metronome/pico-cp/app.py
Me Here fd8446658d PM_K-1 0.0.14: gapless seam + continuous ramp + MIDI Clock Out (master); speaker rename
Speaker rename (production device has a full audio circuit, not a buzzer):
- MUTE_BUZZER -> MUTE_SPEAKER, self.buz -> self.spk, P_BUZ -> P_SPK
- New SPEAKER_AUTO_MUTE flag (default True): mute the speaker when a MIDI host is
  detected (the old hardcoded behavior; now a setting).

Gapless seam between tracks (Continue):
- _prepare_next() pre-parses the next playlist item during the LAST bar so the swap is
  allocation-free. _do_advance() swaps lanes/bpm/bars/ramp/trainer in with
  lanes[0]['next'] = seam_t (the wall-clock time of the boundary step we just hit), no
  _reset_clock - the next tick fires step 0 of the new track exactly at the boundary.
  tick() breaks out at the seam so the old voice's boundary beat is NOT fired (it'd be
  the new track's step 0 a few ms later). Visuals (build_grid + draws) are deferred
  one display-refresh cycle behind the audio via _need_redraw, so the audio doesn't
  wait for them.

Continuous ramp:
- Replaced the bar-boundary set_bpm step with per-master-step linear interpolation:
  bpm = _ramp_base + amt * ((m_steps/mlen) % bars) / ramp.every (clamped 30..300).
  The integer-clamped bpm glides smoothly across the segment. draw_bpm() is now lazy
  (skips the bitmap alloc if the displayed integer hasn't changed), and the periodic
  meter tick in run() also redraws BPM so the big number follows the ramp.

MIDI Clock Out (master):
- New flags: MIDI_CHANNEL (default 10 = GM drum), MIDI_CLOCK_OUT (default OFF),
  MIDI_CLOCK_OUT_TRANSPORT (default ON). midi_send() now uses the configured channel.
  In tick(), when running + MIDI_CLOCK_OUT, stream 0xF8 at 24 PPQN with the interval
  computed live from self.bpm (so it follows the continuous ramp). toggle() sends
  0xFA on Start and 0xFC on Stop when transport is enabled.

Verified in harness: seam keeps lanes[0]['next'] = seam_t (no _reset_clock); ramp 80
glides via +0.25/step (visible as 80->81 in 4 master steps at rmp80/4/4); Clock Out
math sound (60/120/180 BPM -> 41.67/20.83/13.89 ms tick interval).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 07:11:19 -05:00

1148 lines
66 KiB
Python

# VARASYS PolyMeter - PM_K-1 "Kit" firmware (CircuitPython edition)
# Raspberry Pi Pico (Pico / Pico W / Pico 2) on the 52Pi EP-0172 "Pico Breadboard Kit Plus":
# 3.5" ST7796 320x480 cap-touch (GT911), PSP joystick, WS2812 RGB, speaker, 2 buttons.
#
# WHY CIRCUITPYTHON: the board then mounts as a USB drive (CIRCUITPY) carrying this code, your
# tracks (programs.json) and a copy of the editor - edit on the web, "Save to device" writes
# programs.json here, and CircuitPython auto-reloads with the new grooves. It also sends USB-MIDI
# (a note per click) so the web editor can play it out the computer's speakers ("Device audio").
# Runs the SAME program strings as metronome.varasys.io.
#
# INSTALL: flash CircuitPython (https://circuitpython.org/board/raspberry_pi_pico/), then copy
# this file as code.py plus programs.json onto the CIRCUITPY drive. It runs on boot.
#
# Fallback: the simpler MicroPython firmware (pico/main.py) is always available - BOOTSEL +
# drag a MicroPython .uf2 to go back. The Pico cannot be bricked.
#
# Untested-panel notes & calibration flags are in CONFIG + pico-cp/README.md.
import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
APP_VERSION = "0.0.14" # firmware version (the A/B updater pushes/compares this)
try:
import rtc # set from the editor's clock SysEx so the log has real timestamps
except ImportError:
rtc = None
try: # CircuitPython 9.x
from fourwire import FourWire
from busdisplay import BusDisplay
except ImportError: # CircuitPython 8.x
from displayio import FourWire
from displayio import Display as BusDisplay
try:
import neopixel_write # core module on RP2040 - drives WS2812 with no external library
except ImportError:
neopixel_write = None
try:
import usb_midi # default-enabled on RP2040 - sends a MIDI note per click to the computer
except ImportError:
usb_midi = None
try:
from binascii import a2b_base64 # decode the base64-encoded .mpy pushed by the editor's one-click update
except ImportError:
a2b_base64 = None
# ============================== CONFIG (tweak if needed) ==============================
SPI_BAUD = 62_500_000 # faster SPI = smaller tearing window; drop to 40_000_000 if unstable
LED_BRIGHTNESS = 0.15 # WS2812 sits right next to you - keep it dim (0..1)
MIDI_ENABLED = True # send a USB-MIDI note per click (play via the web editor's "Device audio")
MIDI_CHANNEL = 10 # 1..16 - GM channel 10 is the drum channel (what DAWs auto-route to drums)
MIDI_CLOCK_OUT = False # send 24 PPQN MIDI Clock so a DAW can slave its tempo to the metronome
MIDI_CLOCK_OUT_TRANSPORT = True # also send Start (0xFA) / Stop (0xFC) on play / stop (relevant if MIDI_CLOCK_OUT)
MUTE_SPEAKER = False # always silence the on-board speaker
SPEAKER_AUTO_MUTE = True # auto-mute the speaker when a MIDI host is listening (computer plays it instead)
WIDTH, HEIGHT = 320, 480
MADCTL = 0x48 # portrait; 0x48 swaps R/B for this BGR panel (cyan reads cyan). Use 0x40 if reversed.
INVERT_COLORS = True # most ST7796 modules need inversion ON; set False if colours look negative
# Touch (GT911) - flip if taps land wrong:
TOUCH_SWAP_XY = False
TOUCH_INVERT_X = False
TOUCH_INVERT_Y = False
TOUCH_DEBUG = False
# Joystick:
JOY_INVERT_X = False
JOY_INVERT_Y = False
JOY_DEADZONE = 9000
# ----- pins (fixed by the EP-0172 board) -----
P_SCK, P_MOSI, P_CS, P_DC, P_RST = board.GP2, board.GP3, board.GP5, board.GP6, board.GP7
P_SDA, P_SCL = board.GP8, board.GP9
P_RGB, P_SPK, P_BTNA, P_BTNB = board.GP12, board.GP13, board.GP15, board.GP14
P_JOYX, P_JOYY = board.GP26, board.GP27
# ----- BUILT-IN playlists: the standard defaults from the web editor, baked in here so they update with
# firmware and the user can't change/delete them. User playlists live separately in programs.json
# (pushed from the editor) and never touch these. (ASCII only - it's pushed 7-bit + the fonts are ASCII.)
BUILTIN_SETLISTS = [
("Styles", [
("Four-on-the-floor", "t120;kick:4;snare:4=.x.x;hatClosed:4/2"),
("Swing ride", "t150;ride:4/2s;kick:4=X..x;snare:4=.x.x"),
("Purdie half-time shuffle", "t92;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
("Samba (2/4)", "t104;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
("Nanigo (6/8 bembe)", "t130;cowbell:4/3=X.xx.x.xx.x.;kick:4/3=X.....X.....;hatClosed:4/3=..x..x..x..x"),
("6/8 groove", "t100;kick:3+3=x..x..;snare:3+3=...x..;hatClosed:3+3/2"),
("7/8 (2+2+3)", "t130;kick:2+2+3=x..x..x;hatClosed:2+2+3/2"),
("5/4 (3+2)", "t112;kick:3+2=x..x.;snare:3+2=..x..;hatClosed:3+2/2"),
]),
("Practice", [
("5 over 4 polyrhythm", "t100;kick:4;claves:5~"),
("3 over 2 hemiola", "t96;woodblock:2;cowbell:3~"),
("2 & 4 & 3 over one bar", "t100;kick:3;cowbell:2~;claves:4~"),
("Triplet hats", "t100;kick:4;snare:4=.x.x;hatClosed:4/3"),
("Accents - cycle the pads", "t92;kick:4=X..X;snare:4=.X.X;hatClosed:4/2"),
("Tempo builder 80 up", "t80;woodblock:4;rmp80/4/4"),
("Gap trainer (play 2 / rest 2)", "t100;kick:4;hatClosed:4/2;tr2/2"),
]),
("Song (continuous)", [ # ~4-bar sections; with Continue on they roll one into the next
("Intro - hats & kick", "t88;b4;kick:4=X.x.;hatClosed:4/2=gggggggg"),
("Groove in - backbeat", "t88;b4;kick:4=X.x.;snare:4=.X.X;hatClosed:4/2"),
("Half-time shuffle", "t92;b4;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
("Build - ramp 92-120", "t92;b4;rmp92/4/2;kick:4;snare:4=.X.X;hatClosed:4/2"),
("Four-on-the-floor (909)", "t124;b4;kick909:4;clap909:4=.X.X;hat909:4/2=.X.X.X.X"),
("Samba break (2/4)", "t116;b4;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
("Peak - 16ths", "t132;b4;kick:4=X..x;snare:4=.X.X;hatClosed:4/4"),
("Outro - ramp down", "t132;b4;rmp132/-7/1;kick:4=X..x;hatClosed:4/2=gggggggg"),
]),
]
# ============================== COLOURS (0xRRGGBB; displayio handles 565) ==============================
C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494
C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240
C_BTN = 0x1C222C
LEVEL_RGB = {2: (255, 110, 0), 1: (0, 150, 255), 3: (130, 70, 255)} # beat pulse: accent / normal / ghost
LED_IDLE = (0, 80, 0) # RGB LED resting colour when stopped: dim green ("on")
LED_RUN = (110, 0, 0) # RGB LED resting colour when playing: dim red (beats pulse brighter on top)
# voice -> General-MIDI note (USB-MIDI bridge), and level -> MIDI velocity
SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38,
"clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42,
"hatOpen":46,"openHat808":46, "ride":51,"ride909":51, "crash":49,"crash909":49,
"tomLow":41,"tom808":45,"tomMid":45,"tomHigh":48, "tambourine":54,
"cowbell":56,"cowbell808":56, "woodblock":76,"jamblock":76, "claves":75, "beep":37}
GM_DEFAULT = 37
SOUNDS = ["kick", "snare", "clap", "rim", "hatClosed", "hatOpen", "ride", "crash", # lane-editor sound cycle
"tomLow", "tomMid", "tomHigh", "cowbell", "woodblock", "claves", "tambourine", "beep"]
MIDI_VEL = {2: 120, 1: 90, 3: 45} # accent / normal / ghost
MAXLANES = 5 # lanes shown on the pad grid (extras still play)
GRID_TOP = 158 # top of the pad grid (leaves room for time/bar/ramp/tab rows)
LOG_TOP, LOG_ROWH, LOG_ROWS = 302, 16, 9 # practice-history log area (below the pad grid)
MIN_LOG_SEC = 5 # don't log plays shorter than this
PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost
PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost
C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes)
C_RED = 0xFF5A5A # unsaved-edits (dirty) track title
# WS2812 RGB LED - self-contained via the core neopixel_write module (no external library)
class RGB:
def __init__(self, pin):
self.ok = neopixel_write is not None
if self.ok:
self.io = digitalio.DigitalInOut(pin); self.io.direction = digitalio.Direction.OUTPUT
self.buf = bytearray(3)
def set(self, r, g, b):
if not self.ok: return
# WS2812 wants GRB order; scale down so it isn't blinding
self.buf[0] = int(g * LED_BRIGHTNESS); self.buf[1] = int(r * LED_BRIGHTNESS); self.buf[2] = int(b * LED_BRIGHTNESS)
try: neopixel_write.neopixel_write(self.io, self.buf)
except Exception: self.ok = False
# ============================== ANTI-ALIASED FONTS (binary blobs on the drive; see pico/gen_font.py) ==============================
def load_font(path):
with open(path, "rb") as f:
blob = f.read()
count = blob[0]; p = 1; pixoff = 1 + count * 7; glyphs = {}
for _ in range(count):
cp = (blob[p] << 8) | blob[p+1]; w = blob[p+2]; h = blob[p+3]
xoff = blob[p+4]; xoff = xoff - 256 if xoff > 127 else xoff
top = blob[p+5]; adv = blob[p+6]; p += 7
glyphs[cp] = (w, h, xoff, top, adv, pixoff); pixoff += (w * h + 1) // 2
return (glyphs, blob)
FONT_S = load_font("/font_s.bin") # small - pad-grid lane labels
FONT_M = load_font("/font_m.bin") # labels / buttons
FONT_L = load_font("/font_l.bin") # big BPM
gc.collect()
def _blend(bg, fg, i):
t = i * 17
r = (((bg >> 16) & 0xFF)*(255-t) + ((fg >> 16) & 0xFF)*t) // 255
g = (((bg >> 8) & 0xFF)*(255-t) + ((fg >> 8) & 0xFF)*t) // 255
b = ((bg & 0xFF)*(255-t) + (fg & 0xFF)*t) // 255
return (r << 16) | (g << 8) | b
def make_text(s, font, fg, bg):
"""Render a string into a displayio TileGrid (anti-aliased via a 16-step blend palette)."""
glyphs, blob = font
w = 0; top0 = 999; bot = 0
for c in s:
g = glyphs.get(ord(c))
if not g: continue
w += g[4]
if g[1]:
if g[3] < top0: top0 = g[3]
if g[3] + g[1] > bot: bot = g[3] + g[1]
if top0 == 999: top0 = 0
w = max(1, w); h = max(1, bot - top0)
gc.collect()
bmp = displayio.Bitmap(w, h, 16)
pal = displayio.Palette(16)
for i in range(16): pal[i] = _blend(bg, fg, i)
pen = 0
for c in s:
g = glyphs.get(ord(c))
if not g: continue
gw, gh, xoff, gtop, adv, off = g
for j in range(gh):
row = (gtop - top0) + j
for i in range(gw):
k = j * gw + i
byte = blob[off + (k >> 1)]
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
if nib:
x = pen + xoff + i
if 0 <= x < w and 0 <= row < h: bmp[x, row] = nib
pen += adv
return displayio.TileGrid(bmp, pixel_shader=pal), w, h
# ---- single-image alpha assets (logo, status icons) - blit like a one-off glyph; see gen_assets.py ----
def load_alpha(path):
try:
with open(path, "rb") as f: blob = f.read()
return (blob[0], blob[1], blob) # (w, h, bytes); pixels start at offset 2
except Exception:
return None # missing/corrupt -> caller falls back to text (no crash)
def make_glyph(asset, fg, bg):
w, h, blob = asset
gc.collect()
bmp = displayio.Bitmap(w, h, 16); pal = displayio.Palette(16)
for i in range(16): pal[i] = _blend(bg, fg, i)
for k in range(w * h):
byte = blob[2 + (k >> 1)]
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
if nib: bmp[k % w, k // w] = nib
return displayio.TileGrid(bmp, pixel_shader=pal), pal, w, h
def _recolor(pal, fg, bg): # re-tint a stored asset palette in place (tear-free)
for i in range(16): pal[i] = _blend(bg, fg, i)
LOGO = load_alpha("/logo.bin") # VARASYS wordmark (no tagline)
ICON_MIDI = load_alpha("/midi.bin") # DIN-5: green when a MIDI host is listening
ICON_USB = load_alpha("/usb.bin") # trident: lit when USB-connected to a computer
gc.collect()
# ============================== POLYMETER ENGINE (same semantics as the web/MicroPython) ==============================
PAT = {'X': 2, 'x': 1, 'g': 3, '.': 0, '-': 0, '_': 0}
PRIO = {2: 3, 1: 2, 3: 1}
def parse_program(s):
bpm = 120; lanes = []; bars = 0; ramp = None; trainer = None
for tok in s.strip().split(';'):
tok = tok.strip()
if not tok: continue
if tok[0] == 't' and tok[1:].isdigit():
bpm = int(tok[1:]); continue
if tok[0] == 'b' and tok[1:].isdigit(): # b<n> = segment length in bars (totals + Continue)
bars = int(tok[1:]); continue # (lane sounds like "beep:4" have a ':' -> not matched here)
if tok.startswith('rmp'): # rmp<start>/<amount>/<everyBars> tempo ramp (amount may be -)
p = tok[3:].split('/')
if len(p) == 3:
try: ramp = {'start': int(p[0]), 'amt': int(p[1]), 'every': max(1, int(p[2]))}
except ValueError: pass
continue
if tok.startswith('tr') and '/' in tok and ':' not in tok: # tr<play>/<mute> gap trainer (bars)
p = tok[2:].split('/')
if len(p) == 2:
try: trainer = {'play': max(0, int(p[0])), 'mute': max(0, int(p[1]))}
except ValueError: pass
continue
if ':' not in tok: continue
lane = _parse_lane(tok)
if lane: lanes.append(lane)
if not lanes: lanes = [_parse_lane("beep:4")]
return max(30, min(300, bpm)), lanes, bars, ramp, trainer
def _parse_lane(tok):
poly = '~' in tok; mute = '!' in tok
tok = tok.replace('~', '').replace('!', '')
gain = ''
if '@' in tok: tok, _, g = tok.partition('@'); gain = '@' + g # preserve @db for round-trip (engine ignores it)
sound, _, rest = tok.partition(':')
pattern = None
if '=' in rest: rest, _, pattern = rest.partition('=')
sub = 1; swing = False
if '/' in rest:
rest, _, sd = rest.partition('/')
swing = sd.endswith('s'); sd = sd.rstrip('s') # "/2s" = swung eighths
sub = int(sd) if sd.isdigit() else 1
groups = [int(g) for g in rest.split('+') if g.isdigit()] or [4]
beats = sum(groups); starts = set(); acc = 0
for gp in groups: starts.add(acc); acc += gp
steps = beats * sub
if pattern:
levels = [PAT.get(ch, 0) for ch in pattern]
if len(levels) < steps: levels += [0] * (steps - len(levels))
steps = len(levels)
else:
levels = []
for i in range(steps):
if i % sub == 0: levels.append(2 if (i // sub) in starts else 1)
else: levels.append(0)
return {'sound': sound, 'sub': sub, 'swing': swing, 'steps': steps, 'levels': levels,
'poly': poly, 'mute': mute, 'groups': groups, 'gain': gain}
PAT_CH = {2: 'X', 1: 'x', 3: 'g', 0: '.'} # level -> pattern char (inverse of PAT)
def lane_to_str(L): # serialize a lane back to the share grammar (round-trips)
s = L['sound'] + ':' + '+'.join(str(g) for g in L.get('groups', [4]))
if L['sub'] != 1 or L['swing']: s += '/' + str(L['sub']) + ('s' if L['swing'] else '')
s += '=' + ''.join(PAT_CH.get(v, '.') for v in L['levels'])
s += L.get('gain', '')
if L['poly']: s += '~'
if L['mute']: s += '!'
return s
_ALNUM = "abcdefghijklmnopqrstuvwxyz0123456789"
def _slkey(t): # normalise a title for built-in/user de-dup (no str.isalnum on CircuitPython)
return "".join(c for c in t.lower() if c in _ALNUM)
def load_user_setlists():
# User playlists from /programs.json (pushed by the editor). New {setlists:[{title,programs:[..]}]} form,
# or the old flat {programs:[..]} (one list). Built-ins are baked in BUILTIN_SETLISTS, never here.
try:
with open("/programs.json") as f: d = json.load(f)
except Exception as e:
print("programs.json:", e); return []
def items_of(pl): return [(p.get("name", "?"), p.get("prog", "")) for p in pl if p.get("prog")]
out = []
try:
if isinstance(d.get("setlists"), list):
for sl in d["setlists"]:
it = items_of(sl.get("programs", []))
if it: out.append((sl.get("title", "My set list"), it))
elif isinstance(d.get("programs"), list):
it = items_of(d["programs"])
if it: out.append((d.get("title", "My set list"), it))
except Exception as e:
print("setlists:", e)
return out
# ============================== GT911 TOUCH ==============================
class GT911:
def __init__(self, i2c):
self.i2c = i2c; self.addr = None
while not i2c.try_lock(): pass
try: found = i2c.scan()
finally: i2c.unlock()
for a in (0x5D, 0x14):
if a in found: self.addr = a; break
if self.addr is None and found: self.addr = found[0]
def _rd(self, reg, n):
b = bytearray(n)
while not self.i2c.try_lock(): pass
try:
self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF]))
self.i2c.readfrom_into(self.addr, b)
finally: self.i2c.unlock()
return b
def _wr(self, reg, val):
while not self.i2c.try_lock(): pass
try: self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF, val]))
finally: self.i2c.unlock()
def read(self):
if self.addr is None: return None
try: st = self._rd(0x814E, 1)[0]
except OSError: return None
if not (st & 0x80): return None
n = st & 0x0F; pt = None
if n >= 1:
b = self._rd(0x8150, 4); tx = b[0] | (b[1] << 8); ty = b[2] | (b[3] << 8)
pt = self._map(tx, ty)
try: self._wr(0x814E, 0)
except OSError: pass
return pt
def _map(self, tx, ty):
if TOUCH_DEBUG: print("touch raw", tx, ty)
if TOUCH_SWAP_XY: tx, ty = ty, tx
if TOUCH_INVERT_X: tx = WIDTH - 1 - tx
if TOUCH_INVERT_Y: ty = HEIGHT - 1 - ty
if 0 <= tx < WIDTH and 0 <= ty < HEIGHT: return (tx, ty)
return None
# ============================== DISPLAY SETUP ==============================
def st7796_init():
inv = b'\x21\x00' if INVERT_COLORS else b'\x20\x00'
return (
b'\x01\x80\x78' # SWRESET + 120ms
b'\x11\x80\x78' # SLPOUT + 120ms
b'\xF0\x01\xC3' b'\xF0\x01\x96' # command-set unlock
+ bytes([0x36, 0x01, MADCTL]) +
b'\x3A\x01\x55' # 16bpp
b'\xB4\x01\x01'
b'\xB6\x03\x80\x02\x3B'
b'\xE8\x08\x40\x8A\x00\x00\x29\x19\xA5\x33'
b'\xC1\x01\x06' b'\xC2\x01\xA7'
b'\xC5\x81\x18\x78' # VCOM + 120ms
b'\xE0\x0E\xF0\x09\x0B\x06\x04\x15\x2F\x54\x42\x3C\x17\x14\x18\x1B'
b'\xE1\x0E\xE0\x09\x0B\x06\x04\x03\x2B\x43\x42\x3B\x16\x14\x17\x1B'
b'\xF0\x01\x3C' b'\xF0\x81\x69\x78' # lock + 120ms
+ inv +
b'\x29\x80\x32' # DISPON + 50ms
)
def make_display():
displayio.release_displays()
spi = busio.SPI(clock=P_SCK, MOSI=P_MOSI)
bus = FourWire(spi, command=P_DC, chip_select=P_CS, reset=P_RST, baudrate=SPI_BAUD)
return BusDisplay(bus, st7796_init(), width=WIDTH, height=HEIGHT, auto_refresh=False)
def solid(color):
p = displayio.Palette(1); p[0] = color; return p
def rect(x, y, w, h, color):
return vectorio.Rectangle(pixel_shader=solid(color), width=w, height=h, x=x, y=y)
# ============================== APP ==============================
class App:
def __init__(self):
self.display = make_display()
self.i2c = busio.I2C(scl=P_SCL, sda=P_SDA, frequency=400_000)
self.touch = GT911(self.i2c)
self.midi = usb_midi.ports[1] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 1) else None
self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None
self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0
self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler (clock + pushed programs)
self._fw = None # chunked firmware transfer: staging file handle
self.led = RGB(P_RGB)
self.spk = pwmio.PWMOut(P_SPK, frequency=1600, variable_frequency=True, duty_cycle=0)
self.spk_off = 0
self.btnA = self._btn(P_BTNA); self.btnB = self._btn(P_BTNB)
self._aPrev = True; self._bPrev = True
self.jx = analogio.AnalogIn(P_JOYX); self.jy = analogio.AnalogIn(P_JOYY)
self._joyNext = 0
self._touchDown = False; self._touchSeen = 0
self.running = False; self.bpm = 120; self.idx = 0; self.lanes = []; self.bars = 0; self.rgb = (0, 0, 0)
self.ramp = None; self.trainer = None; self._lastbar = -1; self._muted = False; self._ramp_base = 120
self._dirty = False; self._overlay = None; self._ovbtns = [] # on-device editing: unsaved edits + modal
self.continue_on = False; self._advance = False; self._grid = {} # auto-advance + pad hit-test geometry
self._next_pending = None; self._seam_t = 0; self._need_redraw = False # gapless seam between tracks
self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler
self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json)
self.dirty = True
self.pad_pal = displayio.Palette(8) # 0-3 idle levels (mute/normal/accent/ghost), 4-7 the lit playhead
for i in range(4): self.pad_pal[i] = PAD_DIM[i]; self.pad_pal[i + 4] = PAD_LIT[i]
self.lane_pads = []; self.lane_lit = []
self.usb_conn = False; self._m_steps = 0 # USB-connected state; master-lane steps (for the bar counter)
self._uiNext = 0.0; self._lastTs = None; self._lastBs = None # throttle the stopwatch/bar redraw
self._seg_start = 0.0 # timer origin; resets with the bar counter (each segment)
self._refreshNext = 0.0; self._touchNext = 0.0 # cap display refresh + touch polling (tighter MIDI timing)
self.ic_midi_pal = None; self.ic_usb_pal = None
# practice history - persisted to /history.json (next to programs.json) when we own the filesystem
self.can_write = self._probe_write()
self.log = self._load_log()
self.play_start = None; self.play_bpm = 0; self.play_name = ""
self._armed = None; self.log_rows = []
self._build_scene()
self.load(0) # load() also draws the (track-filtered) practice log
self.draw_icons(); self.draw_meters(); self.led_rest() # LED green = on
def _btn(self, pin):
d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP
return d
# ---------- scene graph ----------
def _build_scene(self):
root = displayio.Group(); self.display.root_group = root
root.append(rect(0, 0, WIDTH, HEIGHT, C_BG)) # static background (run state shows on the LED)
# header: VARASYS logo (left, no tagline) + version (small, top, right of the logo) + MIDI/USB icons (right)
if LOGO:
tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 10; tg.y = 9; root.append(tg)
lx = 10 + lw
else:
tg, w, h = make_text("VARASYS", FONT_M, C_CYAN, C_BG); tg.x = 10; tg.y = 8; root.append(tg)
lx = 10 + w
vtg, vw, vh = make_text("v" + APP_VERSION, FONT_S, C_DIM, C_BG); vtg.x = lx + 6; vtg.y = 8; root.append(vtg)
x = WIDTH - 12
for asset, attr in ((ICON_USB, "ic_usb_pal"), (ICON_MIDI, "ic_midi_pal")):
if asset:
tg, pal, w, h = make_glyph(asset, C_DIM, C_BG); x -= w; tg.x = x; tg.y = 8; x -= 8
root.append(tg); setattr(self, attr, pal)
root.append(rect(0, 38, WIDTH, 2, C_PANEL))
# dynamic groups
self.g_bpm = displayio.Group(); root.append(self.g_bpm) # big tempo (right)
self.g_time = displayio.Group(); root.append(self.g_time) # elapsed [of total] (left)
self.g_bar = displayio.Group(); root.append(self.g_bar) # bar [of total] (left)
self.g_train = displayio.Group(); root.append(self.g_train) # ramp / gap-trainer indicators
self.g_cont = displayio.Group(); root.append(self.g_cont) # CONT (Continue auto-advance) toggle indicator
self.g_name = displayio.Group(); root.append(self.g_name) # track title (red when edited/unsaved)
self.g_idx = displayio.Group(); root.append(self.g_idx) # set-list tab (tap to switch playlist)
self.g_grid = displayio.Group(); root.append(self.g_grid) # lanes x step pads
root.append(rect(0, LOG_TOP - 6, WIDTH, 2, C_PANEL)) # divider above the history log
self.g_log = displayio.Group(); root.append(self.g_log) # practice history (tap a row to delete)
self.g_overlay = displayio.Group(); root.append(self.g_overlay) # modal (save/revert) - drawn on top
# run/stop shows on the RGB LED; tap beats to edit, tap the title to save/revert, tap the tab to switch lists
def _place(self, group, s, x, y, fg, bg, font, right_edge=None):
while len(group): group.pop()
self.dirty = True
if not s: return
tg, w, h = make_text(s, font, fg, bg)
tg.x = (right_edge - w) if right_edge is not None else x; tg.y = y; group.append(tg)
def _center(self, group, s, cx, cy, fg, bg, font):
while len(group): group.pop()
tg, w, h = make_text(s, font, fg, bg); tg.x = cx - w//2; tg.y = cy - h//2; group.append(tg)
self.dirty = True
# ---------- program ----------
def rebuild_setlists(self):
# built-in playlists first (read-only), then user playlists from programs.json (a baked title always wins)
self.setlists = [{'title': t, 'items': it, 'builtin': True} for t, it in BUILTIN_SETLISTS]
seen = set(_slkey(t) for t, _ in BUILTIN_SETLISTS)
for t, it in load_user_setlists():
if _slkey(t) in seen: continue
seen.add(_slkey(t)); self.setlists.append({'title': t, 'items': it, 'builtin': False})
if self.sl >= len(self.setlists): self.sl = 0
def switch_setlist(self, delta=1):
if len(self.setlists) < 2: return
was = self.running
if was: self.running = False; self._log_play()
self.sl = (self.sl + delta) % len(self.setlists)
self.load(0)
if was: self.running = True; self._reset_clock(); self._start_play()
self.led_rest(); self.draw_meters()
def load(self, i):
items = self.setlists[self.sl]['items']
self.idx = i % len(items)
self.name, prog = items[self.idx]
self.bpm, self.lanes, self.bars, self.ramp, self.trainer = parse_program(prog)
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
self._dirty = False; self._overlay = None # fresh load -> no unsaved edits
self._next_pending = None; self._need_redraw = False # discard any prepared seam (user navigated away)
while len(self.g_overlay): self.g_overlay.pop() # dismiss any open modal
self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train()
self.build_grid(); self.draw_log()
def _prog_str(self): # serialize the current (possibly edited) track to a program string
parts = ['t' + str(self.bpm)]
if self.bars: parts.append('b' + str(self.bars))
if self.ramp: parts.append('rmp%d/%d/%d' % (self.ramp.get('start', self.bpm), self.ramp['amt'], self.ramp['every']))
if self.trainer: parts.append('tr%d/%d' % (self.trainer['play'], self.trainer['mute']))
for L in self.lanes: parts.append(lane_to_str(L))
return ';'.join(parts)
# ---------- on-device editing: tap a beat to cycle it; tap the title to save/revert ----------
def _grid_hit(self, tx, ty): # map a touch to (kind, lane[, step]) on the pad grid
g = self._grid
if not g or not (g['top'] <= ty < g['top'] + g['n'] * g['rowh']): return None
li = (ty - g['top']) // g['rowh']
if li >= g['n']: return None
if tx < g['px0']: return ('lane', li) # tapped the lane label (lane editor = 0.1.0)
L = self.lanes[li]; steps = L['steps']
s = int((tx - g['px0'] - 6) * steps / g['usable'] + 0.5)
return ('beat', li, max(0, min(steps - 1, s)))
def _cycle_beat(self, li, s): # off -> normal -> accent -> ghost -> off
L = self.lanes[li]
L['levels'][s] = {0: 1, 1: 2, 2: 3, 3: 0}[L['levels'][s]]
base = self._padbase(L, s); lit = (self.lane_lit[li] == s)
self.lane_pads[li][s].color_index = base + 4 if lit else base
self._set_dirty()
def _set_dirty(self):
if not self._dirty: self._dirty = True; self.draw_status()
self.dirty = True
def toggle_continue(self):
self.continue_on = not self.continue_on; self.draw_status()
def _user_list(self, title): # find or create a user playlist
for s in self.setlists:
if not s['builtin'] and s['title'] == title: return s
s = {'title': title, 'items': [], 'builtin': False}; self.setlists.append(s); return s
def _persist_user(self): # write all user playlists back to /programs.json
user = [s for s in self.setlists if not s['builtin']]
data = {"setlists": [{"title": s['title'],
"programs": [{"name": n, "prog": p} for n, p in s['items']]} for s in user]}
try:
with open("/programs.json", "w") as f: json.dump(data, f)
return True
except OSError:
return False # editor mode: the drive is read-only to us
def _save_edit(self):
prog = self._prog_str(); sl = self.setlists[self.sl]
if sl['builtin']: # built-ins are read-only -> save a USER copy
tgt = self._user_list("My edits"); names = [n for n, _ in tgt['items']]
if self.name in names: tgt['items'][names.index(self.name)] = (self.name, prog)
else: tgt['items'].append((self.name, prog))
dest = ("My edits", self.name)
else:
sl['items'] = list(sl['items']); sl['items'][self.idx] = (self.name, prog)
dest = (sl['title'], self.name)
if not self._persist_user():
self._show_msg("Read-only: reboot without holding A"); return
self.rebuild_setlists() # refresh, then jump to the saved (user) copy
for i, s in enumerate(self.setlists):
if not s['builtin'] and s['title'] == dest[0]:
self.sl = i; names = [n for n, _ in s['items']]
self.load(names.index(dest[1]) if dest[1] in names else 0); return
self.load(0)
def _revert(self):
self.load(self.idx) # reload from source -> discard edits
# ---------- modal overlay (save / revert / message) ----------
def _show_saverevert(self):
self._overlay = 'saverevert'; g = self.g_overlay
while len(g): g.pop()
px, py, pw, ph = 24, 178, WIDTH - 48, 116
g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_CYAN))
t, w, h = make_text("Unsaved edits", FONT_M, C_TXT, C_PANEL); t.x = px + 14; t.y = py + 12; g.append(t)
self._ovbtns = []; by = py + 44; bh = 50; gap = 12; bw = (pw - 3 * gap) // 2
for i, (lbl, col, act) in enumerate((("SAVE", C_GREEN, self._save_edit), ("REVERT", C_AMBER, self._revert))):
bx = px + gap + i * (bw + gap)
g.append(rect(bx, by, bw, bh, C_BTN)); g.append(rect(bx, by, bw, 2, col))
tt, tw, th = make_text(lbl, FONT_M, col, C_BTN); tt.x = bx + (bw - tw) // 2; tt.y = by + (bh - th) // 2; g.append(tt)
self._ovbtns.append((bx, by, bx + bw, by + bh, act))
c, cw, ch = make_text("tap outside to cancel", FONT_S, C_DIM, C_PANEL); c.x = px + 14; c.y = py + ph - 16; g.append(c)
self.dirty = True
def _show_msg(self, text):
self._overlay = 'msg'; g = self.g_overlay
while len(g): g.pop()
px, py, pw, ph = 24, 200, WIDTH - 48, 64
g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_AMBER))
t, w, h = make_text(text[:28], FONT_S, C_TXT, C_PANEL); t.x = px + 12; t.y = py + 14; g.append(t)
t2, w2, h2 = make_text("(tap to dismiss)", FONT_S, C_DIM, C_PANEL); t2.x = px + 12; t2.y = py + 38; g.append(t2)
self.dirty = True
def _close_overlay(self):
self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
self.dirty = True
def _tap_overlay(self, tx, ty):
if self._overlay == 'msg': self._close_overlay(); return
for x0, y0, x1, y1, act in self._ovbtns: # each action manages the panel (lane edits redraw it live)
if x0 <= tx <= x1 and y0 <= ty <= y1: act(); return
self._close_overlay() # tapped outside a button -> cancel / done
def _handle_tap(self, tx, ty):
if self._overlay: self._tap_overlay(tx, ty); return
if 112 <= ty <= 126: # set-list tab line
if tx > WIDTH - 56: self.toggle_continue() # right end = CONT (auto-advance) toggle
else: self.switch_setlist(1)
return
if 128 <= ty <= 154: # track-title line
if self._dirty: self._show_saverevert()
return
hit = self._grid_hit(tx, ty)
if hit and hit[0] == 'beat': self._cycle_beat(hit[1], hit[2]); return
if hit and hit[0] == 'lane': self._show_laneedit(hit[1]); return # tap the instrument name -> lane editor
self._tap_log(tx, ty) # else the practice log
# ---------- lane editor (tap the instrument name): sound / beats / sub / swing / mute + add / remove ----------
def _show_laneedit(self, li):
self._overlay = 'lane'; self._edit_li = li; self._draw_laneedit()
def _draw_laneedit(self):
li = self._edit_li; L = self.lanes[li]; g = self.g_overlay
while len(g): g.pop()
self._ovbtns = []
PX, PY, PW, RH = 14, 54, WIDTH - 28, 34
g.append(rect(PX, PY, PW, RH * 7 + 30, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
t, w, h = make_text("Edit lane %d of %d" % (li + 1, len(self.lanes)), FONT_S, C_MUTE, C_PANEL)
t.x = PX + 12; t.y = PY + 8; g.append(t)
y = [PY + 28]
def vrow(label, value, fn): # label + [<] value [>]; left tap = fn(-1), right = fn(+1)
yy = y[0]
lt, lw, lh = make_text(label, FONT_S, C_MUTE, C_PANEL); lt.x = PX + 12; lt.y = yy + 9; g.append(lt)
g.append(rect(PX + 108, yy + 3, 28, RH - 8, C_BTN))
at, aw, ah = make_text("<", FONT_M, C_CYAN, C_BTN); at.x = PX + 108 + 9; at.y = yy + 7; g.append(at)
vt, vw, vh = make_text(value, FONT_M, C_TXT, C_PANEL); vt.x = PX + 146; vt.y = yy + 5; g.append(vt)
g.append(rect(PX + PW - 36, yy + 3, 28, RH - 8, C_BTN))
gt, gw, gh = make_text(">", FONT_M, C_CYAN, C_BTN); gt.x = PX + PW - 36 + 9; gt.y = yy + 7; g.append(gt)
self._ovbtns.append((PX + 104, yy, PX + 140, yy + RH, lambda: fn(-1)))
self._ovbtns.append((PX + PW - 40, yy, PX + PW, yy + RH, lambda: fn(1)))
y[0] += RH
vrow("Sound", L['sound'][:9], self._edit_sound)
vrow("Beats", str(sum(L['groups'])), self._edit_beats)
vrow("Subdiv", str(L['sub']), self._edit_sub)
vrow("Swing", "on" if L['swing'] else "off", self._edit_swing)
vrow("Mute", "yes" if L['mute'] else "no", self._edit_mute)
yy = y[0] + 2; bw = (PW - 36) // 2 # + Lane | Remove
g.append(rect(PX + 12, yy, bw, RH - 6, C_BTN))
a, aw, ah = make_text("+ Lane", FONT_S, C_GREEN if len(self.lanes) < MAXLANES else C_DIM, C_BTN); a.x = PX + 22; a.y = yy + 8; g.append(a)
self._ovbtns.append((PX + 12, yy, PX + 12 + bw, yy + RH, self._edit_add))
g.append(rect(PX + PW - 12 - bw, yy, bw, RH - 6, C_BTN))
r, rw, rh = make_text("Remove", FONT_S, C_AMBER if len(self.lanes) > 1 else C_DIM, C_BTN); r.x = PX + PW - 12 - bw + 14; r.y = yy + 8; g.append(r)
self._ovbtns.append((PX + PW - 12 - bw, yy, PX + PW - 12, yy + RH, self._edit_remove))
yy += RH + 2
g.append(rect(PX + 12, yy, PW - 24, RH - 4, C_BTN))
d, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); d.x = PX + (PW - dw) // 2; d.y = yy + 5; g.append(d)
self._ovbtns.append((PX + 12, yy, PX + PW - 12, yy + RH, self._edit_done))
self.dirty = True
def _regen_levels(self, L): # default accents after a beats/sub change
sub = L['sub']; groups = L['groups']; starts = set(); acc = 0
for gp in groups: starts.add(acc); acc += gp
L['steps'] = sum(groups) * sub
L['levels'] = [(2 if (i // sub) in starts else 1) if i % sub == 0 else 0 for i in range(L['steps'])]
def _lane_dirty(self, structural):
if structural: self._regen_levels(self.lanes[self._edit_li])
self.build_grid()
if not self._dirty: self._dirty = True; self.draw_status()
self._draw_laneedit() # refresh the modal with the new values
def _edit_sound(self, d):
L = self.lanes[self._edit_li]; i = SOUNDS.index(L['sound']) if L['sound'] in SOUNDS else 0
L['sound'] = SOUNDS[(i + d) % len(SOUNDS)]; self._lane_dirty(False)
def _edit_beats(self, d):
L = self.lanes[self._edit_li]; L['groups'] = [max(1, min(12, sum(L['groups']) + d))]; self._lane_dirty(True)
def _edit_sub(self, d):
L = self.lanes[self._edit_li]; L['sub'] = max(1, min(8, L['sub'] + d)); self._lane_dirty(True)
def _edit_swing(self, d):
L = self.lanes[self._edit_li]; L['swing'] = not L['swing']; self._lane_dirty(False)
def _edit_mute(self, d):
L = self.lanes[self._edit_li]; L['mute'] = not L['mute']; self._lane_dirty(False)
def _edit_add(self):
if len(self.lanes) >= MAXLANES: return
self.lanes.insert(self._edit_li + 1, _parse_lane("beep:4")); self._edit_li += 1; self._lane_dirty(False)
def _edit_remove(self):
if len(self.lanes) <= 1: return
del self.lanes[self._edit_li]
if self._edit_li >= len(self.lanes): self._edit_li = len(self.lanes) - 1
self._lane_dirty(False)
def _edit_done(self):
self._close_overlay()
def _step_dur(self, L, step):
beat = 60_000_000_000 / self.bpm
if L['poly']: # ~ polymeter: fit this lane's whole cycle into lane 1's bar
m = self.lanes[0]; master_bar = beat * (m['steps'] // m['sub'])
return int(master_bar / L['steps'])
sub = L['sub']
if L['swing'] and sub % 2 == 0: # swing even subdivisions: long-short (2:1) pairs
pair = beat / (sub // 2)
return int(pair * 2 / 3) if (step % sub) % 2 == 0 else int(pair / 3)
return int(beat / sub) # straight: a step = one beat / subdivision
def _reset_clock(self):
now = time.monotonic_ns()
for L in self.lanes:
L['next'] = now; L['step'] = -1
self._m_steps = 0 # restart the bar count
self._seg_start = time.monotonic() # and the on-screen timer (resets with the bar counter)
# ---------- audio + light ----------
def click(self, level):
self.spk.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600)
self.spk.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000)
self.spk_off = time.monotonic_ns() + 22_000_000
def _led_base(self):
return LED_RUN if self.running else LED_IDLE # dim red while playing / dim green when stopped
def flash(self, level):
self.rgb = LEVEL_RGB.get(level, (0, 150, 255)) # bright beat pulse, fades back to the base in tick()
self.led.set(*self.rgb)
def led_rest(self): # settle to the resting colour (green idle / red running)
self.rgb = self._led_base()
self.led.set(*self.rgb)
def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer
if self.midi is None: return
try: self.midi.write(bytes([0x90 | ((MIDI_CHANNEL - 1) & 0x0F), note, vel])) # Note On, channel 1..16
except Exception: pass
# ---------- transport ----------
def toggle(self):
self.running = not self.running
if self.running:
self._reset_clock(); self._start_play()
self._clock_next = time.monotonic_ns() # start MIDI Clock Out from zero
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(bytes([0xFA])) # Start
except Exception: pass
else:
self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
try: self.midi.write(bytes([0xFC])) # Stop
except Exception: pass
self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped
def set_bpm(self, v):
v = max(30, min(300, v))
if v != self.bpm:
self.bpm = v
self.draw_bpm(); self.draw_meters() # total time depends on bpm
def goto(self, i):
was = self.running
if was: self.running = False; self._log_play() # close out the track that was playing
self.load(i)
if was: self.running = True; self._reset_clock(); self._start_play()
self.led_rest(); self.draw_meters()
def tap(self):
now = time.monotonic()
if not hasattr(self, '_taps'): self._taps = []
self._taps = [t for t in self._taps if now - t < 2.4]
self._taps.append(now)
if len(self._taps) >= 2:
span = (self._taps[-1] - self._taps[0]) / (len(self._taps) - 1)
if span > 0: self.set_bpm(round(60 / span))
# ---------- scheduler ----------
def tick(self):
now = time.monotonic_ns()
if self.spk_off and now >= self.spk_off: self.spk.duty_cycle = 0; self.spk_off = 0
if self.running:
fired = []
for li, L in enumerate(self.lanes):
if self._advance: break # seam armed - skip remaining lanes for THIS tick
adv = False
while now >= L['next']:
L['step'] = (L['step'] + 1) % L['steps']
if li == 0:
self._m_steps += 1 # count master-lane steps -> bars
nb = self._m_steps // L['steps']
if nb != self._lastbar: self._lastbar = nb; self._on_new_bar(nb)
if self._advance: break # seam armed - suppress this step's firing
if self.ramp and L['steps'] > 0: # CONTINUOUS ramp: interpolate bpm at every master step
mlen = L['steps']
bar_pos = self._m_steps / mlen
seg_bar = (bar_pos % self.bars) if self.bars else bar_pos
new_bpm = max(30, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt'])))
if new_bpm != self.bpm: self.bpm = new_bpm
lvl = 0 if L['mute'] else L['levels'][L['step']]
if lvl > 0:
fired.append(lvl)
if not self._muted: # gap trainer: silent during the rest bars
self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90))
L['next'] += self._step_dur(L, L['step']); adv = True
if adv and li < len(self.lane_pads): self._move_playhead(li, L['step'])
if fired and not self._muted:
best = max(fired, key=lambda l: PRIO.get(l, 0))
if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host):
self.click(best) # speaker silent if user muted it / auto-mute on + host present
self.flash(best)
base = self._led_base() # decay the beat pulse back down to the red running base
if self.rgb != base:
r = base[0] + (self.rgb[0]-base[0])*7//10
g = base[1] + (self.rgb[1]-base[1])*7//10
b = base[2] + (self.rgb[2]-base[2])*7//10
if abs(r-base[0])+abs(g-base[1])+abs(b-base[2]) < 6: r, g, b = base
self.rgb = (r, g, b); self.led.set(r, g, b)
if self._advance: # Continue: gapless swap to the prepared track at seam_t
self._advance = False
self._do_advance()
# MIDI Clock Out (master): 24 PPQN; interval follows the live bpm (so continuous ramps carry through)
if self.running and MIDI_CLOCK_OUT and self.midi is not None:
while now >= self._clock_next:
try: self.midi.write(bytes([0xF8]))
except Exception: pass
self._clock_next += int(60_000_000_000 / max(1, self.bpm) / 24)
def _on_new_bar(self, bar):
# Pre-parse the next track during the LAST bar of this segment, so the swap at the seam is allocation-free
if self.bars and self.continue_on and self._next_pending is None and bar == self.bars - 1:
self._prepare_next()
if self.bars and bar > 0 and bar % self.bars == 0: # segment boundary
self._seg_start = time.monotonic() # timer resets with the bar counter
if self.continue_on and self._next_pending is not None:
self._seam_t = self.lanes[0]['next'] # the wall-clock time of THIS boundary step
self._advance = True # tick() will swap to the prepared track
# Note: per-master-step continuous ramp handles the bpm reset implicitly (seg_bar wraps to 0)
t = self.trainer # gap trainer: silence during the rest bars
self._muted = bool(t and (t['play'] + t['mute']) and (bar % (t['play'] + t['mute'])) >= t['play'])
def _prepare_next(self): # parse the next playlist item into a side holder
items = self.setlists[self.sl]['items']
nxt = (self.idx + 1) % len(items)
if nxt == self.idx: return # 1-item playlist -> just loop, no swap
name, prog = items[nxt]
bpm, lanes, bars, ramp, trainer = parse_program(prog)
self._next_pending = {'lanes': lanes, 'bpm': bpm, 'bars': bars, 'ramp': ramp,
'trainer': trainer, 'name': name, 'idx': nxt}
def _do_advance(self): # gapless seam: swap the prepared track in at seam_t
n = self._next_pending
if n is None: return
self._next_pending = None
self.lanes = n['lanes']; self.bpm = n['bpm']; self.bars = n['bars']
self.ramp = n['ramp']; self.trainer = n['trainer']; self.name = n['name']; self.idx = n['idx']
self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False; self._m_steps = 0
self._dirty = False; self._overlay = None
while len(self.g_overlay): self.g_overlay.pop()
seam = self._seam_t
for L in self.lanes: L['next'] = seam; L['step'] = -1 # NEXT tick fires step 0 of the new track at seam_t
self._need_redraw = True # visuals (grid + draws) catch up on the next refresh
self._seg_start = time.monotonic() # reset the on-screen timer
self.led_rest()
# ---------- inputs ----------
def poll(self):
a = self.btnA.value
if (not a) and self._aPrev: self.toggle()
self._aPrev = a
b = self.btnB.value
if (not b) and self._bPrev: self.tap()
self._bPrev = b
now = time.monotonic_ns()
if now >= self._joyNext:
x = self.jx.value - 32768; y = self.jy.value - 32768
if JOY_INVERT_X: x = -x
if JOY_INVERT_Y: y = -y
if abs(y) > JOY_DEADZONE:
self.set_bpm(self.bpm + (1 if y > 0 else -1) * (5 if abs(y) > 26000 else 1))
self._joyNext = now + 70_000_000
elif abs(x) > JOY_DEADZONE:
self.goto(self.idx + (1 if x > 0 else -1)); self._joyNext = now + 350_000_000; return
else:
self._joyNext = now + 20_000_000
nowms = time.monotonic()
if nowms >= self._touchNext: # poll touch ~30x/s (the I2C read adds loop latency -> MIDI jitter)
self._touchNext = nowms + 0.033
pt = self.touch.read()
if pt:
self._touchSeen = nowms
if not self._touchDown:
self._touchDown = True; self._handle_tap(pt[0], pt[1])
elif self._touchDown and (nowms - self._touchSeen) > 0.14:
self._touchDown = False
# USB-MIDI in: any byte = a host is listening (heartbeat); also assemble SysEx (clock / pushed programs)
if self.midi_in is not None:
try: n = self.midi_in.readinto(self._mbuf)
except Exception: n = 0
if n:
self.last_midi_in = nowms
self._feed_midi(self._mbuf, n)
host = bool(self.last_midi_in) and (nowms - self.last_midi_in) < 1.0
if host != self.midi_host:
self.midi_host = host
if host and SPEAKER_AUTO_MUTE: self.spk.duty_cycle = 0 # auto-mute when the computer takes over
self.led_rest(); self.draw_icons()
uc = bool(getattr(supervisor.runtime, "usb_connected", True)) # connected to a computer?
if uc != self.usb_conn:
self.usb_conn = uc; self.draw_icons()
# ---------- drawing ----------
def draw_bpm(self): # lazy: skip the bitmap alloc if the displayed integer is unchanged
if self.bpm == self._displayed_bpm: return
self._displayed_bpm = self.bpm
self._place(self.g_bpm, str(self.bpm), 0, 44, C_TXT, C_BG, FONT_L, right_edge=WIDTH-12)
def draw_status(self): # set-list tab (tap=switch) + CONT toggle, above the item title
sl = self.setlists[self.sl]
# tab: playlist + position; muted = built-in (read-only), cyan = your own
self._place(self.g_idx, "%s %d/%d" % (sl['title'][:11], self.idx + 1, len(sl['items'])),
12, 118, C_MUTE if sl['builtin'] else C_CYAN, C_BG, FONT_S)
self._place(self.g_cont, "CONT", 0, 118, C_GREEN if self.continue_on else C_DIM, C_BG, FONT_S, right_edge=WIDTH-12)
# title turns red when edited (tap it to save/revert)
self._place(self.g_name, self.name[:20], 12, 134, C_RED if self._dirty else C_TXT, C_BG, FONT_M)
def draw_train(self): # ramp + gap-trainer indicators (symbol + params), when set
g = self.g_train
while len(g): g.pop()
x = 12; y = 100
if self.ramp:
up = self.ramp['amt'] >= 0
pts = [(0, 9), (12, 9), (12, 0)] if up else [(0, 0), (0, 9), (12, 9)] # rising / falling ramp
g.append(vectorio.Polygon(pixel_shader=solid(C_AMBER), points=pts, x=x, y=y)); x += 16
a = self.ramp['amt']; lbl = ("+%d" % a if a >= 0 else "%d" % a) + "/%db" % self.ramp['every']
tg, w, h = make_text(lbl, FONT_S, C_AMBER, C_BG); tg.x = x; tg.y = y; g.append(tg); x += w + 14
if self.trainer:
g.append(rect(x, y, 4, 9, C_CYAN)); g.append(rect(x + 6, y, 4, 9, C_DIM)) # play | rest
x += 14
tg, w, h = make_text("%d/%db" % (self.trainer['play'], self.trainer['mute']), FONT_S, C_CYAN, C_BG)
tg.x = x; tg.y = y; g.append(tg)
self.dirty = True
def draw_icons(self): # recolor the MIDI/USB icons by state (tear-free palette swap)
if self.ic_midi_pal is not None:
_recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG)
if self.ic_usb_pal is not None:
_recolor(self.ic_usb_pal, C_CYAN if self.usb_conn else C_DIM, C_BG)
self.dirty = True
def _fmt_t(self, s): # m:ss, or h:mm:ss past an hour
s = int(s)
return "%d:%02d:%02d" % (s // 3600, (s % 3600) // 60, s % 60) if s >= 3600 else "%d:%02d" % (s // 60, s % 60)
def draw_meters(self): # running time [of total] + bar [of total]; ~4x/s from run()
run = self.running and self.play_start is not None
mlen = self.lanes[0]['steps'] if self.lanes else 1
bpb = (self.lanes[0]['steps'] // max(1, self.lanes[0]['sub'])) if self.lanes else 4
el = (time.monotonic() - self._seg_start) if run else 0 # time within the current segment (resets with the bar)
mbars = self._m_steps // max(1, mlen) # whole master bars elapsed
cur = ("%d" % ((mbars % self.bars + 1) if self.bars else (mbars + 1))) if run else "-" # cycle 1..N
if self.bars: # track has a length (b<n>): show "X of TOTAL"
ts = "%s of %s" % (self._fmt_t(el), self._fmt_t(self.bars * bpb * 60.0 / self.bpm))
bs = "bar %s of %d" % (cur, self.bars)
else:
ts = self._fmt_t(el); bs = "bar %s" % cur
if ts != self._lastTs:
self._place(self.g_time, ts, 12, 50, C_TXT, C_BG, FONT_M); self._lastTs = ts
if bs != self._lastBs:
self._place(self.g_bar, bs, 12, 78, C_MUTE, C_BG, FONT_M); self._lastBs = bs
# ---------- pad grid (each lane = a row of step pads; playhead lit as it plays) ----------
def _padbase(self, L, s):
return 0 if L['mute'] else L['levels'][s]
def build_grid(self):
while len(self.g_grid): self.g_grid.pop()
self.lane_pads = []; self.lane_lit = []
n = min(len(self.lanes), MAXLANES)
top = GRID_TOP; rowh = min(40, ((LOG_TOP - 10) - top) // max(1, n))
px0 = 60; usable = WIDTH - 8 - px0 - 12; gridh = n * rowh
self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} # for touch hit-testing
# vertical gridlines at the master lane's beats, full height -> beats line up across lanes
m = self.lanes[0]; mbeats = max(1, m['steps'] // max(1, m['sub']))
for bcol in range(mbeats):
self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID))
for li in range(n):
L = self.lanes[li]; y = top + li * rowh; cy = y + rowh // 2
tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG)
tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg)
steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps)
side = max(5, min(15, stepw - 1, rowh - 6)) # square edge for the main pulse
rad = max(2, min(side // 2, stepw // 2 - 1)) # smaller circle for subdivisions
pads = []
for s in range(steps):
cxp = px0 + 6 + (s * usable) // steps # proportional -> beats line up across lanes
if s % sub == 0: # main beat -> square
p = vectorio.Rectangle(pixel_shader=self.pad_pal, width=side, height=side,
x=cxp - side // 2, y=cy - side // 2)
else: # subdivision -> circle
p = vectorio.Circle(pixel_shader=self.pad_pal, radius=rad, x=cxp, y=cy)
p.color_index = self._padbase(L, s); self.g_grid.append(p); pads.append(p)
self.lane_pads.append(pads); self.lane_lit.append(-1)
self.dirty = True
def _move_playhead(self, li, step):
pads = self.lane_pads[li]; prev = self.lane_lit[li]
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
if step < len(pads): pads[step].color_index = self._padbase(self.lanes[li], step) + 4
self.lane_lit[li] = step; self.dirty = True
def reset_playheads(self):
for li, pads in enumerate(self.lane_pads):
prev = self.lane_lit[li]
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
self.lane_lit[li] = -1
self.dirty = True
# ---------- practice history (saved to /history.json, next to programs.json) ----------
def _probe_write(self):
try:
with open("/.wtest", "w") as f: f.write("1")
try: os.remove("/.wtest")
except Exception: pass
return True
except OSError:
return False # editor mode: the computer owns the FS
def _load_log(self):
try:
with open("/history.json") as f: return json.load(f).get("log", [])
except Exception:
return []
def _save_log(self):
if not self.can_write: return
try:
with open("/history.json", "w") as f: json.dump({"log": self.log[:200]}, f)
except OSError:
self.can_write = False
def _start_play(self):
self.play_start = time.monotonic(); self.play_bpm = self.bpm; self.play_name = self.name
def _log_play(self):
if self.play_start is None: return
dur = int(time.monotonic() - self.play_start); self.play_start = None
if dur < MIN_LOG_SEC: return # skip plays under 5 seconds
mlen = self.lanes[0]['steps'] if self.lanes else 1
t = time.localtime()
self.log.insert(0, {"t": "%02d:%02d" % (t.tm_hour, t.tm_min), "bpm": self.play_bpm,
"dur": dur, "bars": self._m_steps // max(1, mlen), "name": self.play_name})
del self.log[200:]; self._armed = None
self._save_log(); self.draw_log()
def draw_log(self):
g = self.g_log
while len(g): g.pop()
self.log_rows = []
hdr, w, h = make_text("PRACTICE LOG - THIS TRACK", FONT_S, C_MUTE, C_BG); hdr.x = 10; hdr.y = LOG_TOP; g.append(hdr)
rows = [(i, e) for i, e in enumerate(self.log) if e.get("name") == self.name] # current track only
if not rows:
tg, w, h = make_text("no plays over 5s yet", FONT_S, C_DIM, C_BG); tg.x = 10; tg.y = LOG_TOP + LOG_ROWH; g.append(tg)
self.dirty = True; return
y = LOG_TOP + LOG_ROWH + 2
for k in range(min(LOG_ROWS, len(rows))):
oi, e = rows[k]; armed = (oi == self._armed) # oi = index into self.log (for delete)
dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60)
bars = e.get("bars", 0); bstr = (" %dbar" % bars) if bars else ""
line = "%s%s %3dbpm %s%s" % ("x " if armed else "", e.get("t", "--:--"), e["bpm"], dur, bstr)
tg, w, h = make_text(line, FONT_S, C_AMBER if armed else C_TXT, C_BG); tg.x = 10; tg.y = y; g.append(tg)
self.log_rows.append((y - 2, y + LOG_ROWH - 2, oi))
y += LOG_ROWH
self.dirty = True
def _tap_log(self, x, ty):
for y0, y1, idx in self.log_rows:
if y0 <= ty <= y1:
if self._armed == idx: del self.log[idx]; self._armed = None; self._save_log(); self.draw_log() # confirm delete
else: self._armed = idx; self.draw_log() # arm (tap again)
return
if self._armed is not None: self._armed = None; self.draw_log() # tapped elsewhere -> cancel
# ---------- USB-MIDI in: SysEx assembler (clock + editor-pushed programs) ----------
def _feed_midi(self, buf, n):
for i in range(n):
b = buf[i]
if b == 0xF0: self._sx = bytearray(); self._sxon = True
elif b == 0xF7:
if self._sxon: self._handle_sysex(self._sx)
self._sxon = False
elif b >= 0xF8: pass # real-time (e.g. Active Sensing 0xFE) - ignore
elif self._sxon:
if len(self._sx) < 60000: self._sx.append(b) # big enough for a pushed firmware (app.py)
else: self._sxon = False # overflow guard
def _handle_sysex(self, sx):
if len(sx) < 2 or sx[0] != 0x7D: return # 0x7D = our (educational) manufacturer id
cmd = sx[1]
if cmd == 0x01 and len(sx) >= 8 and rtc is not None: # set clock: yr-2000, mo, dd, hh, mm, ss
try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1))
except Exception: pass
elif cmd == 0x02: # version query -> reply 0x03 + APP_VERSION
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x03]) + APP_VERSION.encode() + bytes([0xF7]))
elif cmd == 0x10: # write /programs.json (user playlists) pushed from the editor
try:
with open("/programs.json", "wb") as f: f.write(bytes(sx[2:]))
self.rebuild_setlists(); self.load(0) # built-ins untouched; show the refreshed lists
self._ack(True)
except Exception:
self._ack(False) # read-only (editor mode) etc.
# A/B firmware update, sent as small flow-controlled chunks (a single huge SysEx overruns the
# USB-MIDI input buffer and arrives corrupt). begin(0x21,len) -> data(0x22)* -> commit(0x23).
elif cmd == 0x21: # BEGIN firmware transfer: open the .mpy staging file
try:
try: self._fw.close()
except Exception: pass
self._fw = open("/app.new", "wb"); self._ack(True)
except Exception: # read-only (editor mode) / no space
self._fw = None; self._ack(False)
elif cmd == 0x22: # DATA: a base64 chunk (multiple of 4) -> decode -> append
try:
if self._fw is None or a2b_base64 is None: raise OSError()
self._fw.write(a2b_base64(bytes(sx[2:]))); self._ack(True)
except Exception:
try: self._fw.close()
except Exception: pass
self._fw = None; self._ack(False)
elif cmd == 0x23: # COMMIT: verify it's a CircuitPython .mpy, then A/B install
try:
try: self._fw.close()
except Exception: pass
self._fw = None; gc.collect()
with open("/app.new", "rb") as f: head = f.read(2)
if os.stat("/app.new")[6] < 4000 or len(head) < 2 or head[0] != 0x43 or head[1] != 0x06:
try: os.remove("/app.new") # not a CircuitPython mpy v6 -> reject, keep the working build
except OSError: pass
self._ack(False); return
try: os.remove("/app.bak")
except OSError: pass
os.rename("/app.mpy", "/app.bak") # current build becomes the rollback
os.rename("/app.new", "/app.mpy")
open("/trial", "w").close() # arm the trial; the loader reverts if it won't boot
self._ack(True); time.sleep(0.4); supervisor.reload()
except Exception: # catch ALL (read-only, MemoryError, ...) -> never brick
self._ack(False)
def _ack(self, ok):
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F if ok else 0x7E, 0xF7]))
def run(self):
if self.touch.addr is None:
print("GT911 touch not found")
boot = time.monotonic()
try: os.stat("/trial"); committed = False # we're a freshly-pushed build on trial
except OSError: committed = True
while True:
self.tick(); self.poll()
if self._need_redraw: # post-seam: visuals catch up AFTER the audio swap
self._need_redraw = False
self.draw_bpm(); self.draw_status(); self.draw_train()
self.build_grid(); self.draw_log(); self.draw_meters()
tnow = time.monotonic()
if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter
self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp
if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update
try: os.remove("/trial")
except Exception: pass
committed = True
# Refresh at most ~30x/s. display.refresh() BLOCKS while it streams pixels over SPI, which
# would otherwise delay the next beat's MIDI note and make the audio stutter; throttling it
# keeps the click timing tight (the visuals lag a few ms, which is imperceptible).
if self.dirty and tnow >= self._refreshNext:
if self.display.refresh(): self.dirty = False
self._refreshNext = tnow + 0.033
time.sleep(0.0005)
App().run()