PM_K-1: chunked firmware transfer (reliable), LED run/stop indicator, revert bg tint

APP_VERSION -> 0.0.6. Device firmware + editor change in lockstep — one-time manual
copy of 0.0.6 needed (the broken single-shot updater can't deliver it).

Update transport (fixes the failed/bricking updates):
- Editor now pushes app.py in 512-byte flow-controlled chunks: begin(0x21,len) ->
  data(0x22)* -> commit(0x23), waiting for each ACK before the next. A single ~38KB
  SysEx overran the device's USB-MIDI input buffer and arrived corrupt.
- Device writes chunks straight to /app.new, and on commit verifies length + no NUL +
  App().run()/APP_VERSION present before the A/B install; rejects (NAK) otherwise and
  keeps the working build. All errors caught -> never bricks.

Run/stop indicator moved off the screen onto the RGB LED (per feedback that recoloring
the whole background is wrong — it forces a full-screen SPI repaint and fringes the
anti-aliased text):
- Dim GREEN when stopped ("on"), dim RED while playing; the beat pulse flashes brighter
  and now decays back to the running base instead of to black. Background is static black.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Me Here 2026-05-29 11:33:43 -05:00
parent 711a02fcc1
commit 5e71df6b17
3 changed files with 85 additions and 44 deletions

View file

@ -1197,15 +1197,31 @@ async function updateFirmware() { // A/B firmware update over USB-MIDI, with a
if (!confirm("Device firmware: " + (dev || "unknown") + "\nNew build: " + latest + if (!confirm("Device firmware: " + (dev || "unknown") + "\nNew build: " + latest +
(upToDate ? "\n\nSame version. Re-install anyway?" (upToDate ? "\n\nSame version. Re-install anyway?"
: "\n\nUpdate now? The device reboots, runs the new build, and auto-rolls-back if it fails to start."))) return; : "\n\nUpdate now? The device reboots, runs the new build, and auto-rolls-back if it fails to start."))) return;
const bytes = [0xF0, 0x7D, 0x20]; const err = await _pushFirmware(src);
for (let i = 0; i < src.length; i++) bytes.push(src.charCodeAt(i) & 0x7F); // app.py is ASCII if (err) return alert("Update didn't complete (" + err + ").\n\nThe device kept its working firmware — nothing was installed. " +
bytes.push(0xF7); "Make sure it's plugged in and NOT in editor mode (don't hold A), on firmware 0.0.6+, then retry. " +
const p = new Promise((res) => { _saveCb = res; setTimeout(() => { if (_saveCb) { _saveCb = null; res(null); } }, 5000); }); "(If the device is older than 0.0.6, drag app.py onto the drive once in editor mode to get the chunked updater.)");
_send(bytes); alert("Update sent ✓ — the device verified v" + latest + " and is rebooting into it. It auto-confirms after a few seconds, or rolls back if it won't start.");
const ok = await p; }
alert(ok === true ? "Update sent ✓ — the device is rebooting into the new build (v" + latest + "). It auto-confirms after a few seconds, or rolls back if it won't start." // One ACK/NAK await (the device replies 0x7F=ok / 0x7E=rejected after each SysEx); resolves true/false/null(timeout).
: ok === false ? "The device is in editor mode (read-only to the updater). Reboot it normally (don't hold A) and try again." function _ack(timeout) {
: "No acknowledgement from the device. Make sure it's connected and not in editor mode — or drag app.py onto the drive in editor mode."); return new Promise((res) => { _saveCb = res; setTimeout(() => { if (_saveCb) { _saveCb = null; res(null); } }, timeout); });
}
// Push app.py in small, flow-controlled chunks: a giant single SysEx overruns the device's MIDI input
// buffer and arrives corrupt. begin(0x21,len) -> data(0x22)* -> commit(0x23); wait for each ACK.
async function _pushFirmware(src) {
const data = []; for (let i = 0; i < src.length; i++) data.push(src.charCodeAt(i) & 0x7F); // 7-bit ASCII
const n = data.length;
_send([0xF0, 0x7D, 0x21, n & 0x7F, (n >> 7) & 0x7F, (n >> 14) & 0x7F, (n >> 21) & 0x7F, 0xF7]);
if (await _ack(3000) !== true) return "handshake";
const CH = 512;
for (let o = 0; o < n; o += CH) {
_send([0xF0, 0x7D, 0x22].concat(data.slice(o, o + CH)).concat([0xF7]));
const a = await _ack(4000);
if (a !== true) return "transfer at " + o + "/" + n + (a === false ? " (rejected)" : " (timeout)");
}
_send([0xF0, 0x7D, 0x23, 0xF7]); // commit: device verifies the whole file, then reboots
return (await _ack(6000)) === true ? null : "verify";
} }
// Where the new app.py comes from: the site when online (the https editor, same-origin), else let the user // Where the new app.py comes from: the site when online (the https editor, same-origin), else let the user
// pick the file — so the OFFLINE editor that ships ON the device can update too (file:// pages can't fetch). // pick the file — so the OFFLINE editor that ships ON the device can update too (file:// pages can't fetch).

View file

@ -18,7 +18,7 @@
import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
APP_VERSION = "0.0.5" # firmware version (the A/B updater pushes/compares this) APP_VERSION = "0.0.6" # firmware version (the A/B updater pushes/compares this)
try: try:
import rtc # set from the editor's clock SysEx so the log has real timestamps import rtc # set from the editor's clock SysEx so the log has real timestamps
except ImportError: except ImportError:
@ -75,7 +75,9 @@ DEFAULT_PROGRAMS = [
C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494 C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494
C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240 C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240
C_BTN = 0x1C222C C_BTN = 0x1C222C
LEVEL_RGB = {2: (255, 110, 0), 1: (0, 150, 255), 3: (130, 70, 255)} LEVEL_RGB = {2: (255, 110, 0), 1: (0, 150, 255), 3: (130, 70, 255)} # beat pulse: accent / normal / ghost
LED_IDLE = (0, 80, 0) # RGB LED resting colour when stopped: dim green ("on")
LED_RUN = (110, 0, 0) # RGB LED resting colour when playing: dim red (beats pulse brighter on top)
# voice -> General-MIDI note (USB-MIDI bridge), and level -> MIDI velocity # voice -> General-MIDI note (USB-MIDI bridge), and level -> MIDI velocity
SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38, SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38,
"clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42, "clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42,
@ -91,7 +93,6 @@ MIN_LOG_SEC = 5 # don't log plays shorter t
PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost
PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost
C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes) C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes)
C_RUNBG = 0x161D28 # background tint while running (vs near-black when stopped)
# WS2812 RGB LED - self-contained via the core neopixel_write module (no external library) # WS2812 RGB LED - self-contained via the core neopixel_write module (no external library)
class RGB: class RGB:
@ -331,6 +332,7 @@ class App:
self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None
self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0 self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0
self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler (clock + pushed programs) self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler (clock + pushed programs)
self._fw = None; self._fw_len = 0 # chunked firmware transfer: staging file + expected size
self.led = RGB(P_RGB) self.led = RGB(P_RGB)
self.buz = pwmio.PWMOut(P_BUZ, frequency=1600, variable_frequency=True, duty_cycle=0) self.buz = pwmio.PWMOut(P_BUZ, frequency=1600, variable_frequency=True, duty_cycle=0)
self.buz_off = 0 self.buz_off = 0
@ -347,7 +349,7 @@ class App:
self.lane_pads = []; self.lane_lit = [] self.lane_pads = []; self.lane_lit = []
self.usb_conn = False; self._m_steps = 0 # USB-connected state; master-lane steps (for the bar counter) self.usb_conn = False; self._m_steps = 0 # USB-connected state; master-lane steps (for the bar counter)
self._uiNext = 0.0; self._lastTs = None; self._lastBs = None # throttle the stopwatch/bar redraw self._uiNext = 0.0; self._lastTs = None; self._lastBs = None # throttle the stopwatch/bar redraw
self.ic_midi_pal = None; self.ic_usb_pal = None; self.bg_pal = None self.ic_midi_pal = None; self.ic_usb_pal = None
# practice history - persisted to /history.json (next to programs.json) when we own the filesystem # practice history - persisted to /history.json (next to programs.json) when we own the filesystem
self.can_write = self._probe_write() self.can_write = self._probe_write()
self.log = self._load_log() self.log = self._load_log()
@ -355,7 +357,7 @@ class App:
self._armed = None; self.log_rows = [] self._armed = None; self.log_rows = []
self._build_scene() self._build_scene()
self.load(0) # load() also draws the (track-filtered) practice log self.load(0) # load() also draws the (track-filtered) practice log
self.draw_icons(); self.draw_meters() self.draw_icons(); self.draw_meters(); self.led_rest() # LED green = on
def _btn(self, pin): def _btn(self, pin):
d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP
@ -364,8 +366,7 @@ class App:
# ---------- scene graph ---------- # ---------- scene graph ----------
def _build_scene(self): def _build_scene(self):
root = displayio.Group(); self.display.root_group = root root = displayio.Group(); self.display.root_group = root
self.bg_pal = solid(C_BG) # recolored on play/stop (black <-> running gray) root.append(rect(0, 0, WIDTH, HEIGHT, C_BG)) # static background (run state shows on the LED)
root.append(vectorio.Rectangle(pixel_shader=self.bg_pal, width=WIDTH, height=HEIGHT, x=0, y=0))
# header: VARASYS logo (left, no tagline) + version (small, top, right of the logo) + MIDI/USB icons (right) # header: VARASYS logo (left, no tagline) + version (small, top, right of the logo) + MIDI/USB icons (right)
if LOGO: if LOGO:
tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 10; tg.y = 9; root.append(tg) tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 10; tg.y = 9; root.append(tg)
@ -430,12 +431,14 @@ class App:
self.buz.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600) self.buz.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600)
self.buz.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000) self.buz.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000)
self.buz_off = time.monotonic_ns() + 22_000_000 self.buz_off = time.monotonic_ns() + 22_000_000
def _led_base(self):
return LED_RUN if self.running else LED_IDLE # dim red while playing / dim green when stopped
def flash(self, level): def flash(self, level):
self.rgb = LEVEL_RGB.get(level, (0, 150, 255)) self.rgb = LEVEL_RGB.get(level, (0, 150, 255)) # bright beat pulse, fades back to the base in tick()
self.led.set(*self.rgb)
def led_rest(self): # settle to the resting colour (green idle / red running)
self.rgb = self._led_base()
self.led.set(*self.rgb) self.led.set(*self.rgb)
def led_off(self):
self.rgb = (0, 0, 0)
self.led.set(0, 0, 0)
def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer
if self.midi is None: return if self.midi is None: return
try: self.midi.write(bytes([0x90, note, vel])) # Note On (percussive - no Note Off needed) try: self.midi.write(bytes([0x90, note, vel])) # Note On (percussive - no Note Off needed)
@ -445,8 +448,8 @@ class App:
def toggle(self): def toggle(self):
self.running = not self.running self.running = not self.running
if self.running: self._reset_clock(); self._start_play() if self.running: self._reset_clock(); self._start_play()
else: self.buz.duty_cycle = 0; self.led_off(); self.reset_playheads(); self._log_play() else: self.buz.duty_cycle = 0; self.reset_playheads(); self._log_play()
self.draw_runbg(); self.draw_meters() self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped
def set_bpm(self, v): def set_bpm(self, v):
v = max(30, min(300, v)) v = max(30, min(300, v))
if v != self.bpm: if v != self.bpm:
@ -457,7 +460,7 @@ class App:
if was: self.running = False; self._log_play() # close out the track that was playing if was: self.running = False; self._log_play() # close out the track that was playing
self.load(i) self.load(i)
if was: self.running = True; self._reset_clock(); self._start_play() if was: self.running = True; self._reset_clock(); self._start_play()
self.draw_runbg(); self.draw_meters() self.led_rest(); self.draw_meters()
def tap(self): def tap(self):
now = time.monotonic() now = time.monotonic()
if not hasattr(self, '_taps'): self._taps = [] if not hasattr(self, '_taps'): self._taps = []
@ -488,10 +491,13 @@ class App:
best = max(fired, key=lambda l: PRIO.get(l, 0)) best = max(fired, key=lambda l: PRIO.get(l, 0))
if not MUTE_BUZZER and not self.midi_host: self.click(best) # computer plays it instead if not MUTE_BUZZER and not self.midi_host: self.click(best) # computer plays it instead
self.flash(best) self.flash(best)
if self.rgb != (0, 0, 0): base = self._led_base() # decay the beat pulse back down to the red running base
r, g, b = self.rgb; r = r*7//10; g = g*7//10; b = b*7//10 if self.rgb != base:
self.rgb = (r, g, b) if (r+g+b) > 12 else (0, 0, 0) r = base[0] + (self.rgb[0]-base[0])*7//10
self.led.set(*self.rgb) g = base[1] + (self.rgb[1]-base[1])*7//10
b = base[2] + (self.rgb[2]-base[2])*7//10
if abs(r-base[0])+abs(g-base[1])+abs(b-base[2]) < 6: r, g, b = base
self.rgb = (r, g, b); self.led.set(r, g, b)
# ---------- inputs ---------- # ---------- inputs ----------
def poll(self): def poll(self):
@ -532,7 +538,7 @@ class App:
if host != self.midi_host: if host != self.midi_host:
self.midi_host = host self.midi_host = host
if host: self.buz.duty_cycle = 0 # silence the buzzer when the computer takes over if host: self.buz.duty_cycle = 0 # silence the buzzer when the computer takes over
self.led_off(); self.draw_icons() self.led_rest(); self.draw_icons()
uc = bool(getattr(supervisor.runtime, "usb_connected", True)) # connected to a computer? uc = bool(getattr(supervisor.runtime, "usb_connected", True)) # connected to a computer?
if uc != self.usb_conn: if uc != self.usb_conn:
self.usb_conn = uc; self.draw_icons() self.usb_conn = uc; self.draw_icons()
@ -544,9 +550,6 @@ class App:
self._place(self.g_name, self.name[:22], 12, 116, C_TXT, C_BG, FONT_M) self._place(self.g_name, self.name[:22], 12, 116, C_TXT, C_BG, FONT_M)
self._place(self.g_idx, "%d/%d" % (self.idx + 1, len(self.programs)), 0, 120, self._place(self.g_idx, "%d/%d" % (self.idx + 1, len(self.programs)), 0, 120,
C_DIM, C_BG, FONT_S, right_edge=WIDTH - 12) C_DIM, C_BG, FONT_S, right_edge=WIDTH - 12)
def draw_runbg(self): # run/stop indicator: tint the whole background
if self.bg_pal is not None: self.bg_pal[0] = C_RUNBG if self.running else C_BG
self.dirty = True
def draw_icons(self): # recolor the MIDI/USB icons by state (tear-free palette swap) def draw_icons(self): # recolor the MIDI/USB icons by state (tear-free palette swap)
if self.ic_midi_pal is not None: if self.ic_midi_pal is not None:
_recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG) _recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG)
@ -701,23 +704,45 @@ class App:
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F, 0xF7])) # ACK ok if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F, 0xF7])) # ACK ok
except OSError: except OSError:
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7E, 0xF7])) # NAK: read-only (editor mode) if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7E, 0xF7])) # NAK: read-only (editor mode)
elif cmd == 0x20: # A/B firmware update: install new app.py to the trial slot # A/B firmware update, sent as small flow-controlled chunks (a single huge SysEx overruns the
# USB-MIDI input buffer and arrives corrupt). begin(0x21,len) -> data(0x22)* -> commit(0x23).
elif cmd == 0x21: # BEGIN: open the staging file; sx[2:6] = expected length (7-bit)
self._fw_len = (sx[2] | (sx[3] << 7) | (sx[4] << 14) | (sx[5] << 21)) if len(sx) >= 6 else 0
try: try:
data = bytes(sx[2:]) try: self._fw.close()
# sanity-check the transfer before touching the working build: a corrupt/truncated push except Exception: pass
# (e.g. a dropped MIDI byte, or a 7-bit-mangled non-ASCII char -> NUL) must NOT be installed. self._fw = open("/app.new", "wb"); self._ack(True)
if (0 in data) or (b"App().run()" not in data) or (b"APP_VERSION" not in data): except Exception: # read-only (editor mode) / no space
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7E, 0xF7])) # NAK: rejected, kept current build self._fw = None; self._ack(False)
return elif cmd == 0x22: # DATA: append a chunk to the staging file
try:
if self._fw is None: raise OSError()
self._fw.write(bytes(sx[2:])); self._fw.flush(); self._ack(True)
except Exception:
try: self._fw.close()
except Exception: pass
self._fw = None; self._ack(False)
elif cmd == 0x23: # COMMIT: verify the whole file, then A/B install + reboot
try:
try: self._fw.close()
except Exception: pass
self._fw = None; gc.collect()
with open("/app.new", "rb") as f: data = f.read()
if (self._fw_len and len(data) != self._fw_len) or (0 in data) \
or (b"App().run()" not in data) or (b"APP_VERSION" not in data):
try: os.remove("/app.new") # corrupt/truncated -> reject, keep the working build
except OSError: pass
self._ack(False); return
try: os.remove("/app.bak") try: os.remove("/app.bak")
except OSError: pass except OSError: pass
os.rename("/app.py", "/app.bak") # keep the current build as the rollback os.rename("/app.py", "/app.bak") # current build becomes the rollback
with open("/app.py", "wb") as f: f.write(data) os.rename("/app.new", "/app.py")
open("/trial", "w").close() # arm the trial; the loader reverts if it won't boot open("/trial", "w").close() # arm the trial; the loader reverts if it won't boot
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F, 0xF7])) # ACK -> rebooting self._ack(True); time.sleep(0.4); supervisor.reload()
time.sleep(0.4); supervisor.reload() except Exception: # catch ALL (read-only, MemoryError, ...) -> never brick
except Exception: # catch ALL (OSError read-only, MemoryError, ...) -> never brick self._ack(False)
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7E, 0xF7])) # NAK def _ack(self, ok):
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F if ok else 0x7E, 0xF7]))
def run(self): def run(self):
if self.touch.addr is None: if self.touch.addr is None: