PM_K-1 0.0.21: live-sync protocol (HELLO/FULL/DELTA/BYE) - device side

Implements the device half of the docs/livesync-protocol.md contract that the
other (editor) Claude wrote in src/livesync.js. SysEx opcodes 0x40..0x43 on the
existing 0x7D manufacturer id; ASCII payload reusing the share-grammar tokens.

State + helpers (App.__init__):
- self._sync_origin: short random id ("d" + 8-hex from os.urandom(4)) - lets
  peers drop their own echoes (composite USB MIDI may loop a frame back).
- self._sync_seq: monotonic counter.
- self._sync_armed: True after any HELLO/FULL/DELTA seen (until BYE).
- self._sync_applying: echo guard - True while applying a remote change so
  the broadcast hooks early-out and we never ping-pong.
- self._sync_heartbeat_next: deadline for the periodic 5s FULL.

Transport:
- _sync_send(op, text): builds the SysEx frame, ASCII-safe (replaces high bits
  with '?'), writes via self.midi.write.
- _sync_broadcast(evt): one DELTA, guarded.
- _sync_broadcast_full(): one FULL with origin;seq;running;sl;item;patch; also
  resets the heartbeat timer.

Receive (extends _handle_sysex):
- 0x40 HELLO -> _sync_broadcast_full (reply with our state).
- 0x41 FULL  -> _sync_apply_full(running, patch): diff against current
  _prog_str(); only rebuild if different (avoids flicker on a heartbeat that
  matches local state); reconcile transport either way.
- 0x42 DELTA -> _sync_apply_delta(evt) for the seven verbs:
  - play / stop -> toggle if state differs.
  - bpm=<n>      -> set_bpm.
  - sel=<sl>/<item> -> switch + goto; -1/-1 sentinel ignored.
  - beat=<l>/<s>/<lvl> -> set lanes[l]['levels'][s], recolor pad, mark dirty.
  - lane=<l>/<field>/<value> -> sound/groups/sub/swing/gain/poly/enabled; if
    structural, _regen_levels + _rebuild_dur (or _rebuild_dur_all when lane 0
    changed, so poly lanes follow) + build_grid.
- 0x43 BYE -> _sync_armed = False.

Local mutators broadcast:
- toggle()         -> "play" | "stop"
- set_bpm()        -> "bpm=<n>"
- goto() / switch_setlist() -> "sel=<sl>/<item>"
- _cycle_beat()    -> "beat=<l>/<s>/<lvl>"
- _lane_dirty()    -> FULL (coalesced; matches editor's syncPatchSoon idiom)

Heartbeat in the run loop emits a FULL every 5s while armed.

Plus docs/livesync-protocol.md (the spec the other Claude wrote in parallel)
gets committed too. The patch field round-trips through parse_program <->
_prog_str on the device side and setupToPatch <-> patchToSetup on the editor
side, all already share-grammar compatible.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Me Here 2026-05-30 09:59:19 -05:00
parent eae9057baf
commit 47fa6d7ce7

View file

@ -18,7 +18,7 @@
import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
APP_VERSION = "0.0.20" # firmware version (the A/B updater pushes/compares this) APP_VERSION = "0.0.21" # firmware version (the A/B updater pushes/compares this)
try: try:
import rtc # set from the editor's clock SysEx so the log has real timestamps import rtc # set from the editor's clock SysEx so the log has real timestamps
except ImportError: except ImportError:
@ -461,6 +461,12 @@ class App:
self._clock_byte = bytes([0xF8]) # singleton MIDI Clock tick (24 PPQN) self._clock_byte = bytes([0xF8]) # singleton MIDI Clock tick (24 PPQN)
self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC]) self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC])
self._lastRefresh = 0.0 # for the "force refresh after Xms even if a beat is imminent" guard self._lastRefresh = 0.0 # for the "force refresh after Xms even if a beat is imminent" guard
try: # live sync: short random id so peers can drop their own echoes
o = os.urandom(4); self._sync_origin = "d" + "".join("%02x" % b for b in o)
except Exception:
self._sync_origin = "d%08x" % (time.monotonic_ns() & 0xFFFFFFFF)
self._sync_armed = False; self._sync_seq = 0; self._sync_applying = False
self._sync_heartbeat_next = 0.0 # next periodic FULL broadcast deadline (when armed)
self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag
self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json) self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json)
@ -546,12 +552,14 @@ class App:
if self.sl >= len(self.setlists): self.sl = 0 if self.sl >= len(self.setlists): self.sl = 0
def switch_setlist(self, delta=1): def switch_setlist(self, delta=1):
if len(self.setlists) < 2: return if len(self.setlists) < 2: return
if self._sync_applying: return # the editor sends sel=... directly; don't ping-pong
was = self.running was = self.running
if was: self.running = False; self._log_play() if was: self.running = False; self._log_play()
self.sl = (self.sl + delta) % len(self.setlists) self.sl = (self.sl + delta) % len(self.setlists)
self.load(0) self.load(0)
if was: self.running = True; self._reset_clock(); self._start_play() if was: self.running = True; self._reset_clock(); self._start_play()
self.led_rest(); self.draw_meters() self.led_rest(); self.draw_meters()
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
def load(self, i): def load(self, i):
items = self.setlists[self.sl]['items'] items = self.setlists[self.sl]['items']
self.idx = i % len(items) self.idx = i % len(items)
@ -589,6 +597,7 @@ class App:
base = self._padbase(L, s); lit = (self.lane_lit[li] == s) base = self._padbase(L, s); lit = (self.lane_lit[li] == s)
self.lane_pads[li][s].color_index = base + 4 if lit else base self.lane_pads[li][s].color_index = base + 4 if lit else base
self._set_dirty() self._set_dirty()
self._sync_broadcast("beat=%d/%d/%d" % (li, s, L['levels'][s]))
def _set_dirty(self): def _set_dirty(self):
if not self._dirty: self._dirty = True; self.draw_status() if not self._dirty: self._dirty = True; self.draw_status()
self.dirty = True self.dirty = True
@ -726,6 +735,8 @@ class App:
if structural and self._edit_li == 0: self._rebuild_dur_all() # master changed -> polymeter lanes follow if structural and self._edit_li == 0: self._rebuild_dur_all() # master changed -> polymeter lanes follow
else: self._rebuild_dur(self.lanes[self._edit_li]) else: self._rebuild_dur(self.lanes[self._edit_li])
self.build_grid() self.build_grid()
if not self._sync_applying: # coalesce structural / multi-field lane edits into one FULL
self._sync_broadcast_full()
if not self._dirty: self._dirty = True; self.draw_status() if not self._dirty: self._dirty = True; self.draw_status()
self._draw_laneedit() # refresh the modal with the new values self._draw_laneedit() # refresh the modal with the new values
def _edit_sound(self, d): def _edit_sound(self, d):
@ -994,6 +1005,119 @@ class App:
def led_rest(self): # settle to the resting colour (green idle / red running) def led_rest(self): # settle to the resting colour (green idle / red running)
self.rgb = self._led_base() self.rgb = self._led_base()
self.led.set(*self.rgb) self.led.set(*self.rgb)
# ---------- Live sync (HELLO/FULL/DELTA/BYE on SysEx 0x40-0x43; see src/livesync.js for the editor side) ----------
def _sync_send(self, op, text):
if self.midi is None: return
b = bytearray((0xF0, 0x7D, op))
for c in text: # ASCII-only payload (the share grammar uses ; / = digits letters)
v = ord(c); b.append(v if v < 0x80 else 0x3F)
b.append(0xF7)
try: self.midi.write(b)
except Exception: pass
def _sync_broadcast(self, evt): # one DELTA event; suppressed while applying a remote change (echo guard)
if not self._sync_armed or self._sync_applying or self.midi is None: return
text = "%s;%d;%s" % (self._sync_origin, self._sync_seq, evt); self._sync_seq += 1
self._sync_send(0x42, text)
def _sync_broadcast_full(self): # FULL snapshot: running + sl + item + patch (coalesces structural edits)
if not self._sync_armed or self.midi is None: return
try: patch = self._prog_str()
except Exception: return
text = "%s;%d;%d;%d;%d;%s" % (self._sync_origin, self._sync_seq,
1 if self.running else 0, self.sl, self.idx, patch)
self._sync_seq += 1
self._sync_send(0x41, text)
self._sync_heartbeat_next = time.monotonic() + 5.0 # next periodic heartbeat
def _sync_apply_full(self, running, patch): # accept the peer's snapshot as ground truth
self._sync_applying = True
try:
try:
gc.collect()
# Diff before rebuilding -> avoid grid flicker / lost focus on a heartbeat that matches local state
try: cur = self._prog_str()
except Exception: cur = None
if patch and patch != cur:
bpm, lanes, bars, ramp, trainer = parse_program(patch)
self.bpm = bpm; self.lanes = lanes; self.bars = bars; self.ramp = ramp; self.trainer = trainer
self._beat_ns = 60_000_000_000 // max(1, bpm); self._rebuild_dur_all()
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
self._dirty = False; self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
self._reset_clock()
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
self.build_grid(); self.draw_log()
if running and not self.running: self.toggle()
elif (not running) and self.running: self.toggle()
except Exception as e:
try: print("sync FULL apply:", e)
except Exception: pass
finally:
self._sync_applying = False
def _sync_apply_delta(self, evt): # one mutation
self._sync_applying = True
try:
eq = evt.find('=')
key = evt if eq < 0 else evt[:eq]
val = '' if eq < 0 else evt[eq+1:]
if key == 'play':
if not self.running: self.toggle()
elif key == 'stop':
if self.running: self.toggle()
elif key == 'bpm':
try: self.set_bpm(int(val))
except Exception: pass
elif key == 'sel': # sel=-1/-1 = "no selection" sentinel -> ignore
p = val.split('/')
if len(p) == 2:
try:
sl = int(p[0]); item = int(p[1])
if sl >= 0 and item >= 0:
if sl < len(self.setlists) and sl != self.sl: self.sl = sl
items = self.setlists[self.sl]['items']
if 0 <= item < len(items) and item != self.idx: self.goto(item)
except Exception: pass
elif key == 'beat': # beat=lane/step/level (0=mute 1=normal 2=accent 3=ghost)
p = val.split('/')
if len(p) == 3:
try:
li = int(p[0]); s = int(p[1]); lvl = int(p[2])
if 0 <= li < len(self.lanes):
L = self.lanes[li]
if 0 <= s < len(L['levels']):
L['levels'][s] = lvl & 3
if li < len(self.lane_pads) and s < len(self.lane_pads[li]):
lit = (self.lane_lit[li] == s)
self.lane_pads[li][s].color_index = self._padbase(L, s) + (4 if lit else 0)
self._set_dirty()
except Exception: pass
elif key == 'lane': # lane=lane/field/value (field: sound|groups|sub|swing|gain|poly|enabled)
p = val.split('/')
if len(p) >= 3:
try:
li = int(p[0]); field = p[1]; v = '/'.join(p[2:])
if 0 <= li < len(self.lanes):
L = self.lanes[li]; structural = False
if field == 'sound': L['sound'] = v
elif field == 'groups':
try: L['groups'] = [int(x) for x in v.split('+')]; structural = True
except Exception: pass
elif field == 'sub':
try: L['sub'] = int(v); structural = True
except Exception: pass
elif field == 'swing': L['swing'] = (v == '1'); structural = True # swing changes the dur grid
elif field == 'enabled': L['mute'] = not (v == '1')
elif field == 'gain':
try: L['gain'] = int(v)
except Exception: pass
elif field == 'poly': L['poly'] = (v == '1'); structural = True
if structural: self._regen_levels(L)
if li == 0 and structural: self._rebuild_dur_all() # master changed -> poly lanes follow
else: self._rebuild_dur(L)
if structural: self.build_grid()
self._set_dirty()
except Exception: pass
finally:
self._sync_applying = False
def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer
if self.midi is None: return if self.midi is None: return
b = self._note_buf # reused bytearray -> zero alloc per click (hot path) b = self._note_buf # reused bytearray -> zero alloc per click (hot path)
@ -1017,18 +1141,21 @@ class App:
try: self.midi.write(self._stop_byte) # Stop (reused singleton) try: self.midi.write(self._stop_byte) # Stop (reused singleton)
except Exception: pass except Exception: pass
self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped
self._sync_broadcast("play" if self.running else "stop")
def set_bpm(self, v): def set_bpm(self, v):
v = max(5, min(300, v)) v = max(5, min(300, v))
if v != self.bpm: if v != self.bpm:
self.bpm = v; self._beat_ns = 60_000_000_000 // v self.bpm = v; self._beat_ns = 60_000_000_000 // v
self._rebuild_dur_all() # step grids follow the new beat duration self._rebuild_dur_all() # step grids follow the new beat duration
# Don't draw here -- the 4Hz UI tick redraws bpm/meters; calling per joystick nudge allocated text bitmaps fast enough to trigger GC pauses # Don't draw here -- the 4Hz UI tick redraws bpm/meters; calling per joystick nudge allocated text bitmaps fast enough to trigger GC pauses
self._sync_broadcast("bpm=%d" % v)
def goto(self, i): def goto(self, i):
was = self.running was = self.running
if was: self.running = False; self._log_play() # close out the track that was playing if was: self.running = False; self._log_play() # close out the track that was playing
self.load(i) self.load(i)
if was: self.running = True; self._reset_clock(); self._start_play() if was: self.running = True; self._reset_clock(); self._start_play()
self.led_rest(); self.draw_meters() self.led_rest(); self.draw_meters()
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
def tap(self): def tap(self):
now = time.monotonic() now = time.monotonic()
if not hasattr(self, '_taps'): self._taps = [] if not hasattr(self, '_taps'): self._taps = []
@ -1426,6 +1553,26 @@ class App:
except Exception: pass except Exception: pass
elif cmd == 0x02: # version query -> reply 0x03 + APP_VERSION elif cmd == 0x02: # version query -> reply 0x03 + APP_VERSION
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x03]) + APP_VERSION.encode() + bytes([0xF7])) if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x03]) + APP_VERSION.encode() + bytes([0xF7]))
elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43: # Live sync (see src/livesync.js)
try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:])
except Exception: return
origin = text.split(";", 1)[0] if text else ""
if origin == self._sync_origin: return # drop our own echoes (composite USB may loop)
self._sync_armed = True
if cmd == 0x40: # HELLO -> reply with our current FULL
self._sync_broadcast_full()
elif cmd == 0x43: # BYE -> peer disconnected; stop heartbeats
self._sync_armed = False
elif cmd == 0x41: # FULL: origin;seq;running;sl;item;patch...
parts = text.split(";", 5)
if len(parts) >= 6:
try:
running = parts[2] == "1"; patch = parts[5]
self._sync_apply_full(running, patch)
except Exception: pass
elif cmd == 0x42: # DELTA: origin;seq;evt
parts = text.split(";", 2)
if len(parts) >= 3: self._sync_apply_delta(parts[2])
elif cmd == 0x10: # write /programs.json (user playlists) pushed from the editor elif cmd == 0x10: # write /programs.json (user playlists) pushed from the editor
try: try:
with open("/programs.json", "wb") as f: f.write(bytes(sx[2:])) with open("/programs.json", "wb") as f: f.write(bytes(sx[2:]))
@ -1497,6 +1644,8 @@ class App:
tnow = time.monotonic() tnow = time.monotonic()
if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter
self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp
if self._sync_armed and tnow >= self._sync_heartbeat_next:
self._sync_broadcast_full() # periodic FULL: device is the convergence authority
if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update
try: os.remove("/trial") try: os.remove("/trial")
except Exception: pass except Exception: pass