diff --git a/build.sh b/build.sh index b3b6edc..0c81d15 100755 --- a/build.sh +++ b/build.sh @@ -19,6 +19,10 @@ MPYC="$PWD/tools/mpy-cross"; ROOT="$PWD" [[ -x "$MPYC" ]] || { echo "error: $MPYC missing (Adafruit mpy-cross for CircuitPython 10.2.1)" >&2; exit 1; } ( cd pico-cp && "$MPYC" app.py -o "$ROOT/dist/app.mpy" ) # compile from pico-cp/ so tracebacks read "app.py" echo "precompiled dist/app.mpy ($(stat -c%s dist/app.mpy) bytes <- $(stat -c%s pico-cp/app.py) source)" +# PM_X-1 Explorer firmware uses the same mpy-cross. Output as dist/explorer-app.mpy so the Kit + Explorer +# bundles each ship their own precompiled binary; the served URLs follow the same one-target-per-file rule. +( cd pico-explorer && "$MPYC" app.py -o "$ROOT/dist/explorer-app.mpy" ) +echo "precompiled dist/explorer-app.mpy ($(stat -c%s dist/explorer-app.mpy) bytes <- $(stat -c%s pico-explorer/app.py) source)" python3 - <<'PY' import os, pathlib, re @@ -40,7 +44,7 @@ def build(name): for name in ("index.html","editor.html","editor-beta.html","player.html","teacher.html","stage.html","micro.html","showcase.html","kit.html", "embed.html", - "info-editor.html","info-player.html","info-teacher.html","info-stage.html","info-micro.html","info-showcase.html","info-kit.html"): + "info-editor.html","info-player.html","info-teacher.html","info-stage.html","info-micro.html","info-showcase.html","info-kit.html","info-explorer.html"): print("built %s (%dKB)" % (name, build(name) // 1024)) pathlib.Path("dist/embed.js").write_text(pathlib.Path("embed.js").read_text()) # loader, served as-is print("copied embed.js") @@ -55,6 +59,12 @@ pathlib.Path("dist/pico-cp-app.py").write_text(_appsrc) # served for version r # the editor pushes the PRECOMPILED .mpy (base64); serve it next to the source pathlib.Path("dist/pico-cp-app.mpy").write_bytes(pathlib.Path("dist/app.mpy").read_bytes()) print("copied pico-cp-app.py + pico-cp-app.mpy") +_xsrc = pathlib.Path("pico-explorer/app.py").read_text() # PM_X-1 Explorer firmware (sibling to the Kit) +_xbad = [(i, c) for i, c in enumerate(_xsrc) if ord(c) > 0x7F] +assert not _xbad, "pico-explorer/app.py has non-ASCII at %r -- keep it ASCII (version regex + clean source)" % (_xbad[:5],) +pathlib.Path("dist/pico-explorer-app.py").write_text(_xsrc) # editor reads APP_VERSION from here +pathlib.Path("dist/pico-explorer-app.mpy").write_bytes(pathlib.Path("dist/explorer-app.mpy").read_bytes()) +print("copied pico-explorer-app.py + pico-explorer-app.mpy") import zipfile # PM_K-1 CircuitPython drive bundle (download → unzip onto CIRCUITPY) with zipfile.ZipFile("dist/pm_k1_circuitpy.zip", "w", zipfile.ZIP_DEFLATED) as z: for f in ("code.py", "boot.py", "programs.json", "font_s.bin", "font_m.bin", "font_l.bin", @@ -63,4 +73,13 @@ with zipfile.ZipFile("dist/pm_k1_circuitpy.zip", "w", zipfile.ZIP_DEFLATED) as z z.write("dist/app.mpy", "app.mpy") # the precompiled firmware (NOT app.py - too big to compile on-device) z.write("dist/editor.html", "editor.html") # offline copy of the editor, on the drive print("zipped pm_k1_circuitpy.zip") +# PM_X-1 Explorer drive bundle (download → unzip onto CIRCUITPY on the Pimoroni Explorer with CircuitPython for Pico 2) +with zipfile.ZipFile("dist/pm_x1_circuitpy.zip", "w", zipfile.ZIP_DEFLATED) as z: + for f in ("code.py", "boot.py", "programs.json", "README.md"): + z.write("pico-explorer/" + f, f) + for f in ("font_s.bin", "font_m.bin", "font_l.bin", "logo.bin", "midi.bin", "usb.bin"): + z.write("pico-cp/" + f, f) # fonts + icons are resolution-agnostic; reuse the Kit's baked blobs + z.write("dist/explorer-app.mpy", "app.mpy") + z.write("dist/editor.html", "editor.html") +print("zipped pm_x1_circuitpy.zip") PY diff --git a/deploy.sh b/deploy.sh index f6485df..43747c2 100755 --- a/deploy.sh +++ b/deploy.sh @@ -42,7 +42,7 @@ fi echo "deployed v$BUILD -> $DEST_DIR" for f in index.html editor.html editor-beta.html player.html teacher.html stage.html micro.html showcase.html kit.html \ embed.html \ - info-editor.html info-player.html info-teacher.html info-stage.html info-micro.html info-showcase.html info-kit.html; do + info-editor.html info-player.html info-teacher.html info-stage.html info-micro.html info-showcase.html info-kit.html info-explorer.html; do sed "s|const APP_VERSION = \"[^\"]*\";|const APP_VERSION = \"$BUILD\";|" "$DIST_DIR/$f" > "$DEST_DIR/$f" echo " $f ($(stat -c '%s' "$DEST_DIR/$f") bytes)" done @@ -51,6 +51,9 @@ cp "$DIST_DIR/pico-main.py" "$DEST_DIR/pico-main.py"; echo " pico-main.py ($(st cp "$DIST_DIR/pm_k1_circuitpy.zip" "$DEST_DIR/pm_k1_circuitpy.zip"; echo " pm_k1_circuitpy.zip ($(stat -c '%s' "$DEST_DIR/pm_k1_circuitpy.zip") bytes)" # PM_K-1 CircuitPython bundle cp "$DIST_DIR/pico-cp-app.py" "$DEST_DIR/pico-cp-app.py"; echo " pico-cp-app.py ($(stat -c '%s' "$DEST_DIR/pico-cp-app.py") bytes)" # served for version reading + reference cp "$DIST_DIR/pico-cp-app.mpy" "$DEST_DIR/pico-cp-app.mpy"; echo " pico-cp-app.mpy ($(stat -c '%s' "$DEST_DIR/pico-cp-app.mpy") bytes)" # precompiled firmware the editor pushes (base64) +cp "$DIST_DIR/pm_x1_circuitpy.zip" "$DEST_DIR/pm_x1_circuitpy.zip"; echo " pm_x1_circuitpy.zip ($(stat -c '%s' "$DEST_DIR/pm_x1_circuitpy.zip") bytes)" # PM_X-1 Explorer CircuitPython bundle +cp "$DIST_DIR/pico-explorer-app.py" "$DEST_DIR/pico-explorer-app.py"; echo " pico-explorer-app.py ($(stat -c '%s' "$DEST_DIR/pico-explorer-app.py") bytes)" # served for version reading +cp "$DIST_DIR/pico-explorer-app.mpy" "$DEST_DIR/pico-explorer-app.mpy"; echo " pico-explorer-app.mpy ($(stat -c '%s' "$DEST_DIR/pico-explorer-app.mpy") bytes)" # PM_X-1 firmware (the editor pushes this when device id = X) rm -f "$DEST_DIR/player-asbuilt.html" # renamed to teacher.html rm -f "$DEST_DIR/concepts.html" # Concepts is now the landing (/) # info-*.html are first-class pages again: each form factor has a lean widget page diff --git a/docs/livesync-protocol.md b/docs/livesync-protocol.md index 6820da1..d661479 100644 --- a/docs/livesync-protocol.md +++ b/docs/livesync-protocol.md @@ -193,3 +193,20 @@ target. - Streaming the device practice log (`history.json`) up to the browser. - Mirroring device `settings.json` (LED brightness, MIDI config, etc.). - Multi‑peer / multi‑editor arbitration beyond last‑writer‑wins. + +--- + +## 7. Per‑device emit/apply matrix + +Both targets implement the **full apply path** for every verb. They differ in what +they **emit**, because on‑device editing differs: + +| Device | Emits | Applies | +|-------------|----------------------------------------------------|---------------------------------------------| +| **PM_K‑1** Kit (touchscreen + joystick) | `play` / `stop` / `bpm` / `sel` / `beat` / `lane` (FULL on structural lane edits) | all of the above | +| **PM_X‑1** Explorer (6 buttons, read‑only beats) | `play` / `stop` / `bpm` / `sel` only (no on‑device beat/lane editing) | all of the above | + +Editors don't need to special‑case the source — both DELTA streams look identical on +the wire, and the **device id is only exposed on the version query** (SysEx `0x02` +→ `0x03` reply, `;`; pre‑0.0.23 firmware sends bare version → assume +`K`). diff --git a/editor-beta.html b/editor-beta.html index c0e1ea6..c259553 100644 --- a/editor-beta.html +++ b/editor-beta.html @@ -1211,34 +1211,42 @@ async function toggleDeviceAudio() { function _queryDeviceVersion() { // ask the device its firmware version (SysEx 0x02 -> reply 0x03) return new Promise((res) => { _verCb = res; _send([0xF0, 0x7D, 0x02, 0xF7]); setTimeout(() => { if (_verCb) { _verCb = null; res(null); } }, 1500); }); } +// 0.0.23+ devices reply ";" (e.g. "K;0.0.23", "X;0.0.1"); pre-0.0.23 send bare version -> assume K. +function _parseDeviceReply(s) { + if (!s) return { id: null, version: null }; + const i = s.indexOf(";"); + return i >= 0 ? { id: s.slice(0, i), version: s.slice(i + 1) } : { id: "K", version: s }; +} +const FW_PATHS = { K: { py: "/pico-cp-app.py", mpy: "/pico-cp-app.mpy", label: "PM_K-1 Kit" }, + X: { py: "/pico-explorer-app.py", mpy: "/pico-explorer-app.mpy", label: "PM_X-1 Explorer" } }; async function updateFirmware() { // A/B firmware update over USB-MIDI: push the precompiled .mpy console.log("[fw] update start"); if (!(await _ensureMidi()) || !_midiOutputs().length) { console.log("[fw] no MIDI output (outputs:", _midiOutputs().length, ")"); - return alert("Connect the PM_K-1 (Chrome/Edge/Firefox), then try again."); + return alert("Connect the device (Chrome/Edge/Firefox), then try again."); } console.log("[fw] MIDI ok; outputs:", _midiOutputs().map((o) => o.name)); - const dev = await _queryDeviceVersion(); - console.log("[fw] device version reply:", dev); + const reply = await _queryDeviceVersion(); + const { id: devId, version: dev } = _parseDeviceReply(reply); + const paths = FW_PATHS[devId] || FW_PATHS.K; // unknown id -> assume Kit (pre-0.0.23 firmware) + console.log("[fw] device reply:", reply, "-> id:", devId || "(none)", "version:", dev, "paths:", paths.label); let latest = null, b64 = null; - // version comes from the (text) source; the payload is the precompiled .mpy bytecode — CircuitPython - // compiles a big .py at boot, which OOMs the RP2040, so we ship + push compiled bytecode instead. for (const base of ["", "https://metronome.varasys.io"]) { - try { const t = await (await fetch(base + "/pico-cp-app.py", { cache: "no-store" })).text(); + try { const t = await (await fetch(base + paths.py, { cache: "no-store" })).text(); const m = t.match(/APP_VERSION\s*=\s*["']([^"']+)["']/); if (m) latest = m[1]; } catch (_) {} - try { const r = await fetch(base + "/pico-cp-app.mpy", { cache: "no-store" }); + try { const r = await fetch(base + paths.mpy, { cache: "no-store" }); if (r.ok) { b64 = _b64(new Uint8Array(await r.arrayBuffer())); break; } } catch (_) {} } if (!b64) { // offline: let the user pick app.mpy alert("Can't reach the site.\n\nPick the firmware file (app.mpy) to flash — download it from\n" + - "metronome.varasys.io/pico-cp-app.mpy, or use the online editor at metronome.varasys.io/editor.html."); + "metronome.varasys.io" + paths.mpy + ", or use the online editor at metronome.varasys.io/editor.html."); const u8 = await _pickBinary(); if (!u8) return; b64 = _b64(u8); if (!latest) latest = "(picked .mpy)"; } if (!latest) latest = "?"; console.log("[fw] latest:", latest, "| .mpy base64 length:", b64 && b64.length); const upToDate = dev && dev === latest; - if (!confirm("Device firmware: " + (dev || "unknown") + "\nNew build: " + latest + + if (!confirm(paths.label + " firmware: " + (dev || "unknown") + "\nNew build: " + latest + (upToDate ? "\n\nSame version. Re-install anyway?" : "\n\nUpdate now? The device reboots, runs the new build, and auto-rolls-back if it fails to start."))) { console.log("[fw] confirm returned false (cancelled, OR the browser is suppressing dialogs -> reload the page)"); diff --git a/editor.html b/editor.html index 7117e86..489ff94 100644 --- a/editor.html +++ b/editor.html @@ -1204,34 +1204,42 @@ async function toggleDeviceAudio() { function _queryDeviceVersion() { // ask the device its firmware version (SysEx 0x02 -> reply 0x03) return new Promise((res) => { _verCb = res; _send([0xF0, 0x7D, 0x02, 0xF7]); setTimeout(() => { if (_verCb) { _verCb = null; res(null); } }, 1500); }); } +// 0.0.23+ devices reply ";" (e.g. "K;0.0.23", "X;0.0.1"); pre-0.0.23 send bare version -> assume K. +function _parseDeviceReply(s) { + if (!s) return { id: null, version: null }; + const i = s.indexOf(";"); + return i >= 0 ? { id: s.slice(0, i), version: s.slice(i + 1) } : { id: "K", version: s }; +} +const FW_PATHS = { K: { py: "/pico-cp-app.py", mpy: "/pico-cp-app.mpy", label: "PM_K-1 Kit" }, + X: { py: "/pico-explorer-app.py", mpy: "/pico-explorer-app.mpy", label: "PM_X-1 Explorer" } }; async function updateFirmware() { // A/B firmware update over USB-MIDI: push the precompiled .mpy console.log("[fw] update start"); if (!(await _ensureMidi()) || !_midiOutputs().length) { console.log("[fw] no MIDI output (outputs:", _midiOutputs().length, ")"); - return alert("Connect the PM_K-1 (Chrome/Edge/Firefox), then try again."); + return alert("Connect the device (Chrome/Edge/Firefox), then try again."); } console.log("[fw] MIDI ok; outputs:", _midiOutputs().map((o) => o.name)); - const dev = await _queryDeviceVersion(); - console.log("[fw] device version reply:", dev); + const reply = await _queryDeviceVersion(); + const { id: devId, version: dev } = _parseDeviceReply(reply); + const paths = FW_PATHS[devId] || FW_PATHS.K; // unknown id -> assume Kit (pre-0.0.23 firmware) + console.log("[fw] device reply:", reply, "-> id:", devId || "(none)", "version:", dev, "paths:", paths.label); let latest = null, b64 = null; - // version comes from the (text) source; the payload is the precompiled .mpy bytecode — CircuitPython - // compiles a big .py at boot, which OOMs the RP2040, so we ship + push compiled bytecode instead. for (const base of ["", "https://metronome.varasys.io"]) { - try { const t = await (await fetch(base + "/pico-cp-app.py", { cache: "no-store" })).text(); + try { const t = await (await fetch(base + paths.py, { cache: "no-store" })).text(); const m = t.match(/APP_VERSION\s*=\s*["']([^"']+)["']/); if (m) latest = m[1]; } catch (_) {} - try { const r = await fetch(base + "/pico-cp-app.mpy", { cache: "no-store" }); + try { const r = await fetch(base + paths.mpy, { cache: "no-store" }); if (r.ok) { b64 = _b64(new Uint8Array(await r.arrayBuffer())); break; } } catch (_) {} } if (!b64) { // offline: let the user pick app.mpy alert("Can't reach the site.\n\nPick the firmware file (app.mpy) to flash — download it from\n" + - "metronome.varasys.io/pico-cp-app.mpy, or use the online editor at metronome.varasys.io/editor.html."); + "metronome.varasys.io" + paths.mpy + ", or use the online editor at metronome.varasys.io/editor.html."); const u8 = await _pickBinary(); if (!u8) return; b64 = _b64(u8); if (!latest) latest = "(picked .mpy)"; } if (!latest) latest = "?"; console.log("[fw] latest:", latest, "| .mpy base64 length:", b64 && b64.length); const upToDate = dev && dev === latest; - if (!confirm("Device firmware: " + (dev || "unknown") + "\nNew build: " + latest + + if (!confirm(paths.label + " firmware: " + (dev || "unknown") + "\nNew build: " + latest + (upToDate ? "\n\nSame version. Re-install anyway?" : "\n\nUpdate now? The device reboots, runs the new build, and auto-rolls-back if it fails to start."))) { console.log("[fw] confirm returned false (cancelled, OR the browser is suppressing dialogs -> reload the page)"); diff --git a/info-explorer.html b/info-explorer.html new file mode 100644 index 0000000..12067a0 --- /dev/null +++ b/info-explorer.html @@ -0,0 +1,169 @@ + + + + + +VARASYS PM_X-1 Explorer - wiring, parts & firmware (Pimoroni Explorer / RP2350) + + + + + + + +/*@BUILD:include:src/header.html@*/ + +
+
+

PM_X-1 Explorer

+

The off-the-shelf Pimoroni Explorer Kit (RP2350, 2.8" LCD, 6 buttons, piezo) as a polymeter metronome - sibling to the PM_K-1 Kit, sharing the engine, program-string grammar, and live-sync protocol with the web editor.

+
+ +
+

What it is

+
Buildable nowRP2350 (Pico 2 class)Pimoroni Explorer PIM744~$60
+

The Pimoroni Explorer Kit (PIM744) is a finished + development board: RP2350B built in, 2.8" ST7789V 320x240 IPS LCD, 6 user buttons (A/B/C + on the left of the screen, X/Y/Z on the right), a piezo speaker, USB-C, a JST-PH battery + connector, and a mini breadboard with 6 GPIOs / 3 ADCs broken out for sensor projects. No soldering; + you flash CircuitPython and drop the firmware on. No touchscreen, no joystick, no RGB LED - + everything is driven from the 6 buttons.

+

It runs the same polymeter engine and the same program strings as the web editor. + Beat editing is done in the browser; Live sync mirrors edits to the device in real time + (HELLO/FULL/DELTA over USB-MIDI), and the device mirrors play/stop/tempo/track changes back. The + piezo clicks; a tiny on-screen dot shows run state.

+
+ +
+ Wiring - the Pimoroni Explorer fixed pinout (no breadboarding required) +
+

Everything is wired on the board; this is what the firmware reads. The display is driven by an 8-bit parallel bus initialised by CircuitPython's official board definition - we use board.DISPLAY directly.

+ + + + + + + + + + + + + + +
ComponentRP2350 pins
Display - 2.8" ST7789V, 320x240 (8-bit parallel 8080)
BL / CS / DC / WR / RD / D0-D7GP26 / GP27 / GP28 / GP30 / GP31 / GP32-GP39 (board.c)
Buttons (digital, pull-up)
A (play/stop) / B (tap tempo) / C (menu)GP16 / GP15 / GP14 (left side, top to bottom)
X (prev track) / Y (-bpm) / Z (next track)GP17 / GP18 / GP19 (right side, top to bottom)
Audio
Piezo PWMGP12
Amp enableGP13
I2C (QwSTEMMA - unused by the firmware, free for sensors)
SDA / SCLGP20 / GP21
+
+
+ +
+ Controls +
+ + + + + + + + + + + + + + + + + +
ButtonAction
Aplay / stop
Btap tempo
Cmenu (Settings / Practice log / Help / About)
Xprev track (hold to repeat)
Znext track (hold to repeat)
Ytempo -1 (hold = -5 after 1.5 s)
X + Z (chord)tempo +1 (same hold rule as Y)
In a menu
X / Zmove cursor up / down (Help: prev / next page)
Ydecrement the focused value
Acycle / increment / select
Bback (cancel)
Cclose the menu
+
+
+ +
+ Parts +
+

A finished development board, not a custom build - ballpark one-off price (USD).

+ + + + + + + +
PartQty~$
Pimoroni Explorer Kit (PIM744) - RP2350B, 2.8" ST7789V, 6 buttons, piezo + amp, USB-C160
USB-C cable - power + flashing12
Total (one-off)≈ $62
+

Reference: Pimoroni Explorer product page + · vendor code + · CircuitPython for Pimoroni Explorer (RP2350).

+
+
+ +
+ Firmware - self-contained appliance (USB drive · web-driven editing via Live sync · MIDI audio · practice log) +
+

The firmware turns the Explorer into a self-contained appliance: it mounts as a + USB drive carrying the (precompiled) firmware, your tracks and an offline copy of this editor; + drives a lanes/pads display with web-driven editing via Live sync; logs your practice to + history.json; takes new set lists pushed from the editor over USB-MIDI; and plays + out your computer's speakers over USB-MIDI. By default the firmware owns the drive (read-only to + the computer - so it can log and can't be accidentally erased); hold button A at power-on for + editor mode (drive writable).

+

+ Download CircuitPython bundle ↓ + Source + README ↗ +

+
    +
  1. Flash CircuitPython for Pimoroni Explorer (RP2350) + (download) + via BOOTSEL, unzip the bundle onto CIRCUITPY, and power-cycle. It boots into appliance mode.
  2. +
  3. Edit on the web: open the editor (beta) in Chrome / Edge / Firefox, + click 🔗 Live sync, and the Explorer mirrors your edits live (beats, tempo, track changes).
  4. +
  5. Save a set list to the device for offline use: set-list ··· menu → + 📟 Save to device. It's pushed over USB-MIDI; the device persists it to + /programs.json.
  6. +
  7. Play through your computer: click 🎹 Device audio, then press A on the device - + the full groove sounds through your speakers over USB-MIDI, in sync. A MIDI badge appears in the + header and the piezo auto-mutes.
  8. +
  9. Practice log: press CPractice log. Plays over 5 s appear (time · BPM · duration · bars).
  10. +
  11. Firmware updates: ··· menu → ⬆ Update firmware - the editor reads + the device id (X = Explorer), fetches the matching pico-explorer-app.mpy, and pushes it + over USB-MIDI. The device A/B-updates with automatic rollback if a build won't boot.
  12. +
+
+
+ +

Pairs with the touch-driven PM_K-1 Kit - same engine, same programs.json, same web editor.

+
+ +/*@BUILD:include:src/footer.html@*/ + + + + diff --git a/pico-cp/app.py b/pico-cp/app.py index bdc6f9d..bf11e5f 100644 --- a/pico-cp/app.py +++ b/pico-cp/app.py @@ -18,7 +18,8 @@ import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart -APP_VERSION = "0.0.22" # firmware version (the A/B updater pushes/compares this) +APP_VERSION = "0.0.23" # firmware version (the A/B updater pushes/compares this) +DEVICE_ID = "K" # 'K' = 52Pi kit, 'X' = Pimoroni Explorer (per docs/livesync-protocol.md and the version reply) try: import rtc # set from the editor's clock SysEx so the log has real timestamps except ImportError: @@ -1541,8 +1542,10 @@ class App: if cmd == 0x01 and len(sx) >= 8 and rtc is not None: # set clock: yr-2000, mo, dd, hh, mm, ss try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1)) except Exception: pass - elif cmd == 0x02: # version query -> reply 0x03 + APP_VERSION - if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x03]) + APP_VERSION.encode() + bytes([0xF7])) + elif cmd == 0x02: # version query -> reply 0x03 + ";" + if self.midi: # old firmware sent bare APP_VERSION; editor parses "contains ';'?" for back-compat + payload = DEVICE_ID + ";" + APP_VERSION + self.midi.write(bytes([0xF0, 0x7D, 0x03]) + payload.encode() + bytes([0xF7])) elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43: # Live sync (see src/livesync.js) try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:]) except Exception: return diff --git a/pico-explorer/README.md b/pico-explorer/README.md new file mode 100644 index 0000000..9a2ca2a --- /dev/null +++ b/pico-explorer/README.md @@ -0,0 +1,74 @@ +# PM_X-1 "Explorer" — CircuitPython edition (Pimoroni Explorer · RP2350) + +The **CircuitPython** firmware for the [Pimoroni Explorer Kit (PIM744)](https://shop.pimoroni.com/products/explorer), +set up as a self-contained appliance. Sibling to the PM_K-1 build in `../pico-cp/` (the 52Pi EP-0172 +kit) — same engine, same program strings, same `programs.json`, same web editor. + +This board is a **2.8″ ST7789V 320×240 LCD + 6 user buttons (A/B/C on the left, X/Y/Z on the right) ++ piezo speaker** built around an RP2350B (Pico 2 class chip). **No touchscreen, no joystick, no +RGB LED.** Editing is done in the web editor with **Live sync** on; the device mirrors changes in +real time and emits its own play/stop/bpm/sel deltas back. + +## Controls + +| Button | Action | +| ------ | --------------------------------------------------------------------- | +| **A** | play / stop | +| **B** | tap tempo | +| **C** | menu (Settings / Help / About / Practice log) | +| **X** | prev track (hold to repeat) | +| **Y** | tempo −1 (hold to repeat; after ~1.5 s the step grows to −5) | +| **Z** | next track (hold to repeat) | +| **X + Z** | tempo +1 (chord; same hold-repeat as Y) | + +In a menu: **X / Z** move the cursor up / down, **Y** decrements the focused value, **A** commits or +cycles, **B** = back, **C** = close. + +## Install + +1. **Flash CircuitPython for Pico 2 / RP2350.** Hold **BOOTSEL** on the Explorer, plug it in over + USB-C, drop the [Pimoroni Explorer (RP2350) CircuitPython `.uf2`](https://circuitpython.org/board/pimoroni_explorer2350/) + onto the `RP2350` drive. A `CIRCUITPY` drive appears. +2. **Copy the bundle onto `CIRCUITPY`** — `boot.py`, `code.py`, **`app.mpy`**, `programs.json`, + `font_s.bin` / `font_m.bin` / `font_l.bin`, `logo.bin` / `midi.bin` / `usb.bin`, `editor.html` + (offline editor). If an old `app.py` is on the drive, delete it. +3. **Power-cycle.** It boots into appliance mode and runs. + +## Program it from the web + +Open in Chrome / Edge / Firefox. The set-list **⋯** menu → +**📟 Save to device** pushes a `programs.json` over USB-MIDI; the device persists it and reloads. +Click **🔗 Live sync** to mirror edits in real time. + +## Pin reference + +The display, buttons, and audio are wired into the board — no jumpers required. CircuitPython's +official board definition for `pimoroni_explorer2350` exposes `board.DISPLAY` pre-initialized, so +the firmware just uses it. + +| Function | GPIO | +| ----------------- | -------- | +| Button A | GP16 | +| Button B | GP15 | +| Button C | GP14 | +| Button X | GP17 | +| Button Y | GP18 | +| Button Z | GP19 | +| Piezo audio (PWM) | GP12 | +| Piezo amp enable | GP13 | +| I²C SDA (QwSTEMMA) | GP20 | +| I²C SCL (QwSTEMMA) | GP21 | +| Display | `board.DISPLAY` (8080 parallel bus on GP26..GP39, initialized by board.c) | + +## Calibration (flags at the top of `app.py`) + +- **Speaker too loud / quiet:** the piezo + amp gain is fixed in hardware. `MUTE_SPEAKER` + silences the click; `SPEAKER_AUTO_MUTE` auto-mutes when a MIDI host is listening. +- **Buttons feel inverted:** the polarity is hard-coded to active-low (pull-up). If a button + fires on release instead of press, check the `BTN_*` pin map at the top of `app.py`. +- **Display orientation:** the board's CircuitPython init mounts the panel landscape + (320 × 240). If your screen looks rotated, that's a board.c-level thing — file a CircuitPython + bug, don't patch the firmware. + +If `app.py` ever errors, CircuitPython prints the traceback **on the screen and over USB serial** — +send me that. diff --git a/pico-explorer/app.py b/pico-explorer/app.py new file mode 100644 index 0000000..6526959 --- /dev/null +++ b/pico-explorer/app.py @@ -0,0 +1,1444 @@ +# VARASYS PolyMeter - PM_X-1 "Explorer" firmware (CircuitPython edition) +# Pimoroni Explorer (PIM744): RP2350B + 2.8" ST7789V 320x240 + 6 buttons (A/B/C/X/Y/Z) + piezo. +# +# Sibling to PM_K-1 (the 52Pi EP-0172 kit in ../pico-cp/). Same engine, same program-string +# grammar, same programs.json, same web editor, same live-sync protocol. The Explorer build is +# READ-ONLY on the device (no on-device beat editing). All editing happens in the web editor +# with Live sync on; the device reflects DELTAs in real time and emits play/stop/bpm/sel back. +# +# WHY CIRCUITPYTHON: the board mounts as a USB drive (CIRCUITPY) carrying this code + your +# tracks + an offline copy of the editor; edits in the web editor are pushed over USB-MIDI. +# Display is initialized by the official board definition (board.DISPLAY pre-built); we just +# use it. Pinout in ./README.md. + +import board, busio, digitalio, pwmio, displayio, vectorio, time, json, gc, os, supervisor +supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart +APP_VERSION = "0.0.1" # firmware version (the A/B updater pushes/compares this) +DEVICE_ID = "X" # 'X' = Explorer, 'K' = 52Pi kit (per docs/livesync-protocol.md and the version reply) +try: + import rtc # set from the editor's clock SysEx so the log has real timestamps +except ImportError: + rtc = None +try: + import usb_midi # default-enabled on RP2350 - sends a MIDI note per click to the computer +except ImportError: + usb_midi = None +try: + from binascii import a2b_base64 # decode the base64-encoded .mpy pushed by the editor's one-click update +except ImportError: + a2b_base64 = None + +# ============================== CONFIG (tweak if needed) ============================== +MIDI_ENABLED = True # send a USB-MIDI note per click (play via the web editor's "Device audio") +MIDI_CHANNEL = 10 # 1..16 - GM channel 10 is the drum channel +MIDI_CLOCK_OUT = False # send 24 PPQN MIDI Clock so a DAW can slave its tempo to the metronome +MIDI_CLOCK_OUT_TRANSPORT = True +MIDI_CLOCK_IN = False # follow an external 24 PPQN clock +MIDI_CLOCK_IN_TRANSPORT = True +MUTE_SPEAKER = False # always silence the on-board piezo +SPEAKER_AUTO_MUTE = True # auto-mute the piezo when a MIDI host is listening + +# ----- pins (Pimoroni Explorer board layout) ----- +P_AUDIO = board.GP12 # piezo PWM (variable frequency) +P_AMPEN = board.GP13 # piezo amp enable (high = on) +P_BTNA, P_BTNB, P_BTNC = board.GP16, board.GP15, board.GP14 # left-side buttons (top to bottom) +P_BTNX, P_BTNY, P_BTNZ = board.GP17, board.GP18, board.GP19 # right-side buttons (top to bottom) +P_SDA, P_SCL = board.GP20, board.GP21 # QwSTEMMA (unused by the firmware - future expansion) + +# Display is initialised by the board definition (8-bit parallel bus, 320x240 landscape). +# We grab it via board.DISPLAY rather than rolling our own init sequence. +WIDTH, HEIGHT = 320, 240 +GRID_TOP = 100 # top of the pad grid (header + meters fit above) +MAXLANES = 6 # lanes shown on the pad grid (parser still accepts more; they just play silent visually) +MIN_LOG_SEC = 5 # don't log plays shorter than this +LOG_MENU_ROWS = 7 # log entries shown in the Practice-log menu screen + +# ----- BUILT-IN playlists: same defaults as the Kit so the two firmwares feel identical ----- +BUILTIN_SETLISTS = [ + ("Styles", [ + ("Four-on-the-floor", "t120;kick:4;snare:4=.x.x;hatClosed:4/2"), + ("Swing ride", "t150;ride:4/2s;kick:4=X..x;snare:4=.x.x"), + ("Purdie half-time shuffle", "t92;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"), + ("Samba (2/4)", "t104;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."), + ("Nanigo (6/8 bembe)", "t130;cowbell:4/3=X.xx.x.xx.x.;kick:4/3=X.....X.....;hatClosed:4/3=..x..x..x..x"), + ("6/8 groove", "t100;kick:3+3=x..x..;snare:3+3=...x..;hatClosed:3+3/2"), + ("7/8 (2+2+3)", "t130;kick:2+2+3=x..x..x;hatClosed:2+2+3/2"), + ("5/4 (3+2)", "t112;kick:3+2=x..x.;snare:3+2=..x..;hatClosed:3+2/2"), + ]), + ("Practice", [ + ("5 over 4 polyrhythm", "t100;kick:4;claves:5~"), + ("3 over 2 hemiola", "t96;woodblock:2;cowbell:3~"), + ("2 & 4 & 3 over one bar", "t100;kick:3;cowbell:2~;claves:4~"), + ("Triplet hats", "t100;kick:4;snare:4=.x.x;hatClosed:4/3"), + ("Tempo builder 80 up", "t80;woodblock:4;rmp80/4/4"), + ("Gap trainer (play 2 / rest 2)", "t100;kick:4;hatClosed:4/2;tr2/2"), + ]), + ("Song (continuous)", [ + ("Intro - hats & kick", "t88;b4;kick:4=X.x.;hatClosed:4/2=gggggggg"), + ("Groove in - backbeat", "t88;b4;kick:4=X.x.;snare:4=.X.X;hatClosed:4/2"), + ("Half-time shuffle", "t92;b4;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"), + ("Build - ramp 92-120", "t92;b4;rmp92/4/2;kick:4;snare:4=.X.X;hatClosed:4/2"), + ("Four-on-the-floor (909)", "t124;b4;kick909:4;clap909:4=.X.X;hat909:4/2=.X.X.X.X"), + ("Samba break (2/4)", "t116;b4;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."), + ("Peak - 16ths", "t132;b4;kick:4=X..x;snare:4=.X.X;hatClosed:4/4"), + ("Outro - ramp down", "t132;b4;rmp132/-7/1;kick:4=X..x;hatClosed:4/2=gggggggg"), + ]), +] + +# ============================== COLOURS ============================== +C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494 +C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240 +C_BTN = 0x1C222C +C_RUN_IDLE = 0x2FE07A # run-state dot (green when stopped, red when playing, bright on each beat) +C_RUN_GO = 0xFF5A5A +C_RUN_PULSE = 0xFFEC78 +SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38, + "clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42, + "hatOpen":46,"openHat808":46, "ride":51,"ride909":51, "crash":49,"crash909":49, + "tomLow":41,"tom808":45,"tomMid":45,"tomHigh":48, "tambourine":54, + "cowbell":56,"cowbell808":56, "woodblock":76,"jamblock":76, "claves":75, "beep":37} +GM_DEFAULT = 37 +HELP_PAGES = ( + ("Transport & Navigation", ( + "A: play / stop", + "B: tap tempo", + "C: menu (this)", + "X: prev track (hold to repeat)", + "Z: next track (hold to repeat)", + "Y: tempo -1 (hold = -5 after 1.5s)", + "X+Z chord: tempo +1 (same hold rule)", + )), + ("Menu navigation", ( + "X / Z: move cursor up / down", + "Y: decrement the focused value", + "A: cycle / increment / select", + "B: back (cancel)", + "C: close the menu", + )), + ("Editing & sync", ( + "Edit on the web at metronome.varasys.io", + "Click 'Live sync' to mirror live", + "Beat patterns are read-only on device", + "Tracks + tempo + transport sync both ways", + "Built-in playlists baked, user lists", + " live in /programs.json", + )), + ("Status & Hardware", ( + "MIDI badge green: laptop listening", + "USB badge cyan: connected to a computer", + "Run dot: green=stop / red=play + pulse", + "Squares = main beats, circles = subs", + "Ramp arrow: track has a tempo ramp", + "Gap symbol: silent rest bars", + )), +) +MIDI_VEL = {2: 120, 1: 90, 3: 45} # accent / normal / ghost +PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost +PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost +C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes) + +# ============================== ANTI-ALIASED FONTS (binary blobs on the drive; see ../pico/gen_font.py) ============================== +def load_font(path): + with open(path, "rb") as f: + blob = f.read() + count = blob[0]; p = 1; pixoff = 1 + count * 7; glyphs = {} + for _ in range(count): + cp = (blob[p] << 8) | blob[p+1]; w = blob[p+2]; h = blob[p+3] + xoff = blob[p+4]; xoff = xoff - 256 if xoff > 127 else xoff + top = blob[p+5]; adv = blob[p+6]; p += 7 + glyphs[cp] = (w, h, xoff, top, adv, pixoff); pixoff += (w * h + 1) // 2 + return (glyphs, blob) + +FONT_S = load_font("/font_s.bin") # small - pad-grid lane labels + meter rows +FONT_M = load_font("/font_m.bin") # labels / buttons +FONT_L = load_font("/font_l.bin") # big BPM +gc.collect() + +def _blend(bg, fg, i): + t = i * 17 + r = (((bg >> 16) & 0xFF)*(255-t) + ((fg >> 16) & 0xFF)*t) // 255 + g = (((bg >> 8) & 0xFF)*(255-t) + ((fg >> 8) & 0xFF)*t) // 255 + b = ((bg & 0xFF)*(255-t) + (fg & 0xFF)*t) // 255 + return (r << 16) | (g << 8) | b + +def make_text(s, font, fg, bg): + """Render a string into a displayio TileGrid (anti-aliased via a 16-step blend palette).""" + glyphs, blob = font + w = 0; top0 = 999; bot = 0 + for c in s: + g = glyphs.get(ord(c)) + if not g: continue + w += g[4] + if g[1]: + if g[3] < top0: top0 = g[3] + if g[3] + g[1] > bot: bot = g[3] + g[1] + if top0 == 999: top0 = 0 + w = max(1, w); h = max(1, bot - top0) + gc.collect() + bmp = displayio.Bitmap(w, h, 16) + pal = displayio.Palette(16) + for i in range(16): pal[i] = _blend(bg, fg, i) + pen = 0 + for c in s: + g = glyphs.get(ord(c)) + if not g: continue + gw, gh, xoff, gtop, adv, off = g + for j in range(gh): + row = (gtop - top0) + j + for i in range(gw): + k = j * gw + i + byte = blob[off + (k >> 1)] + nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF) + if nib: + x = pen + xoff + i + if 0 <= x < w and 0 <= row < h: bmp[x, row] = nib + pen += adv + return displayio.TileGrid(bmp, pixel_shader=pal), w, h + +def load_alpha(path): + try: + with open(path, "rb") as f: blob = f.read() + return (blob[0], blob[1], blob) + except Exception: + return None +def make_glyph(asset, fg, bg): + w, h, blob = asset + gc.collect() + bmp = displayio.Bitmap(w, h, 16); pal = displayio.Palette(16) + for i in range(16): pal[i] = _blend(bg, fg, i) + for k in range(w * h): + byte = blob[2 + (k >> 1)] + nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF) + if nib: bmp[k % w, k // w] = nib + return displayio.TileGrid(bmp, pixel_shader=pal), pal, w, h +def _recolor(pal, fg, bg): + for i in range(16): pal[i] = _blend(bg, fg, i) + +LOGO = load_alpha("/logo.bin") +ICON_MIDI = load_alpha("/midi.bin") +ICON_USB = load_alpha("/usb.bin") +gc.collect() + +# ============================== POLYMETER ENGINE ============================== +PAT = {'X': 2, 'x': 1, 'g': 3, '.': 0, '-': 0, '_': 0} +PRIO = {2: 3, 1: 2, 3: 1} + +def parse_program(s): + bpm = 120; lanes = []; bars = 0; ramp = None; trainer = None + for tok in s.strip().split(';'): + tok = tok.strip() + if not tok: continue + if tok[0] == 't' and tok[1:].isdigit(): + bpm = int(tok[1:]); continue + if tok[0] == 'b' and tok[1:].isdigit(): + bars = int(tok[1:]); continue + if tok.startswith('rmp'): + p = tok[3:].split('/') + if len(p) == 3: + try: ramp = {'start': int(p[0]), 'amt': int(p[1]), 'every': max(1, int(p[2]))} + except ValueError: pass + continue + if tok.startswith('tr') and '/' in tok and ':' not in tok: + p = tok[2:].split('/') + if len(p) == 2: + try: trainer = {'play': max(0, int(p[0])), 'mute': max(0, int(p[1]))} + except ValueError: pass + continue + if ':' not in tok: continue + lane = _parse_lane(tok) + if lane: lanes.append(lane) + if not lanes: lanes = [_parse_lane("beep:4")] + return max(5, min(300, bpm)), lanes, bars, ramp, trainer + +def _parse_lane(tok): + poly = '~' in tok; mute = '!' in tok + tok = tok.replace('~', '').replace('!', '') + gain = '' + if '@' in tok: tok, _, g = tok.partition('@'); gain = '@' + g + sound, _, rest = tok.partition(':') + pattern = None + if '=' in rest: rest, _, pattern = rest.partition('=') + sub = 1; swing = False + if '/' in rest: + rest, _, sd = rest.partition('/') + swing = sd.endswith('s'); sd = sd.rstrip('s') + sub = int(sd) if sd.isdigit() else 1 + groups = [int(g) for g in rest.split('+') if g.isdigit()] or [4] + beats = sum(groups); starts = set(); acc = 0 + for gp in groups: starts.add(acc); acc += gp + steps = beats * sub + if pattern: + levels = [PAT.get(ch, 0) for ch in pattern] + if len(levels) < steps: levels += [0] * (steps - len(levels)) + steps = len(levels) + else: + levels = [] + for i in range(steps): + if i % sub == 0: levels.append(2 if (i // sub) in starts else 1) + else: levels.append(0) + return {'sound': sound, 'sub': sub, 'swing': swing, 'steps': steps, 'levels': levels, + 'poly': poly, 'mute': mute, 'groups': groups, 'gain': gain} + +PAT_CH = {2: 'X', 1: 'x', 3: 'g', 0: '.'} +def lane_to_str(L): + s = L['sound'] + ':' + '+'.join(str(g) for g in L.get('groups', [4])) + if L['sub'] != 1 or L['swing']: s += '/' + str(L['sub']) + ('s' if L['swing'] else '') + s += '=' + ''.join(PAT_CH.get(v, '.') for v in L['levels']) + s += L.get('gain', '') + if L['poly']: s += '~' + if L['mute']: s += '!' + return s + +_ALNUM = "abcdefghijklmnopqrstuvwxyz0123456789" +def _slkey(t): + return "".join(c for c in t.lower() if c in _ALNUM) +def load_user_setlists(): + try: + with open("/programs.json") as f: d = json.load(f) + except Exception as e: + print("programs.json:", e); return [] + def items_of(pl): return [(p.get("name", "?"), p.get("prog", "")) for p in pl if p.get("prog")] + out = [] + try: + if isinstance(d.get("setlists"), list): + for sl in d["setlists"]: + it = items_of(sl.get("programs", [])) + if it: out.append((sl.get("title", "My set list"), it)) + elif isinstance(d.get("programs"), list): + it = items_of(d["programs"]) + if it: out.append((d.get("title", "My set list"), it)) + except Exception as e: + print("setlists:", e) + return out + +def solid(color): + p = displayio.Palette(1); p[0] = color; return p + +def rect(x, y, w, h, color): + return vectorio.Rectangle(pixel_shader=solid(color), width=w, height=h, x=x, y=y) + +# ============================== APP ============================== +class App: + def __init__(self): + self.display = board.DISPLAY # board.c built the BusDisplay; we just use it + try: self.display.auto_refresh = False # we manage refresh in run() (predictive skip + ~20Hz throttle) + except Exception: pass + self.i2c = busio.I2C(scl=P_SCL, sda=P_SDA, frequency=400_000) # QwSTEMMA - unused by the firmware, available to user code + self.midi = usb_midi.ports[1] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 1) else None + self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None + self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0 + self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler + self._fw = None; self._fw_n = 0 # chunked firmware transfer state + self.spk = pwmio.PWMOut(P_AUDIO, frequency=1600, variable_frequency=True, duty_cycle=0) + self.amp_en = digitalio.DigitalInOut(P_AMPEN); self.amp_en.direction = digitalio.Direction.OUTPUT + self.amp_en.value = False # amp off when no audio playing (saves power, kills hum) + self.spk_off = 0 + # buttons - all active-low with internal pull-ups + self.btnA = self._btn(P_BTNA); self.btnB = self._btn(P_BTNB); self.btnC = self._btn(P_BTNC) + self.btnX = self._btn(P_BTNX); self.btnY = self._btn(P_BTNY); self.btnZ = self._btn(P_BTNZ) + self._prev = {'A': True, 'B': True, 'C': True, 'X': True, 'Y': True, 'Z': True} + self._held_t = {'X': 0, 'Y': 0, 'Z': 0} # press start time (monotonic_ns) for hold-repeat + self._next_rep = {'X': 0, 'Y': 0, 'Z': 0} # next "auto repeat" deadline for held buttons + self._chord_xz = 0 # 0 = not in chord; else monotonic_ns of the chord start + self.running = False; self.bpm = 120; self.idx = 0; self.lanes = []; self.bars = 0 + self.ramp = None; self.trainer = None; self._lastbar = -1; self._muted = False; self._ramp_base = 120 + self._overlay = None # menu stack: None / 'menu' / 'settings' / 'help' / 'about' / 'log' / 'msg' + self._modal_cursor = 0 # focused row in the current modal + self._modal_rows = [] # tuples (label, value_str_or_None, action) for current modal + self.continue_on = False; self._advance = False + self._next_pending = None; self._seam_t = 0; self._need_redraw = False + self._heavy_redraw_at = 0 + self._grid_li = None; self._grid_n = 0; self._grid_geo = (0, 0, 0, 0) + self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] + self._heavy_log_pending = False + self._beat_ns = 60_000_000_000 // self.bpm + self._note_buf = bytearray([0x90, 0, 0]) + self._clock_byte = bytes([0xF8]) + self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC]) + self._lastRefresh = 0.0 + try: + o = os.urandom(4); self._sync_origin = "d" + "".join("%02x" % b for b in o) + except Exception: + self._sync_origin = "d%08x" % (time.monotonic_ns() & 0xFFFFFFFF) + self._sync_armed = False; self._sync_seq = 0; self._sync_applying = False + self._sync_heartbeat_next = 0.0 + self._displayed_bpm = -1; self._clock_next = 0 + self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False + self.sl = 0; self.rebuild_setlists() + self.dirty = True + self.pad_pal = displayio.Palette(8) + for i in range(4): self.pad_pal[i] = PAD_DIM[i]; self.pad_pal[i + 4] = PAD_LIT[i] + self.lane_pads = []; self.lane_lit = [] + self.usb_conn = False; self._m_steps = 0 + self._uiNext = 0.0; self._lastTs = None; self._lastBs = None + self._seg_start = 0.0 + self._refreshNext = 0.0 + self.ic_midi_pal = None; self.ic_usb_pal = None + # practice log + self.can_write = self._probe_write() + self._load_settings() + self.log = self._load_log() + self.play_start = None; self.play_bpm = 0; self.play_name = "" + self._log_scroll = 0 + self._build_scene() + self.load(0) + self.draw_icons(); self.draw_meters(); self._set_run_dot() + + def _btn(self, pin): + d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP + return d + + # ---------- scene graph (320x240 landscape) ---------- + def _build_scene(self): + root = displayio.Group(); self.display.root_group = root + root.append(rect(0, 0, WIDTH, HEIGHT, C_BG)) + # Header (y 0..28): VARASYS logo + version + run dot + MIDI/USB badges + if LOGO: + tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 8; tg.y = 6; root.append(tg) + lx = 8 + lw + else: + tg, w, h = make_text("VARASYS", FONT_M, C_CYAN, C_BG); tg.x = 8; tg.y = 6; root.append(tg) + lx = 8 + w + vtg, vw, vh = make_text("v" + APP_VERSION, FONT_S, C_DIM, C_BG); vtg.x = lx + 6; vtg.y = 8; root.append(vtg) + # MIDI/USB icons + run dot at the right of the header + x = WIDTH - 10 + for asset, attr in ((ICON_USB, "ic_usb_pal"), (ICON_MIDI, "ic_midi_pal")): + if asset: + tg, pal, w, h = make_glyph(asset, C_DIM, C_BG); x -= w; tg.x = x; tg.y = 6; x -= 6 + root.append(tg); setattr(self, attr, pal) + # Run dot at the far right + self.run_dot_pal = displayio.Palette(1); self.run_dot_pal[0] = C_RUN_IDLE + x -= 10 + self.run_dot = vectorio.Circle(pixel_shader=self.run_dot_pal, radius=4, x=x, y=14) + root.append(self.run_dot) + # Header divider + root.append(rect(0, 28, WIDTH, 1, C_PANEL)) + # dynamic groups + self.g_idx = displayio.Group(); root.append(self.g_idx) # set-list tab (left of title row) + self.g_cont = displayio.Group(); root.append(self.g_cont) # CONT (auto-advance) toggle indicator + self.g_name = displayio.Group(); root.append(self.g_name) # track title + self.g_bpm = displayio.Group(); root.append(self.g_bpm) # big tempo (right) + self.g_time = displayio.Group(); root.append(self.g_time) # elapsed [of total] (left) + self.g_bar = displayio.Group(); root.append(self.g_bar) # bar [of total] (left) + self.g_train = displayio.Group(); root.append(self.g_train) # ramp / gap-trainer indicators + self.g_grid = displayio.Group(); root.append(self.g_grid) # lanes x step pads + self.g_overlay = displayio.Group(); root.append(self.g_overlay) # modals (drawn on top) + + def _place(self, group, s, x, y, fg, bg, font, right_edge=None): + while len(group): group.pop() + self.dirty = True + if not s: return + tg, w, h = make_text(s, font, fg, bg) + tg.x = (right_edge - w) if right_edge is not None else x; tg.y = y; group.append(tg) + + # ---------- program ---------- + def rebuild_setlists(self): + self.setlists = [{'title': t, 'items': it, 'builtin': True} for t, it in BUILTIN_SETLISTS] + seen = set(_slkey(t) for t, _ in BUILTIN_SETLISTS) + for t, it in load_user_setlists(): + if _slkey(t) in seen: continue + seen.add(_slkey(t)); self.setlists.append({'title': t, 'items': it, 'builtin': False}) + if self.sl >= len(self.setlists): self.sl = 0 + def switch_setlist(self, delta=1): + if len(self.setlists) < 2: return + if self._sync_applying: return + was = self.running + if was: self.running = False; self._log_play() + self.sl = (self.sl + delta) % len(self.setlists) + self.load(0) + if was: self.running = True; self._reset_clock(); self._start_play() + self._set_run_dot(); self.draw_meters() + self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx)) + def load(self, i): + items = self.setlists[self.sl]['items'] + self.idx = i % len(items) + self.name, prog = items[self.idx] + self.bpm, self.lanes, self.bars, self.ramp, self.trainer = parse_program(prog) + self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() + self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False + self._overlay = None + self._next_pending = None; self._need_redraw = False + self._heavy_redraw_at = 0; self._heavy_log_pending = False; self._grid_li = None + while len(self.g_overlay): self.g_overlay.pop() + self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train() + self.build_grid() + def _prog_str(self): + parts = ['t' + str(self.bpm)] + if self.bars: parts.append('b' + str(self.bars)) + if self.ramp: parts.append('rmp%d/%d/%d' % (self.ramp.get('start', self.bpm), self.ramp['amt'], self.ramp['every'])) + if self.trainer: parts.append('tr%d/%d' % (self.trainer['play'], self.trainer['mute'])) + for L in self.lanes: parts.append(lane_to_str(L)) + return ';'.join(parts) + def toggle_continue(self): + self.continue_on = not self.continue_on; self.draw_status() + + # ---------- modal: 4-screen menu navigated by buttons (Settings / Help / About / Practice log) ---------- + def _show_msg(self, text): + self._overlay = 'msg'; g = self.g_overlay + while len(g): g.pop() + px, py, pw, ph = 24, 90, WIDTH - 48, 60 + g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_AMBER)) + t, w, h = make_text(text[:32], FONT_S, C_TXT, C_PANEL); t.x = px + 12; t.y = py + 12; g.append(t) + t2, w2, h2 = make_text("(A/C to dismiss)", FONT_S, C_DIM, C_PANEL); t2.x = px + 12; t2.y = py + 34; g.append(t2) + self.dirty = True + def _close_overlay(self): + self._overlay = None; self._modal_cursor = 0; self._modal_rows = [] + while len(self.g_overlay): self.g_overlay.pop() + self.dirty = True + def _show_menu(self): + gc.collect() + self._overlay = 'menu'; self._modal_cursor = 0; self._draw_menu() + def _draw_menu(self): + g = self.g_overlay + while len(g): g.pop() + PX, PY, PW, RH = 24, 36, WIDTH - 48, 26 + rows = ( + ("Continue: " + ("on" if self.continue_on else "off"), None, self._menu_toggle_continue), + ("Settings >", None, self._show_settings), + ("Practice log >", None, self._show_log), + ("Help >", None, self._show_help), + ("About", None, self._show_about), + ) + self._modal_rows = rows + PH = 24 + len(rows) * RH + 18 + g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) + t, w, h = make_text("Menu", FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 6; g.append(t) + ht, hw, hh = make_text("X / Z move, A select, C close", FONT_S, C_DIM, C_PANEL) + ht.x = PX + PW - hw - 12; ht.y = PY + 10; g.append(ht) + for i, (label, _v, _act) in enumerate(rows): + yy = PY + 26 + i * RH + sel = (i == self._modal_cursor) + if sel: + g.append(rect(PX + 6, yy, 3, RH - 4, C_CYAN)) # left-edge caret + g.append(rect(PX + 12, yy, PW - 24, RH - 4, C_BTN)) + col = C_CYAN if (sel and label.startswith("Continue") and self.continue_on) else (C_TXT if sel else C_MUTE) + lt, lw, lh = make_text(label, FONT_S, col, C_BTN if sel else C_PANEL) + lt.x = PX + 18; lt.y = yy + 4; g.append(lt) + self.dirty = True + def _menu_toggle_continue(self): + self.continue_on = not self.continue_on; self.draw_status(); self._draw_menu() + + # ---------- Settings sub-modal (Speaker / MIDI Out / Channel / Clock Out / Clock In) ---------- + def _show_settings(self): + gc.collect() + self._overlay = 'settings'; self._modal_cursor = 0; self._draw_settings() + def _draw_settings(self): + g = self.g_overlay + while len(g): g.pop() + PX, PY, PW, RH = 14, 30, WIDTH - 28, 26 + sm = "Off" if MUTE_SPEAKER else ("Auto" if SPEAKER_AUTO_MUTE else "Always") + rows = ( + ("Speaker", sm, self._adj_speaker), + ("MIDI Out", "on" if MIDI_ENABLED else "off", self._adj_midi_out), + ("Channel", str(MIDI_CHANNEL), self._adj_midi_ch), + ("Clock Out", "on" if MIDI_CLOCK_OUT else "off", self._adj_clock_out), + ("Clock In", "on" if MIDI_CLOCK_IN else "off", self._adj_clock_in), + ) + self._modal_rows = rows + PH = 24 + len(rows) * RH + 14 + g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) + t, w, h = make_text("Settings", FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 5; g.append(t) + ht, hw, hh = make_text("X/Z row, Y -, A +, B back", FONT_S, C_DIM, C_PANEL) + ht.x = PX + PW - hw - 12; ht.y = PY + 10; g.append(ht) + for i, (label, value, _adj) in enumerate(rows): + yy = PY + 26 + i * RH + sel = (i == self._modal_cursor) + if sel: + g.append(rect(PX + 4, yy, 3, RH - 4, C_CYAN)) + g.append(rect(PX + 10, yy, PW - 20, RH - 4, C_BTN)) + lt, lw, lh = make_text(label, FONT_S, C_TXT if sel else C_MUTE, C_BTN if sel else C_PANEL) + lt.x = PX + 16; lt.y = yy + 5; g.append(lt) + vt, vw, vh = make_text(value, FONT_M, C_TXT if sel else C_MUTE, C_BTN if sel else C_PANEL) + vt.x = PX + PW - vw - 14; vt.y = yy + 3; g.append(vt) + self.dirty = True + def _adj_speaker(self, d): + global MUTE_SPEAKER, SPEAKER_AUTO_MUTE + modes = ("auto", "always", "off") + cur = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always") + i = (modes.index(cur) + d) % 3 + MUTE_SPEAKER = (modes[i] == "off"); SPEAKER_AUTO_MUTE = (modes[i] == "auto") + if MUTE_SPEAKER: self.spk.duty_cycle = 0; self.amp_en.value = False + self._save_settings(); self._draw_settings() + def _adj_midi_out(self, d): + global MIDI_ENABLED + MIDI_ENABLED = not MIDI_ENABLED; self._save_settings(); self._draw_settings() + def _adj_midi_ch(self, d): + global MIDI_CHANNEL + MIDI_CHANNEL = ((MIDI_CHANNEL - 1 + d) % 16) + 1 + self._save_settings(); self._draw_settings() + def _adj_clock_out(self, d): + global MIDI_CLOCK_OUT + MIDI_CLOCK_OUT = not MIDI_CLOCK_OUT + if MIDI_CLOCK_OUT: self._clock_next = time.monotonic_ns() + self._save_settings(); self._draw_settings() + def _adj_clock_in(self, d): + global MIDI_CLOCK_IN + MIDI_CLOCK_IN = not MIDI_CLOCK_IN + if not MIDI_CLOCK_IN: self._slaved = False + self._save_settings(); self._draw_settings() + + # ---------- Help sub-modal (paginated; cursor not used, X/Z page through) ---------- + def _show_help(self): + gc.collect() + self._overlay = 'help'; self._help_page = 0; self._modal_cursor = 0; self._draw_help() + def _draw_help(self): + g = self.g_overlay + while len(g): g.pop() + PX, PY, PW = 12, 30, WIDTH - 24 + title, lines = HELP_PAGES[self._help_page] + PH = 26 + 14 * len(lines) + 22 + g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) + t, w, h = make_text(title, FONT_M, C_TXT, C_PANEL); t.x = PX + 10; t.y = PY + 5; g.append(t) + pi, piw, pih = make_text("%d / %d" % (self._help_page + 1, len(HELP_PAGES)), FONT_S, C_DIM, C_PANEL) + pi.x = PX + PW - piw - 10; pi.y = PY + 9; g.append(pi) + yy = PY + 24 + for ln in lines: + lt, lw, lh = make_text(ln[:44], FONT_S, C_TXT, C_PANEL); lt.x = PX + 10; lt.y = yy; g.append(lt) + yy += 14 + hint, hw, hh = make_text("X / Z = page, C = close", FONT_S, C_DIM, C_PANEL) + hint.x = PX + 10; hint.y = PY + PH - 14; g.append(hint) + self.dirty = True + + # ---------- About sub-modal ---------- + def _show_about(self): + gc.collect() + self._overlay = 'about'; self._modal_cursor = 0; self._draw_about() + def _draw_about(self): + import sys + gc.collect() + try: free = gc.mem_free() + except Exception: free = 0 + try: cp_ver = "%d.%d.%d" % sys.implementation.version[:3] + except Exception: cp_ver = "?" + up_min = int(time.monotonic()) // 60 + lines = ( + ("VARASYS PolyMeter", C_CYAN), + ("PM_X-1 Explorer", C_TXT), + ("", None), + ("Firmware: v" + APP_VERSION, C_TXT), + ("Free RAM: %d KB" % (free // 1024), C_TXT), + ("Uptime: %dm" % up_min, C_TXT), + ("CircuitPython: " + cp_ver, C_TXT), + ("", None), + ("metronome.varasys.io", C_DIM), + ) + g = self.g_overlay + while len(g): g.pop() + PX, PY, PW = 24, 28, WIDTH - 48; PH = 12 + 16 * len(lines) + 22 + g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) + yy = PY + 8 + for text, col in lines: + if col is not None: + lt, lw, lh = make_text(text, FONT_S, col, C_PANEL); lt.x = PX + 14; lt.y = yy; g.append(lt) + yy += 16 + hint, hw, hh = make_text("C = close", FONT_S, C_DIM, C_PANEL) + hint.x = PX + 14; hint.y = PY + PH - 14; g.append(hint) + self.dirty = True + + # ---------- Practice log sub-modal (replaces the Kit's screen-footer log) ---------- + def _show_log(self): + gc.collect() + self._overlay = 'log'; self._log_scroll = 0; self._draw_log_modal() + def _draw_log_modal(self): + g = self.g_overlay + while len(g): g.pop() + PX, PY, PW = 8, 28, WIDTH - 16; PH = 12 + LOG_MENU_ROWS * 16 + 28 + g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) + t, w, h = make_text("Practice log (this track)", FONT_M, C_TXT, C_PANEL); t.x = PX + 10; t.y = PY + 5; g.append(t) + rows = [(i, e) for i, e in enumerate(self.log) if e.get("name") == self.name] + if not rows: + tg, w, h = make_text("no plays over 5s yet", FONT_S, C_DIM, C_PANEL); tg.x = PX + 14; tg.y = PY + 32; g.append(tg) + else: + top = self._log_scroll; yy = PY + 28 + for k in range(min(LOG_MENU_ROWS, len(rows) - top)): + _oi, e = rows[top + k] + dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60) + bars = e.get("bars", 0); bstr = (" %dbar" % bars) if bars else "" + line = "%s %3dbpm %s%s" % (e.get("t", "--:--"), e["bpm"], dur, bstr) + lt, lw, lh = make_text(line, FONT_S, C_TXT, C_PANEL); lt.x = PX + 14; lt.y = yy; g.append(lt) + yy += 16 + hint, hw, hh = make_text("X/Z scroll, C close", FONT_S, C_DIM, C_PANEL) + hint.x = PX + 10; hint.y = PY + PH - 14; g.append(hint) + self.dirty = True + + # ---------- Settings persistence ---------- + def _load_settings(self): + global MUTE_SPEAKER, SPEAKER_AUTO_MUTE, MIDI_ENABLED, MIDI_CHANNEL, MIDI_CLOCK_OUT, MIDI_CLOCK_IN + try: + with open("/settings.json") as f: d = json.load(f) + except Exception: return + try: + sm = d.get("speaker", "auto") + MUTE_SPEAKER = (sm == "off"); SPEAKER_AUTO_MUTE = (sm == "auto") + MIDI_ENABLED = bool(d.get("midi_out", MIDI_ENABLED)) + MIDI_CHANNEL = max(1, min(16, int(d.get("midi_channel", MIDI_CHANNEL)))) + MIDI_CLOCK_OUT = bool(d.get("clock_out", MIDI_CLOCK_OUT)) + MIDI_CLOCK_IN = bool(d.get("clock_in", MIDI_CLOCK_IN)) + except Exception as e: print("settings:", e) + def _save_settings(self): + if not self.can_write: return + sm = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always") + d = {"speaker": sm, "midi_out": MIDI_ENABLED, "midi_channel": MIDI_CHANNEL, + "clock_out": MIDI_CLOCK_OUT, "clock_in": MIDI_CLOCK_IN} + try: + with open("/settings.json", "w") as f: json.dump(d, f) + except OSError: self.can_write = False + + # ---------- step grids (cached per-lane ns durations: tuple lookup, no method call in tick) ---------- + def _rebuild_dur(self, L): + beat = self._beat_ns + sub = max(1, L['sub']); steps = max(1, L['steps']) + if L.get('poly') and self.lanes: + m = self.lanes[0]; master_bar = beat * (m['steps'] // max(1, m['sub'])) + d = master_bar // steps; L['durs'] = tuple(d for _ in range(steps)) + elif L.get('swing') and sub % 2 == 0: + pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3 + L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps)) + else: + d = beat // sub; L['durs'] = tuple(d for _ in range(steps)) + def _rebuild_dur_all(self): + for L in self.lanes: self._rebuild_dur(L) + def _reset_clock(self): + now = time.monotonic_ns() + for L in self.lanes: + L['next'] = now; L['step'] = -1 + self._m_steps = 0 + self._seg_start = time.monotonic() + + # ---------- audio + run-state indicator ---------- + def click(self, level): + self.amp_en.value = True # enable the amp briefly while we drive the piezo + self.spk.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600) + self.spk.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000) + self.spk_off = time.monotonic_ns() + 22_000_000 # silence + amp_en off scheduled in tick() + def _set_run_dot(self): + self.run_dot_pal[0] = C_RUN_GO if self.running else C_RUN_IDLE + self.dirty = True + def flash(self, level): # brief bright pulse on the run dot (replaces the Kit's RGB LED) + self.run_dot_pal[0] = C_RUN_PULSE + self.dirty = True + + # ---------- Live sync (HELLO/FULL/DELTA/BYE on SysEx 0x40-0x43; see src/livesync.js for the editor side) ---------- + def _sync_send(self, op, text): + if self.midi is None: return + b = bytearray((0xF0, 0x7D, op)) + for c in text: + v = ord(c); b.append(v if v < 0x80 else 0x3F) + b.append(0xF7) + try: self.midi.write(b) + except Exception: pass + def _sync_broadcast(self, evt): + if not self._sync_armed or self._sync_applying or self.midi is None: return + text = "%s;%d;%s" % (self._sync_origin, self._sync_seq, evt); self._sync_seq += 1 + self._sync_send(0x42, text) + def _sync_broadcast_full(self): + if not self._sync_armed or self.midi is None: return + try: patch = self._prog_str() + except Exception: return + text = "%s;%d;%d;%d;%d;%s" % (self._sync_origin, self._sync_seq, + 1 if self.running else 0, self.sl, self.idx, patch) + self._sync_seq += 1 + self._sync_send(0x41, text) + self._sync_heartbeat_next = time.monotonic() + 5.0 + def _sync_apply_full(self, running, patch): + self._sync_applying = True + try: + try: + gc.collect() + try: cur = self._prog_str() + except Exception: cur = None + if patch and patch != cur: + bpm, lanes, bars, ramp, trainer = parse_program(patch) + self.bpm = bpm; self.lanes = lanes; self.bars = bars; self.ramp = ramp; self.trainer = trainer + self._beat_ns = 60_000_000_000 // max(1, bpm); self._rebuild_dur_all() + self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False + self._overlay = None + while len(self.g_overlay): self.g_overlay.pop() + self._reset_clock() + self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() + self.build_grid() + if running and not self.running: self.toggle() + elif (not running) and self.running: self.toggle() + except Exception as e: + try: print("sync FULL apply:", e) + except Exception: pass + finally: + self._sync_applying = False + def _sync_apply_delta(self, evt): + self._sync_applying = True + try: + eq = evt.find('=') + key = evt if eq < 0 else evt[:eq] + val = '' if eq < 0 else evt[eq+1:] + if key == 'play': + if not self.running: self.toggle() + elif key == 'stop': + if self.running: self.toggle() + elif key == 'bpm': + try: self.set_bpm(int(val)) + except Exception: pass + elif key == 'sel': + p = val.split('/') + if len(p) == 2: + try: + sl = int(p[0]); item = int(p[1]) + if sl >= 0 and item >= 0: + if sl < len(self.setlists) and sl != self.sl: self.sl = sl + items = self.setlists[self.sl]['items'] + if 0 <= item < len(items) and item != self.idx: self.goto(item) + except Exception: pass + elif key == 'beat': # PM_X-1 doesn't EMIT beat= (no on-device editing) but DOES apply + p = val.split('/') + if len(p) == 3: + try: + li = int(p[0]); s = int(p[1]); lvl = int(p[2]) + if 0 <= li < len(self.lanes): + L = self.lanes[li] + if 0 <= s < len(L['levels']): + L['levels'][s] = lvl & 3 + if li < len(self.lane_pads) and s < len(self.lane_pads[li]): + lit = (self.lane_lit[li] == s) + self.lane_pads[li][s].color_index = self._padbase(L, s) + (4 if lit else 0) + self.dirty = True + except Exception: pass + elif key == 'lane': # apply but don't emit + p = val.split('/') + if len(p) >= 3: + try: + li = int(p[0]); field = p[1]; v = '/'.join(p[2:]) + if 0 <= li < len(self.lanes): + L = self.lanes[li]; structural = False + if field == 'sound': L['sound'] = v + elif field == 'groups': + try: L['groups'] = [int(x) for x in v.split('+')]; structural = True + except Exception: pass + elif field == 'sub': + try: L['sub'] = int(v); structural = True + except Exception: pass + elif field == 'swing': L['swing'] = (v == '1'); structural = True + elif field == 'enabled': L['mute'] = not (v == '1') + elif field == 'gain': + try: L['gain'] = int(v) + except Exception: pass + elif field == 'poly': L['poly'] = (v == '1'); structural = True + if structural: self._regen_levels(L) + if li == 0 and structural: self._rebuild_dur_all() + else: self._rebuild_dur(L) + if structural: self.build_grid() + self.dirty = True + except Exception: pass + finally: + self._sync_applying = False + def _regen_levels(self, L): # called on remote lane= deltas to recompute default accents + sub = L['sub']; groups = L['groups']; starts = set(); acc = 0 + for gp in groups: starts.add(acc); acc += gp + L['steps'] = sum(groups) * sub + L['levels'] = [(2 if (i // sub) in starts else 1) if i % sub == 0 else 0 for i in range(L['steps'])] + + def midi_send(self, note, vel): + if self.midi is None: return + b = self._note_buf + b[0] = 0x90 | ((MIDI_CHANNEL - 1) & 0x0F) + b[1] = note & 0x7F; b[2] = vel & 0x7F + try: self.midi.write(b) + except Exception: pass + + # ---------- transport ---------- + def toggle(self): + self.running = not self.running + if self.running: + self._reset_clock(); self._start_play() + self._clock_next = time.monotonic_ns() + if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: + try: self.midi.write(self._start_byte) + except Exception: pass + else: + self.spk.duty_cycle = 0; self.amp_en.value = False; self.reset_playheads(); self._log_play() + if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: + try: self.midi.write(self._stop_byte) + except Exception: pass + self._set_run_dot(); self.draw_meters() + self._sync_broadcast("play" if self.running else "stop") + def set_bpm(self, v): + v = max(5, min(300, v)) + if v != self.bpm: + self.bpm = v; self._beat_ns = 60_000_000_000 // v + self._rebuild_dur_all() + self._sync_broadcast("bpm=%d" % v) + def goto(self, i): + was = self.running + if was: self.running = False; self._log_play() + self.load(i) + if was: self.running = True; self._reset_clock(); self._start_play() + self._set_run_dot(); self.draw_meters() + self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx)) + def tap(self): + now = time.monotonic() + if not hasattr(self, '_taps'): self._taps = [] + self._taps = [t for t in self._taps if now - t < 2.4] + self._taps.append(now) + if len(self._taps) >= 2: + span = (self._taps[-1] - self._taps[0]) / (len(self._taps) - 1) + if span > 0: self.set_bpm(round(60 / span)) + + # ---------- scheduler ---------- + def tick(self): + now = time.monotonic_ns() + if self.spk_off and now >= self.spk_off: + self.spk.duty_cycle = 0; self.spk_off = 0; self.amp_en.value = False + if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False + if self.running: + fired_best = 0; fired_prio = -1 + for li, L in enumerate(self.lanes): + if self._advance: break + adv = False + while now >= L['next']: + L['step'] = (L['step'] + 1) % L['steps'] + if li == 0: + self._m_steps += 1 + nb = (self._m_steps - 1) // L['steps'] + if nb != self._lastbar: self._lastbar = nb; self._on_new_bar(nb) + if self._advance: break + if self.ramp and L['steps'] > 0 and not self._slaved: + mlen = L['steps'] + bar_pos = self._m_steps / mlen + seg_bar = (bar_pos % self.bars) if self.bars else bar_pos + new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt']))) + if new_bpm != self.bpm: + self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm + self._rebuild_dur_all() + lvl = 0 if L['mute'] else L['levels'][L['step']] + if lvl > 0: + p = PRIO.get(lvl, 0) + if p > fired_prio: fired_prio = p; fired_best = lvl + if not self._muted: + self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90)) + L['next'] += L['durs'][L['step']]; adv = True + if adv and li < len(self.lane_pads): self._move_playhead(li, L['step']) + if fired_best and not self._muted: + if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host): + self.click(fired_best) + self.flash(fired_best) + # Decay the run-dot pulse back to base + if self.run_dot_pal[0] == C_RUN_PULSE: + self.run_dot_pal[0] = C_RUN_GO if self.running else C_RUN_IDLE + self.dirty = True + if self._advance: + self._advance = False + self._do_advance() + if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved: + clk = self._clock_byte + tick_ns = self._beat_ns // 24 + while now >= self._clock_next: + try: self.midi.write(clk) + except Exception: pass + self._clock_next += tick_ns + def _on_new_bar(self, bar): + if self.bars and self.continue_on and self._next_pending is None and bar == self.bars - 1: + self._prepare_next() + if self.bars and bar > 0 and bar % self.bars == 0: + self._seg_start = time.monotonic() + if self.continue_on: + if self._next_pending is None: self._prepare_next() + if self._next_pending is not None: + self._seam_t = self.lanes[0]['next'] + self._advance = True + t = self.trainer + self._muted = bool(t and (t['play'] + t['mute']) and (bar % (t['play'] + t['mute'])) >= t['play']) + def _prepare_next(self): + items = self.setlists[self.sl]['items'] + nxt = (self.idx + 1) % len(items) + if nxt == self.idx: return + name, prog = items[nxt] + gc.collect() + try: + bpm, lanes, bars, ramp, trainer = parse_program(prog) + except MemoryError: + gc.collect(); return + beat = 60_000_000_000 // max(1, bpm) + for L in lanes: + sub = max(1, L['sub']); steps = max(1, L['steps']) + if L.get('poly'): + m = lanes[0]; mbar = beat * (m['steps'] // max(1, m['sub'])) + d = mbar // steps; L['durs'] = tuple(d for _ in range(steps)) + elif L.get('swing') and sub % 2 == 0: + pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3 + L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps)) + else: + d = beat // sub; L['durs'] = tuple(d for _ in range(steps)) + self._next_pending = {'lanes': lanes, 'bpm': bpm, 'bars': bars, 'ramp': ramp, + 'trainer': trainer, 'name': name, 'idx': nxt} + def _do_advance(self): + n = self._next_pending + if n is None: return + self._next_pending = None + self.lanes = n['lanes']; self.bpm = n['bpm']; self.bars = n['bars'] + self.ramp = n['ramp']; self.trainer = n['trainer']; self.name = n['name']; self.idx = n['idx'] + self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() + self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False; self._m_steps = 0 + self._overlay = None + while len(self.g_overlay): self.g_overlay.pop() + seam = self._seam_t + for L in self.lanes: L['next'] = seam; L['step'] = -1 + self._need_redraw = True + self._heavy_redraw_at = time.monotonic() + 0.6 + self._seg_start = time.monotonic() + self._set_run_dot() + + # ---------- inputs (6 buttons - active low) ---------- + def _modal_select(self): + """A inside a modal: invoke the focused row's action (cycle adjuster) or close on About/Help.""" + if self._overlay in ('help', 'about', 'msg', 'log'): + self._close_overlay(); return + if not self._modal_rows: return + i = self._modal_cursor + if i < 0 or i >= len(self._modal_rows): return + _label, _v, fn = self._modal_rows[i] + if fn is None: return + # adjusters take one argument (+1); plain actions take none + try: fn(1) + except TypeError: fn() + def _modal_back(self): + """B inside a modal: step out to the parent (Settings -> menu, Help -> menu, etc.).""" + if self._overlay in ('settings', 'help', 'about', 'log'): + self._show_menu(); return + self._close_overlay() + def _modal_up(self): + if self._overlay == 'help': + if self._help_page > 0: self._help_page -= 1; self._draw_help() + return + if self._overlay == 'log': + self._log_scroll = max(0, self._log_scroll - 1); self._draw_log_modal(); return + n = len(self._modal_rows) + if n: self._modal_cursor = (self._modal_cursor - 1) % n; self._redraw_modal() + def _modal_down(self): + if self._overlay == 'help': + if self._help_page < len(HELP_PAGES) - 1: self._help_page += 1; self._draw_help() + return + if self._overlay == 'log': + rows = [e for e in self.log if e.get("name") == self.name] + if self._log_scroll + LOG_MENU_ROWS < len(rows): + self._log_scroll += 1; self._draw_log_modal() + return + n = len(self._modal_rows) + if n: self._modal_cursor = (self._modal_cursor + 1) % n; self._redraw_modal() + def _modal_decrement(self): + """Y inside a modal: -1 on the focused row's adjuster (if it is one).""" + if self._overlay in ('help', 'about', 'msg', 'log'): return + if not self._modal_rows: return + i = self._modal_cursor + if i < 0 or i >= len(self._modal_rows): return + _label, _v, fn = self._modal_rows[i] + if fn is None: return + try: fn(-1) + except TypeError: pass # plain action -> Y does nothing + def _redraw_modal(self): + if self._overlay == 'menu': self._draw_menu() + elif self._overlay == 'settings': self._draw_settings() + elif self._overlay == 'help': self._draw_help() + elif self._overlay == 'about': self._draw_about() + elif self._overlay == 'log': self._draw_log_modal() + + def _on_btn_X(self): + if self._overlay: self._modal_up(); return + self.goto(self.idx - 1) + def _on_btn_Z(self): + if self._overlay: self._modal_down(); return + self.goto(self.idx + 1) + def _on_btn_Y(self): + if self._overlay: self._modal_decrement(); return + # tempo down: 1 normally, 5 after long hold + step = -5 if (time.monotonic_ns() - self._held_t['Y']) > 1_500_000_000 else -1 + self.set_bpm(self.bpm + step) + def _on_chord_XZ(self): # X+Z chord -> tempo up (mirrors Y for tempo down) + step = 5 if (time.monotonic_ns() - self._chord_xz) > 1_500_000_000 else 1 + self.set_bpm(self.bpm + step) + + def poll(self): + now_ns = time.monotonic_ns() + # Sample all six buttons (active-low; True = released) + a = self.btnA.value; b = self.btnB.value; c = self.btnC.value + x = self.btnX.value; y = self.btnY.value; z = self.btnZ.value + # ---- A: play/stop ---- + if (not a) and self._prev['A']: + if self._overlay: self._modal_select() + else: self.toggle() + # ---- B: tap tempo / modal back ---- + if (not b) and self._prev['B']: + if self._overlay: self._modal_back() + else: self.tap() + # ---- C: menu open/close ---- + if (not c) and self._prev['C']: + if self._overlay: self._close_overlay() + else: self._show_menu() + # ---- X/Z chord detection (tempo up) ---- + x_pressed_now = (not x) and self._prev['X'] + z_pressed_now = (not z) and self._prev['Z'] + chord_window = 100_000_000 # 100ms + if x_pressed_now and (not z) and not self._prev['Z'] and (now_ns - self._held_t['Z']) < chord_window: + self._chord_xz = now_ns; self._on_chord_XZ() + elif z_pressed_now and (not x) and not self._prev['X'] and (now_ns - self._held_t['X']) < chord_window: + self._chord_xz = now_ns; self._on_chord_XZ() + else: + # ---- single-press X / Z ---- + if x_pressed_now: + self._held_t['X'] = now_ns; self._next_rep['X'] = now_ns + 350_000_000 + if not self._chord_xz: self._on_btn_X() + if z_pressed_now: + self._held_t['Z'] = now_ns; self._next_rep['Z'] = now_ns + 350_000_000 + if not self._chord_xz: self._on_btn_Z() + if x and z: self._chord_xz = 0 # both released -> chord state clears + # ---- Y: tempo down (or modal decrement) ---- + if (not y) and self._prev['Y']: + self._held_t['Y'] = now_ns; self._next_rep['Y'] = now_ns + 350_000_000 + self._on_btn_Y() + # ---- hold-repeat for X / Y / Z ---- + if (not x) and not self._prev['X'] and now_ns >= self._next_rep['X']: + self._next_rep['X'] = now_ns + 120_000_000 + if self._chord_xz: self._on_chord_XZ() + else: self._on_btn_X() + if (not z) and not self._prev['Z'] and now_ns >= self._next_rep['Z']: + self._next_rep['Z'] = now_ns + 120_000_000 + if self._chord_xz: self._on_chord_XZ() + else: self._on_btn_Z() + if (not y) and not self._prev['Y'] and now_ns >= self._next_rep['Y']: + self._next_rep['Y'] = now_ns + 120_000_000; self._on_btn_Y() + # Commit previous-state + self._prev['A'] = a; self._prev['B'] = b; self._prev['C'] = c + self._prev['X'] = x; self._prev['Y'] = y; self._prev['Z'] = z + + # USB-MIDI in: any byte = a host is listening (heartbeat); also assemble SysEx + if self.midi_in is not None: + try: n = self.midi_in.readinto(self._mbuf) + except Exception: n = 0 + if n: + self.last_midi_in = time.monotonic() + self._feed_midi(self._mbuf, n) + host = bool(self.last_midi_in) and (time.monotonic() - self.last_midi_in) < 1.0 + if host != self.midi_host: + self.midi_host = host + if host and SPEAKER_AUTO_MUTE: + self.spk.duty_cycle = 0; self.amp_en.value = False + self._set_run_dot(); self.draw_icons() + uc = bool(getattr(supervisor.runtime, "usb_connected", True)) + if uc != self.usb_conn: + self.usb_conn = uc; self.draw_icons() + + # ---------- drawing ---------- + def draw_bpm(self): + if self.bpm == self._displayed_bpm: return + self._displayed_bpm = self.bpm + self._place(self.g_bpm, str(self.bpm), 0, 56, C_TXT, C_BG, FONT_L, right_edge=WIDTH-10) + def draw_status(self): + sl = self.setlists[self.sl] + self._place(self.g_idx, "%s %d/%d" % (sl['title'][:14], self.idx + 1, len(sl['items'])), + 10, 32, C_MUTE if sl['builtin'] else C_CYAN, C_BG, FONT_S) + self._place(self.g_cont, "CONT", 0, 32, C_GREEN if self.continue_on else C_DIM, C_BG, FONT_S, right_edge=WIDTH-10) + self._place(self.g_name, self.name[:22], 10, 48, C_TXT, C_BG, FONT_M) + def draw_train(self): + g = self.g_train + while len(g): g.pop() + x = 10; y = 84 + if self.ramp: + up = self.ramp['amt'] >= 0 + pts = [(0, 9), (12, 9), (12, 0)] if up else [(0, 0), (0, 9), (12, 9)] + g.append(vectorio.Polygon(pixel_shader=solid(C_AMBER), points=pts, x=x, y=y)); x += 16 + a = self.ramp['amt']; lbl = ("+%d" % a if a >= 0 else "%d" % a) + "/%db" % self.ramp['every'] + tg, w, h = make_text(lbl, FONT_S, C_AMBER, C_BG); tg.x = x; tg.y = y; g.append(tg); x += w + 14 + if self.trainer: + g.append(rect(x, y, 4, 9, C_CYAN)); g.append(rect(x + 6, y, 4, 9, C_DIM)) + x += 14 + tg, w, h = make_text("%d/%db" % (self.trainer['play'], self.trainer['mute']), FONT_S, C_CYAN, C_BG) + tg.x = x; tg.y = y; g.append(tg) + self.dirty = True + def draw_icons(self): + if self.ic_midi_pal is not None: + _recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG) + if self.ic_usb_pal is not None: + _recolor(self.ic_usb_pal, C_CYAN if self.usb_conn else C_DIM, C_BG) + self.dirty = True + def _fmt_t(self, s): + s = int(s) + return "%d:%02d:%02d" % (s // 3600, (s % 3600) // 60, s % 60) if s >= 3600 else "%d:%02d" % (s // 60, s % 60) + def draw_meters(self): + run = self.running and self.play_start is not None + mlen = self.lanes[0]['steps'] if self.lanes else 1 + bpb = (self.lanes[0]['steps'] // max(1, self.lanes[0]['sub'])) if self.lanes else 4 + el = (time.monotonic() - self._seg_start) if run else 0 + mbars = max(0, self._m_steps - 1) // max(1, mlen) + cur = ("%d" % ((mbars % self.bars + 1) if self.bars else (mbars + 1))) if run else "-" + if self.bars: + ts = "%s of %s" % (self._fmt_t(el), self._fmt_t(self.bars * bpb * 60.0 / self.bpm)) + bs = "bar %s of %d" % (cur, self.bars) + else: + ts = self._fmt_t(el); bs = "bar %s" % cur + if ts != self._lastTs: + self._place(self.g_time, ts, 10, 66, C_TXT, C_BG, FONT_S); self._lastTs = ts + if bs != self._lastBs: + self._place(self.g_bar, bs, 10, 80, C_MUTE, C_BG, FONT_S); self._lastBs = bs + + # ---------- pad grid (chunked rebuild; per-pad chunks so audio interleaves) ---------- + def _padbase(self, L, s): + return 0 if L['mute'] else L['levels'][s] + def build_grid(self): + self._grid_rebuild_start() + while self._grid_li is not None: self._grid_rebuild_step() + def _grid_rebuild_start(self): + while len(self.g_grid): self.g_grid.pop() + self.lane_pads = []; self.lane_lit = [] + gc.collect() + n = min(len(self.lanes), MAXLANES) + top = GRID_TOP; rowh = min(22, ((HEIGHT - 6) - top) // max(1, n)) + px0 = 60; usable = WIDTH - 8 - px0 - 8; gridh = n * rowh + self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} + m = self.lanes[0] if self.lanes else None + if m is not None: + mbeats = max(1, m['steps'] // max(1, m['sub'])) + for bcol in range(mbeats): + self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID)) + self._grid_n = n + self._grid_geo = (top, rowh, px0, usable) + self._grid_li = 0 if n > 0 else None + self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] + self.dirty = True + def _grid_rebuild_step(self): + li = self._grid_li + if li is None: return + if li >= self._grid_n or li >= len(self.lanes): + self._grid_li = None; return + L = self.lanes[li] + top, rowh, px0, usable = self._grid_geo + y = top + li * rowh; cy = y + rowh // 2 + st = self._grid_lane_st + if st is None: + tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG) + tg.x = 6; tg.y = cy - h // 2; self.g_grid.append(tg) + steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps) + side = max(4, min(12, stepw - 1, rowh - 6)) + rad = max(2, min(side // 2, stepw // 2 - 1)) + self._grid_lane_st = (cy, steps, sub, stepw, side, rad) + self._grid_pi = 0; self._grid_pads = []; self.dirty = True + return + cy_, steps, sub, stepw, side, rad = st + s = self._grid_pi + if s >= steps: + self.lane_pads.append(self._grid_pads); self.lane_lit.append(-1) + self._grid_pads = []; self._grid_lane_st = None; self._grid_li = li + 1 + return + cxp = px0 + 6 + (s * usable) // steps + pal = self.pad_pal + if s % sub == 0: + p = vectorio.Rectangle(pixel_shader=pal, width=side, height=side, x=cxp - side // 2, y=cy_ - side // 2) + else: + p = vectorio.Circle(pixel_shader=pal, radius=rad, x=cxp, y=cy_) + p.color_index = self._padbase(L, s); self.g_grid.append(p); self._grid_pads.append(p) + self._grid_pi = s + 1 + self.dirty = True + def _move_playhead(self, li, step): + pads = self.lane_pads[li]; prev = self.lane_lit[li] + if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev) + if step < len(pads): pads[step].color_index = self._padbase(self.lanes[li], step) + 4 + self.lane_lit[li] = step; self.dirty = True + def reset_playheads(self): + for li, pads in enumerate(self.lane_pads): + prev = self.lane_lit[li] + if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev) + self.lane_lit[li] = -1 + self.dirty = True + + # ---------- practice log (saved to /history.json) ---------- + def _probe_write(self): + try: + with open("/.wtest", "w") as f: f.write("1") + try: os.remove("/.wtest") + except Exception: pass + return True + except OSError: + return False + def _load_log(self): + try: + with open("/history.json") as f: return json.load(f).get("log", []) + except Exception: + return [] + def _save_log(self): + if not self.can_write: return + try: + with open("/history.json", "w") as f: json.dump({"log": self.log[:200]}, f) + except OSError: + self.can_write = False + def _start_play(self): + self.play_start = time.monotonic(); self.play_bpm = self.bpm; self.play_name = self.name + def _log_play(self): + if self.play_start is None: return + dur = int(time.monotonic() - self.play_start); self.play_start = None + if dur < MIN_LOG_SEC: return + mlen = self.lanes[0]['steps'] if self.lanes else 1 + t = time.localtime() + self.log.insert(0, {"t": "%02d:%02d" % (t.tm_hour, t.tm_min), "bpm": self.play_bpm, + "dur": dur, "bars": self._m_steps // max(1, mlen), "name": self.play_name}) + del self.log[200:] + self._save_log() + + # ---------- USB-MIDI in: SysEx assembler (clock + editor-pushed programs + live-sync) ---------- + def _feed_midi(self, buf, n): + now_ns = time.monotonic_ns() if MIDI_CLOCK_IN else 0 + for i in range(n): + b = buf[i] + if b == 0xF0: self._sx = bytearray(); self._sxon = True + elif b == 0xF7: + if self._sxon: self._handle_sysex(self._sx) + self._sxon = False + elif b == 0xF8 and MIDI_CLOCK_IN: self._slave_tick(now_ns) + elif b == 0xFA and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() + elif b == 0xFB and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() + elif b == 0xFC and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_stop() + elif b >= 0xF8: pass + elif self._sxon: + if len(self._sx) < 60000: self._sx.append(b) + else: self._sxon = False + def _slave_tick(self, now_ns): + if self._clock_in_last_t == 0: + self._clock_in_last_t = now_ns; self._slaved = True; return + interval = now_ns - self._clock_in_last_t + self._clock_in_last_t = now_ns + if interval < 8_300_000 or interval > 500_000_000: return + if self._clock_in_avg == 0: self._clock_in_avg = interval + else: self._clock_in_avg = (self._clock_in_avg * 7 + interval) // 8 + new_bpm = max(5, min(300, int(60_000_000_000 // (self._clock_in_avg * 24)))) + if new_bpm != self.bpm: + self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm; self._rebuild_dur_all() + self._slaved = True + def _slave_start(self): + if not self.running: + self.running = True; self._reset_clock(); self._start_play() + self._set_run_dot(); self.draw_meters() + self._clock_in_last_t = 0; self._clock_in_avg = 0 + def _slave_stop(self): + if self.running: + self.running = False + self.spk.duty_cycle = 0; self.amp_en.value = False + self.reset_playheads(); self._log_play() + self._set_run_dot(); self.draw_meters() + self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False + def _handle_sysex(self, sx): + if len(sx) < 2 or sx[0] != 0x7D: return + cmd = sx[1] + if cmd == 0x01 and len(sx) >= 8 and rtc is not None: + try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1)) + except Exception: pass + elif cmd == 0x02: # version query -> reply 0x03 + "X;" + if self.midi: + payload = DEVICE_ID + ";" + APP_VERSION + self.midi.write(bytes([0xF0, 0x7D, 0x03]) + payload.encode() + bytes([0xF7])) + elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43: + try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:]) + except Exception: return + origin = text.split(";", 1)[0] if text else "" + if origin == self._sync_origin: return + self._sync_armed = True + if cmd == 0x40: + self._sync_broadcast_full() + elif cmd == 0x43: + self._sync_armed = False + elif cmd == 0x41: + parts = text.split(";", 5) + if len(parts) >= 6: + try: + running = parts[2] == "1"; patch = parts[5] + self._sync_apply_full(running, patch) + except Exception: pass + elif cmd == 0x42: + parts = text.split(";", 2) + if len(parts) >= 3: self._sync_apply_delta(parts[2]) + elif cmd == 0x10: + try: + with open("/programs.json", "wb") as f: f.write(bytes(sx[2:])) + self.rebuild_setlists(); self.load(0) + self._ack(True) + except Exception: + self._ack(False) + elif cmd == 0x21: + try: + try: self._fw.close() + except Exception: pass + self._fw = open("/app.new", "wb"); self._fw_n = 0; self._ack(True) + except Exception: + self._fw = None; self._ack(False) + elif cmd == 0x22: + try: + if self._fw is None or a2b_base64 is None: raise OSError() + self._fw.write(a2b_base64(bytes(sx[2:]))) + self._fw.flush() + self._fw_n += 1 + if self._fw_n % 50 == 0: gc.collect() + self._ack(True) + except Exception: + try: self._fw.close() + except Exception: pass + self._fw = None; self._ack(False) + elif cmd == 0x23: + try: + try: self._fw.close() + except Exception: pass + self._fw = None; gc.collect() + with open("/app.new", "rb") as f: head = f.read(2) + if os.stat("/app.new")[6] < 4000 or len(head) < 2 or head[0] != 0x43 or head[1] != 0x06: + try: os.remove("/app.new") + except OSError: pass + self._ack(False); return + try: os.remove("/app.bak") + except OSError: pass + os.rename("/app.mpy", "/app.bak") + os.rename("/app.new", "/app.mpy") + open("/trial", "w").close() + self._ack(True); time.sleep(0.4); supervisor.reload() + except Exception: + self._ack(False) + def _ack(self, ok): + if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F if ok else 0x7E, 0xF7])) + + def run(self): + boot = time.monotonic() + try: os.stat("/trial"); committed = False + except OSError: committed = True + while True: + try: + self.tick(); self.poll() + if self._need_redraw: + self._need_redraw = False + self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() + if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at: + self._heavy_redraw_at = 0 + self._grid_rebuild_start() + if self._grid_li is not None: + self._grid_rebuild_step() + tnow = time.monotonic() + if tnow >= self._uiNext: + self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() + if self._sync_armed and tnow >= self._sync_heartbeat_next: + self._sync_broadcast_full() + if not committed and tnow - boot > 5: + try: os.remove("/trial") + except Exception: pass + committed = True + if self.dirty and tnow >= self._refreshNext: + safe = True + if self.running and self.lanes: + nb = self.lanes[0]['next'] + safe = (nb - time.monotonic_ns()) > 10_000_000 or (tnow - self._lastRefresh) > 0.2 + if safe: + if self.display.refresh(): self.dirty = False + self._lastRefresh = tnow; self._refreshNext = tnow + 0.05 + else: + self._refreshNext = tnow + 0.003 + time.sleep(0.0005) + except MemoryError: + try: print("MemoryError: gc + continue") + except Exception: pass + gc.collect(); time.sleep(0.05) + except Exception as e: + try: print("tick error:", e) + except Exception: pass + time.sleep(0.05) + +App().run() diff --git a/pico-explorer/boot.py b/pico-explorer/boot.py new file mode 100644 index 0000000..d0ef5e3 --- /dev/null +++ b/pico-explorer/boot.py @@ -0,0 +1,22 @@ +# boot.py - runs once at power-on (before USB connects); decides who owns the filesystem. +# +# DEFAULT = appliance mode: the FIRMWARE owns the drive, so it can save your practice log to +# /history.json and write /programs.json that the editor pushes over USB-MIDI. The drive is then +# READ-ONLY to the computer - which also protects the firmware from accidental deletion. +# +# HOLD BUTTON A (GP16 on the Pimoroni Explorer) WHILE PLUGGING IN = editor mode: the drive is +# writable by the computer, so you can drag programs.json / code.py on from any OS or browser +# (the universal fallback). Reset afterwards to return to appliance mode. +# +# Also frees a USB endpoint (disables unused HID) and makes sure USB-MIDI is available. +import board, digitalio, storage, usb_hid, usb_midi +try: usb_hid.disable() +except Exception: pass +usb_midi.enable() +a = digitalio.DigitalInOut(board.GP16) +a.switch_to_input(pull=digitalio.Pull.UP) +appliance = a.value # value True (pull-up, not pressed) -> appliance mode +a.deinit() +if appliance: + try: storage.remount("/", readonly=False) # writable by code, read-only to the computer + except Exception: pass diff --git a/pico-explorer/code.py b/pico-explorer/code.py new file mode 100644 index 0000000..ff92cad --- /dev/null +++ b/pico-explorer/code.py @@ -0,0 +1,24 @@ +# code.py - PM_X-1 A/B firmware loader (stable; rarely changes). +# +# The real application is the PRECOMPILED app.mpy (CircuitPython compiles a big .py at boot, which +# fragments the heap and OOMs; a .mpy loads without compiling). app.bak holds the previous known-good +# build. The web editor pushes a new app.mpy to a "trial" slot over USB-MIDI; this loader runs it, and +# if it fails to boot it AUTOMATICALLY ROLLS BACK to app.bak. (Unbrickable: BOOTSEL -> drag a .uf2.) +# app.mpy clears the /trial marker once it has run healthily for ~5s. +import supervisor, os +supervisor.runtime.autoreload = False # updates reboot explicitly; never auto-restart on our own writes + +def _trial(): + try: os.stat("/trial"); return True + except OSError: return False + +try: + import app # runs the application (app.mpy; ends with App().run()) +except Exception: + if _trial(): # a freshly-pushed build crashed on startup -> roll back + try: + os.remove("/app.mpy"); os.rename("/app.bak", "/app.mpy"); os.remove("/trial") + except Exception: pass + supervisor.reload() # reboot into the restored known-good build + else: + raise # the active build failed unexpectedly (rare) -> on-screen traceback diff --git a/pico-explorer/programs.json b/pico-explorer/programs.json new file mode 100644 index 0000000..2a81802 --- /dev/null +++ b/pico-explorer/programs.json @@ -0,0 +1,3 @@ +{ + "setlists": [] +}