# VARASYS PolyMeter - PM_X-1 "Explorer" firmware (CircuitPython edition) # Pimoroni Explorer (PIM744): RP2350B + 2.8" ST7789V 320x240 + 6 buttons (A/B/C/X/Y/Z) + piezo. # # Sibling to PM_K-1 (the 52Pi EP-0172 kit in ../pico-cp/). Same engine, same program-string # grammar, same programs.json, same web editor, same live-sync protocol. The Explorer build is # READ-ONLY on the device (no on-device beat editing). All editing happens in the web editor # with Live sync on; the device reflects DELTAs in real time and emits play/stop/bpm/sel back. # # WHY CIRCUITPYTHON: the board mounts as a USB drive (CIRCUITPY) carrying this code + your # tracks + an offline copy of the editor; edits in the web editor are pushed over USB-MIDI. # Display is initialized by the official board definition (board.DISPLAY pre-built); we just # use it. Pinout in ./README.md. import board, busio, digitalio, pwmio, displayio, vectorio, time, json, gc, os, supervisor supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart APP_VERSION = "0.0.3" # firmware version (the A/B updater pushes/compares this) DEVICE_ID = "X" # 'X' = Explorer, 'K' = 52Pi kit (per docs/livesync-protocol.md and the version reply) try: import rtc # set from the editor's clock SysEx so the log has real timestamps except ImportError: rtc = None try: import usb_midi # default-enabled on RP2350 - sends a MIDI note per click to the computer except ImportError: usb_midi = None try: from binascii import a2b_base64 # decode the base64-encoded .mpy pushed by the editor's one-click update except ImportError: a2b_base64 = None # ============================== CONFIG (tweak if needed) ============================== MIDI_ENABLED = True # send a USB-MIDI note per click (play via the web editor's "Device audio") MIDI_CHANNEL = 10 # 1..16 - GM channel 10 is the drum channel MIDI_CLOCK_OUT = False # send 24 PPQN MIDI Clock so a DAW can slave its tempo to the metronome MIDI_CLOCK_OUT_TRANSPORT = True MIDI_CLOCK_IN = False # follow an external 24 PPQN clock MIDI_CLOCK_IN_TRANSPORT = True MUTE_SPEAKER = False # always silence the on-board piezo SPEAKER_AUTO_MUTE = False # auto-mute the piezo when a MIDI host is listening. DEFAULT OFF on Explorer: # Live sync sends a FULL heartbeat every 5s which would silence the piezo otherwise. # Toggle to Auto in Settings if you ARE using "Device audio" in the editor. AMP_EN_ACTIVE_HIGH = True # piezo amp enable polarity. If you HEAR sound from the piezo only when click() # has just timed out (~22ms after a beat), flip this to False - your amp is active-low. DISPLAY_ROTATION = 270 # 0=native landscape; 90/270 = portrait. Hold the device with A/B/C # on top: try 270 first; if upside-down try 90; 180 = flipped landscape. # ----- pins (Pimoroni Explorer board layout) ----- P_AUDIO = board.GP12 # piezo PWM (variable frequency) P_AMPEN = board.GP13 # piezo amp enable (high = on) P_BTNA, P_BTNB, P_BTNC = board.GP16, board.GP15, board.GP14 # left-side buttons (top to bottom) P_BTNX, P_BTNY, P_BTNZ = board.GP17, board.GP18, board.GP19 # right-side buttons (top to bottom) P_SDA, P_SCL = board.GP20, board.GP21 # QwSTEMMA (unused by the firmware - future expansion) # Display is initialised by the board definition (8-bit parallel bus). We grab board.DISPLAY + # call display.rotation = DISPLAY_ROTATION (above) to turn it portrait so the layout has the same # shape as the PM_K-1 Kit's portrait UI but in 240x320 instead of 320x480. WIDTH, HEIGHT = 240, 320 GRID_TOP = 104 # top of the pad grid (compact header + meters + title fit above) MAXLANES = 6 # lanes shown on the pad grid (parser still accepts more; they just play silent visually) MIN_LOG_SEC = 5 # don't log plays shorter than this LOG_MENU_ROWS = 8 # log entries shown in the Practice-log menu screen # ----- BUILT-IN playlists: same defaults as the Kit so the two firmwares feel identical ----- BUILTIN_SETLISTS = [ ("Styles", [ ("Four-on-the-floor", "t120;kick:4;snare:4=.x.x;hatClosed:4/2"), ("Swing ride", "t150;ride:4/2s;kick:4=X..x;snare:4=.x.x"), ("Purdie half-time shuffle", "t92;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"), ("Samba (2/4)", "t104;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."), ("Nanigo (6/8 bembe)", "t130;cowbell:4/3=X.xx.x.xx.x.;kick:4/3=X.....X.....;hatClosed:4/3=..x..x..x..x"), ("6/8 groove", "t100;kick:3+3=x..x..;snare:3+3=...x..;hatClosed:3+3/2"), ("7/8 (2+2+3)", "t130;kick:2+2+3=x..x..x;hatClosed:2+2+3/2"), ("5/4 (3+2)", "t112;kick:3+2=x..x.;snare:3+2=..x..;hatClosed:3+2/2"), ]), ("Practice", [ ("5 over 4 polyrhythm", "t100;kick:4;claves:5~"), ("3 over 2 hemiola", "t96;woodblock:2;cowbell:3~"), ("2 & 4 & 3 over one bar", "t100;kick:3;cowbell:2~;claves:4~"), ("Triplet hats", "t100;kick:4;snare:4=.x.x;hatClosed:4/3"), ("Tempo builder 80 up", "t80;woodblock:4;rmp80/4/4"), ("Gap trainer (play 2 / rest 2)", "t100;kick:4;hatClosed:4/2;tr2/2"), ]), ("Song (continuous)", [ ("Intro - hats & kick", "t88;b4;kick:4=X.x.;hatClosed:4/2=gggggggg"), ("Groove in - backbeat", "t88;b4;kick:4=X.x.;snare:4=.X.X;hatClosed:4/2"), ("Half-time shuffle", "t92;b4;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"), ("Build - ramp 92-120", "t92;b4;rmp92/4/2;kick:4;snare:4=.X.X;hatClosed:4/2"), ("Four-on-the-floor (909)", "t124;b4;kick909:4;clap909:4=.X.X;hat909:4/2=.X.X.X.X"), ("Samba break (2/4)", "t116;b4;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."), ("Peak - 16ths", "t132;b4;kick:4=X..x;snare:4=.X.X;hatClosed:4/4"), ("Outro - ramp down", "t132;b4;rmp132/-7/1;kick:4=X..x;hatClosed:4/2=gggggggg"), ]), ] # ============================== COLOURS ============================== C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494 C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240 C_BTN = 0x1C222C C_RUN_IDLE = 0x2FE07A # run-state dot (green when stopped, red when playing, bright on each beat) C_RUN_GO = 0xFF5A5A C_RUN_PULSE = 0xFFEC78 SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38, "clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42, "hatOpen":46,"openHat808":46, "ride":51,"ride909":51, "crash":49,"crash909":49, "tomLow":41,"tom808":45,"tomMid":45,"tomHigh":48, "tambourine":54, "cowbell":56,"cowbell808":56, "woodblock":76,"jamblock":76, "claves":75, "beep":37} GM_DEFAULT = 37 HELP_PAGES = ( ("Transport & Navigation", ( "Hold portrait with A/B/C on top.", "A: play / stop", "B: tap tempo", "C: menu (this)", "X: prev track (hold to repeat)", "Z: next track (hold to repeat)", "Y: tempo -1 (-5 after 1.5s held)", "X+Z chord: tempo +1", )), ("Menu navigation", ( "X / Z: move cursor up / down", "Y: decrement the focused value", "A: cycle / increment / select", "B: back (cancel)", "C: close the menu", )), ("Editing & sync", ( "Edit on the web at metronome.varasys.io", "Click 'Live sync' to mirror live", "Beat patterns are read-only on device", "Tracks + tempo + transport sync both ways", "Built-in playlists baked, user lists", " live in /programs.json", )), ("Status & Hardware", ( "MIDI badge green: laptop listening", "USB badge cyan: connected to a computer", "Run dot: green=stop / red=play + pulse", "Squares = main beats, circles = subs", "Ramp arrow: track has a tempo ramp", "Gap symbol: silent rest bars", )), ) MIDI_VEL = {2: 120, 1: 90, 3: 45} # accent / normal / ghost PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes) # ============================== ANTI-ALIASED FONTS (binary blobs on the drive; see ../pico/gen_font.py) ============================== def load_font(path): with open(path, "rb") as f: blob = f.read() count = blob[0]; p = 1; pixoff = 1 + count * 7; glyphs = {} for _ in range(count): cp = (blob[p] << 8) | blob[p+1]; w = blob[p+2]; h = blob[p+3] xoff = blob[p+4]; xoff = xoff - 256 if xoff > 127 else xoff top = blob[p+5]; adv = blob[p+6]; p += 7 glyphs[cp] = (w, h, xoff, top, adv, pixoff); pixoff += (w * h + 1) // 2 return (glyphs, blob) FONT_S = load_font("/font_s.bin") # small - pad-grid lane labels + meter rows FONT_M = load_font("/font_m.bin") # labels / buttons FONT_L = load_font("/font_l.bin") # big BPM gc.collect() def _blend(bg, fg, i): t = i * 17 r = (((bg >> 16) & 0xFF)*(255-t) + ((fg >> 16) & 0xFF)*t) // 255 g = (((bg >> 8) & 0xFF)*(255-t) + ((fg >> 8) & 0xFF)*t) // 255 b = ((bg & 0xFF)*(255-t) + (fg & 0xFF)*t) // 255 return (r << 16) | (g << 8) | b def make_text(s, font, fg, bg): """Render a string into a displayio TileGrid (anti-aliased via a 16-step blend palette).""" glyphs, blob = font w = 0; top0 = 999; bot = 0 for c in s: g = glyphs.get(ord(c)) if not g: continue w += g[4] if g[1]: if g[3] < top0: top0 = g[3] if g[3] + g[1] > bot: bot = g[3] + g[1] if top0 == 999: top0 = 0 w = max(1, w); h = max(1, bot - top0) gc.collect() bmp = displayio.Bitmap(w, h, 16) pal = displayio.Palette(16) for i in range(16): pal[i] = _blend(bg, fg, i) pen = 0 for c in s: g = glyphs.get(ord(c)) if not g: continue gw, gh, xoff, gtop, adv, off = g for j in range(gh): row = (gtop - top0) + j for i in range(gw): k = j * gw + i byte = blob[off + (k >> 1)] nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF) if nib: x = pen + xoff + i if 0 <= x < w and 0 <= row < h: bmp[x, row] = nib pen += adv return displayio.TileGrid(bmp, pixel_shader=pal), w, h def load_alpha(path): try: with open(path, "rb") as f: blob = f.read() return (blob[0], blob[1], blob) except Exception: return None def make_glyph(asset, fg, bg): w, h, blob = asset gc.collect() bmp = displayio.Bitmap(w, h, 16); pal = displayio.Palette(16) for i in range(16): pal[i] = _blend(bg, fg, i) for k in range(w * h): byte = blob[2 + (k >> 1)] nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF) if nib: bmp[k % w, k // w] = nib return displayio.TileGrid(bmp, pixel_shader=pal), pal, w, h def _recolor(pal, fg, bg): for i in range(16): pal[i] = _blend(bg, fg, i) LOGO = load_alpha("/logo.bin") ICON_MIDI = load_alpha("/midi.bin") ICON_USB = load_alpha("/usb.bin") gc.collect() # ============================== POLYMETER ENGINE ============================== PAT = {'X': 2, 'x': 1, 'g': 3, '.': 0, '-': 0, '_': 0} PRIO = {2: 3, 1: 2, 3: 1} def parse_program(s): bpm = 120; lanes = []; bars = 0; ramp = None; trainer = None for tok in s.strip().split(';'): tok = tok.strip() if not tok: continue if tok[0] == 't' and tok[1:].isdigit(): bpm = int(tok[1:]); continue if tok[0] == 'b' and tok[1:].isdigit(): bars = int(tok[1:]); continue if tok.startswith('rmp'): p = tok[3:].split('/') if len(p) == 3: try: ramp = {'start': int(p[0]), 'amt': int(p[1]), 'every': max(1, int(p[2]))} except ValueError: pass continue if tok.startswith('tr') and '/' in tok and ':' not in tok: p = tok[2:].split('/') if len(p) == 2: try: trainer = {'play': max(0, int(p[0])), 'mute': max(0, int(p[1]))} except ValueError: pass continue if ':' not in tok: continue lane = _parse_lane(tok) if lane: lanes.append(lane) if not lanes: lanes = [_parse_lane("beep:4")] return max(5, min(300, bpm)), lanes, bars, ramp, trainer def _parse_lane(tok): poly = '~' in tok; mute = '!' in tok tok = tok.replace('~', '').replace('!', '') gain = '' if '@' in tok: tok, _, g = tok.partition('@'); gain = '@' + g sound, _, rest = tok.partition(':') pattern = None if '=' in rest: rest, _, pattern = rest.partition('=') sub = 1; swing = False if '/' in rest: rest, _, sd = rest.partition('/') swing = sd.endswith('s'); sd = sd.rstrip('s') sub = int(sd) if sd.isdigit() else 1 groups = [int(g) for g in rest.split('+') if g.isdigit()] or [4] beats = sum(groups); starts = set(); acc = 0 for gp in groups: starts.add(acc); acc += gp steps = beats * sub if pattern: levels = [PAT.get(ch, 0) for ch in pattern] if len(levels) < steps: levels += [0] * (steps - len(levels)) steps = len(levels) else: levels = [] for i in range(steps): if i % sub == 0: levels.append(2 if (i // sub) in starts else 1) else: levels.append(0) return {'sound': sound, 'sub': sub, 'swing': swing, 'steps': steps, 'levels': levels, 'poly': poly, 'mute': mute, 'groups': groups, 'gain': gain} PAT_CH = {2: 'X', 1: 'x', 3: 'g', 0: '.'} def lane_to_str(L): s = L['sound'] + ':' + '+'.join(str(g) for g in L.get('groups', [4])) if L['sub'] != 1 or L['swing']: s += '/' + str(L['sub']) + ('s' if L['swing'] else '') s += '=' + ''.join(PAT_CH.get(v, '.') for v in L['levels']) s += L.get('gain', '') if L['poly']: s += '~' if L['mute']: s += '!' return s _ALNUM = "abcdefghijklmnopqrstuvwxyz0123456789" def _slkey(t): return "".join(c for c in t.lower() if c in _ALNUM) def load_user_setlists(): try: with open("/programs.json") as f: d = json.load(f) except Exception as e: print("programs.json:", e); return [] def items_of(pl): return [(p.get("name", "?"), p.get("prog", "")) for p in pl if p.get("prog")] out = [] try: if isinstance(d.get("setlists"), list): for sl in d["setlists"]: it = items_of(sl.get("programs", [])) if it: out.append((sl.get("title", "My set list"), it)) elif isinstance(d.get("programs"), list): it = items_of(d["programs"]) if it: out.append((d.get("title", "My set list"), it)) except Exception as e: print("setlists:", e) return out def solid(color): p = displayio.Palette(1); p[0] = color; return p def rect(x, y, w, h, color): return vectorio.Rectangle(pixel_shader=solid(color), width=w, height=h, x=x, y=y) # ============================== APP ============================== class App: def __init__(self): self.display = board.DISPLAY # board.c built the BusDisplay; we just use it try: self.display.rotation = DISPLAY_ROTATION # turn portrait (240x320) - same shape as the Kit's UI except Exception: pass try: self.display.auto_refresh = False # we manage refresh in run() (predictive skip + ~20Hz throttle) except Exception: pass self.i2c = busio.I2C(scl=P_SCL, sda=P_SDA, frequency=400_000) # QwSTEMMA - unused by the firmware, available to user code self.midi = usb_midi.ports[1] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 1) else None self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0 self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler self._fw = None; self._fw_n = 0 # chunked firmware transfer state self.spk = pwmio.PWMOut(P_AUDIO, frequency=1600, variable_frequency=True, duty_cycle=0) self.amp_en = digitalio.DigitalInOut(P_AMPEN); self.amp_en.direction = digitalio.Direction.OUTPUT self._amp(False) # amp off when no audio playing (saves power, kills hum) self.spk_off = 0 # buttons - all active-low with internal pull-ups self.btnA = self._btn(P_BTNA); self.btnB = self._btn(P_BTNB); self.btnC = self._btn(P_BTNC) self.btnX = self._btn(P_BTNX); self.btnY = self._btn(P_BTNY); self.btnZ = self._btn(P_BTNZ) self._prev = {'A': True, 'B': True, 'C': True, 'X': True, 'Y': True, 'Z': True} self._held_t = {'X': 0, 'Y': 0, 'Z': 0} # press start time (monotonic_ns) for hold-repeat self._next_rep = {'X': 0, 'Y': 0, 'Z': 0} # next "auto repeat" deadline for held buttons self._chord_xz = 0 # 0 = not in chord; else monotonic_ns of the chord start self.running = False; self.bpm = 120; self.idx = 0; self.lanes = []; self.bars = 0 self.ramp = None; self.trainer = None; self._lastbar = -1; self._muted = False; self._ramp_base = 120 self._overlay = None # menu stack: None / 'menu' / 'settings' / 'help' / 'about' / 'log' / 'msg' self._modal_cursor = 0 # focused row in the current modal self._modal_rows = [] # tuples (label, value_str_or_None, action) for current modal self.continue_on = False; self._advance = False self._next_pending = None; self._seam_t = 0; self._need_redraw = False self._heavy_redraw_at = 0 self._grid_li = None; self._grid_n = 0; self._grid_geo = (0, 0, 0, 0) self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] self._heavy_log_pending = False self._beat_ns = 60_000_000_000 // self.bpm self._note_buf = bytearray([0x90, 0, 0]) self._clock_byte = bytes([0xF8]) self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC]) self._lastRefresh = 0.0 try: o = os.urandom(4); self._sync_origin = "d" + "".join("%02x" % b for b in o) except Exception: self._sync_origin = "d%08x" % (time.monotonic_ns() & 0xFFFFFFFF) self._sync_armed = False; self._sync_seq = 0; self._sync_applying = False self._sync_heartbeat_next = 0.0 self._displayed_bpm = -1; self._clock_next = 0 self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False self.sl = 0; self.rebuild_setlists() self.dirty = True self.pad_pal = displayio.Palette(8) for i in range(4): self.pad_pal[i] = PAD_DIM[i]; self.pad_pal[i + 4] = PAD_LIT[i] self.lane_pads = []; self.lane_lit = [] self.usb_conn = False; self._m_steps = 0 self._uiNext = 0.0; self._lastTs = None; self._lastBs = None self._seg_start = 0.0 self._refreshNext = 0.0 self.ic_midi_pal = None; self.ic_usb_pal = None # practice log self.can_write = self._probe_write() self._load_settings() self.log = self._load_log() self.play_start = None; self.play_bpm = 0; self.play_name = "" self._log_scroll = 0 self._build_scene() self.load(0) self.draw_icons(); self.draw_meters(); self._set_run_dot() def _btn(self, pin): d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP return d # ---------- scene graph (240x320 portrait; same shape as the Kit's UI, shorter) ---------- def _build_scene(self): root = displayio.Group(); self.display.root_group = root root.append(rect(0, 0, WIDTH, HEIGHT, C_BG)) # Header (y 0..28): VARASYS logo + version + (right edge) MIDI/USB badges + run dot if LOGO: tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 8; tg.y = 8; root.append(tg) lx = 8 + lw else: tg, w, h = make_text("VARASYS", FONT_M, C_CYAN, C_BG); tg.x = 8; tg.y = 8; root.append(tg) lx = 8 + w vtg, vw, vh = make_text("v" + APP_VERSION, FONT_S, C_DIM, C_BG); vtg.x = lx + 4; vtg.y = 10; root.append(vtg) # Run dot + MIDI/USB icons packed at the right edge (replaces the Kit's hamburger; C button opens the menu) self.run_dot_pal = displayio.Palette(1); self.run_dot_pal[0] = C_RUN_IDLE self.run_dot = vectorio.Circle(pixel_shader=self.run_dot_pal, radius=4, x=WIDTH - 12, y=14) root.append(self.run_dot) x = WIDTH - 22 # icons live to the LEFT of the run dot for asset, attr in ((ICON_USB, "ic_usb_pal"), (ICON_MIDI, "ic_midi_pal")): if asset: tg, pal, w, h = make_glyph(asset, C_DIM, C_BG); x -= w; tg.x = x; tg.y = 8; x -= 6 root.append(tg); setattr(self, attr, pal) root.append(rect(0, 28, WIDTH, 1, C_PANEL)) # header divider # Dynamic groups - layout order matches the Kit (BPM big at top-right + meters left; # then ramp/trainer indicators; then setlist tab + CONT; then track title; then pad grid). self.g_bpm = displayio.Group(); root.append(self.g_bpm) # big BPM (right, y ~44) self.g_time = displayio.Group(); root.append(self.g_time) # elapsed [of total] (left, y ~50) self.g_bar = displayio.Group(); root.append(self.g_bar) # bar [of total] (left, y ~78) self.g_train = displayio.Group(); root.append(self.g_train) # ramp / gap-trainer indicators (y ~100) self.g_idx = displayio.Group(); root.append(self.g_idx) # set-list tab (y ~118) self.g_cont = displayio.Group(); root.append(self.g_cont) # CONT (auto-advance) toggle (y ~118) self.g_name = displayio.Group(); root.append(self.g_name) # track title (y ~134) self.g_grid = displayio.Group(); root.append(self.g_grid) # lanes x step pads (y >= GRID_TOP=138) self.g_overlay = displayio.Group(); root.append(self.g_overlay) # modals (drawn on top) def _place(self, group, s, x, y, fg, bg, font, right_edge=None): while len(group): group.pop() self.dirty = True if not s: return tg, w, h = make_text(s, font, fg, bg) tg.x = (right_edge - w) if right_edge is not None else x; tg.y = y; group.append(tg) # ---------- program ---------- def rebuild_setlists(self): self.setlists = [{'title': t, 'items': it, 'builtin': True} for t, it in BUILTIN_SETLISTS] seen = set(_slkey(t) for t, _ in BUILTIN_SETLISTS) for t, it in load_user_setlists(): if _slkey(t) in seen: continue seen.add(_slkey(t)); self.setlists.append({'title': t, 'items': it, 'builtin': False}) if self.sl >= len(self.setlists): self.sl = 0 def switch_setlist(self, delta=1): if len(self.setlists) < 2: return if self._sync_applying: return was = self.running if was: self.running = False; self._log_play() self.sl = (self.sl + delta) % len(self.setlists) self.load(0) if was: self.running = True; self._reset_clock(); self._start_play() self._set_run_dot(); self.draw_meters() self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx)) def load(self, i): items = self.setlists[self.sl]['items'] self.idx = i % len(items) self.name, prog = items[self.idx] self.bpm, self.lanes, self.bars, self.ramp, self.trainer = parse_program(prog) self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False self._overlay = None self._next_pending = None; self._need_redraw = False self._heavy_redraw_at = 0; self._heavy_log_pending = False; self._grid_li = None while len(self.g_overlay): self.g_overlay.pop() self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train() self.build_grid() def _prog_str(self): parts = ['t' + str(self.bpm)] if self.bars: parts.append('b' + str(self.bars)) if self.ramp: parts.append('rmp%d/%d/%d' % (self.ramp.get('start', self.bpm), self.ramp['amt'], self.ramp['every'])) if self.trainer: parts.append('tr%d/%d' % (self.trainer['play'], self.trainer['mute'])) for L in self.lanes: parts.append(lane_to_str(L)) return ';'.join(parts) def toggle_continue(self): self.continue_on = not self.continue_on; self.draw_status() # ---------- modal: 4-screen menu navigated by buttons (Settings / Help / About / Practice log) ---------- def _show_msg(self, text): self._overlay = 'msg'; g = self.g_overlay while len(g): g.pop() px, py, pw, ph = 24, 90, WIDTH - 48, 60 g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_AMBER)) t, w, h = make_text(text[:32], FONT_S, C_TXT, C_PANEL); t.x = px + 12; t.y = py + 12; g.append(t) t2, w2, h2 = make_text("(A/C to dismiss)", FONT_S, C_DIM, C_PANEL); t2.x = px + 12; t2.y = py + 34; g.append(t2) self.dirty = True def _close_overlay(self): self._overlay = None; self._modal_cursor = 0; self._modal_rows = [] while len(self.g_overlay): self.g_overlay.pop() self.dirty = True def _show_menu(self): gc.collect() self._overlay = 'menu'; self._modal_cursor = 0; self._draw_menu() def _draw_menu(self): g = self.g_overlay while len(g): g.pop() PX, PY, PW, RH = 18, 32, WIDTH - 36, 22 rows = ( ("Continue: " + ("on" if self.continue_on else "off"), None, self._menu_toggle_continue), ("Settings >", None, self._show_settings), ("Practice log >", None, self._show_log), ("Help >", None, self._show_help), ("About", None, self._show_about), ) self._modal_rows = rows PH = 24 + len(rows) * RH + 18 g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) t, w, h = make_text("Menu", FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 6; g.append(t) for i, (label, _v, _act) in enumerate(rows): yy = PY + 26 + i * RH sel = (i == self._modal_cursor) if sel: g.append(rect(PX + 6, yy, 3, RH - 4, C_CYAN)) # left-edge caret g.append(rect(PX + 12, yy, PW - 24, RH - 4, C_BTN)) col = C_CYAN if (sel and label.startswith("Continue") and self.continue_on) else (C_TXT if sel else C_MUTE) lt, lw, lh = make_text(label, FONT_S, col, C_BTN if sel else C_PANEL) lt.x = PX + 18; lt.y = yy + 4; g.append(lt) self.dirty = True def _menu_toggle_continue(self): self.continue_on = not self.continue_on; self.draw_status(); self._draw_menu() # ---------- Settings sub-modal (Speaker / MIDI Out / Channel / Clock Out / Clock In) ---------- def _show_settings(self): gc.collect() self._overlay = 'settings'; self._modal_cursor = 0; self._draw_settings() def _draw_settings(self): g = self.g_overlay while len(g): g.pop() PX, PY, PW, RH = 10, 28, WIDTH - 20, 22 sm = "Off" if MUTE_SPEAKER else ("Auto" if SPEAKER_AUTO_MUTE else "Always") rows = ( ("Speaker", sm, self._adj_speaker), ("MIDI Out", "on" if MIDI_ENABLED else "off", self._adj_midi_out), ("Channel", str(MIDI_CHANNEL), self._adj_midi_ch), ("Clock Out", "on" if MIDI_CLOCK_OUT else "off", self._adj_clock_out), ("Clock In", "on" if MIDI_CLOCK_IN else "off", self._adj_clock_in), ) self._modal_rows = rows PH = 24 + len(rows) * RH + 14 g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) t, w, h = make_text("Settings", FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 5; g.append(t) for i, (label, value, _adj) in enumerate(rows): yy = PY + 26 + i * RH sel = (i == self._modal_cursor) if sel: g.append(rect(PX + 4, yy, 3, RH - 4, C_CYAN)) g.append(rect(PX + 10, yy, PW - 20, RH - 4, C_BTN)) lt, lw, lh = make_text(label, FONT_S, C_TXT if sel else C_MUTE, C_BTN if sel else C_PANEL) lt.x = PX + 16; lt.y = yy + 5; g.append(lt) vt, vw, vh = make_text(value, FONT_M, C_TXT if sel else C_MUTE, C_BTN if sel else C_PANEL) vt.x = PX + PW - vw - 14; vt.y = yy + 3; g.append(vt) self.dirty = True def _adj_speaker(self, d): global MUTE_SPEAKER, SPEAKER_AUTO_MUTE modes = ("auto", "always", "off") cur = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always") i = (modes.index(cur) + d) % 3 MUTE_SPEAKER = (modes[i] == "off"); SPEAKER_AUTO_MUTE = (modes[i] == "auto") if MUTE_SPEAKER: self.spk.duty_cycle = 0; self._amp(False) self._save_settings(); self._draw_settings() def _adj_midi_out(self, d): global MIDI_ENABLED MIDI_ENABLED = not MIDI_ENABLED; self._save_settings(); self._draw_settings() def _adj_midi_ch(self, d): global MIDI_CHANNEL MIDI_CHANNEL = ((MIDI_CHANNEL - 1 + d) % 16) + 1 self._save_settings(); self._draw_settings() def _adj_clock_out(self, d): global MIDI_CLOCK_OUT MIDI_CLOCK_OUT = not MIDI_CLOCK_OUT if MIDI_CLOCK_OUT: self._clock_next = time.monotonic_ns() self._save_settings(); self._draw_settings() def _adj_clock_in(self, d): global MIDI_CLOCK_IN MIDI_CLOCK_IN = not MIDI_CLOCK_IN if not MIDI_CLOCK_IN: self._slaved = False self._save_settings(); self._draw_settings() # ---------- Help sub-modal (paginated; cursor not used, X/Z page through) ---------- def _show_help(self): gc.collect() self._overlay = 'help'; self._help_page = 0; self._modal_cursor = 0; self._draw_help() def _draw_help(self): g = self.g_overlay while len(g): g.pop() PX, PY, PW = 10, 26, WIDTH - 20 title, lines = HELP_PAGES[self._help_page] PH = 22 + 13 * len(lines) + 18 g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) t, w, h = make_text(title, FONT_M, C_TXT, C_PANEL); t.x = PX + 10; t.y = PY + 5; g.append(t) pi, piw, pih = make_text("%d / %d" % (self._help_page + 1, len(HELP_PAGES)), FONT_S, C_DIM, C_PANEL) pi.x = PX + PW - piw - 10; pi.y = PY + 9; g.append(pi) yy = PY + 24 for ln in lines: lt, lw, lh = make_text(ln[:44], FONT_S, C_TXT, C_PANEL); lt.x = PX + 10; lt.y = yy; g.append(lt) yy += 14 hint, hw, hh = make_text("X / Z = page, C = close", FONT_S, C_DIM, C_PANEL) hint.x = PX + 10; hint.y = PY + PH - 14; g.append(hint) self.dirty = True # ---------- About sub-modal ---------- def _show_about(self): gc.collect() self._overlay = 'about'; self._modal_cursor = 0; self._draw_about() def _draw_about(self): import sys gc.collect() try: free = gc.mem_free() except Exception: free = 0 try: cp_ver = "%d.%d.%d" % sys.implementation.version[:3] except Exception: cp_ver = "?" up_min = int(time.monotonic()) // 60 lines = ( ("VARASYS PolyMeter", C_CYAN), ("PM_X-1 Explorer", C_TXT), ("", None), ("Firmware: v" + APP_VERSION, C_TXT), ("Free RAM: %d KB" % (free // 1024), C_TXT), ("Uptime: %dm" % up_min, C_TXT), ("CircuitPython: " + cp_ver, C_TXT), ("", None), ("metronome.varasys.io", C_DIM), ) g = self.g_overlay while len(g): g.pop() PX, PY, PW = 20, 26, WIDTH - 40; PH = 10 + 14 * len(lines) + 20 g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) yy = PY + 8 for text, col in lines: if col is not None: lt, lw, lh = make_text(text, FONT_S, col, C_PANEL); lt.x = PX + 12; lt.y = yy; g.append(lt) yy += 14 hint, hw, hh = make_text("C = close", FONT_S, C_DIM, C_PANEL) hint.x = PX + 14; hint.y = PY + PH - 14; g.append(hint) self.dirty = True # ---------- Practice log sub-modal (replaces the Kit's screen-footer log) ---------- def _show_log(self): gc.collect() self._overlay = 'log'; self._log_scroll = 0; self._draw_log_modal() def _draw_log_modal(self): g = self.g_overlay while len(g): g.pop() PX, PY, PW = 6, 26, WIDTH - 12; PH = 12 + LOG_MENU_ROWS * 14 + 22 g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN)) t, w, h = make_text("Practice log", FONT_M, C_TXT, C_PANEL); t.x = PX + 10; t.y = PY + 4; g.append(t) rows = [(i, e) for i, e in enumerate(self.log) if e.get("name") == self.name] if not rows: tg, w, h = make_text("no plays over 5s yet", FONT_S, C_DIM, C_PANEL); tg.x = PX + 12; tg.y = PY + 28; g.append(tg) else: top = self._log_scroll; yy = PY + 24 for k in range(min(LOG_MENU_ROWS, len(rows) - top)): _oi, e = rows[top + k] dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60) bars = e.get("bars", 0); bstr = (" %dbar" % bars) if bars else "" line = "%s %3dbpm %s%s" % (e.get("t", "--:--"), e["bpm"], dur, bstr) lt, lw, lh = make_text(line, FONT_S, C_TXT, C_PANEL); lt.x = PX + 12; lt.y = yy; g.append(lt) yy += 14 hint, hw, hh = make_text("X/Z scroll, C close", FONT_S, C_DIM, C_PANEL) hint.x = PX + 10; hint.y = PY + PH - 14; g.append(hint) self.dirty = True # ---------- Settings persistence ---------- def _load_settings(self): global MUTE_SPEAKER, SPEAKER_AUTO_MUTE, MIDI_ENABLED, MIDI_CHANNEL, MIDI_CLOCK_OUT, MIDI_CLOCK_IN try: with open("/settings.json") as f: d = json.load(f) except Exception: return try: sm = d.get("speaker", "auto") MUTE_SPEAKER = (sm == "off"); SPEAKER_AUTO_MUTE = (sm == "auto") MIDI_ENABLED = bool(d.get("midi_out", MIDI_ENABLED)) MIDI_CHANNEL = max(1, min(16, int(d.get("midi_channel", MIDI_CHANNEL)))) MIDI_CLOCK_OUT = bool(d.get("clock_out", MIDI_CLOCK_OUT)) MIDI_CLOCK_IN = bool(d.get("clock_in", MIDI_CLOCK_IN)) except Exception as e: print("settings:", e) def _save_settings(self): if not self.can_write: return sm = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always") d = {"speaker": sm, "midi_out": MIDI_ENABLED, "midi_channel": MIDI_CHANNEL, "clock_out": MIDI_CLOCK_OUT, "clock_in": MIDI_CLOCK_IN} try: with open("/settings.json", "w") as f: json.dump(d, f) except OSError: self.can_write = False # ---------- step grids (cached per-lane ns durations: tuple lookup, no method call in tick) ---------- def _rebuild_dur(self, L): beat = self._beat_ns sub = max(1, L['sub']); steps = max(1, L['steps']) if L.get('poly') and self.lanes: m = self.lanes[0]; master_bar = beat * (m['steps'] // max(1, m['sub'])) d = master_bar // steps; L['durs'] = tuple(d for _ in range(steps)) elif L.get('swing') and sub % 2 == 0: pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3 L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps)) else: d = beat // sub; L['durs'] = tuple(d for _ in range(steps)) def _rebuild_dur_all(self): for L in self.lanes: self._rebuild_dur(L) def _reset_clock(self): now = time.monotonic_ns() for L in self.lanes: L['next'] = now; L['step'] = -1 self._m_steps = 0 self._seg_start = time.monotonic() # ---------- audio + run-state indicator ---------- def _amp(self, on): # respect AMP_EN_ACTIVE_HIGH (flip in CONFIG if your amp is active-low) self.amp_en.value = on if AMP_EN_ACTIVE_HIGH else not on def click(self, level): self._amp(True) # enable the amp briefly while we drive the piezo self.spk.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600) self.spk.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000) self.spk_off = time.monotonic_ns() + 22_000_000 # silence + amp off scheduled in tick() def _set_run_dot(self): self.run_dot_pal[0] = C_RUN_GO if self.running else C_RUN_IDLE self.dirty = True def flash(self, level): # brief bright pulse on the run dot (replaces the Kit's RGB LED) self.run_dot_pal[0] = C_RUN_PULSE self.dirty = True # ---------- Live sync (HELLO/FULL/DELTA/BYE on SysEx 0x40-0x43; see src/livesync.js for the editor side) ---------- def _sync_send(self, op, text): if self.midi is None: return b = bytearray((0xF0, 0x7D, op)) for c in text: v = ord(c); b.append(v if v < 0x80 else 0x3F) b.append(0xF7) try: self.midi.write(b) except Exception: pass def _sync_broadcast(self, evt): if not self._sync_armed or self._sync_applying or self.midi is None: return text = "%s;%d;%s" % (self._sync_origin, self._sync_seq, evt); self._sync_seq += 1 self._sync_send(0x42, text) def _sync_broadcast_full(self): if not self._sync_armed or self.midi is None: return try: patch = self._prog_str() except Exception: return text = "%s;%d;%d;%d;%d;%s" % (self._sync_origin, self._sync_seq, 1 if self.running else 0, self.sl, self.idx, patch) self._sync_seq += 1 self._sync_send(0x41, text) self._sync_heartbeat_next = time.monotonic() + 5.0 def _sync_apply_full(self, running, patch): self._sync_applying = True try: try: gc.collect() try: cur = self._prog_str() except Exception: cur = None if patch and patch != cur: bpm, lanes, bars, ramp, trainer = parse_program(patch) self.bpm = bpm; self.lanes = lanes; self.bars = bars; self.ramp = ramp; self.trainer = trainer self._beat_ns = 60_000_000_000 // max(1, bpm); self._rebuild_dur_all() self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False self._overlay = None while len(self.g_overlay): self.g_overlay.pop() self._reset_clock() self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() self.build_grid() if running and not self.running: self.toggle() elif (not running) and self.running: self.toggle() except Exception as e: try: print("sync FULL apply:", e) except Exception: pass finally: self._sync_applying = False def _sync_apply_delta(self, evt): self._sync_applying = True try: eq = evt.find('=') key = evt if eq < 0 else evt[:eq] val = '' if eq < 0 else evt[eq+1:] if key == 'play': if not self.running: self.toggle() elif key == 'stop': if self.running: self.toggle() elif key == 'bpm': try: self.set_bpm(int(val)) except Exception: pass elif key == 'sel': p = val.split('/') if len(p) == 2: try: sl = int(p[0]); item = int(p[1]) if sl >= 0 and item >= 0: if sl < len(self.setlists) and sl != self.sl: self.sl = sl items = self.setlists[self.sl]['items'] if 0 <= item < len(items) and item != self.idx: self.goto(item) except Exception: pass elif key == 'beat': # PM_X-1 doesn't EMIT beat= (no on-device editing) but DOES apply p = val.split('/') if len(p) == 3: try: li = int(p[0]); s = int(p[1]); lvl = int(p[2]) if 0 <= li < len(self.lanes): L = self.lanes[li] if 0 <= s < len(L['levels']): L['levels'][s] = lvl & 3 if li < len(self.lane_pads) and s < len(self.lane_pads[li]): lit = (self.lane_lit[li] == s) self.lane_pads[li][s].color_index = self._padbase(L, s) + (4 if lit else 0) self.dirty = True except Exception: pass elif key == 'lane': # apply but don't emit p = val.split('/') if len(p) >= 3: try: li = int(p[0]); field = p[1]; v = '/'.join(p[2:]) if 0 <= li < len(self.lanes): L = self.lanes[li]; structural = False if field == 'sound': L['sound'] = v elif field == 'groups': try: L['groups'] = [int(x) for x in v.split('+')]; structural = True except Exception: pass elif field == 'sub': try: L['sub'] = int(v); structural = True except Exception: pass elif field == 'swing': L['swing'] = (v == '1'); structural = True elif field == 'enabled': L['mute'] = not (v == '1') elif field == 'gain': try: L['gain'] = int(v) except Exception: pass elif field == 'poly': L['poly'] = (v == '1'); structural = True if structural: self._regen_levels(L) if li == 0 and structural: self._rebuild_dur_all() else: self._rebuild_dur(L) if structural: self.build_grid() self.dirty = True except Exception: pass finally: self._sync_applying = False def _regen_levels(self, L): # called on remote lane= deltas to recompute default accents sub = L['sub']; groups = L['groups']; starts = set(); acc = 0 for gp in groups: starts.add(acc); acc += gp L['steps'] = sum(groups) * sub L['levels'] = [(2 if (i // sub) in starts else 1) if i % sub == 0 else 0 for i in range(L['steps'])] def midi_send(self, note, vel): if self.midi is None: return b = self._note_buf b[0] = 0x90 | ((MIDI_CHANNEL - 1) & 0x0F) b[1] = note & 0x7F; b[2] = vel & 0x7F try: self.midi.write(b) except Exception: pass # ---------- transport ---------- def toggle(self): self.running = not self.running if self.running: self._reset_clock(); self._start_play() self._clock_next = time.monotonic_ns() if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: try: self.midi.write(self._start_byte) except Exception: pass else: self.spk.duty_cycle = 0; self._amp(False); self.reset_playheads(); self._log_play() if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None: try: self.midi.write(self._stop_byte) except Exception: pass self._set_run_dot(); self.draw_meters() self._sync_broadcast("play" if self.running else "stop") def set_bpm(self, v): v = max(5, min(300, v)) if v != self.bpm: self.bpm = v; self._beat_ns = 60_000_000_000 // v self._rebuild_dur_all() self._sync_broadcast("bpm=%d" % v) def goto(self, i): was = self.running if was: self.running = False; self._log_play() self.load(i) if was: self.running = True; self._reset_clock(); self._start_play() self._set_run_dot(); self.draw_meters() self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx)) def tap(self): now = time.monotonic() if not hasattr(self, '_taps'): self._taps = [] self._taps = [t for t in self._taps if now - t < 2.4] self._taps.append(now) if len(self._taps) >= 2: span = (self._taps[-1] - self._taps[0]) / (len(self._taps) - 1) if span > 0: self.set_bpm(round(60 / span)) # ---------- scheduler ---------- def tick(self): now = time.monotonic_ns() if self.spk_off and now >= self.spk_off: self.spk.duty_cycle = 0; self.spk_off = 0; self._amp(False) if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False if self.running: fired_best = 0; fired_prio = -1 for li, L in enumerate(self.lanes): if self._advance: break adv = False while now >= L['next']: L['step'] = (L['step'] + 1) % L['steps'] if li == 0: self._m_steps += 1 nb = (self._m_steps - 1) // L['steps'] if nb != self._lastbar: self._lastbar = nb; self._on_new_bar(nb) if self._advance: break if self.ramp and L['steps'] > 0 and not self._slaved: mlen = L['steps'] bar_pos = self._m_steps / mlen seg_bar = (bar_pos % self.bars) if self.bars else bar_pos new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt']))) if new_bpm != self.bpm: self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm self._rebuild_dur_all() lvl = 0 if L['mute'] else L['levels'][L['step']] if lvl > 0: p = PRIO.get(lvl, 0) if p > fired_prio: fired_prio = p; fired_best = lvl if not self._muted: self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90)) L['next'] += L['durs'][L['step']]; adv = True if adv and li < len(self.lane_pads): self._move_playhead(li, L['step']) if fired_best and not self._muted: if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host): self.click(fired_best) self.flash(fired_best) # Decay the run-dot pulse back to base if self.run_dot_pal[0] == C_RUN_PULSE: self.run_dot_pal[0] = C_RUN_GO if self.running else C_RUN_IDLE self.dirty = True if self._advance: self._advance = False self._do_advance() if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved: clk = self._clock_byte tick_ns = self._beat_ns // 24 while now >= self._clock_next: try: self.midi.write(clk) except Exception: pass self._clock_next += tick_ns def _on_new_bar(self, bar): if self.bars and self.continue_on and self._next_pending is None and bar == self.bars - 1: self._prepare_next() if self.bars and bar > 0 and bar % self.bars == 0: self._seg_start = time.monotonic() if self.continue_on: if self._next_pending is None: self._prepare_next() if self._next_pending is not None: self._seam_t = self.lanes[0]['next'] self._advance = True t = self.trainer self._muted = bool(t and (t['play'] + t['mute']) and (bar % (t['play'] + t['mute'])) >= t['play']) def _prepare_next(self): items = self.setlists[self.sl]['items'] nxt = (self.idx + 1) % len(items) if nxt == self.idx: return name, prog = items[nxt] gc.collect() try: bpm, lanes, bars, ramp, trainer = parse_program(prog) except MemoryError: gc.collect(); return beat = 60_000_000_000 // max(1, bpm) for L in lanes: sub = max(1, L['sub']); steps = max(1, L['steps']) if L.get('poly'): m = lanes[0]; mbar = beat * (m['steps'] // max(1, m['sub'])) d = mbar // steps; L['durs'] = tuple(d for _ in range(steps)) elif L.get('swing') and sub % 2 == 0: pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3 L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps)) else: d = beat // sub; L['durs'] = tuple(d for _ in range(steps)) self._next_pending = {'lanes': lanes, 'bpm': bpm, 'bars': bars, 'ramp': ramp, 'trainer': trainer, 'name': name, 'idx': nxt} def _do_advance(self): n = self._next_pending if n is None: return self._next_pending = None self.lanes = n['lanes']; self.bpm = n['bpm']; self.bars = n['bars'] self.ramp = n['ramp']; self.trainer = n['trainer']; self.name = n['name']; self.idx = n['idx'] self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False; self._m_steps = 0 self._overlay = None while len(self.g_overlay): self.g_overlay.pop() seam = self._seam_t for L in self.lanes: L['next'] = seam; L['step'] = -1 self._need_redraw = True self._heavy_redraw_at = time.monotonic() + 0.6 self._seg_start = time.monotonic() self._set_run_dot() # ---------- inputs (6 buttons - active low) ---------- def _modal_select(self): """A inside a modal: invoke the focused row's action (cycle adjuster) or close on About/Help.""" if self._overlay in ('help', 'about', 'msg', 'log'): self._close_overlay(); return if not self._modal_rows: return i = self._modal_cursor if i < 0 or i >= len(self._modal_rows): return _label, _v, fn = self._modal_rows[i] if fn is None: return # adjusters take one argument (+1); plain actions take none try: fn(1) except TypeError: fn() def _modal_back(self): """B inside a modal: step out to the parent (Settings -> menu, Help -> menu, etc.).""" if self._overlay in ('settings', 'help', 'about', 'log'): self._show_menu(); return self._close_overlay() def _modal_up(self): if self._overlay == 'help': if self._help_page > 0: self._help_page -= 1; self._draw_help() return if self._overlay == 'log': self._log_scroll = max(0, self._log_scroll - 1); self._draw_log_modal(); return n = len(self._modal_rows) if n: self._modal_cursor = (self._modal_cursor - 1) % n; self._redraw_modal() def _modal_down(self): if self._overlay == 'help': if self._help_page < len(HELP_PAGES) - 1: self._help_page += 1; self._draw_help() return if self._overlay == 'log': rows = [e for e in self.log if e.get("name") == self.name] if self._log_scroll + LOG_MENU_ROWS < len(rows): self._log_scroll += 1; self._draw_log_modal() return n = len(self._modal_rows) if n: self._modal_cursor = (self._modal_cursor + 1) % n; self._redraw_modal() def _modal_decrement(self): """Y inside a modal: -1 on the focused row's adjuster (if it is one).""" if self._overlay in ('help', 'about', 'msg', 'log'): return if not self._modal_rows: return i = self._modal_cursor if i < 0 or i >= len(self._modal_rows): return _label, _v, fn = self._modal_rows[i] if fn is None: return try: fn(-1) except TypeError: pass # plain action -> Y does nothing def _redraw_modal(self): if self._overlay == 'menu': self._draw_menu() elif self._overlay == 'settings': self._draw_settings() elif self._overlay == 'help': self._draw_help() elif self._overlay == 'about': self._draw_about() elif self._overlay == 'log': self._draw_log_modal() def _on_btn_X(self): if self._overlay: self._modal_up(); return self.goto(self.idx - 1) def _on_btn_Z(self): if self._overlay: self._modal_down(); return self.goto(self.idx + 1) def _on_btn_Y(self): if self._overlay: self._modal_decrement(); return # tempo down: 1 normally, 5 after long hold step = -5 if (time.monotonic_ns() - self._held_t['Y']) > 1_500_000_000 else -1 self.set_bpm(self.bpm + step) def _on_chord_XZ(self): # X+Z chord -> tempo up (mirrors Y for tempo down) step = 5 if (time.monotonic_ns() - self._chord_xz) > 1_500_000_000 else 1 self.set_bpm(self.bpm + step) def poll(self): now_ns = time.monotonic_ns() # Sample all six buttons (active-low; True = released) a = self.btnA.value; b = self.btnB.value; c = self.btnC.value x = self.btnX.value; y = self.btnY.value; z = self.btnZ.value # ---- A: play/stop ---- if (not a) and self._prev['A']: if self._overlay: self._modal_select() else: self.toggle() # ---- B: tap tempo / modal back ---- if (not b) and self._prev['B']: if self._overlay: self._modal_back() else: self.tap() # ---- C: menu open/close ---- if (not c) and self._prev['C']: if self._overlay: self._close_overlay() else: self._show_menu() # ---- X/Z chord detection (tempo up) ---- x_pressed_now = (not x) and self._prev['X'] z_pressed_now = (not z) and self._prev['Z'] chord_window = 100_000_000 # 100ms if x_pressed_now and (not z) and not self._prev['Z'] and (now_ns - self._held_t['Z']) < chord_window: self._chord_xz = now_ns; self._on_chord_XZ() elif z_pressed_now and (not x) and not self._prev['X'] and (now_ns - self._held_t['X']) < chord_window: self._chord_xz = now_ns; self._on_chord_XZ() else: # ---- single-press X / Z ---- if x_pressed_now: self._held_t['X'] = now_ns; self._next_rep['X'] = now_ns + 350_000_000 if not self._chord_xz: self._on_btn_X() if z_pressed_now: self._held_t['Z'] = now_ns; self._next_rep['Z'] = now_ns + 350_000_000 if not self._chord_xz: self._on_btn_Z() if x and z: self._chord_xz = 0 # both released -> chord state clears # ---- Y: tempo down (or modal decrement) ---- if (not y) and self._prev['Y']: self._held_t['Y'] = now_ns; self._next_rep['Y'] = now_ns + 350_000_000 self._on_btn_Y() # ---- hold-repeat for X / Y / Z ---- if (not x) and not self._prev['X'] and now_ns >= self._next_rep['X']: self._next_rep['X'] = now_ns + 120_000_000 if self._chord_xz: self._on_chord_XZ() else: self._on_btn_X() if (not z) and not self._prev['Z'] and now_ns >= self._next_rep['Z']: self._next_rep['Z'] = now_ns + 120_000_000 if self._chord_xz: self._on_chord_XZ() else: self._on_btn_Z() if (not y) and not self._prev['Y'] and now_ns >= self._next_rep['Y']: self._next_rep['Y'] = now_ns + 120_000_000; self._on_btn_Y() # Commit previous-state self._prev['A'] = a; self._prev['B'] = b; self._prev['C'] = c self._prev['X'] = x; self._prev['Y'] = y; self._prev['Z'] = z # USB-MIDI in: any byte = a host is listening (heartbeat); also assemble SysEx if self.midi_in is not None: try: n = self.midi_in.readinto(self._mbuf) except Exception: n = 0 if n: self.last_midi_in = time.monotonic() self._feed_midi(self._mbuf, n) host = bool(self.last_midi_in) and (time.monotonic() - self.last_midi_in) < 1.0 if host != self.midi_host: self.midi_host = host if host and SPEAKER_AUTO_MUTE: self.spk.duty_cycle = 0; self._amp(False) self._set_run_dot(); self.draw_icons() uc = bool(getattr(supervisor.runtime, "usb_connected", True)) if uc != self.usb_conn: self.usb_conn = uc; self.draw_icons() # ---------- drawing ---------- def draw_bpm(self): if self.bpm == self._displayed_bpm: return self._displayed_bpm = self.bpm # Smaller than the Kit's BPM: FONT_M instead of FONT_L. Kit's FONT_L was ~30px tall at 480 - too big proportionally at 320. self._place(self.g_bpm, str(self.bpm), 0, 30, C_TXT, C_BG, FONT_M, right_edge=WIDTH-8) def draw_status(self): sl = self.setlists[self.sl] # setlist tab line at y=66; muted = built-in, cyan = your own self._place(self.g_idx, "%s %d/%d" % (sl['title'][:13], self.idx + 1, len(sl['items'])), 6, 66, C_MUTE if sl['builtin'] else C_CYAN, C_BG, FONT_S) self._place(self.g_cont, "CONT", 0, 66, C_GREEN if self.continue_on else C_DIM, C_BG, FONT_S, right_edge=WIDTH-6) # track title at y=82 (FONT_M; ~16 px tall, fits above GRID_TOP=104) self._place(self.g_name, self.name[:22], 6, 82, C_TXT, C_BG, FONT_M) def draw_train(self): g = self.g_train while len(g): g.pop() x = 6; y = 52 # ramp / gap-trainer indicators below the meters row, above the setlist tab if self.ramp: up = self.ramp['amt'] >= 0 pts = [(0, 9), (12, 9), (12, 0)] if up else [(0, 0), (0, 9), (12, 9)] g.append(vectorio.Polygon(pixel_shader=solid(C_AMBER), points=pts, x=x, y=y)); x += 16 a = self.ramp['amt']; lbl = ("+%d" % a if a >= 0 else "%d" % a) + "/%db" % self.ramp['every'] tg, w, h = make_text(lbl, FONT_S, C_AMBER, C_BG); tg.x = x; tg.y = y; g.append(tg); x += w + 14 if self.trainer: g.append(rect(x, y, 4, 9, C_CYAN)); g.append(rect(x + 6, y, 4, 9, C_DIM)) x += 14 tg, w, h = make_text("%d/%db" % (self.trainer['play'], self.trainer['mute']), FONT_S, C_CYAN, C_BG) tg.x = x; tg.y = y; g.append(tg) self.dirty = True def draw_icons(self): if self.ic_midi_pal is not None: _recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG) if self.ic_usb_pal is not None: _recolor(self.ic_usb_pal, C_CYAN if self.usb_conn else C_DIM, C_BG) self.dirty = True def _fmt_t(self, s): s = int(s) return "%d:%02d:%02d" % (s // 3600, (s % 3600) // 60, s % 60) if s >= 3600 else "%d:%02d" % (s // 60, s % 60) def draw_meters(self): run = self.running and self.play_start is not None mlen = self.lanes[0]['steps'] if self.lanes else 1 bpb = (self.lanes[0]['steps'] // max(1, self.lanes[0]['sub'])) if self.lanes else 4 el = (time.monotonic() - self._seg_start) if run else 0 mbars = max(0, self._m_steps - 1) // max(1, mlen) cur = ("%d" % ((mbars % self.bars + 1) if self.bars else (mbars + 1))) if run else "-" if self.bars: ts = "%s of %s" % (self._fmt_t(el), self._fmt_t(self.bars * bpb * 60.0 / self.bpm)) bs = "bar %s of %d" % (cur, self.bars) else: ts = self._fmt_t(el); bs = "bar %s" % cur if ts != self._lastTs: self._place(self.g_time, ts, 6, 32, C_TXT, C_BG, FONT_S); self._lastTs = ts if bs != self._lastBs: self._place(self.g_bar, bs, 6, 44, C_MUTE, C_BG, FONT_S); self._lastBs = bs # ---------- pad grid (chunked rebuild; per-pad chunks so audio interleaves) ---------- def _padbase(self, L, s): return 0 if L['mute'] else L['levels'][s] def build_grid(self): self._grid_rebuild_start() while self._grid_li is not None: self._grid_rebuild_step() def _grid_rebuild_start(self): while len(self.g_grid): self.g_grid.pop() self.lane_pads = []; self.lane_lit = [] gc.collect() n = min(len(self.lanes), MAXLANES) top = GRID_TOP; rowh = min(30, ((HEIGHT - 6) - top) // max(1, n)) # more vertical room in portrait -> taller rows px0 = 48; usable = WIDTH - 8 - px0 - 8; gridh = n * rowh # narrower screen -> tighter lane-label column self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} m = self.lanes[0] if self.lanes else None if m is not None: mbeats = max(1, m['steps'] // max(1, m['sub'])) for bcol in range(mbeats): self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID)) self._grid_n = n self._grid_geo = (top, rowh, px0, usable) self._grid_li = 0 if n > 0 else None self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] self.dirty = True def _grid_rebuild_step(self): li = self._grid_li if li is None: return if li >= self._grid_n or li >= len(self.lanes): self._grid_li = None; return L = self.lanes[li] top, rowh, px0, usable = self._grid_geo y = top + li * rowh; cy = y + rowh // 2 st = self._grid_lane_st if st is None: tg, w, h = make_text((L.get('sound', '') or '?')[:6], FONT_S, C_MUTE, C_BG) # 6-char label fits the 48px lane column tg.x = 4; tg.y = cy - h // 2; self.g_grid.append(tg) steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps) side = max(5, min(14, stepw - 1, rowh - 6)) # squares can be bigger in portrait rad = max(2, min(side // 2, stepw // 2 - 1)) self._grid_lane_st = (cy, steps, sub, stepw, side, rad) self._grid_pi = 0; self._grid_pads = []; self.dirty = True return cy_, steps, sub, stepw, side, rad = st s = self._grid_pi if s >= steps: self.lane_pads.append(self._grid_pads); self.lane_lit.append(-1) self._grid_pads = []; self._grid_lane_st = None; self._grid_li = li + 1 return cxp = px0 + 6 + (s * usable) // steps pal = self.pad_pal if s % sub == 0: p = vectorio.Rectangle(pixel_shader=pal, width=side, height=side, x=cxp - side // 2, y=cy_ - side // 2) else: p = vectorio.Circle(pixel_shader=pal, radius=rad, x=cxp, y=cy_) p.color_index = self._padbase(L, s); self.g_grid.append(p); self._grid_pads.append(p) self._grid_pi = s + 1 self.dirty = True def _move_playhead(self, li, step): pads = self.lane_pads[li]; prev = self.lane_lit[li] if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev) if step < len(pads): pads[step].color_index = self._padbase(self.lanes[li], step) + 4 self.lane_lit[li] = step; self.dirty = True def reset_playheads(self): for li, pads in enumerate(self.lane_pads): prev = self.lane_lit[li] if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev) self.lane_lit[li] = -1 self.dirty = True # ---------- practice log (saved to /history.json) ---------- def _probe_write(self): try: with open("/.wtest", "w") as f: f.write("1") try: os.remove("/.wtest") except Exception: pass return True except OSError: return False def _load_log(self): try: with open("/history.json") as f: return json.load(f).get("log", []) except Exception: return [] def _save_log(self): if not self.can_write: return try: with open("/history.json", "w") as f: json.dump({"log": self.log[:200]}, f) except OSError: self.can_write = False def _start_play(self): self.play_start = time.monotonic(); self.play_bpm = self.bpm; self.play_name = self.name def _log_play(self): if self.play_start is None: return dur = int(time.monotonic() - self.play_start); self.play_start = None if dur < MIN_LOG_SEC: return mlen = self.lanes[0]['steps'] if self.lanes else 1 t = time.localtime() self.log.insert(0, {"t": "%02d:%02d" % (t.tm_hour, t.tm_min), "bpm": self.play_bpm, "dur": dur, "bars": self._m_steps // max(1, mlen), "name": self.play_name}) del self.log[200:] self._save_log() # ---------- USB-MIDI in: SysEx assembler (clock + editor-pushed programs + live-sync) ---------- def _feed_midi(self, buf, n): now_ns = time.monotonic_ns() if MIDI_CLOCK_IN else 0 for i in range(n): b = buf[i] if b == 0xF0: self._sx = bytearray(); self._sxon = True elif b == 0xF7: if self._sxon: self._handle_sysex(self._sx) self._sxon = False elif b == 0xF8 and MIDI_CLOCK_IN: self._slave_tick(now_ns) elif b == 0xFA and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() elif b == 0xFB and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() elif b == 0xFC and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_stop() elif b >= 0xF8: pass elif self._sxon: if len(self._sx) < 60000: self._sx.append(b) else: self._sxon = False def _slave_tick(self, now_ns): if self._clock_in_last_t == 0: self._clock_in_last_t = now_ns; self._slaved = True; return interval = now_ns - self._clock_in_last_t self._clock_in_last_t = now_ns if interval < 8_300_000 or interval > 500_000_000: return if self._clock_in_avg == 0: self._clock_in_avg = interval else: self._clock_in_avg = (self._clock_in_avg * 7 + interval) // 8 new_bpm = max(5, min(300, int(60_000_000_000 // (self._clock_in_avg * 24)))) if new_bpm != self.bpm: self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm; self._rebuild_dur_all() self._slaved = True def _slave_start(self): if not self.running: self.running = True; self._reset_clock(); self._start_play() self._set_run_dot(); self.draw_meters() self._clock_in_last_t = 0; self._clock_in_avg = 0 def _slave_stop(self): if self.running: self.running = False self.spk.duty_cycle = 0; self._amp(False) self.reset_playheads(); self._log_play() self._set_run_dot(); self.draw_meters() self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False def _handle_sysex(self, sx): if len(sx) < 2 or sx[0] != 0x7D: return cmd = sx[1] if cmd == 0x01 and len(sx) >= 8 and rtc is not None: try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1)) except Exception: pass elif cmd == 0x02: # version query -> reply 0x03 + "X;" if self.midi: payload = DEVICE_ID + ";" + APP_VERSION self.midi.write(bytes([0xF0, 0x7D, 0x03]) + payload.encode() + bytes([0xF7])) elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43: try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:]) except Exception: return origin = text.split(";", 1)[0] if text else "" if origin == self._sync_origin: return self._sync_armed = True if cmd == 0x40: self._sync_broadcast_full() elif cmd == 0x43: self._sync_armed = False elif cmd == 0x41: parts = text.split(";", 5) if len(parts) >= 6: try: running = parts[2] == "1"; patch = parts[5] self._sync_apply_full(running, patch) except Exception: pass elif cmd == 0x42: parts = text.split(";", 2) if len(parts) >= 3: self._sync_apply_delta(parts[2]) elif cmd == 0x10: try: with open("/programs.json", "wb") as f: f.write(bytes(sx[2:])) self.rebuild_setlists(); self.load(0) self._ack(True) except Exception: self._ack(False) elif cmd == 0x21: try: try: self._fw.close() except Exception: pass self._fw = open("/app.new", "wb"); self._fw_n = 0; self._ack(True) except Exception: self._fw = None; self._ack(False) elif cmd == 0x22: try: if self._fw is None or a2b_base64 is None: raise OSError() self._fw.write(a2b_base64(bytes(sx[2:]))) self._fw.flush() self._fw_n += 1 if self._fw_n % 50 == 0: gc.collect() self._ack(True) except Exception: try: self._fw.close() except Exception: pass self._fw = None; self._ack(False) elif cmd == 0x23: try: try: self._fw.close() except Exception: pass self._fw = None; gc.collect() with open("/app.new", "rb") as f: head = f.read(2) if os.stat("/app.new")[6] < 4000 or len(head) < 2 or head[0] != 0x43 or head[1] != 0x06: try: os.remove("/app.new") except OSError: pass self._ack(False); return try: os.remove("/app.bak") except OSError: pass os.rename("/app.mpy", "/app.bak") os.rename("/app.new", "/app.mpy") open("/trial", "w").close() self._ack(True); time.sleep(0.4); supervisor.reload() except Exception: self._ack(False) def _ack(self, ok): if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F if ok else 0x7E, 0xF7])) def run(self): boot = time.monotonic() try: os.stat("/trial"); committed = False except OSError: committed = True while True: try: self.tick(); self.poll() if self._need_redraw: self._need_redraw = False self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters() if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at: self._heavy_redraw_at = 0 self._grid_rebuild_start() if self._grid_li is not None: self._grid_rebuild_step() tnow = time.monotonic() if tnow >= self._uiNext: self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() if self._sync_armed and tnow >= self._sync_heartbeat_next: self._sync_broadcast_full() if not committed and tnow - boot > 5: try: os.remove("/trial") except Exception: pass committed = True if self.dirty and tnow >= self._refreshNext: safe = True if self.running and self.lanes: nb = self.lanes[0]['next'] safe = (nb - time.monotonic_ns()) > 10_000_000 or (tnow - self._lastRefresh) > 0.2 if safe: if self.display.refresh(): self.dirty = False self._lastRefresh = tnow; self._refreshNext = tnow + 0.05 else: self._refreshNext = tnow + 0.003 time.sleep(0.0005) except MemoryError: try: print("MemoryError: gc + continue") except Exception: pass gc.collect(); time.sleep(0.05) except Exception as e: try: print("tick error:", e) except Exception: pass time.sleep(0.05) App().run()