metronome/pico-cp/app.py
Me Here 9651e8bc6a pm-kit: fix jumpy jog (stop drawing mid-spin) - smooth steady spin
The ~1s hitch was the once-per-second readout: show_stats() allocates text
bitmaps (GC pause) and display.refresh() blocks the SPI blit, both stalling the
step loop exactly every second. Now the rate is measured silently while spinning
and the readout (steps + peak) is redrawn only when you release; a gc.collect()
on release + before spinning keeps the heap clean. Steady spin does zero display
work -> smooth.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-05 22:13:07 -05:00

2028 lines
123 KiB
Python

# VARASYS PolyMeter - PM_K-1 "Kit" firmware (CircuitPython edition)
# Raspberry Pi Pico (Pico / Pico W / Pico 2) on the 52Pi EP-0172 "Pico Breadboard Kit Plus":
# 3.5" ST7796 320x480 cap-touch (GT911), PSP joystick, WS2812 RGB, speaker, 2 buttons.
#
# WHY CIRCUITPYTHON: the board then mounts as a USB drive (CIRCUITPY) carrying this code, your
# tracks (programs.json) and a copy of the editor - edit on the web, "Save to device" writes
# programs.json here, and CircuitPython auto-reloads with the new grooves. It also sends USB-MIDI
# (a note per click) so the web editor can play it out the computer's speakers ("Device audio").
# Runs the SAME program strings as metronome.varasys.io.
#
# INSTALL: flash CircuitPython (https://circuitpython.org/board/raspberry_pi_pico/), then copy
# this file as code.py plus programs.json onto the CIRCUITPY drive. It runs on boot.
#
# Fallback: the simpler MicroPython firmware (pico/main.py) is always available - BOOTSEL +
# drag a MicroPython .uf2 to go back. The Pico cannot be bricked.
#
# Untested-panel notes & calibration flags are in CONFIG + pico-cp/README.md.
import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor, math
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
APP_VERSION = "0.0.24" # firmware version (the A/B updater pushes/compares this)
DEVICE_ID = "K" # 'K' = 52Pi kit, 'X' = Pimoroni Explorer (per docs/livesync-protocol.md and the version reply)
try:
import rtc # set from the editor's clock SysEx so the log has real timestamps
except ImportError:
rtc = None
try: # CircuitPython 9.x
from fourwire import FourWire
from busdisplay import BusDisplay
except ImportError: # CircuitPython 8.x
from displayio import FourWire
from displayio import Display as BusDisplay
try:
import neopixel_write # core module on RP2040 - drives WS2812 with no external library
except ImportError:
neopixel_write = None
try:
import usb_midi # default-enabled on RP2040 - sends a MIDI note per click to the computer
except ImportError:
usb_midi = None
try:
from binascii import a2b_base64 # decode the base64-encoded .mpy pushed by the editor's one-click update
except ImportError:
a2b_base64 = None
# ============================== CONFIG (tweak if needed) ==============================
SPI_BAUD = 62_500_000 # faster SPI = smaller tearing window; drop to 40_000_000 if unstable
LED_BRIGHTNESS = 0.15 # WS2812 sits right next to you - keep it dim (0..1)
MIDI_ENABLED = True # send a USB-MIDI note per click (play via the web editor's "Device audio")
MIDI_CHANNEL = 10 # 1..16 - GM channel 10 is the drum channel (what DAWs auto-route to drums)
MIDI_CLOCK_OUT = False # send 24 PPQN MIDI Clock so a DAW can slave its tempo to the metronome
MIDI_CLOCK_OUT_TRANSPORT = True # also send Start (0xFA) / Stop (0xFC) on play / stop (relevant if MIDI_CLOCK_OUT)
MIDI_CLOCK_IN = False # follow an external 24 PPQN clock (DAW / sequencer becomes the master)
MIDI_CLOCK_IN_TRANSPORT = True # also follow Start (0xFA) / Stop (0xFC) from the master (relevant if MIDI_CLOCK_IN)
MUTE_SPEAKER = False # always silence the on-board speaker
SPEAKER_AUTO_MUTE = True # auto-mute the speaker when a MIDI host is listening (computer plays it instead)
WIDTH, HEIGHT = 320, 480
MADCTL = 0x48 # portrait; 0x48 swaps R/B for this BGR panel (cyan reads cyan). Use 0x40 if reversed.
INVERT_COLORS = True # most ST7796 modules need inversion ON; set False if colours look negative
# Touch (GT911) - flip if taps land wrong:
TOUCH_SWAP_XY = False
TOUCH_INVERT_X = False
TOUCH_INVERT_Y = False
TOUCH_DEBUG = False
# Joystick:
JOY_INVERT_X = False
JOY_INVERT_Y = False
JOY_DEADZONE = 9000
# Pendulum stepper (optional): a 4-input unipolar motor (e.g. ULN2003) swung in time with the beat.
STEPPER_ENABLED = True # set False if no motor is wired (the pins just stay free)
PEND_SWING_DEG = 120 # total swing arc, end-to-end, in degrees - drives BOTH the screen graphic and the arm
STEPPER_STEPS_PER_REV = 4096 # your motor's half-steps per full 360 turn (28BYJ-48 half-step ~4096); maps deg -> steps
STEPPER_ARC = round(STEPPER_STEPS_PER_REV * PEND_SWING_DEG / 360.0) # half-steps for one end-to-end swing
STEPPER_MAX_RATE = 600 # top half-steps/sec the motor sustains smoothly (jog spins here; tune via jog mode)
STEPPER_ACCEL = 1800 # half-steps/sec^2 ramp so it reaches top speed without stalling (lower if it stalls)
STEPPER_JOG_START = 150 # jog kickoff half-steps/sec from rest (keep <= the motor's pull-in rate)
# ----- pins (fixed by the EP-0172 board) -----
P_SCK, P_MOSI, P_CS, P_DC, P_RST = board.GP2, board.GP3, board.GP5, board.GP6, board.GP7
P_SDA, P_SCL = board.GP8, board.GP9
P_RGB, P_SPK, P_BTNA, P_BTNB = board.GP12, board.GP13, board.GP15, board.GP14
P_JOYX, P_JOYY = board.GP26, board.GP27
P_STEP = (board.GP18, board.GP19, board.GP20, board.GP21) # pendulum stepper IN1..IN4 (free pins on the EP-0172)
# ----- BUILT-IN playlists: the standard defaults from the web editor, baked in here so they update with
# firmware and the user can't change/delete them. User playlists live separately in programs.json
# (pushed from the editor) and never touch these. (ASCII only - it's pushed 7-bit + the fonts are ASCII.)
BUILTIN_SETLISTS = [
("Styles", [
("Four-on-the-floor", "t120;kick:4;snare:4=.x.x;hatClosed:4/2"),
("Swing ride", "t150;ride:4/2s;kick:4=X..x;snare:4=.x.x"),
("Purdie half-time shuffle", "t92;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
("Samba (2/4)", "t104;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
("Nanigo (6/8 bembe)", "t130;cowbell:4/3=X.xx.x.xx.x.;kick:4/3=X.....X.....;hatClosed:4/3=..x..x..x..x"),
("6/8 groove", "t100;kick:3+3=x..x..;snare:3+3=...x..;hatClosed:3+3/2"),
("7/8 (2+2+3)", "t130;kick:2+2+3=x..x..x;hatClosed:2+2+3/2"),
("5/4 (3+2)", "t112;kick:3+2=x..x.;snare:3+2=..x..;hatClosed:3+2/2"),
]),
("Practice", [
("5 over 4 polyrhythm", "t100;kick:4;claves:5~"),
("3 over 2 hemiola", "t96;woodblock:2;cowbell:3~"),
("2 & 4 & 3 over one bar", "t100;kick:3;cowbell:2~;claves:4~"),
("Triplet hats", "t100;kick:4;snare:4=.x.x;hatClosed:4/3"),
("Accents - cycle the pads", "t92;kick:4=X..X;snare:4=.X.X;hatClosed:4/2"),
("Tempo builder 80 up", "t80;woodblock:4;rmp80/4/4"),
("Gap trainer (play 2 / rest 2)", "t100;kick:4;hatClosed:4/2;tr2/2"),
]),
("Song (continuous)", [ # ~4-bar sections; with Continue on they roll one into the next
("Intro - hats & kick", "t88;b4;kick:4=X.x.;hatClosed:4/2=gggggggg"),
("Groove in - backbeat", "t88;b4;kick:4=X.x.;snare:4=.X.X;hatClosed:4/2"),
("Half-time shuffle", "t92;b4;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
("Build - ramp 92-120", "t92;b4;rmp92/4/2;kick:4;snare:4=.X.X;hatClosed:4/2"),
("Four-on-the-floor (909)", "t124;b4;kick909:4;clap909:4=.X.X;hat909:4/2=.X.X.X.X"),
("Samba break (2/4)", "t116;b4;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
("Peak - 16ths", "t132;b4;kick:4=X..x;snare:4=.X.X;hatClosed:4/4"),
("Outro - ramp down", "t132;b4;rmp132/-7/1;kick:4=X..x;hatClosed:4/2=gggggggg"),
]),
]
# ============================== COLOURS (0xRRGGBB; displayio handles 565) ==============================
C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494
C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240
C_BTN = 0x1C222C
LEVEL_RGB = {2: (255, 110, 0), 1: (0, 150, 255), 3: (130, 70, 255)} # beat pulse: accent / normal / ghost
LED_IDLE = (0, 80, 0) # RGB LED resting colour when stopped: dim green ("on")
LED_RUN = (110, 0, 0) # RGB LED resting colour when playing: dim red (beats pulse brighter on top)
# voice -> General-MIDI note (USB-MIDI bridge), and level -> MIDI velocity
SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38,
"clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42,
"hatOpen":46,"openHat808":46, "ride":51,"ride909":51, "crash":49,"crash909":49,
"tomLow":41,"tom808":45,"tomMid":45,"tomHigh":48, "tambourine":54,
"cowbell":56,"cowbell808":56, "woodblock":76,"jamblock":76, "claves":75, "beep":37}
GM_DEFAULT = 37
SOUNDS = ["kick", "snare", "clap", "rim", "hatClosed", "hatOpen", "ride", "crash", # lane-editor sound cycle
"tomLow", "tomMid", "tomHigh", "cowbell", "woodblock", "claves", "tambourine", "beep"]
HELP_PAGES = ( # paginated on-device help (rendered in _draw_help)
("Transport & Navigation", (
"Joystick up/down: tempo +/-1 (5 if held)",
"Joystick left/right: prev/next track",
"Button A: play / stop",
"Button B: tap tempo",
"Tap set-list tab: switch playlist",
"Tap CONT (top of tab): auto-advance",
"Tap hamburger: this menu",
)),
("Editing", (
"Tap a beat: off -> normal -> accent -> ghost",
"Tap an instrument name: lane editor",
"Lane editor: sound / beats / sub / swing /",
" mute, plus + Lane / Remove",
"Title turns red: unsaved edits",
"Tap red title: Save or Revert",
"Built-in edits save into 'My edits'",
)),
("Status & Hardware", (
"MIDI badge green: laptop listening",
"USB badge cyan: connected to a computer",
"RGB LED: green=stop / red=play + pulse",
"Squares = main beats, circles = subs",
"Ramp arrow: track has a tempo ramp",
"Gap symbol: silent rest bars",
"Practice log: time / BPM / dur / bars",
)),
)
MIDI_VEL = {2: 120, 1: 90, 3: 45} # accent / normal / ghost
MAXLANES = 5 # lanes shown on the pad grid (extras still play)
GRID_TOP = 158 # top of the pad grid (leaves room for time/bar/ramp/tab rows)
LOG_TOP, LOG_ROWH, LOG_ROWS = 302, 16, 9 # practice-history log area (below the pad grid)
MIN_LOG_SEC = 5 # don't log plays shorter than this
# On-screen pendulum (drawn over the log area while playing): inverted-metronome style - pivot near
# the bottom, weighted bob swinging up top. Mirrors the physical stepper arm (same beat phase).
PEND_PX = WIDTH // 2 # pivot x (screen centre)
PEND_PY = HEIGHT - 16 # pivot y (near the bottom edge)
PEND_LEN = 140 # arm length (px)
PEND_THETA = math.radians(PEND_SWING_DEG) / 2.0 # half-swing angle (radians), derived from PEND_SWING_DEG
PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost
PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost
C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes)
C_RED = 0xFF5A5A # unsaved-edits (dirty) track title
# WS2812 RGB LED - self-contained via the core neopixel_write module (no external library)
class RGB:
def __init__(self, pin):
self.ok = neopixel_write is not None
if self.ok:
self.io = digitalio.DigitalInOut(pin); self.io.direction = digitalio.Direction.OUTPUT
self.buf = bytearray(3)
def set(self, r, g, b):
if not self.ok: return
# WS2812 wants GRB order; scale down so it isn't blinding
self.buf[0] = int(g * LED_BRIGHTNESS); self.buf[1] = int(r * LED_BRIGHTNESS); self.buf[2] = int(b * LED_BRIGHTNESS)
try: neopixel_write.neopixel_write(self.io, self.buf)
except Exception: self.ok = False
# Pendulum stepper - a 4-input unipolar motor (e.g. ULN2003) driven as a metronome arm. Half-step
# 8-phase sequence (smoothest). Non-blocking: the app steps it toward a beat-derived target each loop.
HALF_SEQ = ((1, 0, 0, 0), (1, 1, 0, 0), (0, 1, 0, 0), (0, 1, 1, 0),
(0, 0, 1, 0), (0, 0, 1, 1), (0, 0, 0, 1), (1, 0, 0, 1))
class Pendulum:
def __init__(self, pins):
self.io = []
try:
for p in pins:
d = digitalio.DigitalInOut(p); d.direction = digitalio.Direction.OUTPUT; d.value = False
self.io.append(d)
self.ok = len(self.io) == 4
except Exception:
self.ok = False
self.phase = 0 # index into HALF_SEQ (advances +/-1 per half-step)
self.pos = 0 # arm position in half-steps from the 'home' extreme
def _write(self):
pat = HALF_SEQ[self.phase & 7]
self.io[0].value = bool(pat[0]); self.io[1].value = bool(pat[1])
self.io[2].value = bool(pat[2]); self.io[3].value = bool(pat[3])
def step_toward(self, target): # one half-step toward target
if target > self.pos: self.phase += 1; self.pos += 1; self._write()
elif target < self.pos: self.phase -= 1; self.pos -= 1; self._write()
def spin(self, cw): # one free half-step either way (jog/test mode)
self.phase += 1 if cw else -1; self._write()
def release(self): # de-energize all coils (cool + quiet when idle)
for d in self.io: d.value = False
# ============================== ANTI-ALIASED FONTS (binary blobs on the drive; see pico/gen_font.py) ==============================
def load_font(path):
with open(path, "rb") as f:
blob = f.read()
count = blob[0]; p = 1; pixoff = 1 + count * 7; glyphs = {}
for _ in range(count):
cp = (blob[p] << 8) | blob[p+1]; w = blob[p+2]; h = blob[p+3]
xoff = blob[p+4]; xoff = xoff - 256 if xoff > 127 else xoff
top = blob[p+5]; adv = blob[p+6]; p += 7
glyphs[cp] = (w, h, xoff, top, adv, pixoff); pixoff += (w * h + 1) // 2
return (glyphs, blob)
FONT_S = load_font("/font_s.bin") # small - pad-grid lane labels
FONT_M = load_font("/font_m.bin") # labels / buttons
FONT_L = load_font("/font_l.bin") # big BPM
gc.collect()
def _blend(bg, fg, i):
t = i * 17
r = (((bg >> 16) & 0xFF)*(255-t) + ((fg >> 16) & 0xFF)*t) // 255
g = (((bg >> 8) & 0xFF)*(255-t) + ((fg >> 8) & 0xFF)*t) // 255
b = ((bg & 0xFF)*(255-t) + (fg & 0xFF)*t) // 255
return (r << 16) | (g << 8) | b
def make_text(s, font, fg, bg):
"""Render a string into a displayio TileGrid (anti-aliased via a 16-step blend palette)."""
glyphs, blob = font
w = 0; top0 = 999; bot = 0
for c in s:
g = glyphs.get(ord(c))
if not g: continue
w += g[4]
if g[1]:
if g[3] < top0: top0 = g[3]
if g[3] + g[1] > bot: bot = g[3] + g[1]
if top0 == 999: top0 = 0
w = max(1, w); h = max(1, bot - top0)
gc.collect()
bmp = displayio.Bitmap(w, h, 16)
pal = displayio.Palette(16)
for i in range(16): pal[i] = _blend(bg, fg, i)
pen = 0
for c in s:
g = glyphs.get(ord(c))
if not g: continue
gw, gh, xoff, gtop, adv, off = g
for j in range(gh):
row = (gtop - top0) + j
for i in range(gw):
k = j * gw + i
byte = blob[off + (k >> 1)]
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
if nib:
x = pen + xoff + i
if 0 <= x < w and 0 <= row < h: bmp[x, row] = nib
pen += adv
return displayio.TileGrid(bmp, pixel_shader=pal), w, h
# ---- single-image alpha assets (logo, status icons) - blit like a one-off glyph; see gen_assets.py ----
def load_alpha(path):
try:
with open(path, "rb") as f: blob = f.read()
return (blob[0], blob[1], blob) # (w, h, bytes); pixels start at offset 2
except Exception:
return None # missing/corrupt -> caller falls back to text (no crash)
def make_glyph(asset, fg, bg):
w, h, blob = asset
gc.collect()
bmp = displayio.Bitmap(w, h, 16); pal = displayio.Palette(16)
for i in range(16): pal[i] = _blend(bg, fg, i)
for k in range(w * h):
byte = blob[2 + (k >> 1)]
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
if nib: bmp[k % w, k // w] = nib
return displayio.TileGrid(bmp, pixel_shader=pal), pal, w, h
def _recolor(pal, fg, bg): # re-tint a stored asset palette in place (tear-free)
for i in range(16): pal[i] = _blend(bg, fg, i)
LOGO = load_alpha("/logo.bin") # VARASYS wordmark (no tagline)
ICON_MIDI = load_alpha("/midi.bin") # DIN-5: green when a MIDI host is listening
ICON_USB = load_alpha("/usb.bin") # trident: lit when USB-connected to a computer
gc.collect()
# ============================== POLYMETER ENGINE (same semantics as the web/MicroPython) ==============================
PAT = {'X': 2, 'x': 1, 'g': 3, '.': 0, '-': 0, '_': 0,
'f': 1, 'F': 2, 'd': 1, 'D': 2, 'z': 1, 'Z': 2} # ornament hits: UPPER = accented, lower = normal
ORN = {'f': 1, 'F': 1, 'd': 2, 'D': 2, 'z': 3, 'Z': 3} # ornament type: 0 none / 1 flam / 2 drag / 3 roll
PRIO = {2: 3, 1: 2, 3: 1}
# General-MIDI percussion note numbers -> voice names (so a lane can be typed as "36:4"); matches the web GM_NUM
GM_NUM = {35: "kick", 36: "kick", 37: "rim", 38: "snare", 39: "clap", 40: "snare", 41: "tomLow", 42: "hatClosed",
43: "tomLow", 44: "hatClosed", 45: "tomMid", 46: "hatOpen", 47: "tomMid", 48: "tomHigh", 49: "crash",
50: "tomHigh", 51: "ride", 53: "ride", 54: "tambourine", 56: "cowbell", 75: "claves", 76: "woodblock", 77: "woodblock"}
def _euclid(k, n, rot): # even distribution: k hits over n steps, rotated (matches web euclid())
n = max(1, n); k = max(0, min(n, k)); rot = ((rot % n) + n) % n
return [1 if ((((i + rot) % n) * k) % n) < k else 0 for i in range(n)]
def parse_program(s):
bpm = 120; lanes = []; bars = 0; ramp = None; trainer = None; rep = None; end = None
for tok in s.strip().split(';'):
tok = tok.strip()
if not tok: continue
if tok[0] == 't' and tok[1:].isdigit():
bpm = int(tok[1:]); continue
if tok[0] == 'b' and tok[1:].isdigit(): # b<n> = segment length in bars (totals + Continue)
bars = int(tok[1:]); continue # (lane sounds like "beep:4" have a ':' -> not matched here)
if tok.startswith('rmp'): # rmp<start>/<amount>/<everyBars> tempo ramp (amount may be -)
p = tok[3:].split('/')
if len(p) == 3:
try: ramp = {'start': int(p[0]), 'amt': int(p[1]), 'every': max(1, int(p[2]))}
except ValueError: pass
continue
if tok.startswith('tr') and '/' in tok and ':' not in tok: # tr<play>/<mute> gap trainer (bars)
p = tok[2:].split('/')
if len(p) == 2:
try: trainer = {'play': max(0, int(p[0])), 'mute': max(0, int(p[1]))}
except ValueError: pass
continue
if tok.startswith('rep='): # rep=<n> cycles before the end-action fires (playback flow)
try: rep = max(1, int(tok[4:]))
except ValueError: pass
continue
if tok.startswith('end='): # end=stop | end=next(+1) | end=<+/-N> relative goto; absent = loop forever
v = tok[4:]
if v == 'stop': end = 'stop'
elif v == 'next': end = 1
else:
try: end = int(v)
except ValueError: pass
continue
if ':' not in tok: continue
lane = _parse_lane(tok)
if lane: lanes.append(lane)
if not lanes: lanes = [_parse_lane("beep:4")]
return max(5, min(300, bpm)), lanes, bars, ramp, trainer, rep, end
def _parse_lane(tok):
poly = '~' in tok; mute = '!' in tok
tok = tok.replace('~', '').replace('!', '')
gain = ''
if '@' in tok: tok, _, g = tok.partition('@'); gain = '@' + g # preserve @db for round-trip (engine ignores it)
sound, _, rest = tok.partition(':')
if sound.isdigit(): sound = GM_NUM.get(int(sound), sound) # GM note-number alias (e.g. 36 -> kick)
euc = None # euclidean (k,n,rot) shorthand - pulled before the =/ splits
lp = rest.find('(')
if lp >= 0:
rp = rest.find(')', lp)
if rp > lp:
nums = [int(x) for x in rest[lp + 1:rp].split(',') if x.strip().isdigit()]
rest = rest[:lp] + rest[rp + 1:]
if nums: euc = nums
pattern = None
if '=' in rest: rest, _, pattern = rest.partition('=')
sub = 1; swing = False
if '/' in rest:
rest, _, sd = rest.partition('/')
swing = sd.endswith('s'); sd = sd.rstrip('s') # "/2s" = swung eighths
sub = int(sd) if sd.isdigit() else 1
groups = [int(g) for g in rest.split('+') if g.isdigit()] or [4]
beats = sum(groups); starts = set(); acc = 0
for gp in groups: starts.add(acc); acc += gp
if euc: # euclidean: k hits over n steps, first hit accented
k = euc[0]; n = euc[1] if len(euc) > 1 else beats * sub; rot = euc[2] if len(euc) > 2 else 0
if len(euc) > 1:
if n % beats == 0: sub = n // beats
else: groups = [n]; sub = 1
steps = n; levels = []; first = True
for h in _euclid(k, n, rot):
if h: levels.append(2 if first else 1); first = False
else: levels.append(0)
orns = [0] * len(levels) # euclid hits carry no ornament
elif pattern:
steps = beats * sub
levels = [PAT.get(ch, 0) for ch in pattern]
orns = [ORN.get(ch, 0) for ch in pattern] # per-step flam/drag/roll, parallel to levels
if len(levels) < steps:
levels += [0] * (steps - len(levels)); orns += [0] * (steps - len(orns))
steps = len(levels)
else:
steps = beats * sub
levels = []
for i in range(steps):
if i % sub == 0: levels.append(2 if (i // sub) in starts else 1) # beat: accent on group starts
else: levels.append(1) # off-beat subdivisions sound at normal (grouping IS the accent map)
orns = [0] * steps
if sound not in SOUND_GM: sound = "beep" # unknown sound -> beep (match web)
return {'sound': sound, 'sub': sub, 'swing': swing, 'steps': steps, 'levels': levels, 'orns': orns,
'poly': poly, 'mute': mute, 'groups': groups, 'gain': gain}
PAT_CH = {2: 'X', 1: 'x', 3: 'g', 0: '.'} # level -> pattern char (inverse of PAT)
ORN_CH = {1: ('f', 'F'), 2: ('d', 'D'), 3: ('z', 'Z')} # ornament -> (normal, accented) pattern char
def _cell_ch(v, o): # (level, ornament) -> one pattern char
if o in ORN_CH: return ORN_CH[o][1 if v >= 2 else 0]
return PAT_CH.get(v, '.')
def lane_to_str(L): # serialize a lane back to the share grammar (round-trips)
s = L['sound'] + ':' + '+'.join(str(g) for g in L.get('groups', [4]))
if L['sub'] != 1 or L['swing']: s += '/' + str(L['sub']) + ('s' if L['swing'] else '')
orns = L.get('orns') or [0] * len(L['levels'])
s += '=' + ''.join(_cell_ch(v, orns[i] if i < len(orns) else 0) for i, v in enumerate(L['levels']))
s += L.get('gain', '')
if L['poly']: s += '~'
if L['mute']: s += '!'
return s
_ALNUM = "abcdefghijklmnopqrstuvwxyz0123456789"
def _slkey(t): # normalise a title for built-in/user de-dup (no str.isalnum on CircuitPython)
return "".join(c for c in t.lower() if c in _ALNUM)
def load_user_setlists():
# User playlists from /programs.json (pushed by the editor). New {setlists:[{title,programs:[..]}]} form,
# or the old flat {programs:[..]} (one list). Built-ins are baked in BUILTIN_SETLISTS, never here.
try:
with open("/programs.json") as f: d = json.load(f)
except Exception as e:
print("programs.json:", e); return []
def items_of(pl): return [(p.get("name", "?"), p.get("prog", "")) for p in pl if p.get("prog")]
out = []
try:
if isinstance(d.get("setlists"), list):
for sl in d["setlists"]:
it = items_of(sl.get("programs", []))
if it: out.append((sl.get("title", "My set list"), it))
elif isinstance(d.get("programs"), list):
it = items_of(d["programs"])
if it: out.append((d.get("title", "My set list"), it))
except Exception as e:
print("setlists:", e)
return out
# ============================== GT911 TOUCH ==============================
class GT911:
def __init__(self, i2c):
self.i2c = i2c; self.addr = None
while not i2c.try_lock(): pass
try: found = i2c.scan()
finally: i2c.unlock()
for a in (0x5D, 0x14):
if a in found: self.addr = a; break
if self.addr is None and found: self.addr = found[0]
def _rd(self, reg, n):
b = bytearray(n)
while not self.i2c.try_lock(): pass
try:
self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF]))
self.i2c.readfrom_into(self.addr, b)
finally: self.i2c.unlock()
return b
def _wr(self, reg, val):
while not self.i2c.try_lock(): pass
try: self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF, val]))
finally: self.i2c.unlock()
def read(self):
if self.addr is None: return None
try: st = self._rd(0x814E, 1)[0]
except OSError: return None
if not (st & 0x80): return None
n = st & 0x0F; pt = None
if n >= 1:
b = self._rd(0x8150, 4); tx = b[0] | (b[1] << 8); ty = b[2] | (b[3] << 8)
pt = self._map(tx, ty)
try: self._wr(0x814E, 0)
except OSError: pass
return pt
def _map(self, tx, ty):
if TOUCH_DEBUG: print("touch raw", tx, ty)
if TOUCH_SWAP_XY: tx, ty = ty, tx
if TOUCH_INVERT_X: tx = WIDTH - 1 - tx
if TOUCH_INVERT_Y: ty = HEIGHT - 1 - ty
if 0 <= tx < WIDTH and 0 <= ty < HEIGHT: return (tx, ty)
return None
# ============================== DISPLAY SETUP ==============================
def st7796_init():
inv = b'\x21\x00' if INVERT_COLORS else b'\x20\x00'
return (
b'\x01\x80\x78' # SWRESET + 120ms
b'\x11\x80\x78' # SLPOUT + 120ms
b'\xF0\x01\xC3' b'\xF0\x01\x96' # command-set unlock
+ bytes([0x36, 0x01, MADCTL]) +
b'\x3A\x01\x55' # 16bpp
b'\xB4\x01\x01'
b'\xB6\x03\x80\x02\x3B'
b'\xE8\x08\x40\x8A\x00\x00\x29\x19\xA5\x33'
b'\xC1\x01\x06' b'\xC2\x01\xA7'
b'\xC5\x81\x18\x78' # VCOM + 120ms
b'\xE0\x0E\xF0\x09\x0B\x06\x04\x15\x2F\x54\x42\x3C\x17\x14\x18\x1B'
b'\xE1\x0E\xE0\x09\x0B\x06\x04\x03\x2B\x43\x42\x3B\x16\x14\x17\x1B'
b'\xF0\x01\x3C' b'\xF0\x81\x69\x78' # lock + 120ms
+ inv +
b'\x29\x80\x32' # DISPON + 50ms
)
def make_display():
displayio.release_displays()
spi = busio.SPI(clock=P_SCK, MOSI=P_MOSI)
bus = FourWire(spi, command=P_DC, chip_select=P_CS, reset=P_RST, baudrate=SPI_BAUD)
return BusDisplay(bus, st7796_init(), width=WIDTH, height=HEIGHT, auto_refresh=False)
def solid(color):
p = displayio.Palette(1); p[0] = color; return p
def rect(x, y, w, h, color):
return vectorio.Rectangle(pixel_shader=solid(color), width=w, height=h, x=x, y=y)
# ============================== APP ==============================
class App:
def __init__(self):
self.display = make_display()
self.i2c = busio.I2C(scl=P_SCL, sda=P_SDA, frequency=400_000)
self.touch = GT911(self.i2c)
self.midi = usb_midi.ports[1] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 1) else None
self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None
self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0
self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler (clock + pushed programs)
self._fw = None; self._fw_n = 0; self._fw_pushing = False # chunked firmware transfer state + bus-quiet flag
self.led = RGB(P_RGB)
self.spk = pwmio.PWMOut(P_SPK, frequency=1600, variable_frequency=True, duty_cycle=0)
self.spk_off = 0
self.pend = Pendulum(P_STEP) if STEPPER_ENABLED else None # beat-synced pendulum arm (optional)
self._pend_beat0 = 0; self._pend_dir = 1; self._pend_last = 0; self._pend_on = False
self._pendNext = 0.0 # ~30fps cadence for the on-screen pendulum
self.btnA = self._btn(P_BTNA); self.btnB = self._btn(P_BTNB)
self._aPrev = True; self._bPrev = True
self._jog = (not self.btnA.value) and (not self.btnB.value) # both held at boot -> hidden jog/test mode
self.jx = analogio.AnalogIn(P_JOYX); self.jy = analogio.AnalogIn(P_JOYY)
self._joyNext = 0
self._touchDown = False; self._touchSeen = 0
self.running = False; self.bpm = 120; self.idx = 0; self.lanes = []; self.bars = 0; self.rgb = (0, 0, 0)
self.ramp = None; self.trainer = None; self._lastbar = -1; self._muted = False; self._ramp_base = 120
self.rep = None; self.end = None # per-track playback flow: rep=cycles, end=stop|next|+/-N goto
self._dirty = False; self._overlay = None; self._ovbtns = [] # on-device editing: unsaved edits + modal
self.continue_on = False; self._advance = False; self._grid = {} # auto-advance + pad hit-test geometry
self._next_pending = None; self._seam_t = 0; self._need_redraw = False # gapless seam between tracks
self._heavy_redraw_at = 0 # deferred build_grid + draw_log deadline (so B's intro isn't blocked by SPI/alloc)
self._grid_li = None; self._grid_n = 0; self._grid_geo = (0, 0, 0, 0) # chunked build_grid progress (1 PAD / loop iter)
self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] # per-lane sub-state for sub-pad chunking
self._heavy_log_pending = False
self._beat_ns = 60_000_000_000 // self.bpm # cached: ns per quarter note; refreshed on every bpm change
self._note_buf = bytearray([0x90, 0, 0]) # reused for every Note On (no per-click bytes() alloc)
self._clock_byte = bytes([0xF8]) # singleton MIDI Clock tick (24 PPQN)
self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC])
self._lastRefresh = 0.0 # for the "force refresh after Xms even if a beat is imminent" guard
try: # live sync: short random id so peers can drop their own echoes
o = os.urandom(4); self._sync_origin = "d" + "".join("%02x" % b for b in o)
except Exception:
self._sync_origin = "d%08x" % (time.monotonic_ns() & 0xFFFFFFFF)
self._sync_armed = False; self._sync_seq = 0; self._sync_applying = False
self._sync_heartbeat_next = 0.0 # next periodic FULL broadcast deadline (when armed)
self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag
self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json)
self.dirty = True
self.pad_pal = displayio.Palette(8) # 0-3 idle levels (mute/normal/accent/ghost), 4-7 the lit playhead
for i in range(4): self.pad_pal[i] = PAD_DIM[i]; self.pad_pal[i + 4] = PAD_LIT[i]
self.lane_pads = []; self.lane_lit = []
self.usb_conn = False; self._m_steps = 0 # USB-connected state; master-lane steps (for the bar counter)
self._uiNext = 0.0; self._lastTs = None; self._lastBs = None # throttle the stopwatch/bar redraw
self._seg_start = 0.0 # timer origin; resets with the bar counter (each segment)
self._refreshNext = 0.0; self._touchNext = 0.0 # cap display refresh + touch polling (tighter MIDI timing)
self.ic_midi_pal = None; self.ic_usb_pal = None
# practice history - persisted to /history.json (next to programs.json) when we own the filesystem
self.can_write = self._probe_write()
self._load_settings() # /settings.json overrides the module-level defaults
self.log = self._load_log()
self.play_start = None; self.play_bpm = 0; self.play_name = ""
self._armed = None; self.log_rows = []
self._build_scene()
self.load(0) # load() also draws the (track-filtered) practice log
self.draw_icons(); self.draw_meters(); self.led_rest() # LED green = on
def _btn(self, pin):
d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP
return d
# ---------- scene graph ----------
def _build_scene(self):
root = displayio.Group(); self.display.root_group = root
root.append(rect(0, 0, WIDTH, HEIGHT, C_BG)) # static background (run state shows on the LED)
# header: VARASYS logo (left, no tagline) + version (small, top, right of the logo) + MIDI/USB icons (right)
if LOGO:
tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 10; tg.y = 9; root.append(tg)
lx = 10 + lw
else:
tg, w, h = make_text("VARASYS", FONT_M, C_CYAN, C_BG); tg.x = 10; tg.y = 8; root.append(tg)
lx = 10 + w
vtg, vw, vh = make_text("v" + APP_VERSION, FONT_S, C_DIM, C_BG); vtg.x = lx + 6; vtg.y = 8; root.append(vtg)
# Hamburger menu (3 thin rects) at the far right; tap zone is generous so it's easy to hit.
mx = WIDTH - 30 # left edge of the icon (18 px wide x 14 px tall total)
for dy in (10, 16, 22):
root.append(rect(mx, dy, 18, 2, C_MUTE))
self._menu_bbox = (mx - 8, 0, WIDTH, 32)
x = mx - 8 # MIDI/USB icons start LEFT of the hamburger
for asset, attr in ((ICON_USB, "ic_usb_pal"), (ICON_MIDI, "ic_midi_pal")):
if asset:
tg, pal, w, h = make_glyph(asset, C_DIM, C_BG); x -= w; tg.x = x; tg.y = 8; x -= 8
root.append(tg); setattr(self, attr, pal)
root.append(rect(0, 38, WIDTH, 2, C_PANEL))
# dynamic groups
self.g_bpm = displayio.Group(); root.append(self.g_bpm) # big tempo (right)
self.g_time = displayio.Group(); root.append(self.g_time) # elapsed [of total] (left)
self.g_bar = displayio.Group(); root.append(self.g_bar) # bar [of total] (left)
self.g_train = displayio.Group(); root.append(self.g_train) # ramp / gap-trainer indicators
self.g_cont = displayio.Group(); root.append(self.g_cont) # CONT (Continue auto-advance) toggle indicator
self.g_name = displayio.Group(); root.append(self.g_name) # track title (red when edited/unsaved)
self.g_idx = displayio.Group(); root.append(self.g_idx) # set-list tab (tap to switch playlist)
self.g_grid = displayio.Group(); root.append(self.g_grid) # lanes x step pads
root.append(rect(0, LOG_TOP - 6, WIDTH, 2, C_PANEL)) # divider above the history log
self.g_log = displayio.Group(); root.append(self.g_log) # practice history (tap a row to delete)
self.g_pend = displayio.Group(); root.append(self.g_pend) # swinging pendulum, shown over the log while playing
self.g_pend.append(rect(PEND_PX - 26, PEND_PY + 4, 52, 4, C_PANEL)) # little stand/base under the pivot
self._pend_arm = vectorio.Polygon(pixel_shader=solid(C_DIM),
points=[(PEND_PX - 4, PEND_PY), (PEND_PX + 4, PEND_PY), (PEND_PX, PEND_PY - PEND_LEN)], x=0, y=0)
self.g_pend.append(self._pend_arm)
self.g_pend.append(vectorio.Circle(pixel_shader=solid(C_PANEL), radius=7, x=PEND_PX, y=PEND_PY)) # pivot
self._pend_bob = vectorio.Circle(pixel_shader=solid(C_CYAN), radius=13, x=PEND_PX, y=PEND_PY - PEND_LEN)
self.g_pend.append(self._pend_bob)
self.g_pend.hidden = True # only visible while running
self.g_overlay = displayio.Group(); root.append(self.g_overlay) # modal (save/revert) - drawn on top
# run/stop shows on the RGB LED; tap beats to edit, tap the title to save/revert, tap the tab to switch lists
def _place(self, group, s, x, y, fg, bg, font, right_edge=None):
while len(group): group.pop()
self.dirty = True
if not s: return
tg, w, h = make_text(s, font, fg, bg)
tg.x = (right_edge - w) if right_edge is not None else x; tg.y = y; group.append(tg)
def _center(self, group, s, cx, cy, fg, bg, font):
while len(group): group.pop()
tg, w, h = make_text(s, font, fg, bg); tg.x = cx - w//2; tg.y = cy - h//2; group.append(tg)
self.dirty = True
# ---------- program ----------
def rebuild_setlists(self):
# built-in playlists first (read-only), then user playlists from programs.json (a baked title always wins)
self.setlists = [{'title': t, 'items': it, 'builtin': True} for t, it in BUILTIN_SETLISTS]
seen = set(_slkey(t) for t, _ in BUILTIN_SETLISTS)
for t, it in load_user_setlists():
if _slkey(t) in seen: continue
seen.add(_slkey(t)); self.setlists.append({'title': t, 'items': it, 'builtin': False})
if self.sl >= len(self.setlists): self.sl = 0
def switch_setlist(self, delta=1):
if len(self.setlists) < 2: return
if self._sync_applying: return # the editor sends sel=... directly; don't ping-pong
was = self.running
if was: self.running = False; self._log_play()
self.sl = (self.sl + delta) % len(self.setlists)
self.load(0)
if was: self.running = True; self._reset_clock(); self._start_play()
self.led_rest(); self.draw_meters()
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
def load(self, i):
items = self.setlists[self.sl]['items']
self.idx = i % len(items)
self.name, prog = items[self.idx]
self.bpm, self.lanes, self.bars, self.ramp, self.trainer, self.rep, self.end = parse_program(prog)
self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() # step grids ready for this lane set
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
self._dirty = False; self._overlay = None # fresh load -> no unsaved edits
self._next_pending = None; self._need_redraw = False # discard any prepared seam (user navigated away)
self._heavy_redraw_at = 0; self._heavy_log_pending = False; self._grid_li = None # cancel any in-progress chunked rebuild
while len(self.g_overlay): self.g_overlay.pop() # dismiss any open modal
self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train()
self.build_grid(); self.draw_log()
def _prog_str(self): # serialize the current (possibly edited) track to a program string
parts = ['t' + str(self.bpm)]
if self.bars: parts.append('b' + str(self.bars))
if self.ramp: parts.append('rmp%d/%d/%d' % (self.ramp.get('start', self.bpm), self.ramp['amt'], self.ramp['every']))
if self.trainer: parts.append('tr%d/%d' % (self.trainer['play'], self.trainer['mute']))
for L in self.lanes: parts.append(lane_to_str(L))
if self.end is not None: # per-track playback flow (default = loop forever -> omitted)
if self.rep and self.rep > 1: parts.append('rep=' + str(self.rep))
parts.append('end=' + ('stop' if self.end == 'stop' else 'next' if self.end == 1 else ('+%d' % self.end if self.end > 0 else str(self.end))))
return ';'.join(parts)
# ---------- on-device editing: tap a beat to cycle it; tap the title to save/revert ----------
def _grid_hit(self, tx, ty): # map a touch to (kind, lane[, step]) on the pad grid
g = self._grid
if not g or not (g['top'] <= ty < g['top'] + g['n'] * g['rowh']): return None
li = (ty - g['top']) // g['rowh']
if li >= g['n']: return None
if tx < g['px0']: return ('lane', li) # tapped the lane label (lane editor = 0.1.0)
L = self.lanes[li]; steps = L['steps']
s = int((tx - g['px0'] - 6) * steps / g['usable'] + 0.5)
return ('beat', li, max(0, min(steps - 1, s)))
def _cycle_beat(self, li, s): # off -> normal -> accent -> ghost -> off
L = self.lanes[li]
L['levels'][s] = {0: 1, 1: 2, 2: 3, 3: 0}[L['levels'][s]]
base = self._padbase(L, s); lit = (self.lane_lit[li] == s)
self.lane_pads[li][s].color_index = base + 4 if lit else base
self._set_dirty()
self._sync_broadcast("beat=%d/%d/%d" % (li, s, L['levels'][s]))
def _set_dirty(self):
if not self._dirty: self._dirty = True; self.draw_status()
self.dirty = True
def toggle_continue(self):
self.continue_on = not self.continue_on; self.draw_status()
def _user_list(self, title): # find or create a user playlist
for s in self.setlists:
if not s['builtin'] and s['title'] == title: return s
s = {'title': title, 'items': [], 'builtin': False}; self.setlists.append(s); return s
def _persist_user(self): # write all user playlists back to /programs.json
user = [s for s in self.setlists if not s['builtin']]
data = {"setlists": [{"title": s['title'],
"programs": [{"name": n, "prog": p} for n, p in s['items']]} for s in user]}
try:
with open("/programs.json", "w") as f: json.dump(data, f)
if not self._sync_applying: self._sync_send_setlists() # mirror our user library to the editor (sec 8)
return True
except OSError:
return False # editor mode: the drive is read-only to us
def _save_edit(self):
prog = self._prog_str(); sl = self.setlists[self.sl]
if sl['builtin']: # built-ins are read-only -> save a USER copy
tgt = self._user_list("My edits"); names = [n for n, _ in tgt['items']]
if self.name in names: tgt['items'][names.index(self.name)] = (self.name, prog)
else: tgt['items'].append((self.name, prog))
dest = ("My edits", self.name)
else:
sl['items'] = list(sl['items']); sl['items'][self.idx] = (self.name, prog)
dest = (sl['title'], self.name)
if not self._persist_user():
self._show_msg("Read-only: reboot without holding A"); return
self.rebuild_setlists() # refresh, then jump to the saved (user) copy
for i, s in enumerate(self.setlists):
if not s['builtin'] and s['title'] == dest[0]:
self.sl = i; names = [n for n, _ in s['items']]
self.load(names.index(dest[1]) if dest[1] in names else 0); return
self.load(0)
def _revert(self):
self.load(self.idx) # reload from source -> discard edits
# ---------- modal overlay (save / revert / message) ----------
def _show_saverevert(self):
gc.collect()
self._overlay = 'saverevert'; g = self.g_overlay
while len(g): g.pop()
px, py, pw, ph = 24, 178, WIDTH - 48, 116
g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_CYAN))
t, w, h = make_text("Unsaved edits", FONT_M, C_TXT, C_PANEL); t.x = px + 14; t.y = py + 12; g.append(t)
self._ovbtns = []; by = py + 44; bh = 50; gap = 12; bw = (pw - 3 * gap) // 2
for i, (lbl, col, act) in enumerate((("SAVE", C_GREEN, self._save_edit), ("REVERT", C_AMBER, self._revert))):
bx = px + gap + i * (bw + gap)
g.append(rect(bx, by, bw, bh, C_BTN)); g.append(rect(bx, by, bw, 2, col))
tt, tw, th = make_text(lbl, FONT_M, col, C_BTN); tt.x = bx + (bw - tw) // 2; tt.y = by + (bh - th) // 2; g.append(tt)
self._ovbtns.append((bx, by, bx + bw, by + bh, act))
c, cw, ch = make_text("tap outside to cancel", FONT_S, C_DIM, C_PANEL); c.x = px + 14; c.y = py + ph - 16; g.append(c)
self.dirty = True
def _show_msg(self, text):
self._overlay = 'msg'; g = self.g_overlay
while len(g): g.pop()
px, py, pw, ph = 24, 200, WIDTH - 48, 64
g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_AMBER))
t, w, h = make_text(text[:28], FONT_S, C_TXT, C_PANEL); t.x = px + 12; t.y = py + 14; g.append(t)
t2, w2, h2 = make_text("(tap to dismiss)", FONT_S, C_DIM, C_PANEL); t2.x = px + 12; t2.y = py + 38; g.append(t2)
self.dirty = True
def _close_overlay(self):
self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
self.dirty = True
def _tap_overlay(self, tx, ty):
if self._overlay == 'msg': self._close_overlay(); return
for x0, y0, x1, y1, act in self._ovbtns: # each action manages the panel (lane edits redraw it live)
if x0 <= tx <= x1 and y0 <= ty <= y1: act(); return
self._close_overlay() # tapped outside a button -> cancel / done
def _handle_tap(self, tx, ty):
if self._overlay: self._tap_overlay(tx, ty); return
x0, y0, x1, y1 = self._menu_bbox # hamburger -> main menu
if x0 <= tx <= x1 and y0 <= ty <= y1: self._show_menu(); return
if 112 <= ty <= 126: # set-list tab line
if tx > WIDTH - 56: self.toggle_continue() # right end = CONT (auto-advance) toggle
else: self.switch_setlist(1)
return
if 128 <= ty <= 154: # track-title line
if self._dirty: self._show_saverevert()
return
hit = self._grid_hit(tx, ty)
if hit and hit[0] == 'beat': self._cycle_beat(hit[1], hit[2]); return
if hit and hit[0] == 'lane': self._show_laneedit(hit[1]); return # tap the instrument name -> lane editor
self._tap_log(tx, ty) # else the practice log
# ---------- lane editor (tap the instrument name): sound / beats / sub / swing / mute + add / remove ----------
def _show_laneedit(self, li):
gc.collect()
self._overlay = 'lane'; self._edit_li = li; self._draw_laneedit()
def _draw_laneedit(self):
li = self._edit_li; L = self.lanes[li]; g = self.g_overlay
while len(g): g.pop()
self._ovbtns = []
PX, PY, PW, RH = 14, 54, WIDTH - 28, 34
g.append(rect(PX, PY, PW, RH * 7 + 30, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text("Edit lane %d of %d" % (li + 1, len(self.lanes)), FONT_S, C_MUTE, C_PANEL)
t.x = PX + 12; t.y = PY + 8; g.append(t)
y = [PY + 28]
def vrow(label, value, fn): # label + [<] value [>]; left tap = fn(-1), right = fn(+1)
yy = y[0]
lt, lw, lh = make_text(label, FONT_S, C_MUTE, C_PANEL); lt.x = PX + 12; lt.y = yy + 9; g.append(lt)
g.append(rect(PX + 108, yy + 3, 28, RH - 8, C_BTN))
at, aw, ah = make_text("<", FONT_M, C_CYAN, C_BTN); at.x = PX + 108 + 9; at.y = yy + 7; g.append(at)
vt, vw, vh = make_text(value, FONT_M, C_TXT, C_PANEL); vt.x = PX + 146; vt.y = yy + 5; g.append(vt)
g.append(rect(PX + PW - 36, yy + 3, 28, RH - 8, C_BTN))
gt, gw, gh = make_text(">", FONT_M, C_CYAN, C_BTN); gt.x = PX + PW - 36 + 9; gt.y = yy + 7; g.append(gt)
self._ovbtns.append((PX + 104, yy, PX + 140, yy + RH, lambda: fn(-1)))
self._ovbtns.append((PX + PW - 40, yy, PX + PW, yy + RH, lambda: fn(1)))
y[0] += RH
vrow("Sound", L['sound'][:9], self._edit_sound)
vrow("Beats", str(sum(L['groups'])), self._edit_beats)
vrow("Subdiv", str(L['sub']), self._edit_sub)
vrow("Swing", "on" if L['swing'] else "off", self._edit_swing)
vrow("Mute", "yes" if L['mute'] else "no", self._edit_mute)
yy = y[0] + 2; bw = (PW - 36) // 2 # + Lane | Remove
g.append(rect(PX + 12, yy, bw, RH - 6, C_BTN))
a, aw, ah = make_text("+ Lane", FONT_S, C_GREEN if len(self.lanes) < MAXLANES else C_DIM, C_BTN); a.x = PX + 22; a.y = yy + 8; g.append(a)
self._ovbtns.append((PX + 12, yy, PX + 12 + bw, yy + RH, self._edit_add))
g.append(rect(PX + PW - 12 - bw, yy, bw, RH - 6, C_BTN))
r, rw, rh = make_text("Remove", FONT_S, C_AMBER if len(self.lanes) > 1 else C_DIM, C_BTN); r.x = PX + PW - 12 - bw + 14; r.y = yy + 8; g.append(r)
self._ovbtns.append((PX + PW - 12 - bw, yy, PX + PW - 12, yy + RH, self._edit_remove))
yy += RH + 2
g.append(rect(PX + 12, yy, PW - 24, RH - 4, C_BTN))
d, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); d.x = PX + (PW - dw) // 2; d.y = yy + 5; g.append(d)
self._ovbtns.append((PX + 12, yy, PX + PW - 12, yy + RH, self._edit_done))
self.dirty = True
def _regen_levels(self, L): # default accents after a beats/sub change
sub = L['sub']; groups = L['groups']; starts = set(); acc = 0
for gp in groups: starts.add(acc); acc += gp
L['steps'] = sum(groups) * sub
L['levels'] = [(2 if (i // sub) in starts else 1) if i % sub == 0 else 0 for i in range(L['steps'])]
def _lane_dirty(self, structural):
if structural: self._regen_levels(self.lanes[self._edit_li])
if structural and self._edit_li == 0: self._rebuild_dur_all() # master changed -> polymeter lanes follow
else: self._rebuild_dur(self.lanes[self._edit_li])
self.build_grid()
if not self._sync_applying: # coalesce structural / multi-field lane edits into one FULL
self._sync_broadcast_full()
if not self._dirty: self._dirty = True; self.draw_status()
self._draw_laneedit() # refresh the modal with the new values
def _edit_sound(self, d):
L = self.lanes[self._edit_li]; i = SOUNDS.index(L['sound']) if L['sound'] in SOUNDS else 0
L['sound'] = SOUNDS[(i + d) % len(SOUNDS)]; self._lane_dirty(False)
def _edit_beats(self, d):
L = self.lanes[self._edit_li]; L['groups'] = [max(1, min(12, sum(L['groups']) + d))]; self._lane_dirty(True)
def _edit_sub(self, d):
L = self.lanes[self._edit_li]; L['sub'] = max(1, min(8, L['sub'] + d)); self._lane_dirty(True)
def _edit_swing(self, d):
L = self.lanes[self._edit_li]; L['swing'] = not L['swing']; self._lane_dirty(False)
def _edit_mute(self, d):
L = self.lanes[self._edit_li]; L['mute'] = not L['mute']; self._lane_dirty(False)
def _edit_add(self):
if len(self.lanes) >= MAXLANES: return
self.lanes.insert(self._edit_li + 1, _parse_lane("beep:4")); self._edit_li += 1; self._lane_dirty(False)
def _edit_remove(self):
if len(self.lanes) <= 1: return
del self.lanes[self._edit_li]
if self._edit_li >= len(self.lanes): self._edit_li = len(self.lanes) - 1
self._lane_dirty(False)
def _edit_done(self):
self._close_overlay()
# ---------- hamburger menu (main) + sub-modals (Settings / Help / About) ----------
def _show_menu(self):
gc.collect() # defragment before allocating modal bitmaps
self._overlay = 'menu'; self._draw_menu()
def _draw_menu(self):
g = self.g_overlay
while len(g): g.pop()
self._ovbtns = []
PX, PY, PW, RH = 24, 70, WIDTH - 48, 34
rows = (
("Save edits", C_GREEN if self._dirty else C_DIM, self._save_edit if self._dirty else None),
("Revert edits", C_AMBER if self._dirty else C_DIM, self._revert if self._dirty else None),
("Continue: " + ("on" if self.continue_on else "off"), C_CYAN if self.continue_on else C_TXT, self._menu_toggle_continue),
("Settings >", C_TXT, self._show_settings),
("Help >", C_TXT, self._show_help),
("About", C_TXT, self._show_about),
)
PH = 38 + len(rows) * RH + RH + 8
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text("Menu", FONT_M, C_TXT, C_PANEL); t.x = PX + 14; t.y = PY + 12; g.append(t)
for i, (label, col, act) in enumerate(rows):
yy = PY + 38 + i * RH
g.append(rect(PX + 10, yy, PW - 20, RH - 4, C_BTN))
tt, tw, th = make_text(label, FONT_M, col, C_BTN); tt.x = PX + 20; tt.y = yy + 6; g.append(tt)
if act: self._ovbtns.append((PX + 10, yy, PX + PW - 10, yy + RH, act))
yy = PY + 38 + len(rows) * RH + 4
g.append(rect(PX + 10, yy, PW - 20, RH - 4, C_BTN))
dt, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); dt.x = PX + (PW - dw) // 2; dt.y = yy + 6; g.append(dt)
self._ovbtns.append((PX + 10, yy, PX + PW - 10, yy + RH, self._close_overlay))
self.dirty = True
def _menu_toggle_continue(self):
self.continue_on = not self.continue_on; self.draw_status(); self._draw_menu()
# ---------- Settings sub-modal (LED / Speaker / MIDI Out / Channel / Clock Out / Clock In) ----------
def _show_settings(self):
gc.collect()
self._overlay = 'settings'; self._draw_settings()
def _draw_settings(self):
g = self.g_overlay
while len(g): g.pop()
self._ovbtns = []
PX, PY, PW, RH = 14, 50, WIDTH - 28, 32
sm = "Off" if MUTE_SPEAKER else ("Auto" if SPEAKER_AUTO_MUTE else "Always")
rows = (
("LED", "%d%%" % int(LED_BRIGHTNESS * 100 + 0.5), self._adj_led),
("Speaker", sm, self._adj_speaker),
("MIDI Out", "on" if MIDI_ENABLED else "off", self._adj_midi_out),
("Channel", str(MIDI_CHANNEL), self._adj_midi_ch),
("Clock Out", "on" if MIDI_CLOCK_OUT else "off", self._adj_clock_out),
("Clock In", "on" if MIDI_CLOCK_IN else "off", self._adj_clock_in),
)
PH = 30 + len(rows) * RH + RH + 8
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text("Settings", FONT_S, C_MUTE, C_PANEL); t.x = PX + 12; t.y = PY + 8; g.append(t)
for i, (label, value, fn) in enumerate(rows):
yy = PY + 26 + i * RH
lt, lw, lh = make_text(label, FONT_S, C_MUTE, C_PANEL); lt.x = PX + 12; lt.y = yy + 9; g.append(lt)
g.append(rect(PX + 108, yy + 3, 28, RH - 8, C_BTN))
at, aw, ah = make_text("<", FONT_M, C_CYAN, C_BTN); at.x = PX + 108 + 9; at.y = yy + 6; g.append(at)
vt, vw, vh = make_text(value, FONT_M, C_TXT, C_PANEL); vt.x = PX + 150; vt.y = yy + 4; g.append(vt)
g.append(rect(PX + PW - 36, yy + 3, 28, RH - 8, C_BTN))
gt, gw, gh = make_text(">", FONT_M, C_CYAN, C_BTN); gt.x = PX + PW - 36 + 9; gt.y = yy + 6; g.append(gt)
self._ovbtns.append((PX + 104, yy, PX + 140, yy + RH, lambda f=fn: f(-1)))
self._ovbtns.append((PX + PW - 40, yy, PX + PW, yy + RH, lambda f=fn: f(1)))
yy = PY + 26 + len(rows) * RH + 4
g.append(rect(PX + 12, yy + 2, PW - 24, RH - 4, C_BTN))
dt, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); dt.x = PX + (PW - dw) // 2; dt.y = yy + 5; g.append(dt)
self._ovbtns.append((PX + 12, yy, PX + PW - 12, yy + RH, self._close_overlay))
self.dirty = True
def _adj_led(self, d):
global LED_BRIGHTNESS
v = LED_BRIGHTNESS + d * 0.05
if v < 0.05: v = 0.05
if v > 0.50: v = 0.50
LED_BRIGHTNESS = round(v * 100) / 100.0
self.led.set(*self.rgb); self._save_settings(); self._draw_settings()
def _adj_speaker(self, d):
global MUTE_SPEAKER, SPEAKER_AUTO_MUTE
modes = ("auto", "always", "off")
cur = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always")
i = (modes.index(cur) + d) % 3
MUTE_SPEAKER = (modes[i] == "off"); SPEAKER_AUTO_MUTE = (modes[i] == "auto")
if MUTE_SPEAKER: self.spk.duty_cycle = 0
self._save_settings(); self._draw_settings()
def _adj_midi_out(self, d):
global MIDI_ENABLED
MIDI_ENABLED = not MIDI_ENABLED; self._save_settings(); self._draw_settings()
def _adj_midi_ch(self, d):
global MIDI_CHANNEL
MIDI_CHANNEL = ((MIDI_CHANNEL - 1 + d) % 16) + 1
self._save_settings(); self._draw_settings()
def _adj_clock_out(self, d):
global MIDI_CLOCK_OUT
MIDI_CLOCK_OUT = not MIDI_CLOCK_OUT
if MIDI_CLOCK_OUT: self._clock_next = time.monotonic_ns()
self._save_settings(); self._draw_settings()
def _adj_clock_in(self, d):
global MIDI_CLOCK_IN
MIDI_CLOCK_IN = not MIDI_CLOCK_IN
if not MIDI_CLOCK_IN: self._slaved = False
self._save_settings(); self._draw_settings()
# ---------- Help sub-modal (paginated) ----------
def _show_help(self):
gc.collect()
self._overlay = 'help'; self._help_page = 0; self._draw_help()
def _draw_help(self):
g = self.g_overlay
while len(g): g.pop()
self._ovbtns = []
PX, PY, PW = 14, 50, WIDTH - 28
title, lines = HELP_PAGES[self._help_page]
PH = 38 + 18 * len(lines) + 60
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text(title, FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 8; g.append(t)
pi, piw, pih = make_text("%d / %d" % (self._help_page + 1, len(HELP_PAGES)), FONT_S, C_DIM, C_PANEL)
pi.x = PX + PW - piw - 12; pi.y = PY + 12; g.append(pi)
yy = PY + 36
for ln in lines:
lt, lw, lh = make_text(ln[:42], FONT_S, C_TXT, C_PANEL); lt.x = PX + 12; lt.y = yy; g.append(lt)
yy += 16
# Nav: < (prev) | Done | > (next)
by = PY + PH - 38; bh = 32; bw = (PW - 36) // 3
for i, (lbl, col, act) in enumerate((
("<", C_CYAN if self._help_page > 0 else C_DIM,
self._help_prev if self._help_page > 0 else None),
("Done", C_CYAN, self._close_overlay),
(">", C_CYAN if self._help_page < len(HELP_PAGES) - 1 else C_DIM,
self._help_next if self._help_page < len(HELP_PAGES) - 1 else None))):
bx = PX + 12 + i * (bw + 6)
g.append(rect(bx, by, bw, bh, C_BTN))
lt, lw, lh = make_text(lbl, FONT_M, col, C_BTN); lt.x = bx + (bw - lw) // 2; lt.y = by + 6; g.append(lt)
if act: self._ovbtns.append((bx, by, bx + bw, by + bh, act))
self.dirty = True
def _help_prev(self):
self._help_page = max(0, self._help_page - 1); self._draw_help()
def _help_next(self):
self._help_page = min(len(HELP_PAGES) - 1, self._help_page + 1); self._draw_help()
# ---------- About sub-modal ----------
def _show_about(self):
gc.collect()
self._overlay = 'about'; self._draw_about()
def _draw_about(self):
import sys
gc.collect()
try: free = gc.mem_free()
except Exception: free = 0 # mem_free is CircuitPython-only
try: cp_ver = "%d.%d.%d" % sys.implementation.version[:3]
except Exception: cp_ver = "?"
up_min = int(time.monotonic()) // 60
lines = (
"VARASYS PolyMeter",
"PM_K-1 Kit",
"",
"Firmware: v" + APP_VERSION,
"Free RAM: %d KB" % (free // 1024),
"Uptime: %dm" % up_min,
"CircuitPython: " + cp_ver,
"",
"metronome.varasys.io",
)
g = self.g_overlay
while len(g): g.pop()
self._ovbtns = []
PX, PY, PW = 24, 90, WIDTH - 48; PH = 30 + 18 * len(lines) + 50
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
yy = PY + 16
for i, ln in enumerate(lines):
col = C_CYAN if i == 0 else (C_TXT if ln and i != 8 else C_DIM)
lt, lw, lh = make_text(ln, FONT_S, col, C_PANEL); lt.x = PX + 14; lt.y = yy; g.append(lt)
yy += 18
by = PY + PH - 38
g.append(rect(PX + 12, by, PW - 24, 32, C_BTN))
dt, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); dt.x = PX + (PW - dw) // 2; dt.y = by + 6; g.append(dt)
self._ovbtns.append((PX + 12, by, PX + PW - 12, by + 32, self._close_overlay))
self.dirty = True
# ---------- Settings persistence (/settings.json) ----------
def _load_settings(self):
global LED_BRIGHTNESS, MUTE_SPEAKER, SPEAKER_AUTO_MUTE, MIDI_ENABLED, MIDI_CHANNEL, MIDI_CLOCK_OUT, MIDI_CLOCK_IN
global STEPPER_MAX_RATE, STEPPER_ACCEL, STEPPER_JOG_START, PEND_SWING_DEG, STEPPER_STEPS_PER_REV, STEPPER_ARC, PEND_THETA
try:
with open("/settings.json") as f: d = json.load(f)
except Exception: return
try:
LED_BRIGHTNESS = float(d.get("led_brightness", LED_BRIGHTNESS))
sm = d.get("speaker", "auto")
MUTE_SPEAKER = (sm == "off"); SPEAKER_AUTO_MUTE = (sm == "auto")
MIDI_ENABLED = bool(d.get("midi_out", MIDI_ENABLED))
MIDI_CHANNEL = max(1, min(16, int(d.get("midi_channel", MIDI_CHANNEL))))
MIDI_CLOCK_OUT = bool(d.get("clock_out", MIDI_CLOCK_OUT))
MIDI_CLOCK_IN = bool(d.get("clock_in", MIDI_CLOCK_IN))
# Stepper/pendulum tuning (so you can dial it in by editing the file, no recompile):
STEPPER_MAX_RATE = max(50, min(4000, int(d.get("stepper_max_rate", STEPPER_MAX_RATE))))
STEPPER_ACCEL = max(50, int(d.get("stepper_accel", STEPPER_ACCEL)))
STEPPER_JOG_START = max(1, int(d.get("stepper_jog_start", STEPPER_JOG_START)))
PEND_SWING_DEG = max(1, min(180, int(d.get("pend_swing_deg", PEND_SWING_DEG))))
STEPPER_STEPS_PER_REV = max(1, int(d.get("stepper_steps_per_rev", STEPPER_STEPS_PER_REV)))
PEND_THETA = math.radians(PEND_SWING_DEG) / 2.0 # keep derived values in sync
STEPPER_ARC = round(STEPPER_STEPS_PER_REV * PEND_SWING_DEG / 360.0)
except Exception as e: print("settings:", e)
def _save_settings(self):
if not self.can_write: return
sm = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always")
d = {"led_brightness": LED_BRIGHTNESS, "speaker": sm, "midi_out": MIDI_ENABLED,
"midi_channel": MIDI_CHANNEL, "clock_out": MIDI_CLOCK_OUT, "clock_in": MIDI_CLOCK_IN}
try:
with open("/settings.json", "w") as f: json.dump(d, f)
except OSError: self.can_write = False
def _rebuild_dur(self, L): # cache the per-step ns durations into L['durs'] (tuple lookup is ~10us)
beat = self._beat_ns
sub = max(1, L['sub']); steps = max(1, L['steps'])
if L.get('poly') and self.lanes: # polymeter: spread this lane's cycle across master's bar
m = self.lanes[0]; master_bar = beat * (m['steps'] // max(1, m['sub']))
d = master_bar // steps; L['durs'] = tuple(d for _ in range(steps))
elif L.get('swing') and sub % 2 == 0: # swing: long-short pairs
pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3
L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps))
else: # straight: every step is beat/sub long
d = beat // sub; L['durs'] = tuple(d for _ in range(steps))
def _rebuild_dur_all(self): # called on bpm change + lane mutation + track swap
for L in self.lanes: self._rebuild_dur(L)
def _reset_clock(self):
now = time.monotonic_ns()
for L in self.lanes:
L['next'] = now; L['step'] = -1
self._m_steps = 0 # restart the bar count
self._seg_start = time.monotonic() # and the on-screen timer (resets with the bar counter)
self._pend_beat0 = now; self._pend_dir = 1 # restart the pendulum swing, aligned to the beat clock
def _pend_service(self, now): # advance the swing clock + drive the arm (called each tick while running)
beat = self._beat_ns
if beat <= 0: return
while now - self._pend_beat0 >= beat: # reach an extreme exactly on each beat, then reverse
self._pend_beat0 += beat; self._pend_dir = -self._pend_dir
p = self.pend
if p is None or not p.ok: return # no motor wired -> clock still advanced for the screen graphic
max_arc = (STEPPER_MAX_RATE * beat) // 1_000_000_000 # cap the arc to what the motor can sweep in a beat
arc = STEPPER_ARC if STEPPER_ARC < max_arc else max_arc
if arc < 1: arc = 1
frac = (now - self._pend_beat0) / beat
if frac > 1.0: frac = 1.0
elif frac < 0.0: frac = 0.0
desired = int(frac * arc) if self._pend_dir > 0 else int((1.0 - frac) * arc)
if desired != p.pos and (now - self._pend_last) >= (1_000_000_000 // STEPPER_MAX_RATE):
p.step_toward(desired); self._pend_last = now
def _pend_show(self, on): # swap: pendulum visible while playing, log when stopped
try:
self.g_pend.hidden = not on; self.g_log.hidden = on
except Exception:
pass
self.dirty = True
def draw_pendulum(self, now): # move the on-screen arm to match the beat-swing phase
beat = self._beat_ns
if beat <= 0: return
frac = (now - self._pend_beat0) / beat
if frac > 1.0: frac = 1.0
elif frac < 0.0: frac = 0.0
n = frac if self._pend_dir > 0 else (1.0 - frac) # 0..1 across the swing (matches the motor)
ang = (n * 2.0 - 1.0) * PEND_THETA
bx = PEND_PX + int(PEND_LEN * math.sin(ang)); by = PEND_PY - int(PEND_LEN * math.cos(ang))
self._pend_bob.x = bx; self._pend_bob.y = by
self._pend_arm.points = [(PEND_PX - 4, PEND_PY), (PEND_PX + 4, PEND_PY), (bx, by)]
self.dirty = True
def _jog_loop(self): # hidden stepper jog/test (A+B held at boot)
# Joystick L/R spins the motor CCW/CW (speed by how far you push). The on-screen needle + RGB
# LED show direction. Runs forever; power-cycle with no buttons held to return to normal.
while len(self.g_overlay): self.g_overlay.pop()
self.g_overlay.append(rect(0, 40, WIDTH, HEIGHT - 40, C_BG)) # cover the normal UI
for s, fnt, col, yy in (("STEPPER JOG TEST", FONT_M, C_CYAN, 64),
("Joystick L = CCW R = CW", FONT_S, C_TXT, 96),
("spins at max speed (ramped)", FONT_S, C_DIM, 114),
("power-cycle (no buttons) to exit", FONT_S, C_DIM, 132)):
tg, w, h = make_text(s, fnt, col, C_BG); tg.x = 12; tg.y = yy; self.g_overlay.append(tg)
self.g_overlay.append(rect(PEND_PX - 26, PEND_PY + 4, 52, 4, C_PANEL)) # stand
arm = vectorio.Polygon(pixel_shader=solid(C_DIM),
points=[(PEND_PX - 4, PEND_PY), (PEND_PX + 4, PEND_PY), (PEND_PX, PEND_PY - PEND_LEN)], x=0, y=0)
self.g_overlay.append(arm)
self.g_overlay.append(vectorio.Circle(pixel_shader=solid(C_PANEL), radius=7, x=PEND_PX, y=PEND_PY))
bob = vectorio.Circle(pixel_shader=solid(C_CYAN), radius=13, x=PEND_PX, y=PEND_PY - PEND_LEN)
self.g_overlay.append(bob)
rg = displayio.Group(); self.g_overlay.append(rg) # live step-count + rate readout
def show_stats(total, rate, peak):
while len(rg): rg.pop()
t, w, h = make_text("steps: %d" % total, FONT_M, C_TXT, C_BG); t.x = 12; t.y = 158; rg.append(t)
t, w, h = make_text("rate: %d/s peak: %d/s" % (rate, peak), FONT_S, C_AMBER, C_BG)
t.x = 12; t.y = 186; rg.append(t)
show_stats(0, 0, 0)
self.display.refresh()
time.sleep(0.1); center = self.jx.value
def set_needle(d): # d = +1 CW (green), -1 CCW (blue), 0 centre (off)
if d > 0: self.led.set(0, 150, 0); a = PEND_THETA
elif d < 0: self.led.set(0, 0, 255); a = -PEND_THETA
else: self.led.set(0, 0, 0); a = 0.0
bx = PEND_PX + int(PEND_LEN * math.sin(a)); by = PEND_PY - int(PEND_LEN * math.cos(a))
bob.x = bx; bob.y = by
arm.points = [(PEND_PX - 4, PEND_PY), (PEND_PX + 4, PEND_PY), (bx, by)]
self.display.refresh()
# Joystick = DIRECTION only (no fine speed). Spin at STEPPER_MAX_RATE, reached via an
# acceleration ramp (STEPPER_ACCEL) so the motor doesn't stall trying to start at top speed;
# reversing decelerates through zero, then accelerates the other way.
spin = 0; cur = 0.0; total = 0; win = 0; peak = 0; lastrate = 0
now = time.monotonic(); last = now; tsample = now; tjoy = now
gc.collect() # clean heap before spinning (avoid a GC pause mid-spin)
while True: # no per-iteration sleep: tight step timing in this mode
now = time.monotonic()
if now - tjoy >= 0.004: # control update (joystick + accel), off the step hot-path
cdt = now - tjoy; tjoy = now
dx = self.jx.value - center
want = (1 if dx > 0 else -1) if abs(dx) > JOY_DEADZONE else 0
if spin == 0 and want != 0: # start from rest at the safe pull-in rate
spin = want; cur = float(STEPPER_JOG_START); set_needle(spin)
target = float(STEPPER_MAX_RATE) if (spin != 0 and want == spin) else 0.0
if cur < target: cur = min(target, cur + STEPPER_ACCEL * cdt) # accelerate
elif cur > target: cur = max(0.0, cur - STEPPER_ACCEL * cdt) # decelerate
if cur <= 0.0 and spin != 0 and want != spin: # finished slowing -> stop, or flip direction
if self.pend is not None and self.pend.ok: self.pend.release()
if want == 0: # stopped -> now safe to draw the readout + tidy the heap
spin = 0; show_stats(total, lastrate, peak); set_needle(0); gc.collect()
else:
spin = want; cur = float(STEPPER_JOG_START); set_needle(spin)
if spin != 0 and cur > 0.0 and self.pend is not None and self.pend.ok and now - last >= 1.0 / cur:
self.pend.spin(spin > 0); last = now; total += 1; win += 1
if now - tsample >= 1.0: # measure rate SILENTLY (drawing here is what hitched it)
lastrate = int(win / (now - tsample))
if lastrate > peak: peak = lastrate
win = 0; tsample = now
# ---------- audio + light ----------
def click(self, level):
self.spk.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600)
self.spk.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000)
self.spk_off = time.monotonic_ns() + 22_000_000
def _led_base(self):
return LED_RUN if self.running else LED_IDLE # dim red while playing / dim green when stopped
def flash(self, level):
self.rgb = LEVEL_RGB.get(level, (0, 150, 255)) # bright beat pulse, fades back to the base in tick()
self.led.set(*self.rgb)
def led_rest(self): # settle to the resting colour (green idle / red running)
self.rgb = self._led_base()
self.led.set(*self.rgb)
# ---------- Live sync (HELLO/FULL/DELTA/BYE on SysEx 0x40-0x43; see src/livesync.js for the editor side) ----------
def _sync_send(self, op, text):
if self.midi is None: return
b = bytearray((0xF0, 0x7D, op))
for c in text: # ASCII-only payload (the share grammar uses ; / = digits letters)
v = ord(c); b.append(v if v < 0x80 else 0x3F)
b.append(0xF7)
try: self.midi.write(b)
except Exception: pass
def _sync_broadcast(self, evt): # one DELTA event; suppressed while applying a remote change (echo guard)
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
text = "%s;%d;%s" % (self._sync_origin, self._sync_seq, evt); self._sync_seq += 1
self._sync_send(0x42, text)
def _sync_broadcast_full(self): # FULL snapshot: running + sl + item + patch (coalesces structural edits)
if not self._sync_armed or self.midi is None or self._fw_pushing: return
try: patch = self._prog_str()
except Exception: return
text = "%s;%d;%d;%d;%d;%s" % (self._sync_origin, self._sync_seq,
1 if self.running else 0, self.sl, self.idx, patch)
self._sync_seq += 1
self._sync_send(0x41, text)
self._sync_heartbeat_next = time.monotonic() + 5.0 # next periodic heartbeat
def _sync_apply_full(self, running, patch): # accept the peer's snapshot as ground truth
self._sync_applying = True
try:
try:
gc.collect()
# Diff before rebuilding -> avoid grid flicker / lost focus on a heartbeat that matches local state
try: cur = self._prog_str()
except Exception: cur = None
if patch and patch != cur:
bpm, lanes, bars, ramp, trainer, rep, end = parse_program(patch)
self.bpm = bpm; self.lanes = lanes; self.bars = bars; self.ramp = ramp; self.trainer = trainer; self.rep = rep; self.end = end
self._beat_ns = 60_000_000_000 // max(1, bpm); self._rebuild_dur_all()
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
self._dirty = False; self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
self._reset_clock()
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
self.build_grid(); self.draw_log()
if running and not self.running: self.toggle()
elif (not running) and self.running: self.toggle()
except Exception as e:
try: print("sync FULL apply:", e)
except Exception: pass
finally:
self._sync_applying = False
def _sync_apply_delta(self, evt): # one mutation
self._sync_applying = True
try:
eq = evt.find('=')
key = evt if eq < 0 else evt[:eq]
val = '' if eq < 0 else evt[eq+1:]
if key == 'play':
if not self.running: self.toggle()
elif key == 'stop':
if self.running: self.toggle()
elif key == 'bpm':
try: self.set_bpm(int(val))
except Exception: pass
elif key == 'sel': # sel=-1/-1 = "no selection" sentinel -> ignore
p = val.split('/')
if len(p) == 2:
try:
sl = int(p[0]); item = int(p[1])
if sl >= 0 and item >= 0:
if sl < len(self.setlists) and sl != self.sl: self.sl = sl
items = self.setlists[self.sl]['items']
if 0 <= item < len(items) and item != self.idx: self.goto(item)
except Exception: pass
elif key == 'beat': # beat=lane/step/level (0=mute 1=normal 2=accent 3=ghost)
p = val.split('/')
if len(p) == 3:
try:
li = int(p[0]); s = int(p[1]); lvl = int(p[2])
if 0 <= li < len(self.lanes):
L = self.lanes[li]
if 0 <= s < len(L['levels']):
L['levels'][s] = lvl & 3
if li < len(self.lane_pads) and s < len(self.lane_pads[li]):
lit = (self.lane_lit[li] == s)
self.lane_pads[li][s].color_index = self._padbase(L, s) + (4 if lit else 0)
self._set_dirty()
except Exception: pass
elif key == 'lane': # lane=lane/field/value (field: sound|groups|sub|swing|gain|poly|enabled)
p = val.split('/')
if len(p) >= 3:
try:
li = int(p[0]); field = p[1]; v = '/'.join(p[2:])
if 0 <= li < len(self.lanes):
L = self.lanes[li]; structural = False
if field == 'sound': L['sound'] = v
elif field == 'groups':
try: L['groups'] = [int(x) for x in v.split('+')]; structural = True
except Exception: pass
elif field == 'sub':
try: L['sub'] = int(v); structural = True
except Exception: pass
elif field == 'swing': L['swing'] = (v == '1'); structural = True # swing changes the dur grid
elif field == 'enabled': L['mute'] = not (v == '1')
elif field == 'gain':
try: L['gain'] = int(v)
except Exception: pass
elif field == 'poly': L['poly'] = (v == '1'); structural = True
if structural: self._regen_levels(L)
if li == 0 and structural: self._rebuild_dur_all() # master changed -> poly lanes follow
else: self._rebuild_dur(L)
if structural: self.build_grid()
self._set_dirty()
except Exception: pass
finally:
self._sync_applying = False
# ---------- set-list content sync (0x44) + practice-log sync (0x45); see docs/livesync-protocol.md ----------
def _sync_send_setlists(self): # 0x44: manifest of OUR user lists (programs.json shape), merge by title
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
user = [s for s in self.setlists if not s['builtin']]
sls = [{"title": s['title'],
"programs": [{"name": n, "prog": p} for n, p in s['items']]} for s in user]
try: body = json.dumps({"setlists": sls})
except Exception: return
self._sync_send(0x44, "%s;%d;%s" % (self._sync_origin, self._sync_seq, body)); self._sync_seq += 1
def _log_to_wire(self, e): # device entry -> wire schema {at,name,dur,bpm} (sec 9)
return {"at": e.get("at", 0), "name": e.get("name", ""), "dur": e.get("dur", 0), "bpm": e.get("bpm", 0)}
def _sync_send_log_batch(self): # 0x45: whole practice log (on connect / HELLO)
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
try: body = json.dumps({"log": [self._log_to_wire(e) for e in self.log]})
except Exception: return
self._sync_send(0x45, "%s;%d;%s" % (self._sync_origin, self._sync_seq, body)); self._sync_seq += 1
def _sync_send_log_one(self, e): # 0x45: a single freshly-logged session
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
try: body = json.dumps({"log": [self._log_to_wire(e)]})
except Exception: return
self._sync_send(0x45, "%s;%d;%s" % (self._sync_origin, self._sync_seq, body)); self._sync_seq += 1
def _sync_apply_setlists(self, body): # merge user lists by normalized title (replace / append; never delete built-ins)
self._sync_applying = True
try:
try:
d = json.loads(body); lists = d.get("setlists")
if not isinstance(lists, list): return
builtin_keys = set(_slkey(t) for t, _ in BUILTIN_SETLISTS)
changed = False
for rl in lists:
title = rl.get("title", ""); key = _slkey(title)
if not key or key in builtin_keys: continue # never overwrite a baked-in list
items = [(p.get("name", "Item"), p.get("prog", "")) for p in rl.get("programs", []) if p.get("prog")]
tgt = None
for s in self.setlists:
if not s['builtin'] and _slkey(s['title']) == key: tgt = s; break
if tgt is not None: tgt['items'] = items
else: self.setlists.append({'title': title or "Device", 'items': items, 'builtin': False})
changed = True
if changed:
self._persist_user() # write back to programs.json (no-op if read-only)
if self.sl >= len(self.setlists): self.sl = 0
self.draw_meters()
except Exception as e:
try: print("sync SLSYNC:", e)
except Exception: pass
finally:
self._sync_applying = False
def _sync_apply_log(self, body): # additive merge by (at,name); at==0 always appended
self._sync_applying = True
try:
try:
d = json.loads(body); incoming = d.get("log")
if not isinstance(incoming, list): return
have = set()
for e in self.log:
a = e.get("at", 0)
if a: have.add((a, e.get("name", "")))
added = 0
for w in incoming:
nm = w.get("name", "")
if not nm: continue
at = w.get("at", 0) or 0
if at and (at, nm) in have: continue
self.log.append({"at": at, "name": nm, "dur": w.get("dur", 0), "bpm": w.get("bpm", 0),
"t": self._hhmm(at), "bars": 0})
if at: have.add((at, nm))
added += 1
if added:
self.log.sort(key=lambda e: e.get("at", 0), reverse=True) # newest first
del self.log[200:]
self._save_log(); self.draw_log()
except Exception as e:
try: print("sync LOGSYNC:", e)
except Exception: pass
finally:
self._sync_applying = False
def _hhmm(self, at_ms): # epoch-ms -> "HH:MM" for the on-device log row (0 -> unknown)
if not at_ms: return "--:--"
try:
t = time.localtime(at_ms // 1000); return "%02d:%02d" % (t.tm_hour, t.tm_min)
except Exception:
return "--:--"
def _epoch_ms(self): # real epoch ms once the editor has set the RTC, else 0 (unset)
try:
secs = time.time()
return int(secs) * 1000 if secs > 1_000_000_000 else 0 # < 2001 -> RTC unset, no stable key
except Exception:
return 0
def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer
if self.midi is None or self._fw_pushing: return # keep the bus quiet during a firmware push so ACKs aren't interleaved
b = self._note_buf # reused bytearray -> zero alloc per click (hot path)
b[0] = 0x90 | ((MIDI_CHANNEL - 1) & 0x0F) # Note On, channel 1..16
b[1] = note & 0x7F; b[2] = vel & 0x7F
try: self.midi.write(b)
except Exception: pass
# ---------- transport ----------
def toggle(self):
self.running = not self.running
if self.running:
self._reset_clock(); self._start_play()
self._clock_next = time.monotonic_ns() # start MIDI Clock Out from zero
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(self._start_byte) # Start (reused singleton)
except Exception: pass
else:
self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(self._stop_byte) # Stop (reused singleton)
except Exception: pass
self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped
self._sync_broadcast("play" if self.running else "stop")
def set_bpm(self, v):
v = max(5, min(300, v))
if v != self.bpm:
self.bpm = v; self._beat_ns = 60_000_000_000 // v
self._rebuild_dur_all() # step grids follow the new beat duration
# Don't draw here -- the 4Hz UI tick redraws bpm/meters; calling per joystick nudge allocated text bitmaps fast enough to trigger GC pauses
self._sync_broadcast("bpm=%d" % v)
def goto(self, i):
was = self.running
if was: self.running = False; self._log_play() # close out the track that was playing
self.load(i)
if was: self.running = True; self._reset_clock(); self._start_play()
self.led_rest(); self.draw_meters()
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
def tap(self):
now = time.monotonic()
if not hasattr(self, '_taps'): self._taps = []
self._taps = [t for t in self._taps if now - t < 2.4]
self._taps.append(now)
if len(self._taps) >= 2:
span = (self._taps[-1] - self._taps[0]) / (len(self._taps) - 1)
if span > 0: self.set_bpm(round(60 / span))
# ---------- scheduler ----------
def tick(self):
now = time.monotonic_ns()
if self.spk_off and now >= self.spk_off: self.spk.duty_cycle = 0; self.spk_off = 0
# Slave decay: if no Clock In tick in the last 1s, fall back to internal tempo
if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False
if self.running:
fired_best = 0; fired_prio = -1 # int tracking, no per-tick list alloc
for li, L in enumerate(self.lanes):
if self._advance: break # seam armed - skip remaining lanes for THIS tick
adv = False
while now >= L['next']:
L['step'] = (L['step'] + 1) % L['steps']
if li == 0:
self._m_steps += 1 # count master-lane steps -> bars
nb = (self._m_steps - 1) // L['steps'] # bar of THIS step (off-by-one fix vs 0.0.16)
if nb != self._lastbar: self._lastbar = nb; self._on_new_bar(nb)
if self._advance: break # seam armed - suppress this step's firing
if self.ramp and L['steps'] > 0 and not self._slaved: # CONTINUOUS ramp (off when slaved)
mlen = L['steps']
bar_pos = self._m_steps / mlen
seg_bar = (bar_pos % self.bars) if self.bars else bar_pos
new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt'])))
if new_bpm != self.bpm:
self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm
self._rebuild_dur_all() # ramp moves bpm -> step grids follow
lvl = 0 if L['mute'] else L['levels'][L['step']]
if lvl > 0:
p = PRIO.get(lvl, 0)
if p > fired_prio: fired_prio = p; fired_best = lvl # accent > normal > ghost
if not self._muted: # gap trainer: silent during the rest bars
self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90))
L['next'] += L['durs'][L['step']]; adv = True # zero method call, zero dict lookup, just a tuple index
if adv and li < len(self.lane_pads): self._move_playhead(li, L['step'])
if fired_best and not self._muted:
if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host):
self.click(fired_best) # speaker silent if user muted it / auto-mute on + host present
self.flash(fired_best)
base = self._led_base() # decay the beat pulse back down to the red running base
if self.rgb != base:
r = base[0] + (self.rgb[0]-base[0])*7//10
g = base[1] + (self.rgb[1]-base[1])*7//10
b = base[2] + (self.rgb[2]-base[2])*7//10
if abs(r-base[0])+abs(g-base[1])+abs(b-base[2]) < 6: r, g, b = base
self.rgb = (r, g, b); self.led.set(r, g, b)
if self._advance: # Continue: gapless swap to the prepared track at seam_t
self._advance = False
self._do_advance()
# MIDI Clock Out (master): 24 PPQN; interval follows the live bpm (so continuous ramps carry through)
if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved and not self._fw_pushing:
clk = self._clock_byte # reused singleton bytes (no per-tick alloc)
tick_ns = self._beat_ns // 24 # cached: ns per Clock pulse
while now >= self._clock_next:
try: self.midi.write(clk)
except Exception: pass
self._clock_next += tick_ns
if self.running: # pendulum: swing while playing, free-wheel-off when stopped
if not self._pend_on: # just started -> show the pendulum over the log
self._pend_on = True; self._pend_show(True)
self._pend_service(now)
elif self._pend_on: # just stopped -> restore the log, de-energize the motor
self._pend_on = False; self._pend_show(False)
if self.pend is not None and self.pend.ok: self.pend.release()
def _end_plan(self):
# Per-track playback flow. Returns None to loop forever, else (fire_bars, action) where action is
# 'stop' or a signed int goto offset. Explicit end= governs; otherwise the global Continue toggle
# acts as a default end=next (legacy behaviour, still needs b<bars> to define the segment).
end = self.end
if end is None:
if self.continue_on and self.bars: end = 1
else: return None
cyc = self.bars if self.bars else 1 # a cycle = b<bars>, else one master bar
reps = self.rep if self.rep else 1
return (cyc * reps, end)
def _goto_target(self, offset):
items = self.setlists[self.sl]['items']; n = len(items)
t = self.idx + offset
return 0 if t < 0 else (t % n if t >= n else t) # before first -> clamp; past last -> wrap (loop)
def _end_stop(self):
self.running = False; self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
self.led_rest(); self.draw_meters(); self._sync_broadcast("stop")
def _on_new_bar(self, bar):
plan = self._end_plan() # None = loop forever; else (fire_bars, action)
if plan is not None and plan[1] != 'stop' and self._next_pending is None and bar == plan[0] - 1:
self._prepare_next(self._goto_target(plan[1])) # pre-parse the target during the bar before the seam
if self.bars and bar > 0 and bar % self.bars == 0: # segment boundary -> reset the on-screen timer
self._seg_start = time.monotonic()
if plan is not None and bar > 0 and bar == plan[0]: # fire the end-action
action = plan[1]
if not (self.bars and bar % self.bars == 0): self._seg_start = time.monotonic() # no-bars: still reset the timer
if action == 'stop':
self._end_stop()
else:
if self._next_pending is None: self._prepare_next(self._goto_target(action)) # late prep
if self._next_pending is not None:
self._seam_t = self.lanes[0]['next'] # wall-clock time of THIS boundary step
self._advance = True # tick() will swap to the prepared track
t = self.trainer # gap trainer: silence during the rest bars
self._muted = bool(t and (t['play'] + t['mute']) and (bar % (t['play'] + t['mute'])) >= t['play'])
def _prepare_next(self, target=None): # parse a playlist item into a side holder for the gapless seam
items = self.setlists[self.sl]['items']
nxt = (self.idx + 1) % len(items) if target is None else target
if nxt == self.idx: return # same track (1-item list or self-goto) -> just loop, no swap
name, prog = items[nxt]
gc.collect() # defragment before parse_program allocates new lanes
try:
bpm, lanes, bars, ramp, trainer, rep, end = parse_program(prog)
except MemoryError:
gc.collect(); return # leave _next_pending None -> the segment just loops
beat = 60_000_000_000 // max(1, bpm) # pre-compute B's durs against B's bpm so the seam swap is allocation-free
for L in lanes:
sub = max(1, L['sub']); steps = max(1, L['steps'])
if L.get('poly'):
m = lanes[0]; mbar = beat * (m['steps'] // max(1, m['sub']))
d = mbar // steps; L['durs'] = tuple(d for _ in range(steps))
elif L.get('swing') and sub % 2 == 0:
pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3
L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps))
else:
d = beat // sub; L['durs'] = tuple(d for _ in range(steps))
self._next_pending = {'lanes': lanes, 'bpm': bpm, 'bars': bars, 'ramp': ramp,
'trainer': trainer, 'name': name, 'idx': nxt, 'rep': rep, 'end': end}
def _do_advance(self): # gapless seam: swap the prepared track in at seam_t
n = self._next_pending
if n is None: return
self._next_pending = None
self.lanes = n['lanes']; self.bpm = n['bpm']; self.bars = n['bars']
self.ramp = n['ramp']; self.trainer = n['trainer']; self.name = n['name']; self.idx = n['idx']
self.rep = n['rep']; self.end = n['end'] # the swapped-in track's own playback flow governs from here
self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() # B's step grids built at the seam
self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False; self._m_steps = 0
self._dirty = False; self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
seam = self._seam_t
for L in self.lanes: L['next'] = seam; L['step'] = -1 # NEXT tick fires step 0 of the new track at seam_t
self._need_redraw = True # cheap header bits: meters/bpm/status/train -> next refresh
self._heavy_redraw_at = time.monotonic() + 0.6 # heavy: build_grid + draw_log deferred ~0.6s so B's intro plays unblocked
self._seg_start = time.monotonic() # reset the on-screen timer
self.led_rest()
# ---------- inputs ----------
def poll(self):
a = self.btnA.value
if (not a) and self._aPrev: self.toggle()
self._aPrev = a
b = self.btnB.value
if (not b) and self._bPrev: self.tap()
self._bPrev = b
now = time.monotonic_ns()
if now >= self._joyNext:
x = self.jx.value - 32768; y = self.jy.value - 32768
if JOY_INVERT_X: x = -x
if JOY_INVERT_Y: y = -y
if abs(y) > JOY_DEADZONE:
self.set_bpm(self.bpm + (1 if y > 0 else -1) * (5 if abs(y) > 26000 else 1))
self._joyNext = now + 70_000_000
elif abs(x) > JOY_DEADZONE:
self.goto(self.idx + (1 if x > 0 else -1)); self._joyNext = now + 350_000_000; return
else:
self._joyNext = now + 20_000_000
nowms = time.monotonic()
if nowms >= self._touchNext: # poll touch ~30x/s (the I2C read adds loop latency -> MIDI jitter)
self._touchNext = nowms + 0.033
pt = self.touch.read()
if pt:
self._touchSeen = nowms
if not self._touchDown:
self._touchDown = True; self._handle_tap(pt[0], pt[1])
elif self._touchDown and (nowms - self._touchSeen) > 0.14:
self._touchDown = False
# USB-MIDI in: any byte = a host is listening (heartbeat); also assemble SysEx (clock / pushed programs)
if self.midi_in is not None:
try: n = self.midi_in.readinto(self._mbuf)
except Exception: n = 0
if n:
self.last_midi_in = nowms
self._feed_midi(self._mbuf, n)
host = bool(self.last_midi_in) and (nowms - self.last_midi_in) < 1.0
if host != self.midi_host:
self.midi_host = host
if host and SPEAKER_AUTO_MUTE: self.spk.duty_cycle = 0 # auto-mute when the computer takes over
self.led_rest(); self.draw_icons()
uc = bool(getattr(supervisor.runtime, "usb_connected", True)) # connected to a computer?
if uc != self.usb_conn:
self.usb_conn = uc; self.draw_icons()
# ---------- drawing ----------
def draw_bpm(self): # lazy: skip the bitmap alloc if the displayed integer is unchanged
if self.bpm == self._displayed_bpm: return
self._displayed_bpm = self.bpm
self._place(self.g_bpm, str(self.bpm), 0, 44, C_TXT, C_BG, FONT_L, right_edge=WIDTH-12)
def draw_status(self): # set-list tab (tap=switch) + CONT toggle, above the item title
sl = self.setlists[self.sl]
# tab: playlist + position; muted = built-in (read-only), cyan = your own
self._place(self.g_idx, "%s %d/%d" % (sl['title'][:11], self.idx + 1, len(sl['items'])),
12, 118, C_MUTE if sl['builtin'] else C_CYAN, C_BG, FONT_S)
self._place(self.g_cont, "CONT", 0, 118, C_GREEN if self.continue_on else C_DIM, C_BG, FONT_S, right_edge=WIDTH-12)
# title turns red when edited (tap it to save/revert)
self._place(self.g_name, self.name[:20], 12, 134, C_RED if self._dirty else C_TXT, C_BG, FONT_M)
def draw_train(self): # ramp + gap-trainer indicators (symbol + params), when set
g = self.g_train
while len(g): g.pop()
x = 12; y = 100
if self.ramp:
up = self.ramp['amt'] >= 0
pts = [(0, 9), (12, 9), (12, 0)] if up else [(0, 0), (0, 9), (12, 9)] # rising / falling ramp
g.append(vectorio.Polygon(pixel_shader=solid(C_AMBER), points=pts, x=x, y=y)); x += 16
a = self.ramp['amt']; lbl = ("+%d" % a if a >= 0 else "%d" % a) + "/%db" % self.ramp['every']
tg, w, h = make_text(lbl, FONT_S, C_AMBER, C_BG); tg.x = x; tg.y = y; g.append(tg); x += w + 14
if self.trainer:
g.append(rect(x, y, 4, 9, C_CYAN)); g.append(rect(x + 6, y, 4, 9, C_DIM)) # play | rest
x += 14
tg, w, h = make_text("%d/%db" % (self.trainer['play'], self.trainer['mute']), FONT_S, C_CYAN, C_BG)
tg.x = x; tg.y = y; g.append(tg)
self.dirty = True
def draw_icons(self): # recolor the MIDI/USB icons by state (tear-free palette swap)
if self.ic_midi_pal is not None:
_recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG)
if self.ic_usb_pal is not None:
_recolor(self.ic_usb_pal, C_CYAN if self.usb_conn else C_DIM, C_BG)
self.dirty = True
def _fmt_t(self, s): # m:ss, or h:mm:ss past an hour
s = int(s)
return "%d:%02d:%02d" % (s // 3600, (s % 3600) // 60, s % 60) if s >= 3600 else "%d:%02d" % (s // 60, s % 60)
def draw_meters(self): # running time [of total] + bar [of total]; ~4x/s from run()
run = self.running and self.play_start is not None
mlen = self.lanes[0]['steps'] if self.lanes else 1
bpb = (self.lanes[0]['steps'] // max(1, self.lanes[0]['sub'])) if self.lanes else 4
el = (time.monotonic() - self._seg_start) if run else 0 # time within the current segment (resets with the bar)
mbars = max(0, self._m_steps - 1) // max(1, mlen) # bar containing THIS step (off-by-one fix vs 0.0.16)
cur = ("%d" % ((mbars % self.bars + 1) if self.bars else (mbars + 1))) if run else "-" # cycle 1..N
if self.bars: # track has a length (b<n>): show "X of TOTAL"
ts = "%s of %s" % (self._fmt_t(el), self._fmt_t(self.bars * bpb * 60.0 / self.bpm))
bs = "bar %s of %d" % (cur, self.bars)
else:
ts = self._fmt_t(el); bs = "bar %s" % cur
if ts != self._lastTs:
self._place(self.g_time, ts, 12, 50, C_TXT, C_BG, FONT_M); self._lastTs = ts
if bs != self._lastBs:
self._place(self.g_bar, bs, 12, 78, C_MUTE, C_BG, FONT_M); self._lastBs = bs
# ---------- pad grid (each lane = a row of step pads; playhead lit as it plays) ----------
def _padbase(self, L, s):
return 0 if L['mute'] else L['levels'][s]
def build_grid(self): # synchronous: kick off chunked rebuild and run to completion
self._grid_rebuild_start()
while self._grid_li is not None: self._grid_rebuild_step()
def _grid_rebuild_start(self): # tear down + gridlines + initial state for chunked rebuild
while len(self.g_grid): self.g_grid.pop()
self.lane_pads = []; self.lane_lit = []
gc.collect() # 64-128 vectorio allocs incoming - want a defragmented heap
n = min(len(self.lanes), MAXLANES)
top = GRID_TOP; rowh = min(40, ((LOG_TOP - 10) - top) // max(1, n))
px0 = 60; usable = WIDTH - 8 - px0 - 12; gridh = n * rowh
self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} # for touch hit-testing
m = self.lanes[0] if self.lanes else None
if m is not None: # vertical gridlines (cheap; one pass before chunked lanes)
mbeats = max(1, m['steps'] // max(1, m['sub']))
for bcol in range(mbeats):
self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID))
self._grid_n = n
self._grid_geo = (top, rowh, px0, usable)
self._grid_li = 0 if n > 0 else None
self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] # start at lane 0, pad 0, no lane initialized yet
self.dirty = True
def _grid_rebuild_step(self): # PER-PAD chunk: build at most one rectangle, then yield
li = self._grid_li
if li is None: return
if li >= self._grid_n or li >= len(self.lanes):
self._grid_li = None; return # whole rebuild done -> main loop runs draw_log
L = self.lanes[li]
top, rowh, px0, usable = self._grid_geo
y = top + li * rowh; cy = y + rowh // 2
st = self._grid_lane_st
if st is None: # first chunk on this lane: draw the instrument label + cache the geometry
tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG)
tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg)
steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps)
side = max(5, min(15, stepw - 1, rowh - 6)) # square edge for the main pulse
rad = max(2, min(side // 2, stepw // 2 - 1)) # smaller circle for subdivisions
self._grid_lane_st = (cy, steps, sub, stepw, side, rad)
self._grid_pi = 0; self._grid_pads = []; self.dirty = True
return # one chunk = "init this lane"; next iter does the first pad
cy_, steps, sub, stepw, side, rad = st
s = self._grid_pi
if s >= steps: # this lane finished; commit and advance to next
self.lane_pads.append(self._grid_pads); self.lane_lit.append(-1)
self._grid_pads = []; self._grid_lane_st = None; self._grid_li = li + 1
return
cxp = px0 + 6 + (s * usable) // steps # proportional -> beats line up across lanes
pal = self.pad_pal
if s % sub == 0:
p = vectorio.Rectangle(pixel_shader=pal, width=side, height=side, x=cxp - side // 2, y=cy_ - side // 2)
else:
p = vectorio.Circle(pixel_shader=pal, radius=rad, x=cxp, y=cy_)
p.color_index = self._padbase(L, s); self.g_grid.append(p); self._grid_pads.append(p)
self._grid_pi = s + 1
self.dirty = True
def _move_playhead(self, li, step):
pads = self.lane_pads[li]; prev = self.lane_lit[li]
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
if step < len(pads): pads[step].color_index = self._padbase(self.lanes[li], step) + 4
self.lane_lit[li] = step; self.dirty = True
def reset_playheads(self):
for li, pads in enumerate(self.lane_pads):
prev = self.lane_lit[li]
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
self.lane_lit[li] = -1
self.dirty = True
# ---------- practice history (saved to /history.json, next to programs.json) ----------
def _probe_write(self):
try:
with open("/.wtest", "w") as f: f.write("1")
try: os.remove("/.wtest")
except Exception: pass
return True
except OSError:
return False # editor mode: the computer owns the FS
def _load_log(self):
try:
with open("/history.json") as f: return json.load(f).get("log", [])
except Exception:
return []
def _save_log(self):
if not self.can_write: return
try:
with open("/history.json", "w") as f: json.dump({"log": self.log[:200]}, f)
except OSError:
self.can_write = False
def _start_play(self):
self.play_start = time.monotonic(); self.play_bpm = self.bpm; self.play_name = self.name
def _log_play(self):
if self.play_start is None: return
dur = int(time.monotonic() - self.play_start); self.play_start = None
if dur < MIN_LOG_SEC: return # skip plays under 5 seconds
mlen = self.lanes[0]['steps'] if self.lanes else 1
t = time.localtime()
e = {"t": "%02d:%02d" % (t.tm_hour, t.tm_min), "bpm": self.play_bpm,
"dur": dur, "bars": self._m_steps // max(1, mlen), "name": self.play_name,
"at": self._epoch_ms()} # epoch ms (when RTC set) = cross-half dedup key (sec 9)
self.log.insert(0, e)
del self.log[200:]; self._armed = None
self._save_log(); self.draw_log()
self._sync_send_log_one(e) # mirror this session to the editor (no-op if not armed)
def draw_log(self):
g = self.g_log
while len(g): g.pop()
self.log_rows = []
gc.collect() # several text bitmaps allocated below want a clean heap
hdr, w, h = make_text("PRACTICE LOG - THIS TRACK", FONT_S, C_MUTE, C_BG); hdr.x = 10; hdr.y = LOG_TOP; g.append(hdr)
rows = [(i, e) for i, e in enumerate(self.log) if e.get("name") == self.name] # current track only
if not rows:
tg, w, h = make_text("no plays over 5s yet", FONT_S, C_DIM, C_BG); tg.x = 10; tg.y = LOG_TOP + LOG_ROWH; g.append(tg)
self.dirty = True; return
y = LOG_TOP + LOG_ROWH + 2
for k in range(min(LOG_ROWS, len(rows))):
oi, e = rows[k]; armed = (oi == self._armed) # oi = index into self.log (for delete)
dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60)
bars = e.get("bars", 0); bstr = (" %dbar" % bars) if bars else ""
line = "%s%s %3dbpm %s%s" % ("x " if armed else "", e.get("t", "--:--"), e["bpm"], dur, bstr)
tg, w, h = make_text(line, FONT_S, C_AMBER if armed else C_TXT, C_BG); tg.x = 10; tg.y = y; g.append(tg)
self.log_rows.append((y - 2, y + LOG_ROWH - 2, oi))
y += LOG_ROWH
self.dirty = True
def _tap_log(self, x, ty):
for y0, y1, idx in self.log_rows:
if y0 <= ty <= y1:
if self._armed == idx: del self.log[idx]; self._armed = None; self._save_log(); self.draw_log() # confirm delete
else: self._armed = idx; self.draw_log() # arm (tap again)
return
if self._armed is not None: self._armed = None; self.draw_log() # tapped elsewhere -> cancel
# ---------- USB-MIDI in: SysEx assembler (clock + editor-pushed programs) ----------
def _feed_midi(self, buf, n):
now_ns = time.monotonic_ns() if MIDI_CLOCK_IN else 0 # only timestamp when slave is enabled
for i in range(n):
b = buf[i]
if b == 0xF0: self._sx = bytearray(); self._sxon = True
elif b == 0xF7:
if self._sxon: self._handle_sysex(self._sx)
self._sxon = False
elif b == 0xF8 and MIDI_CLOCK_IN: self._slave_tick(now_ns) # 24 PPQN clock tick from a master
elif b == 0xFA and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() # Start
elif b == 0xFB and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() # Continue (no SPP -> treat as Start)
elif b == 0xFC and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_stop() # Stop
elif b >= 0xF8: pass # other real-time (Active Sensing 0xFE etc.) - ignore
elif self._sxon:
if len(self._sx) < 60000: self._sx.append(b) # big enough for a pushed firmware (app.py)
else: self._sxon = False # overflow guard
def _slave_tick(self, now_ns): # one 24 PPQN tick: smooth the interval -> bpm
if self._clock_in_last_t == 0:
self._clock_in_last_t = now_ns; self._slaved = True; return # first tick: just record the timestamp
interval = now_ns - self._clock_in_last_t
self._clock_in_last_t = now_ns
# reject out-of-range intervals (30..300 BPM at 24 PPQN -> 8.33..83.3 ms per tick)
if interval < 8_300_000 or interval > 500_000_000: return
if self._clock_in_avg == 0: self._clock_in_avg = interval
else: self._clock_in_avg = (self._clock_in_avg * 7 + interval) // 8 # exponential smoothing, alpha = 1/8
new_bpm = max(5, min(300, int(60_000_000_000 // (self._clock_in_avg * 24))))
if new_bpm != self.bpm:
self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm; self._rebuild_dur_all()
self._slaved = True
def _slave_start(self): # master sent Start (or Continue) -> start playback
if not self.running:
self.running = True; self._reset_clock(); self._start_play()
self.led_rest(); self.draw_meters() # NOTE: do not echo 0xFA on output (we're slaved)
self._clock_in_last_t = 0; self._clock_in_avg = 0 # next tick re-establishes the smoothed interval
def _slave_stop(self): # master sent Stop -> stop playback
if self.running:
self.running = False; self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
self.led_rest(); self.draw_meters()
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False
def _handle_sysex(self, sx):
if len(sx) < 2 or sx[0] != 0x7D: return # 0x7D = our (educational) manufacturer id
cmd = sx[1]
if cmd == 0x01 and len(sx) >= 8 and rtc is not None: # set clock: yr-2000, mo, dd, hh, mm, ss
try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1))
except Exception: pass
elif cmd == 0x02: # version query -> reply 0x03 + "<device_id>;<APP_VERSION>"
if self.midi: # old firmware sent bare APP_VERSION; editor parses "contains ';'?" for back-compat
payload = DEVICE_ID + ";" + APP_VERSION
self.midi.write(bytes([0xF0, 0x7D, 0x03]) + payload.encode() + bytes([0xF7]))
elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43 or cmd == 0x44 or cmd == 0x45: # Live sync (see src/livesync.js)
try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:])
except Exception: return
origin = text.split(";", 1)[0] if text else ""
if origin == self._sync_origin: return # drop our own echoes (composite USB may loop)
self._sync_armed = True
if cmd == 0x40: # HELLO -> reply with our FULL + set-list library + practice log
self._sync_broadcast_full(); self._sync_send_setlists(); self._sync_send_log_batch()
elif cmd == 0x43: # BYE -> peer disconnected; stop heartbeats
self._sync_armed = False
elif cmd == 0x41: # FULL: origin;seq;running;sl;item;patch...
parts = text.split(";", 5)
if len(parts) >= 6:
try:
running = parts[2] == "1"; patch = parts[5]
self._sync_apply_full(running, patch)
except Exception: pass
elif cmd == 0x42: # DELTA: origin;seq;evt
parts = text.split(";", 2)
if len(parts) >= 3: self._sync_apply_delta(parts[2])
elif cmd == 0x44: # SLSYNC: origin;seq;json (set-list content)
parts = text.split(";", 2)
if len(parts) >= 3: self._sync_apply_setlists(parts[2])
elif cmd == 0x45: # LOGSYNC: origin;seq;json (practice entries)
parts = text.split(";", 2)
if len(parts) >= 3: self._sync_apply_log(parts[2])
elif cmd == 0x10: # write /programs.json (user playlists) pushed from the editor
try:
with open("/programs.json", "wb") as f: f.write(bytes(sx[2:]))
self.rebuild_setlists(); self.load(0) # built-ins untouched; show the refreshed lists
self._ack(True)
except Exception:
self._ack(False) # read-only (editor mode) etc.
# A/B firmware update, sent as small flow-controlled chunks (a single huge SysEx overruns the
# USB-MIDI input buffer and arrives corrupt). begin(0x21,len) -> data(0x22)* -> commit(0x23).
elif cmd == 0x21: # BEGIN firmware transfer: open the .mpy staging file
try:
try: self._fw.close()
except Exception: pass
self._fw = open("/app.new", "wb"); self._fw_n = 0
self._fw_pushing = True # silence Note On / Clock Out / Live-sync broadcasts during the push
self._ack(True)
except Exception: # read-only (editor mode) / no space
self._fw = None; self._fw_pushing = False; self._ack(False)
elif cmd == 0x22: # DATA: a base64 chunk (multiple of 4) -> decode -> append
try:
if self._fw is None or a2b_base64 is None: raise OSError()
self._fw.write(a2b_base64(bytes(sx[2:])))
self._fw.flush() # small, predictable per-chunk flush (no slow burst flushes later)
self._fw_n += 1
gc.collect() # SysEx assembler allocates a fresh bytearray per chunk;
self._ack(True) # GC every chunk so 600 chunks' worth of garbage doesn't accumulate
except Exception:
try: self._fw.close()
except Exception: pass
self._fw = None; self._fw_pushing = False; self._ack(False)
elif cmd == 0x23: # COMMIT: verify it's a CircuitPython .mpy, then A/B install
try:
try: self._fw.close()
except Exception: pass
self._fw = None; gc.collect()
with open("/app.new", "rb") as f: head = f.read(2)
if os.stat("/app.new")[6] < 4000 or len(head) < 2 or head[0] != 0x43 or head[1] != 0x06:
try: os.remove("/app.new") # not a CircuitPython mpy v6 -> reject, keep the working build
except OSError: pass
self._fw_pushing = False; self._ack(False); return
try: os.remove("/app.bak")
except OSError: pass
os.rename("/app.mpy", "/app.bak") # current build becomes the rollback
os.rename("/app.new", "/app.mpy")
open("/trial", "w").close() # arm the trial; the loader reverts if it won't boot
self._fw_pushing = False
self._ack(True); time.sleep(0.4); supervisor.reload()
except Exception: # catch ALL (read-only, MemoryError, ...) -> never brick
self._fw_pushing = False; self._ack(False)
def _ack(self, ok):
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F if ok else 0x7E, 0xF7]))
def run(self):
if self._jog: # A+B held at boot -> hidden stepper jog/test mode
self._jog_loop()
if self.touch.addr is None:
print("GT911 touch not found")
boot = time.monotonic()
try: os.stat("/trial"); committed = False # we're a freshly-pushed build on trial
except OSError: committed = True
while True:
try:
self.tick(); self.poll()
if self._need_redraw: # post-seam fast pass: cheap header/status bits, runs immediately
self._need_redraw = False
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at:
self._heavy_redraw_at = 0 # post-seam slow pass: kick off the chunked rebuild
self._grid_rebuild_start(); self._heavy_log_pending = True
if self._grid_li is not None: # process ONE lane per loop iter -> tick() runs between lanes
self._grid_rebuild_step()
elif self._heavy_log_pending: # grid done -> draw_log (cheap-ish; also one shot)
self._heavy_log_pending = False; self.draw_log()
tnow = time.monotonic()
if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter
self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp
if self.running and tnow >= self._pendNext: # ~30fps: animate the on-screen pendulum
self._pendNext = tnow + 0.03; self.draw_pendulum(time.monotonic_ns())
if self._sync_armed and tnow >= self._sync_heartbeat_next:
self._sync_broadcast_full() # periodic FULL: device is the convergence authority
if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update
try: os.remove("/trial")
except Exception: pass
committed = True
# Refresh display ~20x/s, skip ONLY when the MASTER lane's next step is within ~10ms (its alignment
# matters most musically; sub-lanes can take a ~few ms jitter without audible problem). Force-refresh
# after 200ms so titles + meters still feel live at fast subdivisions.
if self.dirty and tnow >= self._refreshNext:
safe = True
if self.running and self.lanes:
nb = self.lanes[0]['next'] # master only -> doesn't starve at fine subdivisions
safe = (nb - time.monotonic_ns()) > 10_000_000 or (tnow - self._lastRefresh) > 0.2
if safe:
if self.display.refresh(): self.dirty = False
self._lastRefresh = tnow; self._refreshNext = tnow + 0.05
else:
self._refreshNext = tnow + 0.003 # check again very soon; don't wait the 50ms
time.sleep(0.0005)
except MemoryError: # surface, gc, keep running (don't crash on a fragmented heap)
try: print("MemoryError: gc + continue")
except Exception: pass
gc.collect(); time.sleep(0.05)
except Exception as e: # any other transient error: log, continue
try: print("tick error:", e)
except Exception: pass
time.sleep(0.05)
App().run()