metronome/pico-explorer/app.py
Me Here 3805c5ee00 PM_X-1 0.0.4 + editor push diagnostics
Layout fixes (user reported BPM/time were still bumping the header at 0.0.3):
- All Y coords below the header divider shifted down 6px: BPM 30->38,
  time 32->38, bar 44->50, train 52->58, setlist tab 66->72, title 82->88.
- GRID_TOP 104 -> 110.

Restored the Kit-style footer practice log:
- LOG_TOP=218, LOG_ROWH=14, LOG_ROWS=6.
- MAXLANES dropped from 6 visible to 4 visible (rowh capped at 26 so the grid
  doesn't run into the log). Tracks with more lanes still play silently.
- _build_scene now appends g_log (with a divider above it).
- draw_log() draws the current-track log into the footer; load() + _log_play()
  + the seam apply path all call it. The Practice-log menu entry is kept for
  the full scrollable history.

Editor diagnostics for the firmware push (the user got chunk-1 ACK then the
device's MIDI badge went gray, meaning chunks 2+ never reached it):
- editor.html + editor-beta.html _pushFirmware() now logs every MIDI output
  + input it sees along with which ones _isDevicePort() matched, plus per-
  chunk send/ACK timing for the first 3 chunks and any failed chunk.
- This narrows down whether the failure is (a) wrong-port routing
  (filter doesn't match the Pimoroni Explorer's name), (b) ACK never arriving
  back to the host, or (c) chunks sent fine but the device's RX buffer is
  dropping them.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 22:40:11 -05:00

1476 lines
75 KiB
Python

# VARASYS PolyMeter - PM_X-1 "Explorer" firmware (CircuitPython edition)
# Pimoroni Explorer (PIM744): RP2350B + 2.8" ST7789V 320x240 + 6 buttons (A/B/C/X/Y/Z) + piezo.
#
# Sibling to PM_K-1 (the 52Pi EP-0172 kit in ../pico-cp/). Same engine, same program-string
# grammar, same programs.json, same web editor, same live-sync protocol. The Explorer build is
# READ-ONLY on the device (no on-device beat editing). All editing happens in the web editor
# with Live sync on; the device reflects DELTAs in real time and emits play/stop/bpm/sel back.
#
# WHY CIRCUITPYTHON: the board mounts as a USB drive (CIRCUITPY) carrying this code + your
# tracks + an offline copy of the editor; edits in the web editor are pushed over USB-MIDI.
# Display is initialized by the official board definition (board.DISPLAY pre-built); we just
# use it. Pinout in ./README.md.
import board, busio, digitalio, pwmio, displayio, vectorio, time, json, gc, os, supervisor
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
APP_VERSION = "0.0.4" # firmware version (the A/B updater pushes/compares this)
DEVICE_ID = "X" # 'X' = Explorer, 'K' = 52Pi kit (per docs/livesync-protocol.md and the version reply)
try:
import rtc # set from the editor's clock SysEx so the log has real timestamps
except ImportError:
rtc = None
try:
import usb_midi # default-enabled on RP2350 - sends a MIDI note per click to the computer
except ImportError:
usb_midi = None
try:
from binascii import a2b_base64 # decode the base64-encoded .mpy pushed by the editor's one-click update
except ImportError:
a2b_base64 = None
# ============================== CONFIG (tweak if needed) ==============================
MIDI_ENABLED = True # send a USB-MIDI note per click (play via the web editor's "Device audio")
MIDI_CHANNEL = 10 # 1..16 - GM channel 10 is the drum channel
MIDI_CLOCK_OUT = False # send 24 PPQN MIDI Clock so a DAW can slave its tempo to the metronome
MIDI_CLOCK_OUT_TRANSPORT = True
MIDI_CLOCK_IN = False # follow an external 24 PPQN clock
MIDI_CLOCK_IN_TRANSPORT = True
MUTE_SPEAKER = False # always silence the on-board piezo
SPEAKER_AUTO_MUTE = False # auto-mute the piezo when a MIDI host is listening. DEFAULT OFF on Explorer:
# Live sync sends a FULL heartbeat every 5s which would silence the piezo otherwise.
# Toggle to Auto in Settings if you ARE using "Device audio" in the editor.
AMP_EN_ACTIVE_HIGH = True # piezo amp enable polarity. If you HEAR sound from the piezo only when click()
# has just timed out (~22ms after a beat), flip this to False - your amp is active-low.
DISPLAY_ROTATION = 270 # 0=native landscape; 90/270 = portrait. Hold the device with A/B/C
# on top: try 270 first; if upside-down try 90; 180 = flipped landscape.
# ----- pins (Pimoroni Explorer board layout) -----
P_AUDIO = board.GP12 # piezo PWM (variable frequency)
P_AMPEN = board.GP13 # piezo amp enable (high = on)
P_BTNA, P_BTNB, P_BTNC = board.GP16, board.GP15, board.GP14 # left-side buttons (top to bottom)
P_BTNX, P_BTNY, P_BTNZ = board.GP17, board.GP18, board.GP19 # right-side buttons (top to bottom)
P_SDA, P_SCL = board.GP20, board.GP21 # QwSTEMMA (unused by the firmware - future expansion)
# Display is initialised by the board definition (8-bit parallel bus). We grab board.DISPLAY +
# call display.rotation = DISPLAY_ROTATION (above) to turn it portrait so the layout has the same
# shape as the PM_K-1 Kit's portrait UI but in 240x320 instead of 320x480.
WIDTH, HEIGHT = 240, 320
GRID_TOP = 110 # top of the pad grid (compact header + meters + title fit above)
MAXLANES = 4 # lanes visible on the pad grid (parser still accepts more; they just play silent)
LOG_TOP, LOG_ROWH, LOG_ROWS = 218, 14, 6 # footer practice log: rows below the grid like the Kit
MIN_LOG_SEC = 5 # don't log plays shorter than this
LOG_MENU_ROWS = 8 # log entries shown in the Practice-log menu screen (longer history view)
# ----- BUILT-IN playlists: same defaults as the Kit so the two firmwares feel identical -----
BUILTIN_SETLISTS = [
("Styles", [
("Four-on-the-floor", "t120;kick:4;snare:4=.x.x;hatClosed:4/2"),
("Swing ride", "t150;ride:4/2s;kick:4=X..x;snare:4=.x.x"),
("Purdie half-time shuffle", "t92;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
("Samba (2/4)", "t104;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
("Nanigo (6/8 bembe)", "t130;cowbell:4/3=X.xx.x.xx.x.;kick:4/3=X.....X.....;hatClosed:4/3=..x..x..x..x"),
("6/8 groove", "t100;kick:3+3=x..x..;snare:3+3=...x..;hatClosed:3+3/2"),
("7/8 (2+2+3)", "t130;kick:2+2+3=x..x..x;hatClosed:2+2+3/2"),
("5/4 (3+2)", "t112;kick:3+2=x..x.;snare:3+2=..x..;hatClosed:3+2/2"),
]),
("Practice", [
("5 over 4 polyrhythm", "t100;kick:4;claves:5~"),
("3 over 2 hemiola", "t96;woodblock:2;cowbell:3~"),
("2 & 4 & 3 over one bar", "t100;kick:3;cowbell:2~;claves:4~"),
("Triplet hats", "t100;kick:4;snare:4=.x.x;hatClosed:4/3"),
("Tempo builder 80 up", "t80;woodblock:4;rmp80/4/4"),
("Gap trainer (play 2 / rest 2)", "t100;kick:4;hatClosed:4/2;tr2/2"),
]),
("Song (continuous)", [
("Intro - hats & kick", "t88;b4;kick:4=X.x.;hatClosed:4/2=gggggggg"),
("Groove in - backbeat", "t88;b4;kick:4=X.x.;snare:4=.X.X;hatClosed:4/2"),
("Half-time shuffle", "t92;b4;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
("Build - ramp 92-120", "t92;b4;rmp92/4/2;kick:4;snare:4=.X.X;hatClosed:4/2"),
("Four-on-the-floor (909)", "t124;b4;kick909:4;clap909:4=.X.X;hat909:4/2=.X.X.X.X"),
("Samba break (2/4)", "t116;b4;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
("Peak - 16ths", "t132;b4;kick:4=X..x;snare:4=.X.X;hatClosed:4/4"),
("Outro - ramp down", "t132;b4;rmp132/-7/1;kick:4=X..x;hatClosed:4/2=gggggggg"),
]),
]
# ============================== COLOURS ==============================
C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494
C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240
C_BTN = 0x1C222C
C_RUN_IDLE = 0x2FE07A # run-state dot (green when stopped, red when playing, bright on each beat)
C_RUN_GO = 0xFF5A5A
C_RUN_PULSE = 0xFFEC78
SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38,
"clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42,
"hatOpen":46,"openHat808":46, "ride":51,"ride909":51, "crash":49,"crash909":49,
"tomLow":41,"tom808":45,"tomMid":45,"tomHigh":48, "tambourine":54,
"cowbell":56,"cowbell808":56, "woodblock":76,"jamblock":76, "claves":75, "beep":37}
GM_DEFAULT = 37
HELP_PAGES = (
("Transport & Navigation", (
"Hold portrait with A/B/C on top.",
"A: play / stop",
"B: tap tempo",
"C: menu (this)",
"X: prev track (hold to repeat)",
"Z: next track (hold to repeat)",
"Y: tempo -1 (-5 after 1.5s held)",
"X+Z chord: tempo +1",
)),
("Menu navigation", (
"X / Z: move cursor up / down",
"Y: decrement the focused value",
"A: cycle / increment / select",
"B: back (cancel)",
"C: close the menu",
)),
("Editing & sync", (
"Edit on the web at metronome.varasys.io",
"Click 'Live sync' to mirror live",
"Beat patterns are read-only on device",
"Tracks + tempo + transport sync both ways",
"Built-in playlists baked, user lists",
" live in /programs.json",
)),
("Status & Hardware", (
"MIDI badge green: laptop listening",
"USB badge cyan: connected to a computer",
"Run dot: green=stop / red=play + pulse",
"Squares = main beats, circles = subs",
"Ramp arrow: track has a tempo ramp",
"Gap symbol: silent rest bars",
)),
)
MIDI_VEL = {2: 120, 1: 90, 3: 45} # accent / normal / ghost
PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost
PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost
C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes)
# ============================== ANTI-ALIASED FONTS (binary blobs on the drive; see ../pico/gen_font.py) ==============================
def load_font(path):
with open(path, "rb") as f:
blob = f.read()
count = blob[0]; p = 1; pixoff = 1 + count * 7; glyphs = {}
for _ in range(count):
cp = (blob[p] << 8) | blob[p+1]; w = blob[p+2]; h = blob[p+3]
xoff = blob[p+4]; xoff = xoff - 256 if xoff > 127 else xoff
top = blob[p+5]; adv = blob[p+6]; p += 7
glyphs[cp] = (w, h, xoff, top, adv, pixoff); pixoff += (w * h + 1) // 2
return (glyphs, blob)
FONT_S = load_font("/font_s.bin") # small - pad-grid lane labels + meter rows
FONT_M = load_font("/font_m.bin") # labels / buttons
FONT_L = load_font("/font_l.bin") # big BPM
gc.collect()
def _blend(bg, fg, i):
t = i * 17
r = (((bg >> 16) & 0xFF)*(255-t) + ((fg >> 16) & 0xFF)*t) // 255
g = (((bg >> 8) & 0xFF)*(255-t) + ((fg >> 8) & 0xFF)*t) // 255
b = ((bg & 0xFF)*(255-t) + (fg & 0xFF)*t) // 255
return (r << 16) | (g << 8) | b
def make_text(s, font, fg, bg):
"""Render a string into a displayio TileGrid (anti-aliased via a 16-step blend palette)."""
glyphs, blob = font
w = 0; top0 = 999; bot = 0
for c in s:
g = glyphs.get(ord(c))
if not g: continue
w += g[4]
if g[1]:
if g[3] < top0: top0 = g[3]
if g[3] + g[1] > bot: bot = g[3] + g[1]
if top0 == 999: top0 = 0
w = max(1, w); h = max(1, bot - top0)
gc.collect()
bmp = displayio.Bitmap(w, h, 16)
pal = displayio.Palette(16)
for i in range(16): pal[i] = _blend(bg, fg, i)
pen = 0
for c in s:
g = glyphs.get(ord(c))
if not g: continue
gw, gh, xoff, gtop, adv, off = g
for j in range(gh):
row = (gtop - top0) + j
for i in range(gw):
k = j * gw + i
byte = blob[off + (k >> 1)]
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
if nib:
x = pen + xoff + i
if 0 <= x < w and 0 <= row < h: bmp[x, row] = nib
pen += adv
return displayio.TileGrid(bmp, pixel_shader=pal), w, h
def load_alpha(path):
try:
with open(path, "rb") as f: blob = f.read()
return (blob[0], blob[1], blob)
except Exception:
return None
def make_glyph(asset, fg, bg):
w, h, blob = asset
gc.collect()
bmp = displayio.Bitmap(w, h, 16); pal = displayio.Palette(16)
for i in range(16): pal[i] = _blend(bg, fg, i)
for k in range(w * h):
byte = blob[2 + (k >> 1)]
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
if nib: bmp[k % w, k // w] = nib
return displayio.TileGrid(bmp, pixel_shader=pal), pal, w, h
def _recolor(pal, fg, bg):
for i in range(16): pal[i] = _blend(bg, fg, i)
LOGO = load_alpha("/logo.bin")
ICON_MIDI = load_alpha("/midi.bin")
ICON_USB = load_alpha("/usb.bin")
gc.collect()
# ============================== POLYMETER ENGINE ==============================
PAT = {'X': 2, 'x': 1, 'g': 3, '.': 0, '-': 0, '_': 0}
PRIO = {2: 3, 1: 2, 3: 1}
def parse_program(s):
bpm = 120; lanes = []; bars = 0; ramp = None; trainer = None
for tok in s.strip().split(';'):
tok = tok.strip()
if not tok: continue
if tok[0] == 't' and tok[1:].isdigit():
bpm = int(tok[1:]); continue
if tok[0] == 'b' and tok[1:].isdigit():
bars = int(tok[1:]); continue
if tok.startswith('rmp'):
p = tok[3:].split('/')
if len(p) == 3:
try: ramp = {'start': int(p[0]), 'amt': int(p[1]), 'every': max(1, int(p[2]))}
except ValueError: pass
continue
if tok.startswith('tr') and '/' in tok and ':' not in tok:
p = tok[2:].split('/')
if len(p) == 2:
try: trainer = {'play': max(0, int(p[0])), 'mute': max(0, int(p[1]))}
except ValueError: pass
continue
if ':' not in tok: continue
lane = _parse_lane(tok)
if lane: lanes.append(lane)
if not lanes: lanes = [_parse_lane("beep:4")]
return max(5, min(300, bpm)), lanes, bars, ramp, trainer
def _parse_lane(tok):
poly = '~' in tok; mute = '!' in tok
tok = tok.replace('~', '').replace('!', '')
gain = ''
if '@' in tok: tok, _, g = tok.partition('@'); gain = '@' + g
sound, _, rest = tok.partition(':')
pattern = None
if '=' in rest: rest, _, pattern = rest.partition('=')
sub = 1; swing = False
if '/' in rest:
rest, _, sd = rest.partition('/')
swing = sd.endswith('s'); sd = sd.rstrip('s')
sub = int(sd) if sd.isdigit() else 1
groups = [int(g) for g in rest.split('+') if g.isdigit()] or [4]
beats = sum(groups); starts = set(); acc = 0
for gp in groups: starts.add(acc); acc += gp
steps = beats * sub
if pattern:
levels = [PAT.get(ch, 0) for ch in pattern]
if len(levels) < steps: levels += [0] * (steps - len(levels))
steps = len(levels)
else:
levels = []
for i in range(steps):
if i % sub == 0: levels.append(2 if (i // sub) in starts else 1)
else: levels.append(0)
return {'sound': sound, 'sub': sub, 'swing': swing, 'steps': steps, 'levels': levels,
'poly': poly, 'mute': mute, 'groups': groups, 'gain': gain}
PAT_CH = {2: 'X', 1: 'x', 3: 'g', 0: '.'}
def lane_to_str(L):
s = L['sound'] + ':' + '+'.join(str(g) for g in L.get('groups', [4]))
if L['sub'] != 1 or L['swing']: s += '/' + str(L['sub']) + ('s' if L['swing'] else '')
s += '=' + ''.join(PAT_CH.get(v, '.') for v in L['levels'])
s += L.get('gain', '')
if L['poly']: s += '~'
if L['mute']: s += '!'
return s
_ALNUM = "abcdefghijklmnopqrstuvwxyz0123456789"
def _slkey(t):
return "".join(c for c in t.lower() if c in _ALNUM)
def load_user_setlists():
try:
with open("/programs.json") as f: d = json.load(f)
except Exception as e:
print("programs.json:", e); return []
def items_of(pl): return [(p.get("name", "?"), p.get("prog", "")) for p in pl if p.get("prog")]
out = []
try:
if isinstance(d.get("setlists"), list):
for sl in d["setlists"]:
it = items_of(sl.get("programs", []))
if it: out.append((sl.get("title", "My set list"), it))
elif isinstance(d.get("programs"), list):
it = items_of(d["programs"])
if it: out.append((d.get("title", "My set list"), it))
except Exception as e:
print("setlists:", e)
return out
def solid(color):
p = displayio.Palette(1); p[0] = color; return p
def rect(x, y, w, h, color):
return vectorio.Rectangle(pixel_shader=solid(color), width=w, height=h, x=x, y=y)
# ============================== APP ==============================
class App:
def __init__(self):
self.display = board.DISPLAY # board.c built the BusDisplay; we just use it
try: self.display.rotation = DISPLAY_ROTATION # turn portrait (240x320) - same shape as the Kit's UI
except Exception: pass
try: self.display.auto_refresh = False # we manage refresh in run() (predictive skip + ~20Hz throttle)
except Exception: pass
self.i2c = busio.I2C(scl=P_SCL, sda=P_SDA, frequency=400_000) # QwSTEMMA - unused by the firmware, available to user code
self.midi = usb_midi.ports[1] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 1) else None
self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None
self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0
self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler
self._fw = None; self._fw_n = 0 # chunked firmware transfer state
self.spk = pwmio.PWMOut(P_AUDIO, frequency=1600, variable_frequency=True, duty_cycle=0)
self.amp_en = digitalio.DigitalInOut(P_AMPEN); self.amp_en.direction = digitalio.Direction.OUTPUT
self._amp(False) # amp off when no audio playing (saves power, kills hum)
self.spk_off = 0
# buttons - all active-low with internal pull-ups
self.btnA = self._btn(P_BTNA); self.btnB = self._btn(P_BTNB); self.btnC = self._btn(P_BTNC)
self.btnX = self._btn(P_BTNX); self.btnY = self._btn(P_BTNY); self.btnZ = self._btn(P_BTNZ)
self._prev = {'A': True, 'B': True, 'C': True, 'X': True, 'Y': True, 'Z': True}
self._held_t = {'X': 0, 'Y': 0, 'Z': 0} # press start time (monotonic_ns) for hold-repeat
self._next_rep = {'X': 0, 'Y': 0, 'Z': 0} # next "auto repeat" deadline for held buttons
self._chord_xz = 0 # 0 = not in chord; else monotonic_ns of the chord start
self.running = False; self.bpm = 120; self.idx = 0; self.lanes = []; self.bars = 0
self.ramp = None; self.trainer = None; self._lastbar = -1; self._muted = False; self._ramp_base = 120
self._overlay = None # menu stack: None / 'menu' / 'settings' / 'help' / 'about' / 'log' / 'msg'
self._modal_cursor = 0 # focused row in the current modal
self._modal_rows = [] # tuples (label, value_str_or_None, action) for current modal
self.continue_on = False; self._advance = False
self._next_pending = None; self._seam_t = 0; self._need_redraw = False
self._heavy_redraw_at = 0
self._grid_li = None; self._grid_n = 0; self._grid_geo = (0, 0, 0, 0)
self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = []
self._heavy_log_pending = False
self._beat_ns = 60_000_000_000 // self.bpm
self._note_buf = bytearray([0x90, 0, 0])
self._clock_byte = bytes([0xF8])
self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC])
self._lastRefresh = 0.0
try:
o = os.urandom(4); self._sync_origin = "d" + "".join("%02x" % b for b in o)
except Exception:
self._sync_origin = "d%08x" % (time.monotonic_ns() & 0xFFFFFFFF)
self._sync_armed = False; self._sync_seq = 0; self._sync_applying = False
self._sync_heartbeat_next = 0.0
self._displayed_bpm = -1; self._clock_next = 0
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False
self.sl = 0; self.rebuild_setlists()
self.dirty = True
self.pad_pal = displayio.Palette(8)
for i in range(4): self.pad_pal[i] = PAD_DIM[i]; self.pad_pal[i + 4] = PAD_LIT[i]
self.lane_pads = []; self.lane_lit = []
self.usb_conn = False; self._m_steps = 0
self._uiNext = 0.0; self._lastTs = None; self._lastBs = None
self._seg_start = 0.0
self._refreshNext = 0.0
self.ic_midi_pal = None; self.ic_usb_pal = None
# practice log
self.can_write = self._probe_write()
self._load_settings()
self.log = self._load_log()
self.play_start = None; self.play_bpm = 0; self.play_name = ""
self._log_scroll = 0
self._build_scene()
self.load(0)
self.draw_icons(); self.draw_meters(); self._set_run_dot()
def _btn(self, pin):
d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP
return d
# ---------- scene graph (240x320 portrait; same shape as the Kit's UI, shorter) ----------
def _build_scene(self):
root = displayio.Group(); self.display.root_group = root
root.append(rect(0, 0, WIDTH, HEIGHT, C_BG))
# Header (y 0..28): VARASYS logo + version + (right edge) MIDI/USB badges + run dot
if LOGO:
tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 8; tg.y = 8; root.append(tg)
lx = 8 + lw
else:
tg, w, h = make_text("VARASYS", FONT_M, C_CYAN, C_BG); tg.x = 8; tg.y = 8; root.append(tg)
lx = 8 + w
vtg, vw, vh = make_text("v" + APP_VERSION, FONT_S, C_DIM, C_BG); vtg.x = lx + 4; vtg.y = 10; root.append(vtg)
# Run dot + MIDI/USB icons packed at the right edge (replaces the Kit's hamburger; C button opens the menu)
self.run_dot_pal = displayio.Palette(1); self.run_dot_pal[0] = C_RUN_IDLE
self.run_dot = vectorio.Circle(pixel_shader=self.run_dot_pal, radius=4, x=WIDTH - 12, y=14)
root.append(self.run_dot)
x = WIDTH - 22 # icons live to the LEFT of the run dot
for asset, attr in ((ICON_USB, "ic_usb_pal"), (ICON_MIDI, "ic_midi_pal")):
if asset:
tg, pal, w, h = make_glyph(asset, C_DIM, C_BG); x -= w; tg.x = x; tg.y = 8; x -= 6
root.append(tg); setattr(self, attr, pal)
root.append(rect(0, 28, WIDTH, 1, C_PANEL)) # header divider
# Dynamic groups - layout order matches the Kit (BPM big at top-right + meters left;
# then ramp/trainer indicators; then setlist tab + CONT; then track title; then pad grid).
self.g_bpm = displayio.Group(); root.append(self.g_bpm) # big BPM (right, y ~44)
self.g_time = displayio.Group(); root.append(self.g_time) # elapsed [of total] (left, y ~50)
self.g_bar = displayio.Group(); root.append(self.g_bar) # bar [of total] (left, y ~78)
self.g_train = displayio.Group(); root.append(self.g_train) # ramp / gap-trainer indicators (y ~100)
self.g_idx = displayio.Group(); root.append(self.g_idx) # set-list tab (y ~118)
self.g_cont = displayio.Group(); root.append(self.g_cont) # CONT (auto-advance) toggle (y ~118)
self.g_name = displayio.Group(); root.append(self.g_name) # track title (y ~134)
self.g_grid = displayio.Group(); root.append(self.g_grid) # lanes x step pads (y >= GRID_TOP)
root.append(rect(0, LOG_TOP - 4, WIDTH, 1, C_PANEL)) # divider above the footer practice log
self.g_log = displayio.Group(); root.append(self.g_log) # practice history (Kit-style footer)
self.g_overlay = displayio.Group(); root.append(self.g_overlay) # modals (drawn on top)
def _place(self, group, s, x, y, fg, bg, font, right_edge=None):
while len(group): group.pop()
self.dirty = True
if not s: return
tg, w, h = make_text(s, font, fg, bg)
tg.x = (right_edge - w) if right_edge is not None else x; tg.y = y; group.append(tg)
# ---------- program ----------
def rebuild_setlists(self):
self.setlists = [{'title': t, 'items': it, 'builtin': True} for t, it in BUILTIN_SETLISTS]
seen = set(_slkey(t) for t, _ in BUILTIN_SETLISTS)
for t, it in load_user_setlists():
if _slkey(t) in seen: continue
seen.add(_slkey(t)); self.setlists.append({'title': t, 'items': it, 'builtin': False})
if self.sl >= len(self.setlists): self.sl = 0
def switch_setlist(self, delta=1):
if len(self.setlists) < 2: return
if self._sync_applying: return
was = self.running
if was: self.running = False; self._log_play()
self.sl = (self.sl + delta) % len(self.setlists)
self.load(0)
if was: self.running = True; self._reset_clock(); self._start_play()
self._set_run_dot(); self.draw_meters()
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
def load(self, i):
items = self.setlists[self.sl]['items']
self.idx = i % len(items)
self.name, prog = items[self.idx]
self.bpm, self.lanes, self.bars, self.ramp, self.trainer = parse_program(prog)
self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all()
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
self._overlay = None
self._next_pending = None; self._need_redraw = False
self._heavy_redraw_at = 0; self._heavy_log_pending = False; self._grid_li = None
while len(self.g_overlay): self.g_overlay.pop()
self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train()
self.build_grid(); self.draw_log()
def _prog_str(self):
parts = ['t' + str(self.bpm)]
if self.bars: parts.append('b' + str(self.bars))
if self.ramp: parts.append('rmp%d/%d/%d' % (self.ramp.get('start', self.bpm), self.ramp['amt'], self.ramp['every']))
if self.trainer: parts.append('tr%d/%d' % (self.trainer['play'], self.trainer['mute']))
for L in self.lanes: parts.append(lane_to_str(L))
return ';'.join(parts)
def toggle_continue(self):
self.continue_on = not self.continue_on; self.draw_status()
# ---------- modal: 4-screen menu navigated by buttons (Settings / Help / About / Practice log) ----------
def _show_msg(self, text):
self._overlay = 'msg'; g = self.g_overlay
while len(g): g.pop()
px, py, pw, ph = 24, 90, WIDTH - 48, 60
g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_AMBER))
t, w, h = make_text(text[:32], FONT_S, C_TXT, C_PANEL); t.x = px + 12; t.y = py + 12; g.append(t)
t2, w2, h2 = make_text("(A/C to dismiss)", FONT_S, C_DIM, C_PANEL); t2.x = px + 12; t2.y = py + 34; g.append(t2)
self.dirty = True
def _close_overlay(self):
self._overlay = None; self._modal_cursor = 0; self._modal_rows = []
while len(self.g_overlay): self.g_overlay.pop()
self.dirty = True
def _show_menu(self):
gc.collect()
self._overlay = 'menu'; self._modal_cursor = 0; self._draw_menu()
def _draw_menu(self):
g = self.g_overlay
while len(g): g.pop()
PX, PY, PW, RH = 18, 32, WIDTH - 36, 22
rows = (
("Continue: " + ("on" if self.continue_on else "off"), None, self._menu_toggle_continue),
("Settings >", None, self._show_settings),
("Practice log >", None, self._show_log),
("Help >", None, self._show_help),
("About", None, self._show_about),
)
self._modal_rows = rows
PH = 24 + len(rows) * RH + 18
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text("Menu", FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 6; g.append(t)
for i, (label, _v, _act) in enumerate(rows):
yy = PY + 26 + i * RH
sel = (i == self._modal_cursor)
if sel:
g.append(rect(PX + 6, yy, 3, RH - 4, C_CYAN)) # left-edge caret
g.append(rect(PX + 12, yy, PW - 24, RH - 4, C_BTN))
col = C_CYAN if (sel and label.startswith("Continue") and self.continue_on) else (C_TXT if sel else C_MUTE)
lt, lw, lh = make_text(label, FONT_S, col, C_BTN if sel else C_PANEL)
lt.x = PX + 18; lt.y = yy + 4; g.append(lt)
self.dirty = True
def _menu_toggle_continue(self):
self.continue_on = not self.continue_on; self.draw_status(); self._draw_menu()
# ---------- Settings sub-modal (Speaker / MIDI Out / Channel / Clock Out / Clock In) ----------
def _show_settings(self):
gc.collect()
self._overlay = 'settings'; self._modal_cursor = 0; self._draw_settings()
def _draw_settings(self):
g = self.g_overlay
while len(g): g.pop()
PX, PY, PW, RH = 10, 28, WIDTH - 20, 22
sm = "Off" if MUTE_SPEAKER else ("Auto" if SPEAKER_AUTO_MUTE else "Always")
rows = (
("Speaker", sm, self._adj_speaker),
("MIDI Out", "on" if MIDI_ENABLED else "off", self._adj_midi_out),
("Channel", str(MIDI_CHANNEL), self._adj_midi_ch),
("Clock Out", "on" if MIDI_CLOCK_OUT else "off", self._adj_clock_out),
("Clock In", "on" if MIDI_CLOCK_IN else "off", self._adj_clock_in),
)
self._modal_rows = rows
PH = 24 + len(rows) * RH + 14
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text("Settings", FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 5; g.append(t)
for i, (label, value, _adj) in enumerate(rows):
yy = PY + 26 + i * RH
sel = (i == self._modal_cursor)
if sel:
g.append(rect(PX + 4, yy, 3, RH - 4, C_CYAN))
g.append(rect(PX + 10, yy, PW - 20, RH - 4, C_BTN))
lt, lw, lh = make_text(label, FONT_S, C_TXT if sel else C_MUTE, C_BTN if sel else C_PANEL)
lt.x = PX + 16; lt.y = yy + 5; g.append(lt)
vt, vw, vh = make_text(value, FONT_M, C_TXT if sel else C_MUTE, C_BTN if sel else C_PANEL)
vt.x = PX + PW - vw - 14; vt.y = yy + 3; g.append(vt)
self.dirty = True
def _adj_speaker(self, d):
global MUTE_SPEAKER, SPEAKER_AUTO_MUTE
modes = ("auto", "always", "off")
cur = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always")
i = (modes.index(cur) + d) % 3
MUTE_SPEAKER = (modes[i] == "off"); SPEAKER_AUTO_MUTE = (modes[i] == "auto")
if MUTE_SPEAKER: self.spk.duty_cycle = 0; self._amp(False)
self._save_settings(); self._draw_settings()
def _adj_midi_out(self, d):
global MIDI_ENABLED
MIDI_ENABLED = not MIDI_ENABLED; self._save_settings(); self._draw_settings()
def _adj_midi_ch(self, d):
global MIDI_CHANNEL
MIDI_CHANNEL = ((MIDI_CHANNEL - 1 + d) % 16) + 1
self._save_settings(); self._draw_settings()
def _adj_clock_out(self, d):
global MIDI_CLOCK_OUT
MIDI_CLOCK_OUT = not MIDI_CLOCK_OUT
if MIDI_CLOCK_OUT: self._clock_next = time.monotonic_ns()
self._save_settings(); self._draw_settings()
def _adj_clock_in(self, d):
global MIDI_CLOCK_IN
MIDI_CLOCK_IN = not MIDI_CLOCK_IN
if not MIDI_CLOCK_IN: self._slaved = False
self._save_settings(); self._draw_settings()
# ---------- Help sub-modal (paginated; cursor not used, X/Z page through) ----------
def _show_help(self):
gc.collect()
self._overlay = 'help'; self._help_page = 0; self._modal_cursor = 0; self._draw_help()
def _draw_help(self):
g = self.g_overlay
while len(g): g.pop()
PX, PY, PW = 10, 26, WIDTH - 20
title, lines = HELP_PAGES[self._help_page]
PH = 22 + 13 * len(lines) + 18
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text(title, FONT_M, C_TXT, C_PANEL); t.x = PX + 10; t.y = PY + 5; g.append(t)
pi, piw, pih = make_text("%d / %d" % (self._help_page + 1, len(HELP_PAGES)), FONT_S, C_DIM, C_PANEL)
pi.x = PX + PW - piw - 10; pi.y = PY + 9; g.append(pi)
yy = PY + 24
for ln in lines:
lt, lw, lh = make_text(ln[:44], FONT_S, C_TXT, C_PANEL); lt.x = PX + 10; lt.y = yy; g.append(lt)
yy += 14
hint, hw, hh = make_text("X / Z = page, C = close", FONT_S, C_DIM, C_PANEL)
hint.x = PX + 10; hint.y = PY + PH - 14; g.append(hint)
self.dirty = True
# ---------- About sub-modal ----------
def _show_about(self):
gc.collect()
self._overlay = 'about'; self._modal_cursor = 0; self._draw_about()
def _draw_about(self):
import sys
gc.collect()
try: free = gc.mem_free()
except Exception: free = 0
try: cp_ver = "%d.%d.%d" % sys.implementation.version[:3]
except Exception: cp_ver = "?"
up_min = int(time.monotonic()) // 60
lines = (
("VARASYS PolyMeter", C_CYAN),
("PM_X-1 Explorer", C_TXT),
("", None),
("Firmware: v" + APP_VERSION, C_TXT),
("Free RAM: %d KB" % (free // 1024), C_TXT),
("Uptime: %dm" % up_min, C_TXT),
("CircuitPython: " + cp_ver, C_TXT),
("", None),
("metronome.varasys.io", C_DIM),
)
g = self.g_overlay
while len(g): g.pop()
PX, PY, PW = 20, 26, WIDTH - 40; PH = 10 + 14 * len(lines) + 20
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
yy = PY + 8
for text, col in lines:
if col is not None:
lt, lw, lh = make_text(text, FONT_S, col, C_PANEL); lt.x = PX + 12; lt.y = yy; g.append(lt)
yy += 14
hint, hw, hh = make_text("C = close", FONT_S, C_DIM, C_PANEL)
hint.x = PX + 14; hint.y = PY + PH - 14; g.append(hint)
self.dirty = True
# ---------- Practice log sub-modal (replaces the Kit's screen-footer log) ----------
def _show_log(self):
gc.collect()
self._overlay = 'log'; self._log_scroll = 0; self._draw_log_modal()
def _draw_log_modal(self):
g = self.g_overlay
while len(g): g.pop()
PX, PY, PW = 6, 26, WIDTH - 12; PH = 12 + LOG_MENU_ROWS * 14 + 22
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text("Practice log", FONT_M, C_TXT, C_PANEL); t.x = PX + 10; t.y = PY + 4; g.append(t)
rows = [(i, e) for i, e in enumerate(self.log) if e.get("name") == self.name]
if not rows:
tg, w, h = make_text("no plays over 5s yet", FONT_S, C_DIM, C_PANEL); tg.x = PX + 12; tg.y = PY + 28; g.append(tg)
else:
top = self._log_scroll; yy = PY + 24
for k in range(min(LOG_MENU_ROWS, len(rows) - top)):
_oi, e = rows[top + k]
dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60)
bars = e.get("bars", 0); bstr = (" %dbar" % bars) if bars else ""
line = "%s %3dbpm %s%s" % (e.get("t", "--:--"), e["bpm"], dur, bstr)
lt, lw, lh = make_text(line, FONT_S, C_TXT, C_PANEL); lt.x = PX + 12; lt.y = yy; g.append(lt)
yy += 14
hint, hw, hh = make_text("X/Z scroll, C close", FONT_S, C_DIM, C_PANEL)
hint.x = PX + 10; hint.y = PY + PH - 14; g.append(hint)
self.dirty = True
# ---------- Settings persistence ----------
def _load_settings(self):
global MUTE_SPEAKER, SPEAKER_AUTO_MUTE, MIDI_ENABLED, MIDI_CHANNEL, MIDI_CLOCK_OUT, MIDI_CLOCK_IN
try:
with open("/settings.json") as f: d = json.load(f)
except Exception: return
try:
sm = d.get("speaker", "auto")
MUTE_SPEAKER = (sm == "off"); SPEAKER_AUTO_MUTE = (sm == "auto")
MIDI_ENABLED = bool(d.get("midi_out", MIDI_ENABLED))
MIDI_CHANNEL = max(1, min(16, int(d.get("midi_channel", MIDI_CHANNEL))))
MIDI_CLOCK_OUT = bool(d.get("clock_out", MIDI_CLOCK_OUT))
MIDI_CLOCK_IN = bool(d.get("clock_in", MIDI_CLOCK_IN))
except Exception as e: print("settings:", e)
def _save_settings(self):
if not self.can_write: return
sm = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always")
d = {"speaker": sm, "midi_out": MIDI_ENABLED, "midi_channel": MIDI_CHANNEL,
"clock_out": MIDI_CLOCK_OUT, "clock_in": MIDI_CLOCK_IN}
try:
with open("/settings.json", "w") as f: json.dump(d, f)
except OSError: self.can_write = False
# ---------- step grids (cached per-lane ns durations: tuple lookup, no method call in tick) ----------
def _rebuild_dur(self, L):
beat = self._beat_ns
sub = max(1, L['sub']); steps = max(1, L['steps'])
if L.get('poly') and self.lanes:
m = self.lanes[0]; master_bar = beat * (m['steps'] // max(1, m['sub']))
d = master_bar // steps; L['durs'] = tuple(d for _ in range(steps))
elif L.get('swing') and sub % 2 == 0:
pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3
L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps))
else:
d = beat // sub; L['durs'] = tuple(d for _ in range(steps))
def _rebuild_dur_all(self):
for L in self.lanes: self._rebuild_dur(L)
def _reset_clock(self):
now = time.monotonic_ns()
for L in self.lanes:
L['next'] = now; L['step'] = -1
self._m_steps = 0
self._seg_start = time.monotonic()
# ---------- audio + run-state indicator ----------
def _amp(self, on): # respect AMP_EN_ACTIVE_HIGH (flip in CONFIG if your amp is active-low)
self.amp_en.value = on if AMP_EN_ACTIVE_HIGH else not on
def click(self, level):
self._amp(True) # enable the amp briefly while we drive the piezo
self.spk.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600)
self.spk.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000)
self.spk_off = time.monotonic_ns() + 22_000_000 # silence + amp off scheduled in tick()
def _set_run_dot(self):
self.run_dot_pal[0] = C_RUN_GO if self.running else C_RUN_IDLE
self.dirty = True
def flash(self, level): # brief bright pulse on the run dot (replaces the Kit's RGB LED)
self.run_dot_pal[0] = C_RUN_PULSE
self.dirty = True
# ---------- Live sync (HELLO/FULL/DELTA/BYE on SysEx 0x40-0x43; see src/livesync.js for the editor side) ----------
def _sync_send(self, op, text):
if self.midi is None: return
b = bytearray((0xF0, 0x7D, op))
for c in text:
v = ord(c); b.append(v if v < 0x80 else 0x3F)
b.append(0xF7)
try: self.midi.write(b)
except Exception: pass
def _sync_broadcast(self, evt):
if not self._sync_armed or self._sync_applying or self.midi is None: return
text = "%s;%d;%s" % (self._sync_origin, self._sync_seq, evt); self._sync_seq += 1
self._sync_send(0x42, text)
def _sync_broadcast_full(self):
if not self._sync_armed or self.midi is None: return
try: patch = self._prog_str()
except Exception: return
text = "%s;%d;%d;%d;%d;%s" % (self._sync_origin, self._sync_seq,
1 if self.running else 0, self.sl, self.idx, patch)
self._sync_seq += 1
self._sync_send(0x41, text)
self._sync_heartbeat_next = time.monotonic() + 5.0
def _sync_apply_full(self, running, patch):
self._sync_applying = True
try:
try:
gc.collect()
try: cur = self._prog_str()
except Exception: cur = None
if patch and patch != cur:
bpm, lanes, bars, ramp, trainer = parse_program(patch)
self.bpm = bpm; self.lanes = lanes; self.bars = bars; self.ramp = ramp; self.trainer = trainer
self._beat_ns = 60_000_000_000 // max(1, bpm); self._rebuild_dur_all()
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
self._reset_clock()
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
self.build_grid(); self.draw_log()
if running and not self.running: self.toggle()
elif (not running) and self.running: self.toggle()
except Exception as e:
try: print("sync FULL apply:", e)
except Exception: pass
finally:
self._sync_applying = False
def _sync_apply_delta(self, evt):
self._sync_applying = True
try:
eq = evt.find('=')
key = evt if eq < 0 else evt[:eq]
val = '' if eq < 0 else evt[eq+1:]
if key == 'play':
if not self.running: self.toggle()
elif key == 'stop':
if self.running: self.toggle()
elif key == 'bpm':
try: self.set_bpm(int(val))
except Exception: pass
elif key == 'sel':
p = val.split('/')
if len(p) == 2:
try:
sl = int(p[0]); item = int(p[1])
if sl >= 0 and item >= 0:
if sl < len(self.setlists) and sl != self.sl: self.sl = sl
items = self.setlists[self.sl]['items']
if 0 <= item < len(items) and item != self.idx: self.goto(item)
except Exception: pass
elif key == 'beat': # PM_X-1 doesn't EMIT beat= (no on-device editing) but DOES apply
p = val.split('/')
if len(p) == 3:
try:
li = int(p[0]); s = int(p[1]); lvl = int(p[2])
if 0 <= li < len(self.lanes):
L = self.lanes[li]
if 0 <= s < len(L['levels']):
L['levels'][s] = lvl & 3
if li < len(self.lane_pads) and s < len(self.lane_pads[li]):
lit = (self.lane_lit[li] == s)
self.lane_pads[li][s].color_index = self._padbase(L, s) + (4 if lit else 0)
self.dirty = True
except Exception: pass
elif key == 'lane': # apply but don't emit
p = val.split('/')
if len(p) >= 3:
try:
li = int(p[0]); field = p[1]; v = '/'.join(p[2:])
if 0 <= li < len(self.lanes):
L = self.lanes[li]; structural = False
if field == 'sound': L['sound'] = v
elif field == 'groups':
try: L['groups'] = [int(x) for x in v.split('+')]; structural = True
except Exception: pass
elif field == 'sub':
try: L['sub'] = int(v); structural = True
except Exception: pass
elif field == 'swing': L['swing'] = (v == '1'); structural = True
elif field == 'enabled': L['mute'] = not (v == '1')
elif field == 'gain':
try: L['gain'] = int(v)
except Exception: pass
elif field == 'poly': L['poly'] = (v == '1'); structural = True
if structural: self._regen_levels(L)
if li == 0 and structural: self._rebuild_dur_all()
else: self._rebuild_dur(L)
if structural: self.build_grid()
self.dirty = True
except Exception: pass
finally:
self._sync_applying = False
def _regen_levels(self, L): # called on remote lane= deltas to recompute default accents
sub = L['sub']; groups = L['groups']; starts = set(); acc = 0
for gp in groups: starts.add(acc); acc += gp
L['steps'] = sum(groups) * sub
L['levels'] = [(2 if (i // sub) in starts else 1) if i % sub == 0 else 0 for i in range(L['steps'])]
def midi_send(self, note, vel):
if self.midi is None: return
b = self._note_buf
b[0] = 0x90 | ((MIDI_CHANNEL - 1) & 0x0F)
b[1] = note & 0x7F; b[2] = vel & 0x7F
try: self.midi.write(b)
except Exception: pass
# ---------- transport ----------
def toggle(self):
self.running = not self.running
if self.running:
self._reset_clock(); self._start_play()
self._clock_next = time.monotonic_ns()
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(self._start_byte)
except Exception: pass
else:
self.spk.duty_cycle = 0; self._amp(False); self.reset_playheads(); self._log_play()
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(self._stop_byte)
except Exception: pass
self._set_run_dot(); self.draw_meters()
self._sync_broadcast("play" if self.running else "stop")
def set_bpm(self, v):
v = max(5, min(300, v))
if v != self.bpm:
self.bpm = v; self._beat_ns = 60_000_000_000 // v
self._rebuild_dur_all()
self._sync_broadcast("bpm=%d" % v)
def goto(self, i):
was = self.running
if was: self.running = False; self._log_play()
self.load(i)
if was: self.running = True; self._reset_clock(); self._start_play()
self._set_run_dot(); self.draw_meters()
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
def tap(self):
now = time.monotonic()
if not hasattr(self, '_taps'): self._taps = []
self._taps = [t for t in self._taps if now - t < 2.4]
self._taps.append(now)
if len(self._taps) >= 2:
span = (self._taps[-1] - self._taps[0]) / (len(self._taps) - 1)
if span > 0: self.set_bpm(round(60 / span))
# ---------- scheduler ----------
def tick(self):
now = time.monotonic_ns()
if self.spk_off and now >= self.spk_off:
self.spk.duty_cycle = 0; self.spk_off = 0; self._amp(False)
if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False
if self.running:
fired_best = 0; fired_prio = -1
for li, L in enumerate(self.lanes):
if self._advance: break
adv = False
while now >= L['next']:
L['step'] = (L['step'] + 1) % L['steps']
if li == 0:
self._m_steps += 1
nb = (self._m_steps - 1) // L['steps']
if nb != self._lastbar: self._lastbar = nb; self._on_new_bar(nb)
if self._advance: break
if self.ramp and L['steps'] > 0 and not self._slaved:
mlen = L['steps']
bar_pos = self._m_steps / mlen
seg_bar = (bar_pos % self.bars) if self.bars else bar_pos
new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt'])))
if new_bpm != self.bpm:
self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm
self._rebuild_dur_all()
lvl = 0 if L['mute'] else L['levels'][L['step']]
if lvl > 0:
p = PRIO.get(lvl, 0)
if p > fired_prio: fired_prio = p; fired_best = lvl
if not self._muted:
self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90))
L['next'] += L['durs'][L['step']]; adv = True
if adv and li < len(self.lane_pads): self._move_playhead(li, L['step'])
if fired_best and not self._muted:
if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host):
self.click(fired_best)
self.flash(fired_best)
# Decay the run-dot pulse back to base
if self.run_dot_pal[0] == C_RUN_PULSE:
self.run_dot_pal[0] = C_RUN_GO if self.running else C_RUN_IDLE
self.dirty = True
if self._advance:
self._advance = False
self._do_advance()
if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved:
clk = self._clock_byte
tick_ns = self._beat_ns // 24
while now >= self._clock_next:
try: self.midi.write(clk)
except Exception: pass
self._clock_next += tick_ns
def _on_new_bar(self, bar):
if self.bars and self.continue_on and self._next_pending is None and bar == self.bars - 1:
self._prepare_next()
if self.bars and bar > 0 and bar % self.bars == 0:
self._seg_start = time.monotonic()
if self.continue_on:
if self._next_pending is None: self._prepare_next()
if self._next_pending is not None:
self._seam_t = self.lanes[0]['next']
self._advance = True
t = self.trainer
self._muted = bool(t and (t['play'] + t['mute']) and (bar % (t['play'] + t['mute'])) >= t['play'])
def _prepare_next(self):
items = self.setlists[self.sl]['items']
nxt = (self.idx + 1) % len(items)
if nxt == self.idx: return
name, prog = items[nxt]
gc.collect()
try:
bpm, lanes, bars, ramp, trainer = parse_program(prog)
except MemoryError:
gc.collect(); return
beat = 60_000_000_000 // max(1, bpm)
for L in lanes:
sub = max(1, L['sub']); steps = max(1, L['steps'])
if L.get('poly'):
m = lanes[0]; mbar = beat * (m['steps'] // max(1, m['sub']))
d = mbar // steps; L['durs'] = tuple(d for _ in range(steps))
elif L.get('swing') and sub % 2 == 0:
pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3
L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps))
else:
d = beat // sub; L['durs'] = tuple(d for _ in range(steps))
self._next_pending = {'lanes': lanes, 'bpm': bpm, 'bars': bars, 'ramp': ramp,
'trainer': trainer, 'name': name, 'idx': nxt}
def _do_advance(self):
n = self._next_pending
if n is None: return
self._next_pending = None
self.lanes = n['lanes']; self.bpm = n['bpm']; self.bars = n['bars']
self.ramp = n['ramp']; self.trainer = n['trainer']; self.name = n['name']; self.idx = n['idx']
self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all()
self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False; self._m_steps = 0
self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
seam = self._seam_t
for L in self.lanes: L['next'] = seam; L['step'] = -1
self._need_redraw = True
self._heavy_redraw_at = time.monotonic() + 0.6
self._seg_start = time.monotonic()
self._set_run_dot()
# ---------- inputs (6 buttons - active low) ----------
def _modal_select(self):
"""A inside a modal: invoke the focused row's action (cycle adjuster) or close on About/Help."""
if self._overlay in ('help', 'about', 'msg', 'log'):
self._close_overlay(); return
if not self._modal_rows: return
i = self._modal_cursor
if i < 0 or i >= len(self._modal_rows): return
_label, _v, fn = self._modal_rows[i]
if fn is None: return
# adjusters take one argument (+1); plain actions take none
try: fn(1)
except TypeError: fn()
def _modal_back(self):
"""B inside a modal: step out to the parent (Settings -> menu, Help -> menu, etc.)."""
if self._overlay in ('settings', 'help', 'about', 'log'):
self._show_menu(); return
self._close_overlay()
def _modal_up(self):
if self._overlay == 'help':
if self._help_page > 0: self._help_page -= 1; self._draw_help()
return
if self._overlay == 'log':
self._log_scroll = max(0, self._log_scroll - 1); self._draw_log_modal(); return
n = len(self._modal_rows)
if n: self._modal_cursor = (self._modal_cursor - 1) % n; self._redraw_modal()
def _modal_down(self):
if self._overlay == 'help':
if self._help_page < len(HELP_PAGES) - 1: self._help_page += 1; self._draw_help()
return
if self._overlay == 'log':
rows = [e for e in self.log if e.get("name") == self.name]
if self._log_scroll + LOG_MENU_ROWS < len(rows):
self._log_scroll += 1; self._draw_log_modal()
return
n = len(self._modal_rows)
if n: self._modal_cursor = (self._modal_cursor + 1) % n; self._redraw_modal()
def _modal_decrement(self):
"""Y inside a modal: -1 on the focused row's adjuster (if it is one)."""
if self._overlay in ('help', 'about', 'msg', 'log'): return
if not self._modal_rows: return
i = self._modal_cursor
if i < 0 or i >= len(self._modal_rows): return
_label, _v, fn = self._modal_rows[i]
if fn is None: return
try: fn(-1)
except TypeError: pass # plain action -> Y does nothing
def _redraw_modal(self):
if self._overlay == 'menu': self._draw_menu()
elif self._overlay == 'settings': self._draw_settings()
elif self._overlay == 'help': self._draw_help()
elif self._overlay == 'about': self._draw_about()
elif self._overlay == 'log': self._draw_log_modal()
def _on_btn_X(self):
if self._overlay: self._modal_up(); return
self.goto(self.idx - 1)
def _on_btn_Z(self):
if self._overlay: self._modal_down(); return
self.goto(self.idx + 1)
def _on_btn_Y(self):
if self._overlay: self._modal_decrement(); return
# tempo down: 1 normally, 5 after long hold
step = -5 if (time.monotonic_ns() - self._held_t['Y']) > 1_500_000_000 else -1
self.set_bpm(self.bpm + step)
def _on_chord_XZ(self): # X+Z chord -> tempo up (mirrors Y for tempo down)
step = 5 if (time.monotonic_ns() - self._chord_xz) > 1_500_000_000 else 1
self.set_bpm(self.bpm + step)
def poll(self):
now_ns = time.monotonic_ns()
# Sample all six buttons (active-low; True = released)
a = self.btnA.value; b = self.btnB.value; c = self.btnC.value
x = self.btnX.value; y = self.btnY.value; z = self.btnZ.value
# ---- A: play/stop ----
if (not a) and self._prev['A']:
if self._overlay: self._modal_select()
else: self.toggle()
# ---- B: tap tempo / modal back ----
if (not b) and self._prev['B']:
if self._overlay: self._modal_back()
else: self.tap()
# ---- C: menu open/close ----
if (not c) and self._prev['C']:
if self._overlay: self._close_overlay()
else: self._show_menu()
# ---- X/Z chord detection (tempo up) ----
x_pressed_now = (not x) and self._prev['X']
z_pressed_now = (not z) and self._prev['Z']
chord_window = 100_000_000 # 100ms
if x_pressed_now and (not z) and not self._prev['Z'] and (now_ns - self._held_t['Z']) < chord_window:
self._chord_xz = now_ns; self._on_chord_XZ()
elif z_pressed_now and (not x) and not self._prev['X'] and (now_ns - self._held_t['X']) < chord_window:
self._chord_xz = now_ns; self._on_chord_XZ()
else:
# ---- single-press X / Z ----
if x_pressed_now:
self._held_t['X'] = now_ns; self._next_rep['X'] = now_ns + 350_000_000
if not self._chord_xz: self._on_btn_X()
if z_pressed_now:
self._held_t['Z'] = now_ns; self._next_rep['Z'] = now_ns + 350_000_000
if not self._chord_xz: self._on_btn_Z()
if x and z: self._chord_xz = 0 # both released -> chord state clears
# ---- Y: tempo down (or modal decrement) ----
if (not y) and self._prev['Y']:
self._held_t['Y'] = now_ns; self._next_rep['Y'] = now_ns + 350_000_000
self._on_btn_Y()
# ---- hold-repeat for X / Y / Z ----
if (not x) and not self._prev['X'] and now_ns >= self._next_rep['X']:
self._next_rep['X'] = now_ns + 120_000_000
if self._chord_xz: self._on_chord_XZ()
else: self._on_btn_X()
if (not z) and not self._prev['Z'] and now_ns >= self._next_rep['Z']:
self._next_rep['Z'] = now_ns + 120_000_000
if self._chord_xz: self._on_chord_XZ()
else: self._on_btn_Z()
if (not y) and not self._prev['Y'] and now_ns >= self._next_rep['Y']:
self._next_rep['Y'] = now_ns + 120_000_000; self._on_btn_Y()
# Commit previous-state
self._prev['A'] = a; self._prev['B'] = b; self._prev['C'] = c
self._prev['X'] = x; self._prev['Y'] = y; self._prev['Z'] = z
# USB-MIDI in: any byte = a host is listening (heartbeat); also assemble SysEx
if self.midi_in is not None:
try: n = self.midi_in.readinto(self._mbuf)
except Exception: n = 0
if n:
self.last_midi_in = time.monotonic()
self._feed_midi(self._mbuf, n)
host = bool(self.last_midi_in) and (time.monotonic() - self.last_midi_in) < 1.0
if host != self.midi_host:
self.midi_host = host
if host and SPEAKER_AUTO_MUTE:
self.spk.duty_cycle = 0; self._amp(False)
self._set_run_dot(); self.draw_icons()
uc = bool(getattr(supervisor.runtime, "usb_connected", True))
if uc != self.usb_conn:
self.usb_conn = uc; self.draw_icons()
# ---------- drawing ----------
def draw_bpm(self):
if self.bpm == self._displayed_bpm: return
self._displayed_bpm = self.bpm
self._place(self.g_bpm, str(self.bpm), 0, 38, C_TXT, C_BG, FONT_M, right_edge=WIDTH-8)
def draw_status(self):
sl = self.setlists[self.sl]
# setlist tab line at y=72; muted = built-in, cyan = your own
self._place(self.g_idx, "%s %d/%d" % (sl['title'][:13], self.idx + 1, len(sl['items'])),
6, 72, C_MUTE if sl['builtin'] else C_CYAN, C_BG, FONT_S)
self._place(self.g_cont, "CONT", 0, 72, C_GREEN if self.continue_on else C_DIM, C_BG, FONT_S, right_edge=WIDTH-6)
# track title at y=88 (FONT_M; ~16 px tall, fits above GRID_TOP=110)
self._place(self.g_name, self.name[:22], 6, 88, C_TXT, C_BG, FONT_M)
def draw_train(self):
g = self.g_train
while len(g): g.pop()
x = 6; y = 58 # ramp / gap-trainer indicators below the meters row, above the setlist tab
if self.ramp:
up = self.ramp['amt'] >= 0
pts = [(0, 9), (12, 9), (12, 0)] if up else [(0, 0), (0, 9), (12, 9)]
g.append(vectorio.Polygon(pixel_shader=solid(C_AMBER), points=pts, x=x, y=y)); x += 16
a = self.ramp['amt']; lbl = ("+%d" % a if a >= 0 else "%d" % a) + "/%db" % self.ramp['every']
tg, w, h = make_text(lbl, FONT_S, C_AMBER, C_BG); tg.x = x; tg.y = y; g.append(tg); x += w + 14
if self.trainer:
g.append(rect(x, y, 4, 9, C_CYAN)); g.append(rect(x + 6, y, 4, 9, C_DIM))
x += 14
tg, w, h = make_text("%d/%db" % (self.trainer['play'], self.trainer['mute']), FONT_S, C_CYAN, C_BG)
tg.x = x; tg.y = y; g.append(tg)
self.dirty = True
def draw_icons(self):
if self.ic_midi_pal is not None:
_recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG)
if self.ic_usb_pal is not None:
_recolor(self.ic_usb_pal, C_CYAN if self.usb_conn else C_DIM, C_BG)
self.dirty = True
def _fmt_t(self, s):
s = int(s)
return "%d:%02d:%02d" % (s // 3600, (s % 3600) // 60, s % 60) if s >= 3600 else "%d:%02d" % (s // 60, s % 60)
def draw_meters(self):
run = self.running and self.play_start is not None
mlen = self.lanes[0]['steps'] if self.lanes else 1
bpb = (self.lanes[0]['steps'] // max(1, self.lanes[0]['sub'])) if self.lanes else 4
el = (time.monotonic() - self._seg_start) if run else 0
mbars = max(0, self._m_steps - 1) // max(1, mlen)
cur = ("%d" % ((mbars % self.bars + 1) if self.bars else (mbars + 1))) if run else "-"
if self.bars:
ts = "%s of %s" % (self._fmt_t(el), self._fmt_t(self.bars * bpb * 60.0 / self.bpm))
bs = "bar %s of %d" % (cur, self.bars)
else:
ts = self._fmt_t(el); bs = "bar %s" % cur
if ts != self._lastTs:
self._place(self.g_time, ts, 6, 38, C_TXT, C_BG, FONT_S); self._lastTs = ts
if bs != self._lastBs:
self._place(self.g_bar, bs, 6, 50, C_MUTE, C_BG, FONT_S); self._lastBs = bs
# ---------- pad grid (chunked rebuild; per-pad chunks so audio interleaves) ----------
def _padbase(self, L, s):
return 0 if L['mute'] else L['levels'][s]
def build_grid(self):
self._grid_rebuild_start()
while self._grid_li is not None: self._grid_rebuild_step()
def _grid_rebuild_start(self):
while len(self.g_grid): self.g_grid.pop()
self.lane_pads = []; self.lane_lit = []
gc.collect()
n = min(len(self.lanes), MAXLANES)
top = GRID_TOP; rowh = min(26, ((LOG_TOP - 6) - top) // max(1, n)) # leave room for the footer log below
px0 = 48; usable = WIDTH - 8 - px0 - 8; gridh = n * rowh # narrower screen -> tighter lane-label column
self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n}
m = self.lanes[0] if self.lanes else None
if m is not None:
mbeats = max(1, m['steps'] // max(1, m['sub']))
for bcol in range(mbeats):
self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID))
self._grid_n = n
self._grid_geo = (top, rowh, px0, usable)
self._grid_li = 0 if n > 0 else None
self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = []
self.dirty = True
def _grid_rebuild_step(self):
li = self._grid_li
if li is None: return
if li >= self._grid_n or li >= len(self.lanes):
self._grid_li = None; return
L = self.lanes[li]
top, rowh, px0, usable = self._grid_geo
y = top + li * rowh; cy = y + rowh // 2
st = self._grid_lane_st
if st is None:
tg, w, h = make_text((L.get('sound', '') or '?')[:6], FONT_S, C_MUTE, C_BG) # 6-char label fits the 48px lane column
tg.x = 4; tg.y = cy - h // 2; self.g_grid.append(tg)
steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps)
side = max(5, min(14, stepw - 1, rowh - 6)) # squares can be bigger in portrait
rad = max(2, min(side // 2, stepw // 2 - 1))
self._grid_lane_st = (cy, steps, sub, stepw, side, rad)
self._grid_pi = 0; self._grid_pads = []; self.dirty = True
return
cy_, steps, sub, stepw, side, rad = st
s = self._grid_pi
if s >= steps:
self.lane_pads.append(self._grid_pads); self.lane_lit.append(-1)
self._grid_pads = []; self._grid_lane_st = None; self._grid_li = li + 1
return
cxp = px0 + 6 + (s * usable) // steps
pal = self.pad_pal
if s % sub == 0:
p = vectorio.Rectangle(pixel_shader=pal, width=side, height=side, x=cxp - side // 2, y=cy_ - side // 2)
else:
p = vectorio.Circle(pixel_shader=pal, radius=rad, x=cxp, y=cy_)
p.color_index = self._padbase(L, s); self.g_grid.append(p); self._grid_pads.append(p)
self._grid_pi = s + 1
self.dirty = True
def _move_playhead(self, li, step):
pads = self.lane_pads[li]; prev = self.lane_lit[li]
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
if step < len(pads): pads[step].color_index = self._padbase(self.lanes[li], step) + 4
self.lane_lit[li] = step; self.dirty = True
def reset_playheads(self):
for li, pads in enumerate(self.lane_pads):
prev = self.lane_lit[li]
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
self.lane_lit[li] = -1
self.dirty = True
# ---------- practice log (saved to /history.json) ----------
def _probe_write(self):
try:
with open("/.wtest", "w") as f: f.write("1")
try: os.remove("/.wtest")
except Exception: pass
return True
except OSError:
return False
def _load_log(self):
try:
with open("/history.json") as f: return json.load(f).get("log", [])
except Exception:
return []
def _save_log(self):
if not self.can_write: return
try:
with open("/history.json", "w") as f: json.dump({"log": self.log[:200]}, f)
except OSError:
self.can_write = False
def _start_play(self):
self.play_start = time.monotonic(); self.play_bpm = self.bpm; self.play_name = self.name
def _log_play(self):
if self.play_start is None: return
dur = int(time.monotonic() - self.play_start); self.play_start = None
if dur < MIN_LOG_SEC: return
mlen = self.lanes[0]['steps'] if self.lanes else 1
t = time.localtime()
self.log.insert(0, {"t": "%02d:%02d" % (t.tm_hour, t.tm_min), "bpm": self.play_bpm,
"dur": dur, "bars": self._m_steps // max(1, mlen), "name": self.play_name})
del self.log[200:]
self._save_log(); self.draw_log()
def draw_log(self): # footer practice log (this track only), Kit-style
g = self.g_log
while len(g): g.pop()
gc.collect()
hdr, w, h = make_text("PRACTICE LOG", FONT_S, C_MUTE, C_BG); hdr.x = 6; hdr.y = LOG_TOP; g.append(hdr)
rows = [(i, e) for i, e in enumerate(self.log) if e.get("name") == self.name]
if not rows:
tg, w, h = make_text("no plays over 5s yet", FONT_S, C_DIM, C_BG)
tg.x = 6; tg.y = LOG_TOP + LOG_ROWH + 2; g.append(tg)
self.dirty = True; return
y = LOG_TOP + LOG_ROWH + 2
for k in range(min(LOG_ROWS, len(rows))):
_oi, e = rows[k]
dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60)
bars = e.get("bars", 0); bstr = (" %dbar" % bars) if bars else ""
line = "%s %3dbpm %s%s" % (e.get("t", "--:--"), e["bpm"], dur, bstr)
tg, w, h = make_text(line, FONT_S, C_TXT, C_BG); tg.x = 6; tg.y = y; g.append(tg)
y += LOG_ROWH
self.dirty = True
# ---------- USB-MIDI in: SysEx assembler (clock + editor-pushed programs + live-sync) ----------
def _feed_midi(self, buf, n):
now_ns = time.monotonic_ns() if MIDI_CLOCK_IN else 0
for i in range(n):
b = buf[i]
if b == 0xF0: self._sx = bytearray(); self._sxon = True
elif b == 0xF7:
if self._sxon: self._handle_sysex(self._sx)
self._sxon = False
elif b == 0xF8 and MIDI_CLOCK_IN: self._slave_tick(now_ns)
elif b == 0xFA and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start()
elif b == 0xFB and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start()
elif b == 0xFC and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_stop()
elif b >= 0xF8: pass
elif self._sxon:
if len(self._sx) < 60000: self._sx.append(b)
else: self._sxon = False
def _slave_tick(self, now_ns):
if self._clock_in_last_t == 0:
self._clock_in_last_t = now_ns; self._slaved = True; return
interval = now_ns - self._clock_in_last_t
self._clock_in_last_t = now_ns
if interval < 8_300_000 or interval > 500_000_000: return
if self._clock_in_avg == 0: self._clock_in_avg = interval
else: self._clock_in_avg = (self._clock_in_avg * 7 + interval) // 8
new_bpm = max(5, min(300, int(60_000_000_000 // (self._clock_in_avg * 24))))
if new_bpm != self.bpm:
self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm; self._rebuild_dur_all()
self._slaved = True
def _slave_start(self):
if not self.running:
self.running = True; self._reset_clock(); self._start_play()
self._set_run_dot(); self.draw_meters()
self._clock_in_last_t = 0; self._clock_in_avg = 0
def _slave_stop(self):
if self.running:
self.running = False
self.spk.duty_cycle = 0; self._amp(False)
self.reset_playheads(); self._log_play()
self._set_run_dot(); self.draw_meters()
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False
def _handle_sysex(self, sx):
if len(sx) < 2 or sx[0] != 0x7D: return
cmd = sx[1]
if cmd == 0x01 and len(sx) >= 8 and rtc is not None:
try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1))
except Exception: pass
elif cmd == 0x02: # version query -> reply 0x03 + "X;<APP_VERSION>"
if self.midi:
payload = DEVICE_ID + ";" + APP_VERSION
self.midi.write(bytes([0xF0, 0x7D, 0x03]) + payload.encode() + bytes([0xF7]))
elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43:
try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:])
except Exception: return
origin = text.split(";", 1)[0] if text else ""
if origin == self._sync_origin: return
self._sync_armed = True
if cmd == 0x40:
self._sync_broadcast_full()
elif cmd == 0x43:
self._sync_armed = False
elif cmd == 0x41:
parts = text.split(";", 5)
if len(parts) >= 6:
try:
running = parts[2] == "1"; patch = parts[5]
self._sync_apply_full(running, patch)
except Exception: pass
elif cmd == 0x42:
parts = text.split(";", 2)
if len(parts) >= 3: self._sync_apply_delta(parts[2])
elif cmd == 0x10:
try:
with open("/programs.json", "wb") as f: f.write(bytes(sx[2:]))
self.rebuild_setlists(); self.load(0)
self._ack(True)
except Exception:
self._ack(False)
elif cmd == 0x21:
try:
try: self._fw.close()
except Exception: pass
self._fw = open("/app.new", "wb"); self._fw_n = 0; self._ack(True)
except Exception:
self._fw = None; self._ack(False)
elif cmd == 0x22:
try:
if self._fw is None or a2b_base64 is None: raise OSError()
self._fw.write(a2b_base64(bytes(sx[2:])))
self._fw.flush()
self._fw_n += 1
if self._fw_n % 50 == 0: gc.collect()
self._ack(True)
except Exception:
try: self._fw.close()
except Exception: pass
self._fw = None; self._ack(False)
elif cmd == 0x23:
try:
try: self._fw.close()
except Exception: pass
self._fw = None; gc.collect()
with open("/app.new", "rb") as f: head = f.read(2)
if os.stat("/app.new")[6] < 4000 or len(head) < 2 or head[0] != 0x43 or head[1] != 0x06:
try: os.remove("/app.new")
except OSError: pass
self._ack(False); return
try: os.remove("/app.bak")
except OSError: pass
os.rename("/app.mpy", "/app.bak")
os.rename("/app.new", "/app.mpy")
open("/trial", "w").close()
self._ack(True); time.sleep(0.4); supervisor.reload()
except Exception:
self._ack(False)
def _ack(self, ok):
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F if ok else 0x7E, 0xF7]))
def run(self):
boot = time.monotonic()
try: os.stat("/trial"); committed = False
except OSError: committed = True
while True:
try:
self.tick(); self.poll()
if self._need_redraw:
self._need_redraw = False
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at:
self._heavy_redraw_at = 0
self._grid_rebuild_start(); self._heavy_log_pending = True
if self._grid_li is not None:
self._grid_rebuild_step()
elif self._heavy_log_pending: # grid done -> redraw footer log
self._heavy_log_pending = False; self.draw_log()
tnow = time.monotonic()
if tnow >= self._uiNext:
self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm()
if self._sync_armed and tnow >= self._sync_heartbeat_next:
self._sync_broadcast_full()
if not committed and tnow - boot > 5:
try: os.remove("/trial")
except Exception: pass
committed = True
if self.dirty and tnow >= self._refreshNext:
safe = True
if self.running and self.lanes:
nb = self.lanes[0]['next']
safe = (nb - time.monotonic_ns()) > 10_000_000 or (tnow - self._lastRefresh) > 0.2
if safe:
if self.display.refresh(): self.dirty = False
self._lastRefresh = tnow; self._refreshNext = tnow + 0.05
else:
self._refreshNext = tnow + 0.003
time.sleep(0.0005)
except MemoryError:
try: print("MemoryError: gc + continue")
except Exception: pass
gc.collect(); time.sleep(0.05)
except Exception as e:
try: print("tick error:", e)
except Exception: pass
time.sleep(0.05)
App().run()