PM_K-1 0.0.19: chunked build_grid, predictive refresh, zero-alloc hot path

The stutter and the MemoryError were two faces of the same problem: the main loop
allocates and blocks too much. This pass attacks both.

CHUNKED build_grid -- the biggest single audio fix.
- build_grid() split into _grid_rebuild_start (tear-down + gridlines + state) and
  _grid_rebuild_step (build ONE lane row + its pads).
- run() loop processes one chunk per iter, so tick() runs between every lane. A 16-pad
  lane chunk is ~30-50ms but a tick now interleaves; the old ~300ms blackout is gone.
- The synchronous build_grid() still works (used by load() etc) -- it loops the steps
  to completion in one call, same as before for non-seam paths.
- load() / switch_setlist cancel any in-progress chunked rebuild so a navigation
  doesn't leave the grid half-built.

PREDICTIVE display.refresh -- the second-biggest audio fix.
- display.refresh() blocks SPI for ~30ms streaming pixels. A beat scheduled in that
  window was firing late.
- The refresh decision now scans lanes[i]['next'] (cheap; n<=8) and skips refresh if
  any beat is due within 35ms.
- Force-refresh after 0.5s so visuals stay live at fast subdivision rates.

ZERO-ALLOC HOT PATH -- attacks fragmentation / MemoryError frequency.
- self._note_buf: reused bytearray for every MIDI Note On (was a fresh bytes() / click).
- self._clock_byte / _start_byte / _stop_byte: singleton bytes for transport messages.
- fired_best / fired_prio ints replace fired = [] (was ~1000 list allocs/sec at speed,
  plus a max(key=lambda) call per tick -- gone).
- self._beat_ns cached in set_bpm + the ramp interpolator; _step_dur reads it instead
  of doing 60_000_000_000 / self.bpm every step. Integer // instead of float /.
- The MIDI Clock Out also caches its tick period via _beat_ns // 24.

CRASH SAFETY -- the run loop's while True body is now wrapped in try/except. A
MemoryError logs + gc.collect() + continues. Any other Exception logs + continues. The
metronome now stutters through transient errors instead of dying with a traceback.

This is roughly the ceiling of what straight Python can do here without a custom-C
tick module. The remaining stutter sources are the displayio scene-graph itself + the
RP2040 bytecode interpreter overhead, and those move us to Rust (or a C extension) if
audible.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Me Here 2026-05-30 08:38:18 -05:00
parent 93c1fb62e9
commit 672f892ea1

View file

@ -18,7 +18,7 @@
import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
APP_VERSION = "0.0.18" # firmware version (the A/B updater pushes/compares this) APP_VERSION = "0.0.19" # firmware version (the A/B updater pushes/compares this)
try: try:
import rtc # set from the editor's clock SysEx so the log has real timestamps import rtc # set from the editor's clock SysEx so the log has real timestamps
except ImportError: except ImportError:
@ -453,6 +453,13 @@ class App:
self.continue_on = False; self._advance = False; self._grid = {} # auto-advance + pad hit-test geometry self.continue_on = False; self._advance = False; self._grid = {} # auto-advance + pad hit-test geometry
self._next_pending = None; self._seam_t = 0; self._need_redraw = False # gapless seam between tracks self._next_pending = None; self._seam_t = 0; self._need_redraw = False # gapless seam between tracks
self._heavy_redraw_at = 0 # deferred build_grid + draw_log deadline (so B's intro isn't blocked by SPI/alloc) self._heavy_redraw_at = 0 # deferred build_grid + draw_log deadline (so B's intro isn't blocked by SPI/alloc)
self._grid_li = None; self._grid_n = 0; self._grid_geo = (0, 0, 0, 0) # chunked build_grid progress (1 lane / loop iter)
self._heavy_log_pending = False
self._beat_ns = 60_000_000_000 // self.bpm # cached: ns per quarter note; refreshed on every bpm change
self._note_buf = bytearray([0x90, 0, 0]) # reused for every Note On (no per-click bytes() alloc)
self._clock_byte = bytes([0xF8]) # singleton MIDI Clock tick (24 PPQN)
self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC])
self._lastRefresh = 0.0 # for the "force refresh after Xms even if a beat is imminent" guard
self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag
self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json) self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json)
@ -551,7 +558,8 @@ class App:
self.bpm, self.lanes, self.bars, self.ramp, self.trainer = parse_program(prog) self.bpm, self.lanes, self.bars, self.ramp, self.trainer = parse_program(prog)
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
self._dirty = False; self._overlay = None # fresh load -> no unsaved edits self._dirty = False; self._overlay = None # fresh load -> no unsaved edits
self._next_pending = None; self._need_redraw = False; self._heavy_redraw_at = 0 # discard any prepared seam (user navigated away) self._next_pending = None; self._need_redraw = False # discard any prepared seam (user navigated away)
self._heavy_redraw_at = 0; self._heavy_log_pending = False; self._grid_li = None # cancel any in-progress chunked rebuild
while len(self.g_overlay): self.g_overlay.pop() # dismiss any open modal while len(self.g_overlay): self.g_overlay.pop() # dismiss any open modal
self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train() self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train()
self.build_grid(); self.draw_log() self.build_grid(); self.draw_log()
@ -940,15 +948,15 @@ class App:
except OSError: self.can_write = False except OSError: self.can_write = False
def _step_dur(self, L, step): def _step_dur(self, L, step):
beat = 60_000_000_000 / self.bpm beat = self._beat_ns # cached ns/beat (refreshed on every bpm change + ramp tick)
if L['poly']: # ~ polymeter: fit this lane's whole cycle into lane 1's bar if L['poly']: # ~ polymeter: fit this lane's whole cycle into lane 1's bar
m = self.lanes[0]; master_bar = beat * (m['steps'] // m['sub']) m = self.lanes[0]; master_bar = beat * (m['steps'] // m['sub'])
return int(master_bar / L['steps']) return master_bar // L['steps']
sub = L['sub'] sub = L['sub']
if L['swing'] and sub % 2 == 0: # swing even subdivisions: long-short (2:1) pairs if L['swing'] and sub % 2 == 0: # swing even subdivisions: long-short (2:1) pairs
pair = beat / (sub // 2) pair = beat // (sub // 2)
return int(pair * 2 / 3) if (step % sub) % 2 == 0 else int(pair / 3) return (pair * 2) // 3 if (step % sub) % 2 == 0 else pair // 3
return int(beat / sub) # straight: a step = one beat / subdivision return beat // sub # straight: a step = one beat / subdivision
def _reset_clock(self): def _reset_clock(self):
now = time.monotonic_ns() now = time.monotonic_ns()
for L in self.lanes: for L in self.lanes:
@ -971,7 +979,10 @@ class App:
self.led.set(*self.rgb) self.led.set(*self.rgb)
def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer
if self.midi is None: return if self.midi is None: return
try: self.midi.write(bytes([0x90 | ((MIDI_CHANNEL - 1) & 0x0F), note, vel])) # Note On, channel 1..16 b = self._note_buf # reused bytearray -> zero alloc per click (hot path)
b[0] = 0x90 | ((MIDI_CHANNEL - 1) & 0x0F) # Note On, channel 1..16
b[1] = note & 0x7F; b[2] = vel & 0x7F
try: self.midi.write(b)
except Exception: pass except Exception: pass
# ---------- transport ---------- # ---------- transport ----------
@ -981,18 +992,18 @@ class App:
self._reset_clock(); self._start_play() self._reset_clock(); self._start_play()
self._clock_next = time.monotonic_ns() # start MIDI Clock Out from zero self._clock_next = time.monotonic_ns() # start MIDI Clock Out from zero
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(bytes([0xFA])) # Start try: self.midi.write(self._start_byte) # Start (reused singleton)
except Exception: pass except Exception: pass
else: else:
self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play() self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(bytes([0xFC])) # Stop try: self.midi.write(self._stop_byte) # Stop (reused singleton)
except Exception: pass except Exception: pass
self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped
def set_bpm(self, v): def set_bpm(self, v):
v = max(5, min(300, v)) v = max(5, min(300, v))
if v != self.bpm: if v != self.bpm:
self.bpm = v self.bpm = v; self._beat_ns = 60_000_000_000 // v # keep cached beat duration in sync
self.draw_bpm(); self.draw_meters() # total time depends on bpm self.draw_bpm(); self.draw_meters() # total time depends on bpm
def goto(self, i): def goto(self, i):
was = self.running was = self.running
@ -1016,7 +1027,7 @@ class App:
# Slave decay: if no Clock In tick in the last 1s, fall back to internal tempo # Slave decay: if no Clock In tick in the last 1s, fall back to internal tempo
if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False
if self.running: if self.running:
fired = [] fired_best = 0; fired_prio = -1 # int tracking, no per-tick list alloc
for li, L in enumerate(self.lanes): for li, L in enumerate(self.lanes):
if self._advance: break # seam armed - skip remaining lanes for THIS tick if self._advance: break # seam armed - skip remaining lanes for THIS tick
adv = False adv = False
@ -1032,19 +1043,19 @@ class App:
bar_pos = self._m_steps / mlen bar_pos = self._m_steps / mlen
seg_bar = (bar_pos % self.bars) if self.bars else bar_pos seg_bar = (bar_pos % self.bars) if self.bars else bar_pos
new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt']))) new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt'])))
if new_bpm != self.bpm: self.bpm = new_bpm if new_bpm != self.bpm: self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm
lvl = 0 if L['mute'] else L['levels'][L['step']] lvl = 0 if L['mute'] else L['levels'][L['step']]
if lvl > 0: if lvl > 0:
fired.append(lvl) p = PRIO.get(lvl, 0)
if p > fired_prio: fired_prio = p; fired_best = lvl # accent > normal > ghost
if not self._muted: # gap trainer: silent during the rest bars if not self._muted: # gap trainer: silent during the rest bars
self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90)) self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90))
L['next'] += self._step_dur(L, L['step']); adv = True L['next'] += self._step_dur(L, L['step']); adv = True
if adv and li < len(self.lane_pads): self._move_playhead(li, L['step']) if adv and li < len(self.lane_pads): self._move_playhead(li, L['step'])
if fired and not self._muted: if fired_best and not self._muted:
best = max(fired, key=lambda l: PRIO.get(l, 0))
if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host): if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host):
self.click(best) # speaker silent if user muted it / auto-mute on + host present self.click(fired_best) # speaker silent if user muted it / auto-mute on + host present
self.flash(best) self.flash(fired_best)
base = self._led_base() # decay the beat pulse back down to the red running base base = self._led_base() # decay the beat pulse back down to the red running base
if self.rgb != base: if self.rgb != base:
r = base[0] + (self.rgb[0]-base[0])*7//10 r = base[0] + (self.rgb[0]-base[0])*7//10
@ -1057,10 +1068,12 @@ class App:
self._do_advance() self._do_advance()
# MIDI Clock Out (master): 24 PPQN; interval follows the live bpm (so continuous ramps carry through) # MIDI Clock Out (master): 24 PPQN; interval follows the live bpm (so continuous ramps carry through)
if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved: # don't echo to the master if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved: # don't echo to the master
clk = self._clock_byte # reused singleton bytes (no per-tick alloc)
tick_ns = self._beat_ns // 24 # cached: ns per Clock pulse
while now >= self._clock_next: while now >= self._clock_next:
try: self.midi.write(bytes([0xF8])) try: self.midi.write(clk)
except Exception: pass except Exception: pass
self._clock_next += int(60_000_000_000 / max(1, self.bpm) / 24) self._clock_next += tick_ns
def _on_new_bar(self, bar): def _on_new_bar(self, bar):
# Pre-parse the next track during the LAST bar of this segment, so the swap at the seam is allocation-free # Pre-parse the next track during the LAST bar of this segment, so the swap at the seam is allocation-free
if self.bars and self.continue_on and self._next_pending is None and bar == self.bars - 1: if self.bars and self.continue_on and self._next_pending is None and bar == self.bars - 1:
@ -1207,35 +1220,48 @@ class App:
# ---------- pad grid (each lane = a row of step pads; playhead lit as it plays) ---------- # ---------- pad grid (each lane = a row of step pads; playhead lit as it plays) ----------
def _padbase(self, L, s): def _padbase(self, L, s):
return 0 if L['mute'] else L['levels'][s] return 0 if L['mute'] else L['levels'][s]
def build_grid(self): def build_grid(self): # synchronous: kick off chunked rebuild and run to completion
self._grid_rebuild_start()
while self._grid_li is not None: self._grid_rebuild_step()
def _grid_rebuild_start(self): # tear down + gridlines + initial state for chunked rebuild
while len(self.g_grid): self.g_grid.pop() while len(self.g_grid): self.g_grid.pop()
self.lane_pads = []; self.lane_lit = [] self.lane_pads = []; self.lane_lit = []
gc.collect() # 64-128 vectorio allocs in this fn want a defragmented heap gc.collect() # 64-128 vectorio allocs incoming - want a defragmented heap
n = min(len(self.lanes), MAXLANES) n = min(len(self.lanes), MAXLANES)
top = GRID_TOP; rowh = min(40, ((LOG_TOP - 10) - top) // max(1, n)) top = GRID_TOP; rowh = min(40, ((LOG_TOP - 10) - top) // max(1, n))
px0 = 60; usable = WIDTH - 8 - px0 - 12; gridh = n * rowh px0 = 60; usable = WIDTH - 8 - px0 - 12; gridh = n * rowh
self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} # for touch hit-testing self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} # for touch hit-testing
# vertical gridlines at the master lane's beats, full height -> beats line up across lanes m = self.lanes[0] if self.lanes else None
m = self.lanes[0]; mbeats = max(1, m['steps'] // max(1, m['sub'])) if m is not None: # vertical gridlines (cheap; one pass before chunked lanes)
for bcol in range(mbeats): mbeats = max(1, m['steps'] // max(1, m['sub']))
self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID)) for bcol in range(mbeats):
for li in range(n): self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID))
L = self.lanes[li]; y = top + li * rowh; cy = y + rowh // 2 self._grid_n = n
tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG) self._grid_geo = (top, rowh, px0, usable)
tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg) self._grid_li = 0 if n > 0 else None
steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps) self.dirty = True
side = max(5, min(15, stepw - 1, rowh - 6)) # square edge for the main pulse def _grid_rebuild_step(self): # build ONE lane row; sets _grid_li=None when done
rad = max(2, min(side // 2, stepw // 2 - 1)) # smaller circle for subdivisions li = self._grid_li
pads = [] if li is None: return
for s in range(steps): if li >= self._grid_n or li >= len(self.lanes):
cxp = px0 + 6 + (s * usable) // steps # proportional -> beats line up across lanes self._grid_li = None; return # done; main loop falls through to draw_log
if s % sub == 0: # main beat -> square top, rowh, px0, usable = self._grid_geo
p = vectorio.Rectangle(pixel_shader=self.pad_pal, width=side, height=side, L = self.lanes[li]; y = top + li * rowh; cy = y + rowh // 2
x=cxp - side // 2, y=cy - side // 2) tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG)
else: # subdivision -> circle tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg)
p = vectorio.Circle(pixel_shader=self.pad_pal, radius=rad, x=cxp, y=cy) steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps)
p.color_index = self._padbase(L, s); self.g_grid.append(p); pads.append(p) side = max(5, min(15, stepw - 1, rowh - 6)) # square edge for the main pulse
self.lane_pads.append(pads); self.lane_lit.append(-1) rad = max(2, min(side // 2, stepw // 2 - 1)) # smaller circle for subdivisions
pal = self.pad_pal; pads = []
for s in range(steps):
cxp = px0 + 6 + (s * usable) // steps # proportional -> beats line up across lanes
if s % sub == 0:
p = vectorio.Rectangle(pixel_shader=pal, width=side, height=side, x=cxp - side // 2, y=cy - side // 2)
else:
p = vectorio.Circle(pixel_shader=pal, radius=rad, x=cxp, y=cy)
p.color_index = self._padbase(L, s); self.g_grid.append(p); pads.append(p)
self.lane_pads.append(pads); self.lane_lit.append(-1)
self._grid_li = li + 1
self.dirty = True self.dirty = True
def _move_playhead(self, li, step): def _move_playhead(self, li, step):
pads = self.lane_pads[li]; prev = self.lane_lit[li] pads = self.lane_pads[li]; prev = self.lane_lit[li]
@ -1412,26 +1438,47 @@ class App:
try: os.stat("/trial"); committed = False # we're a freshly-pushed build on trial try: os.stat("/trial"); committed = False # we're a freshly-pushed build on trial
except OSError: committed = True except OSError: committed = True
while True: while True:
self.tick(); self.poll() try:
if self._need_redraw: # post-seam fast pass: cheap header/status bits, runs immediately self.tick(); self.poll()
self._need_redraw = False if self._need_redraw: # post-seam fast pass: cheap header/status bits, runs immediately
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() self._need_redraw = False
if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at: self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
self._heavy_redraw_at = 0 # post-seam slow pass: build_grid + draw_log are ~hundreds of ms, if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at:
self.build_grid(); self.draw_log() # deferred so B's first few beats fire on time self._heavy_redraw_at = 0 # post-seam slow pass: kick off the chunked rebuild
tnow = time.monotonic() self._grid_rebuild_start(); self._heavy_log_pending = True
if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter if self._grid_li is not None: # process ONE lane per loop iter -> tick() runs between lanes
self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp self._grid_rebuild_step()
if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update elif self._heavy_log_pending: # grid done -> draw_log (cheap-ish; also one shot)
try: os.remove("/trial") self._heavy_log_pending = False; self.draw_log()
tnow = time.monotonic()
if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter
self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp
if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update
try: os.remove("/trial")
except Exception: pass
committed = True
# Refresh display ~30x/s, BUT skip if a beat is due within ~35ms (refresh() blocks SPI for that long)
# so the click never gets pushed late by the redraw. Force-refresh after 0.5s anyway so visuals stay live.
if self.dirty and tnow >= self._refreshNext:
safe = True
if self.running and self.lanes:
now_ns = time.monotonic_ns(); next_beat = self.lanes[0]['next']
for L in self.lanes:
if L['next'] < next_beat: next_beat = L['next']
safe = (next_beat - now_ns) > 35_000_000 or (tnow - self._lastRefresh) > 0.5
if safe:
if self.display.refresh(): self.dirty = False
self._lastRefresh = tnow; self._refreshNext = tnow + 0.033
else:
self._refreshNext = tnow + 0.005 # check again soon; don't wait the full 33ms
time.sleep(0.0005)
except MemoryError: # surface, gc, keep running (don't crash on a fragmented heap)
try: print("MemoryError: gc + continue")
except Exception: pass except Exception: pass
committed = True gc.collect(); time.sleep(0.05)
# Refresh at most ~30x/s. display.refresh() BLOCKS while it streams pixels over SPI, which except Exception as e: # any other transient error: log, continue
# would otherwise delay the next beat's MIDI note and make the audio stutter; throttling it try: print("tick error:", e)
# keeps the click timing tight (the visuals lag a few ms, which is imperceptible). except Exception: pass
if self.dirty and tnow >= self._refreshNext: time.sleep(0.05)
if self.display.refresh(): self.dirty = False
self._refreshNext = tnow + 0.033
time.sleep(0.0005)
App().run() App().run()