diff --git a/pico-cp/app.py b/pico-cp/app.py index ea01650..9bc0d63 100644 --- a/pico-cp/app.py +++ b/pico-cp/app.py @@ -18,7 +18,7 @@ import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart -APP_VERSION = "0.0.18" # firmware version (the A/B updater pushes/compares this) +APP_VERSION = "0.0.19" # firmware version (the A/B updater pushes/compares this) try: import rtc # set from the editor's clock SysEx so the log has real timestamps except ImportError: @@ -453,6 +453,13 @@ class App: self.continue_on = False; self._advance = False; self._grid = {} # auto-advance + pad hit-test geometry self._next_pending = None; self._seam_t = 0; self._need_redraw = False # gapless seam between tracks self._heavy_redraw_at = 0 # deferred build_grid + draw_log deadline (so B's intro isn't blocked by SPI/alloc) + self._grid_li = None; self._grid_n = 0; self._grid_geo = (0, 0, 0, 0) # chunked build_grid progress (1 lane / loop iter) + self._heavy_log_pending = False + self._beat_ns = 60_000_000_000 // self.bpm # cached: ns per quarter note; refreshed on every bpm change + self._note_buf = bytearray([0x90, 0, 0]) # reused for every Note On (no per-click bytes() alloc) + self._clock_byte = bytes([0xF8]) # singleton MIDI Clock tick (24 PPQN) + self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC]) + self._lastRefresh = 0.0 # for the "force refresh after Xms even if a beat is imminent" guard self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json) @@ -551,7 +558,8 @@ class App: self.bpm, self.lanes, self.bars, self.ramp, self.trainer = parse_program(prog) self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False self._dirty = False; self._overlay = None # fresh load -> no unsaved edits - self._next_pending = None; self._need_redraw = False; self._heavy_redraw_at = 0 # discard any prepared seam (user navigated away) + self._next_pending = None; self._need_redraw = False # discard any prepared seam (user navigated away) + self._heavy_redraw_at = 0; self._heavy_log_pending = False; self._grid_li = None # cancel any in-progress chunked rebuild while len(self.g_overlay): self.g_overlay.pop() # dismiss any open modal self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train() self.build_grid(); self.draw_log() @@ -940,15 +948,15 @@ class App: except OSError: self.can_write = False def _step_dur(self, L, step): - beat = 60_000_000_000 / self.bpm + beat = self._beat_ns # cached ns/beat (refreshed on every bpm change + ramp tick) if L['poly']: # ~ polymeter: fit this lane's whole cycle into lane 1's bar m = self.lanes[0]; master_bar = beat * (m['steps'] // m['sub']) - return int(master_bar / L['steps']) + return master_bar // L['steps'] sub = L['sub'] if L['swing'] and sub % 2 == 0: # swing even subdivisions: long-short (2:1) pairs - pair = beat / (sub // 2) - return int(pair * 2 / 3) if (step % sub) % 2 == 0 else int(pair / 3) - return int(beat / sub) # straight: a step = one beat / subdivision + pair = beat // (sub // 2) + return (pair * 2) // 3 if (step % sub) % 2 == 0 else pair // 3 + return beat // sub # straight: a step = one beat / subdivision def _reset_clock(self): now = time.monotonic_ns() for L in self.lanes: @@ -971,7 +979,10 @@ class App: self.led.set(*self.rgb) def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer if self.midi is None: return - try: self.midi.write(bytes([0x90 | ((MIDI_CHANNEL - 1) & 0x0F), note, vel])) # Note On, channel 1..16 + b = self._note_buf # reused bytearray -> zero alloc per click (hot path) + b[0] = 0x90 | ((MIDI_CHANNEL - 1) & 0x0F) # Note On, channel 1..16 + b[1] = note & 0x7F; b[2] = vel & 0x7F + try: self.midi.write(b) except Exception: pass # ---------- transport ---------- @@ -981,18 +992,18 @@ class App: self._reset_clock(); self._start_play() self._clock_next = time.monotonic_ns() # start MIDI Clock Out from zero if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: - try: self.midi.write(bytes([0xFA])) # Start + try: self.midi.write(self._start_byte) # Start (reused singleton) except Exception: pass else: self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play() if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: - try: self.midi.write(bytes([0xFC])) # Stop + try: self.midi.write(self._stop_byte) # Stop (reused singleton) except Exception: pass self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped def set_bpm(self, v): v = max(5, min(300, v)) if v != self.bpm: - self.bpm = v + self.bpm = v; self._beat_ns = 60_000_000_000 // v # keep cached beat duration in sync self.draw_bpm(); self.draw_meters() # total time depends on bpm def goto(self, i): was = self.running @@ -1016,7 +1027,7 @@ class App: # Slave decay: if no Clock In tick in the last 1s, fall back to internal tempo if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False if self.running: - fired = [] + fired_best = 0; fired_prio = -1 # int tracking, no per-tick list alloc for li, L in enumerate(self.lanes): if self._advance: break # seam armed - skip remaining lanes for THIS tick adv = False @@ -1032,19 +1043,19 @@ class App: bar_pos = self._m_steps / mlen seg_bar = (bar_pos % self.bars) if self.bars else bar_pos new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt']))) - if new_bpm != self.bpm: self.bpm = new_bpm + if new_bpm != self.bpm: self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm lvl = 0 if L['mute'] else L['levels'][L['step']] if lvl > 0: - fired.append(lvl) + p = PRIO.get(lvl, 0) + if p > fired_prio: fired_prio = p; fired_best = lvl # accent > normal > ghost if not self._muted: # gap trainer: silent during the rest bars self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90)) L['next'] += self._step_dur(L, L['step']); adv = True if adv and li < len(self.lane_pads): self._move_playhead(li, L['step']) - if fired and not self._muted: - best = max(fired, key=lambda l: PRIO.get(l, 0)) + if fired_best and not self._muted: if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host): - self.click(best) # speaker silent if user muted it / auto-mute on + host present - self.flash(best) + self.click(fired_best) # speaker silent if user muted it / auto-mute on + host present + self.flash(fired_best) base = self._led_base() # decay the beat pulse back down to the red running base if self.rgb != base: r = base[0] + (self.rgb[0]-base[0])*7//10 @@ -1057,10 +1068,12 @@ class App: self._do_advance() # MIDI Clock Out (master): 24 PPQN; interval follows the live bpm (so continuous ramps carry through) if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved: # don't echo to the master + clk = self._clock_byte # reused singleton bytes (no per-tick alloc) + tick_ns = self._beat_ns // 24 # cached: ns per Clock pulse while now >= self._clock_next: - try: self.midi.write(bytes([0xF8])) + try: self.midi.write(clk) except Exception: pass - self._clock_next += int(60_000_000_000 / max(1, self.bpm) / 24) + self._clock_next += tick_ns def _on_new_bar(self, bar): # Pre-parse the next track during the LAST bar of this segment, so the swap at the seam is allocation-free if self.bars and self.continue_on and self._next_pending is None and bar == self.bars - 1: @@ -1207,35 +1220,48 @@ class App: # ---------- pad grid (each lane = a row of step pads; playhead lit as it plays) ---------- def _padbase(self, L, s): return 0 if L['mute'] else L['levels'][s] - def build_grid(self): + def build_grid(self): # synchronous: kick off chunked rebuild and run to completion + self._grid_rebuild_start() + while self._grid_li is not None: self._grid_rebuild_step() + def _grid_rebuild_start(self): # tear down + gridlines + initial state for chunked rebuild while len(self.g_grid): self.g_grid.pop() self.lane_pads = []; self.lane_lit = [] - gc.collect() # 64-128 vectorio allocs in this fn want a defragmented heap + gc.collect() # 64-128 vectorio allocs incoming - want a defragmented heap n = min(len(self.lanes), MAXLANES) top = GRID_TOP; rowh = min(40, ((LOG_TOP - 10) - top) // max(1, n)) px0 = 60; usable = WIDTH - 8 - px0 - 12; gridh = n * rowh self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} # for touch hit-testing - # vertical gridlines at the master lane's beats, full height -> beats line up across lanes - m = self.lanes[0]; mbeats = max(1, m['steps'] // max(1, m['sub'])) - for bcol in range(mbeats): - self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID)) - for li in range(n): - L = self.lanes[li]; y = top + li * rowh; cy = y + rowh // 2 - tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG) - tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg) - steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps) - side = max(5, min(15, stepw - 1, rowh - 6)) # square edge for the main pulse - rad = max(2, min(side // 2, stepw // 2 - 1)) # smaller circle for subdivisions - pads = [] - for s in range(steps): - cxp = px0 + 6 + (s * usable) // steps # proportional -> beats line up across lanes - if s % sub == 0: # main beat -> square - p = vectorio.Rectangle(pixel_shader=self.pad_pal, width=side, height=side, - x=cxp - side // 2, y=cy - side // 2) - else: # subdivision -> circle - p = vectorio.Circle(pixel_shader=self.pad_pal, radius=rad, x=cxp, y=cy) - p.color_index = self._padbase(L, s); self.g_grid.append(p); pads.append(p) - self.lane_pads.append(pads); self.lane_lit.append(-1) + m = self.lanes[0] if self.lanes else None + if m is not None: # vertical gridlines (cheap; one pass before chunked lanes) + mbeats = max(1, m['steps'] // max(1, m['sub'])) + for bcol in range(mbeats): + self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID)) + self._grid_n = n + self._grid_geo = (top, rowh, px0, usable) + self._grid_li = 0 if n > 0 else None + self.dirty = True + def _grid_rebuild_step(self): # build ONE lane row; sets _grid_li=None when done + li = self._grid_li + if li is None: return + if li >= self._grid_n or li >= len(self.lanes): + self._grid_li = None; return # done; main loop falls through to draw_log + top, rowh, px0, usable = self._grid_geo + L = self.lanes[li]; y = top + li * rowh; cy = y + rowh // 2 + tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG) + tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg) + steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps) + side = max(5, min(15, stepw - 1, rowh - 6)) # square edge for the main pulse + rad = max(2, min(side // 2, stepw // 2 - 1)) # smaller circle for subdivisions + pal = self.pad_pal; pads = [] + for s in range(steps): + cxp = px0 + 6 + (s * usable) // steps # proportional -> beats line up across lanes + if s % sub == 0: + p = vectorio.Rectangle(pixel_shader=pal, width=side, height=side, x=cxp - side // 2, y=cy - side // 2) + else: + p = vectorio.Circle(pixel_shader=pal, radius=rad, x=cxp, y=cy) + p.color_index = self._padbase(L, s); self.g_grid.append(p); pads.append(p) + self.lane_pads.append(pads); self.lane_lit.append(-1) + self._grid_li = li + 1 self.dirty = True def _move_playhead(self, li, step): pads = self.lane_pads[li]; prev = self.lane_lit[li] @@ -1412,26 +1438,47 @@ class App: try: os.stat("/trial"); committed = False # we're a freshly-pushed build on trial except OSError: committed = True while True: - self.tick(); self.poll() - if self._need_redraw: # post-seam fast pass: cheap header/status bits, runs immediately - self._need_redraw = False - self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() - if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at: - self._heavy_redraw_at = 0 # post-seam slow pass: build_grid + draw_log are ~hundreds of ms, - self.build_grid(); self.draw_log() # deferred so B's first few beats fire on time - tnow = time.monotonic() - if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter - self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp - if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update - try: os.remove("/trial") + try: + self.tick(); self.poll() + if self._need_redraw: # post-seam fast pass: cheap header/status bits, runs immediately + self._need_redraw = False + self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() + if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at: + self._heavy_redraw_at = 0 # post-seam slow pass: kick off the chunked rebuild + self._grid_rebuild_start(); self._heavy_log_pending = True + if self._grid_li is not None: # process ONE lane per loop iter -> tick() runs between lanes + self._grid_rebuild_step() + elif self._heavy_log_pending: # grid done -> draw_log (cheap-ish; also one shot) + self._heavy_log_pending = False; self.draw_log() + tnow = time.monotonic() + if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter + self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp + if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update + try: os.remove("/trial") + except Exception: pass + committed = True + # Refresh display ~30x/s, BUT skip if a beat is due within ~35ms (refresh() blocks SPI for that long) + # so the click never gets pushed late by the redraw. Force-refresh after 0.5s anyway so visuals stay live. + if self.dirty and tnow >= self._refreshNext: + safe = True + if self.running and self.lanes: + now_ns = time.monotonic_ns(); next_beat = self.lanes[0]['next'] + for L in self.lanes: + if L['next'] < next_beat: next_beat = L['next'] + safe = (next_beat - now_ns) > 35_000_000 or (tnow - self._lastRefresh) > 0.5 + if safe: + if self.display.refresh(): self.dirty = False + self._lastRefresh = tnow; self._refreshNext = tnow + 0.033 + else: + self._refreshNext = tnow + 0.005 # check again soon; don't wait the full 33ms + time.sleep(0.0005) + except MemoryError: # surface, gc, keep running (don't crash on a fragmented heap) + try: print("MemoryError: gc + continue") except Exception: pass - committed = True - # Refresh at most ~30x/s. display.refresh() BLOCKS while it streams pixels over SPI, which - # would otherwise delay the next beat's MIDI note and make the audio stutter; throttling it - # keeps the click timing tight (the visuals lag a few ms, which is imperceptible). - if self.dirty and tnow >= self._refreshNext: - if self.display.refresh(): self.dirty = False - self._refreshNext = tnow + 0.033 - time.sleep(0.0005) + gc.collect(); time.sleep(0.05) + except Exception as e: # any other transient error: log, continue + try: print("tick error:", e) + except Exception: pass + time.sleep(0.05) App().run()