From 47fa6d7ce7bc5d90b9dded748417d096f2153859 Mon Sep 17 00:00:00 2001 From: Me Here Date: Sat, 30 May 2026 09:59:19 -0500 Subject: [PATCH] PM_K-1 0.0.21: live-sync protocol (HELLO/FULL/DELTA/BYE) - device side Implements the device half of the docs/livesync-protocol.md contract that the other (editor) Claude wrote in src/livesync.js. SysEx opcodes 0x40..0x43 on the existing 0x7D manufacturer id; ASCII payload reusing the share-grammar tokens. State + helpers (App.__init__): - self._sync_origin: short random id ("d" + 8-hex from os.urandom(4)) - lets peers drop their own echoes (composite USB MIDI may loop a frame back). - self._sync_seq: monotonic counter. - self._sync_armed: True after any HELLO/FULL/DELTA seen (until BYE). - self._sync_applying: echo guard - True while applying a remote change so the broadcast hooks early-out and we never ping-pong. - self._sync_heartbeat_next: deadline for the periodic 5s FULL. Transport: - _sync_send(op, text): builds the SysEx frame, ASCII-safe (replaces high bits with '?'), writes via self.midi.write. - _sync_broadcast(evt): one DELTA, guarded. - _sync_broadcast_full(): one FULL with origin;seq;running;sl;item;patch; also resets the heartbeat timer. Receive (extends _handle_sysex): - 0x40 HELLO -> _sync_broadcast_full (reply with our state). - 0x41 FULL -> _sync_apply_full(running, patch): diff against current _prog_str(); only rebuild if different (avoids flicker on a heartbeat that matches local state); reconcile transport either way. - 0x42 DELTA -> _sync_apply_delta(evt) for the seven verbs: - play / stop -> toggle if state differs. - bpm= -> set_bpm. - sel=/ -> switch + goto; -1/-1 sentinel ignored. - beat=// -> set lanes[l]['levels'][s], recolor pad, mark dirty. - lane=// -> sound/groups/sub/swing/gain/poly/enabled; if structural, _regen_levels + _rebuild_dur (or _rebuild_dur_all when lane 0 changed, so poly lanes follow) + build_grid. - 0x43 BYE -> _sync_armed = False. Local mutators broadcast: - toggle() -> "play" | "stop" - set_bpm() -> "bpm=" - goto() / switch_setlist() -> "sel=/" - _cycle_beat() -> "beat=//" - _lane_dirty() -> FULL (coalesced; matches editor's syncPatchSoon idiom) Heartbeat in the run loop emits a FULL every 5s while armed. Plus docs/livesync-protocol.md (the spec the other Claude wrote in parallel) gets committed too. The patch field round-trips through parse_program <-> _prog_str on the device side and setupToPatch <-> patchToSetup on the editor side, all already share-grammar compatible. Co-Authored-By: Claude Opus 4.7 (1M context) --- pico-cp/app.py | 151 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 150 insertions(+), 1 deletion(-) diff --git a/pico-cp/app.py b/pico-cp/app.py index 9186940..0c83eaf 100644 --- a/pico-cp/app.py +++ b/pico-cp/app.py @@ -18,7 +18,7 @@ import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart -APP_VERSION = "0.0.20" # firmware version (the A/B updater pushes/compares this) +APP_VERSION = "0.0.21" # firmware version (the A/B updater pushes/compares this) try: import rtc # set from the editor's clock SysEx so the log has real timestamps except ImportError: @@ -461,6 +461,12 @@ class App: self._clock_byte = bytes([0xF8]) # singleton MIDI Clock tick (24 PPQN) self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC]) self._lastRefresh = 0.0 # for the "force refresh after Xms even if a beat is imminent" guard + try: # live sync: short random id so peers can drop their own echoes + o = os.urandom(4); self._sync_origin = "d" + "".join("%02x" % b for b in o) + except Exception: + self._sync_origin = "d%08x" % (time.monotonic_ns() & 0xFFFFFFFF) + self._sync_armed = False; self._sync_seq = 0; self._sync_applying = False + self._sync_heartbeat_next = 0.0 # next periodic FULL broadcast deadline (when armed) self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json) @@ -546,12 +552,14 @@ class App: if self.sl >= len(self.setlists): self.sl = 0 def switch_setlist(self, delta=1): if len(self.setlists) < 2: return + if self._sync_applying: return # the editor sends sel=... directly; don't ping-pong was = self.running if was: self.running = False; self._log_play() self.sl = (self.sl + delta) % len(self.setlists) self.load(0) if was: self.running = True; self._reset_clock(); self._start_play() self.led_rest(); self.draw_meters() + self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx)) def load(self, i): items = self.setlists[self.sl]['items'] self.idx = i % len(items) @@ -589,6 +597,7 @@ class App: base = self._padbase(L, s); lit = (self.lane_lit[li] == s) self.lane_pads[li][s].color_index = base + 4 if lit else base self._set_dirty() + self._sync_broadcast("beat=%d/%d/%d" % (li, s, L['levels'][s])) def _set_dirty(self): if not self._dirty: self._dirty = True; self.draw_status() self.dirty = True @@ -726,6 +735,8 @@ class App: if structural and self._edit_li == 0: self._rebuild_dur_all() # master changed -> polymeter lanes follow else: self._rebuild_dur(self.lanes[self._edit_li]) self.build_grid() + if not self._sync_applying: # coalesce structural / multi-field lane edits into one FULL + self._sync_broadcast_full() if not self._dirty: self._dirty = True; self.draw_status() self._draw_laneedit() # refresh the modal with the new values def _edit_sound(self, d): @@ -994,6 +1005,119 @@ class App: def led_rest(self): # settle to the resting colour (green idle / red running) self.rgb = self._led_base() self.led.set(*self.rgb) + # ---------- Live sync (HELLO/FULL/DELTA/BYE on SysEx 0x40-0x43; see src/livesync.js for the editor side) ---------- + def _sync_send(self, op, text): + if self.midi is None: return + b = bytearray((0xF0, 0x7D, op)) + for c in text: # ASCII-only payload (the share grammar uses ; / = digits letters) + v = ord(c); b.append(v if v < 0x80 else 0x3F) + b.append(0xF7) + try: self.midi.write(b) + except Exception: pass + def _sync_broadcast(self, evt): # one DELTA event; suppressed while applying a remote change (echo guard) + if not self._sync_armed or self._sync_applying or self.midi is None: return + text = "%s;%d;%s" % (self._sync_origin, self._sync_seq, evt); self._sync_seq += 1 + self._sync_send(0x42, text) + def _sync_broadcast_full(self): # FULL snapshot: running + sl + item + patch (coalesces structural edits) + if not self._sync_armed or self.midi is None: return + try: patch = self._prog_str() + except Exception: return + text = "%s;%d;%d;%d;%d;%s" % (self._sync_origin, self._sync_seq, + 1 if self.running else 0, self.sl, self.idx, patch) + self._sync_seq += 1 + self._sync_send(0x41, text) + self._sync_heartbeat_next = time.monotonic() + 5.0 # next periodic heartbeat + def _sync_apply_full(self, running, patch): # accept the peer's snapshot as ground truth + self._sync_applying = True + try: + try: + gc.collect() + # Diff before rebuilding -> avoid grid flicker / lost focus on a heartbeat that matches local state + try: cur = self._prog_str() + except Exception: cur = None + if patch and patch != cur: + bpm, lanes, bars, ramp, trainer = parse_program(patch) + self.bpm = bpm; self.lanes = lanes; self.bars = bars; self.ramp = ramp; self.trainer = trainer + self._beat_ns = 60_000_000_000 // max(1, bpm); self._rebuild_dur_all() + self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False + self._dirty = False; self._overlay = None + while len(self.g_overlay): self.g_overlay.pop() + self._reset_clock() + self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() + self.build_grid(); self.draw_log() + if running and not self.running: self.toggle() + elif (not running) and self.running: self.toggle() + except Exception as e: + try: print("sync FULL apply:", e) + except Exception: pass + finally: + self._sync_applying = False + def _sync_apply_delta(self, evt): # one mutation + self._sync_applying = True + try: + eq = evt.find('=') + key = evt if eq < 0 else evt[:eq] + val = '' if eq < 0 else evt[eq+1:] + if key == 'play': + if not self.running: self.toggle() + elif key == 'stop': + if self.running: self.toggle() + elif key == 'bpm': + try: self.set_bpm(int(val)) + except Exception: pass + elif key == 'sel': # sel=-1/-1 = "no selection" sentinel -> ignore + p = val.split('/') + if len(p) == 2: + try: + sl = int(p[0]); item = int(p[1]) + if sl >= 0 and item >= 0: + if sl < len(self.setlists) and sl != self.sl: self.sl = sl + items = self.setlists[self.sl]['items'] + if 0 <= item < len(items) and item != self.idx: self.goto(item) + except Exception: pass + elif key == 'beat': # beat=lane/step/level (0=mute 1=normal 2=accent 3=ghost) + p = val.split('/') + if len(p) == 3: + try: + li = int(p[0]); s = int(p[1]); lvl = int(p[2]) + if 0 <= li < len(self.lanes): + L = self.lanes[li] + if 0 <= s < len(L['levels']): + L['levels'][s] = lvl & 3 + if li < len(self.lane_pads) and s < len(self.lane_pads[li]): + lit = (self.lane_lit[li] == s) + self.lane_pads[li][s].color_index = self._padbase(L, s) + (4 if lit else 0) + self._set_dirty() + except Exception: pass + elif key == 'lane': # lane=lane/field/value (field: sound|groups|sub|swing|gain|poly|enabled) + p = val.split('/') + if len(p) >= 3: + try: + li = int(p[0]); field = p[1]; v = '/'.join(p[2:]) + if 0 <= li < len(self.lanes): + L = self.lanes[li]; structural = False + if field == 'sound': L['sound'] = v + elif field == 'groups': + try: L['groups'] = [int(x) for x in v.split('+')]; structural = True + except Exception: pass + elif field == 'sub': + try: L['sub'] = int(v); structural = True + except Exception: pass + elif field == 'swing': L['swing'] = (v == '1'); structural = True # swing changes the dur grid + elif field == 'enabled': L['mute'] = not (v == '1') + elif field == 'gain': + try: L['gain'] = int(v) + except Exception: pass + elif field == 'poly': L['poly'] = (v == '1'); structural = True + if structural: self._regen_levels(L) + if li == 0 and structural: self._rebuild_dur_all() # master changed -> poly lanes follow + else: self._rebuild_dur(L) + if structural: self.build_grid() + self._set_dirty() + except Exception: pass + finally: + self._sync_applying = False + def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer if self.midi is None: return b = self._note_buf # reused bytearray -> zero alloc per click (hot path) @@ -1017,18 +1141,21 @@ class App: try: self.midi.write(self._stop_byte) # Stop (reused singleton) except Exception: pass self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped + self._sync_broadcast("play" if self.running else "stop") def set_bpm(self, v): v = max(5, min(300, v)) if v != self.bpm: self.bpm = v; self._beat_ns = 60_000_000_000 // v self._rebuild_dur_all() # step grids follow the new beat duration # Don't draw here -- the 4Hz UI tick redraws bpm/meters; calling per joystick nudge allocated text bitmaps fast enough to trigger GC pauses + self._sync_broadcast("bpm=%d" % v) def goto(self, i): was = self.running if was: self.running = False; self._log_play() # close out the track that was playing self.load(i) if was: self.running = True; self._reset_clock(); self._start_play() self.led_rest(); self.draw_meters() + self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx)) def tap(self): now = time.monotonic() if not hasattr(self, '_taps'): self._taps = [] @@ -1426,6 +1553,26 @@ class App: except Exception: pass elif cmd == 0x02: # version query -> reply 0x03 + APP_VERSION if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x03]) + APP_VERSION.encode() + bytes([0xF7])) + elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43: # Live sync (see src/livesync.js) + try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:]) + except Exception: return + origin = text.split(";", 1)[0] if text else "" + if origin == self._sync_origin: return # drop our own echoes (composite USB may loop) + self._sync_armed = True + if cmd == 0x40: # HELLO -> reply with our current FULL + self._sync_broadcast_full() + elif cmd == 0x43: # BYE -> peer disconnected; stop heartbeats + self._sync_armed = False + elif cmd == 0x41: # FULL: origin;seq;running;sl;item;patch... + parts = text.split(";", 5) + if len(parts) >= 6: + try: + running = parts[2] == "1"; patch = parts[5] + self._sync_apply_full(running, patch) + except Exception: pass + elif cmd == 0x42: # DELTA: origin;seq;evt + parts = text.split(";", 2) + if len(parts) >= 3: self._sync_apply_delta(parts[2]) elif cmd == 0x10: # write /programs.json (user playlists) pushed from the editor try: with open("/programs.json", "wb") as f: f.write(bytes(sx[2:])) @@ -1497,6 +1644,8 @@ class App: tnow = time.monotonic() if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp + if self._sync_armed and tnow >= self._sync_heartbeat_next: + self._sync_broadcast_full() # periodic FULL: device is the convergence authority if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update try: os.remove("/trial") except Exception: pass