Firmware (pico-cp/): the Pico now owns its filesystem by default (boot.py), so it can save the
practice log and write editor-pushed set lists; the drive is read-only to the computer, which also
protects the firmware. Hold button A at power-on for editor mode (drive writable; universal drag).
- Replaced the on-screen touch buttons with an on-device PRACTICE LOG (time · BPM · duration ·
track), newest-first, persisted to /history.json next to programs.json. Plays < 5s aren't logged;
tap a row twice to delete it. Real timestamps once the editor syncs the clock.
- USB-MIDI SysEx receiver: clock-set (0x01 -> RTC) and program-push (0x10 -> write programs.json,
reload, ACK/NAK). disable autoreload so our own writes never self-restart.
- Fixed swing: the parser was discarding the 's' flag, so /2s never swung. Now the scheduler uses a
per-step duration with long-short (2:1, SWING_RATIO 2/3) pairs on even subdivisions, matching the
web engine. Verified: ride:4/2s -> 266/133ms vs straight 200/200.
Editor (editor.html): requestMIDIAccess({sysex:true}); Save to device now pushes programs.json as
SysEx to the device (+ clock sync), waits for ACK, shows "Saved ✓", and falls back to downloading the
file (drag onto the drive in editor mode) when no device answers. Heartbeat also keeps the clock synced.
Web MIDI works in Chromium AND Firefox; the drag fallback covers any browser/OS incl. Safari.
Docs (pico-cp/README, info-kit, README) updated for the two modes, push programming, and the log.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
626 lines
30 KiB
Python
626 lines
30 KiB
Python
# VARASYS PolyMeter — PM_K-1 "Kit" firmware (CircuitPython edition)
|
||
# Raspberry Pi Pico (Pico / Pico W / Pico 2) on the 52Pi EP-0172 "Pico Breadboard Kit Plus":
|
||
# 3.5" ST7796 320x480 cap-touch (GT911), PSP joystick, WS2812 RGB, buzzer, 2 buttons.
|
||
#
|
||
# WHY CIRCUITPYTHON: the board then mounts as a USB drive (CIRCUITPY) carrying this code, your
|
||
# tracks (programs.json) and a copy of the editor — edit on the web, "Save to device" writes
|
||
# programs.json here, and CircuitPython auto-reloads with the new grooves. It also sends USB-MIDI
|
||
# (a note per click) so the web editor can play it out the computer's speakers ("Device audio").
|
||
# Runs the SAME program strings as metronome.varasys.io.
|
||
#
|
||
# INSTALL: flash CircuitPython (https://circuitpython.org/board/raspberry_pi_pico/), then copy
|
||
# this file as code.py plus programs.json onto the CIRCUITPY drive. It runs on boot.
|
||
#
|
||
# Fallback: the simpler MicroPython firmware (pico/main.py) is always available — BOOTSEL +
|
||
# drag a MicroPython .uf2 to go back. The Pico cannot be bricked.
|
||
#
|
||
# Untested-panel notes & calibration flags are in CONFIG + pico-cp/README.md.
|
||
|
||
import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor
|
||
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
|
||
try:
|
||
import rtc # set from the editor's clock SysEx so the log has real timestamps
|
||
except ImportError:
|
||
rtc = None
|
||
try: # CircuitPython 9.x
|
||
from fourwire import FourWire
|
||
from busdisplay import BusDisplay
|
||
except ImportError: # CircuitPython 8.x
|
||
from displayio import FourWire
|
||
from displayio import Display as BusDisplay
|
||
try:
|
||
import neopixel_write # core module on RP2040 — drives WS2812 with no external library
|
||
except ImportError:
|
||
neopixel_write = None
|
||
try:
|
||
import usb_midi # default-enabled on RP2040 — sends a MIDI note per click to the computer
|
||
except ImportError:
|
||
usb_midi = None
|
||
|
||
# ============================== CONFIG (tweak if needed) ==============================
|
||
SPI_BAUD = 62_500_000 # faster SPI = smaller tearing window; drop to 40_000_000 if unstable
|
||
LED_BRIGHTNESS = 0.15 # WS2812 sits right next to you — keep it dim (0..1)
|
||
MIDI_ENABLED = True # send a USB-MIDI note per click (play via the web editor's "Device audio")
|
||
MUTE_BUZZER = False # silence the on-board buzzer (e.g. when using computer audio)
|
||
WIDTH, HEIGHT = 320, 480
|
||
MADCTL = 0x48 # portrait; 0x48 swaps R/B for this BGR panel (cyan reads cyan). Use 0x40 if reversed.
|
||
INVERT_COLORS = True # most ST7796 modules need inversion ON; set False if colours look negative
|
||
# Touch (GT911) — flip if taps land wrong:
|
||
TOUCH_SWAP_XY = False
|
||
TOUCH_INVERT_X = False
|
||
TOUCH_INVERT_Y = False
|
||
TOUCH_DEBUG = False
|
||
# Joystick:
|
||
JOY_INVERT_X = False
|
||
JOY_INVERT_Y = False
|
||
JOY_DEADZONE = 9000
|
||
|
||
# ----- pins (fixed by the EP-0172 board) -----
|
||
P_SCK, P_MOSI, P_CS, P_DC, P_RST = board.GP2, board.GP3, board.GP5, board.GP6, board.GP7
|
||
P_SDA, P_SCL = board.GP8, board.GP9
|
||
P_RGB, P_BUZ, P_BTNA, P_BTNB = board.GP12, board.GP13, board.GP15, board.GP14
|
||
P_JOYX, P_JOYY = board.GP26, board.GP27
|
||
|
||
# ----- baked default grooves (used only if programs.json is missing/bad) -----
|
||
DEFAULT_PROGRAMS = [
|
||
("Four on the floor", "t120;kick:4;snare:4=.x.x;hatClosed:4/2"),
|
||
("Swing ride", "t150;ride:4/2s;kick:4=X..x;snare:4=.x.x"),
|
||
("7/8 (2+2+3)", "t130;kick:2+2+3=x..x..x;hatClosed:2+2+3/2"),
|
||
("5 over 4", "t100;kick:4;claves:5~"),
|
||
("Straight click", "t120;beep:4"),
|
||
]
|
||
|
||
# ============================== COLOURS (0xRRGGBB; displayio handles 565) ==============================
|
||
C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494
|
||
C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240
|
||
C_BTN = 0x1C222C
|
||
LEVEL_RGB = {2: (255, 110, 0), 1: (0, 150, 255), 3: (130, 70, 255)}
|
||
# voice -> General-MIDI note (USB-MIDI bridge), and level -> MIDI velocity
|
||
SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38,
|
||
"clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42,
|
||
"hatOpen":46,"openHat808":46, "ride":51,"ride909":51, "crash":49,"crash909":49,
|
||
"tomLow":41,"tom808":45,"tomMid":45,"tomHigh":48, "tambourine":54,
|
||
"cowbell":56,"cowbell808":56, "woodblock":76,"jamblock":76, "claves":75, "beep":37}
|
||
GM_DEFAULT = 37
|
||
MIDI_VEL = {2: 120, 1: 90, 3: 45} # accent / normal / ghost
|
||
MAXLANES = 5 # lanes shown on the pad grid (extras still play)
|
||
LOG_TOP, LOG_ROWH, LOG_ROWS = 302, 16, 9 # practice-history log area (below the pad grid)
|
||
MIN_LOG_SEC = 5 # don't log plays shorter than this
|
||
PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost
|
||
PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost
|
||
|
||
# WS2812 RGB LED — self-contained via the core neopixel_write module (no external library)
|
||
class RGB:
|
||
def __init__(self, pin):
|
||
self.ok = neopixel_write is not None
|
||
if self.ok:
|
||
self.io = digitalio.DigitalInOut(pin); self.io.direction = digitalio.Direction.OUTPUT
|
||
self.buf = bytearray(3)
|
||
def set(self, r, g, b):
|
||
if not self.ok: return
|
||
# WS2812 wants GRB order; scale down so it isn't blinding
|
||
self.buf[0] = int(g * LED_BRIGHTNESS); self.buf[1] = int(r * LED_BRIGHTNESS); self.buf[2] = int(b * LED_BRIGHTNESS)
|
||
try: neopixel_write.neopixel_write(self.io, self.buf)
|
||
except Exception: self.ok = False
|
||
|
||
# ============================== ANTI-ALIASED FONTS (binary blobs on the drive; see pico/gen_font.py) ==============================
|
||
def load_font(path):
|
||
with open(path, "rb") as f:
|
||
blob = f.read()
|
||
count = blob[0]; p = 1; pixoff = 1 + count * 7; glyphs = {}
|
||
for _ in range(count):
|
||
cp = (blob[p] << 8) | blob[p+1]; w = blob[p+2]; h = blob[p+3]
|
||
xoff = blob[p+4]; xoff = xoff - 256 if xoff > 127 else xoff
|
||
top = blob[p+5]; adv = blob[p+6]; p += 7
|
||
glyphs[cp] = (w, h, xoff, top, adv, pixoff); pixoff += (w * h + 1) // 2
|
||
return (glyphs, blob)
|
||
|
||
FONT_S = load_font("/font_s.bin") # small — pad-grid lane labels
|
||
FONT_M = load_font("/font_m.bin") # labels / buttons
|
||
FONT_L = load_font("/font_l.bin") # big BPM
|
||
gc.collect()
|
||
|
||
def _blend(bg, fg, i):
|
||
t = i * 17
|
||
r = (((bg >> 16) & 0xFF)*(255-t) + ((fg >> 16) & 0xFF)*t) // 255
|
||
g = (((bg >> 8) & 0xFF)*(255-t) + ((fg >> 8) & 0xFF)*t) // 255
|
||
b = ((bg & 0xFF)*(255-t) + (fg & 0xFF)*t) // 255
|
||
return (r << 16) | (g << 8) | b
|
||
|
||
def make_text(s, font, fg, bg):
|
||
"""Render a string into a displayio TileGrid (anti-aliased via a 16-step blend palette)."""
|
||
glyphs, blob = font
|
||
w = 0; top0 = 999; bot = 0
|
||
for c in s:
|
||
g = glyphs.get(ord(c))
|
||
if not g: continue
|
||
w += g[4]
|
||
if g[1]:
|
||
if g[3] < top0: top0 = g[3]
|
||
if g[3] + g[1] > bot: bot = g[3] + g[1]
|
||
if top0 == 999: top0 = 0
|
||
w = max(1, w); h = max(1, bot - top0)
|
||
gc.collect()
|
||
bmp = displayio.Bitmap(w, h, 16)
|
||
pal = displayio.Palette(16)
|
||
for i in range(16): pal[i] = _blend(bg, fg, i)
|
||
pen = 0
|
||
for c in s:
|
||
g = glyphs.get(ord(c))
|
||
if not g: continue
|
||
gw, gh, xoff, gtop, adv, off = g
|
||
for j in range(gh):
|
||
row = (gtop - top0) + j
|
||
for i in range(gw):
|
||
k = j * gw + i
|
||
byte = blob[off + (k >> 1)]
|
||
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
|
||
if nib:
|
||
x = pen + xoff + i
|
||
if 0 <= x < w and 0 <= row < h: bmp[x, row] = nib
|
||
pen += adv
|
||
return displayio.TileGrid(bmp, pixel_shader=pal), w, h
|
||
|
||
# ============================== POLYMETER ENGINE (same semantics as the web/MicroPython) ==============================
|
||
PAT = {'X': 2, 'x': 1, 'g': 3, '.': 0, '-': 0, '_': 0}
|
||
PRIO = {2: 3, 1: 2, 3: 1}
|
||
|
||
def parse_program(s):
|
||
bpm = 120; lanes = []
|
||
for tok in s.strip().split(';'):
|
||
tok = tok.strip()
|
||
if not tok: continue
|
||
if tok[0] == 't' and tok[1:].isdigit():
|
||
bpm = int(tok[1:]); continue
|
||
if ':' not in tok: continue
|
||
lane = _parse_lane(tok)
|
||
if lane: lanes.append(lane)
|
||
if not lanes: lanes = [_parse_lane("beep:4")]
|
||
return max(30, min(300, bpm)), lanes
|
||
|
||
def _parse_lane(tok):
|
||
poly = '~' in tok; mute = '!' in tok
|
||
tok = tok.replace('~', '').replace('!', '')
|
||
if '@' in tok: tok = tok.split('@')[0]
|
||
sound, _, rest = tok.partition(':')
|
||
pattern = None
|
||
if '=' in rest: rest, _, pattern = rest.partition('=')
|
||
sub = 1; swing = False
|
||
if '/' in rest:
|
||
rest, _, sd = rest.partition('/')
|
||
swing = sd.endswith('s'); sd = sd.rstrip('s') # "/2s" = swung eighths
|
||
sub = int(sd) if sd.isdigit() else 1
|
||
groups = [int(g) for g in rest.split('+') if g.isdigit()] or [4]
|
||
beats = sum(groups); starts = set(); acc = 0
|
||
for gp in groups: starts.add(acc); acc += gp
|
||
steps = beats * sub
|
||
if pattern:
|
||
levels = [PAT.get(ch, 0) for ch in pattern]
|
||
if len(levels) < steps: levels += [0] * (steps - len(levels))
|
||
steps = len(levels)
|
||
else:
|
||
levels = []
|
||
for i in range(steps):
|
||
if i % sub == 0: levels.append(2 if (i // sub) in starts else 1)
|
||
else: levels.append(0)
|
||
return {'sound': sound, 'sub': sub, 'swing': swing, 'steps': steps, 'levels': levels, 'poly': poly, 'mute': mute}
|
||
|
||
def load_programs():
|
||
try:
|
||
with open("/programs.json") as f:
|
||
d = json.load(f)
|
||
progs = [(p["name"], p["prog"]) for p in d["programs"]]
|
||
if progs: return progs
|
||
except Exception as e:
|
||
print("programs.json:", e)
|
||
return DEFAULT_PROGRAMS
|
||
|
||
# ============================== GT911 TOUCH ==============================
|
||
class GT911:
|
||
def __init__(self, i2c):
|
||
self.i2c = i2c; self.addr = None
|
||
while not i2c.try_lock(): pass
|
||
try: found = i2c.scan()
|
||
finally: i2c.unlock()
|
||
for a in (0x5D, 0x14):
|
||
if a in found: self.addr = a; break
|
||
if self.addr is None and found: self.addr = found[0]
|
||
def _rd(self, reg, n):
|
||
b = bytearray(n)
|
||
while not self.i2c.try_lock(): pass
|
||
try:
|
||
self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF]))
|
||
self.i2c.readfrom_into(self.addr, b)
|
||
finally: self.i2c.unlock()
|
||
return b
|
||
def _wr(self, reg, val):
|
||
while not self.i2c.try_lock(): pass
|
||
try: self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF, val]))
|
||
finally: self.i2c.unlock()
|
||
def read(self):
|
||
if self.addr is None: return None
|
||
try: st = self._rd(0x814E, 1)[0]
|
||
except OSError: return None
|
||
if not (st & 0x80): return None
|
||
n = st & 0x0F; pt = None
|
||
if n >= 1:
|
||
b = self._rd(0x8150, 4); tx = b[0] | (b[1] << 8); ty = b[2] | (b[3] << 8)
|
||
pt = self._map(tx, ty)
|
||
try: self._wr(0x814E, 0)
|
||
except OSError: pass
|
||
return pt
|
||
def _map(self, tx, ty):
|
||
if TOUCH_DEBUG: print("touch raw", tx, ty)
|
||
if TOUCH_SWAP_XY: tx, ty = ty, tx
|
||
if TOUCH_INVERT_X: tx = WIDTH - 1 - tx
|
||
if TOUCH_INVERT_Y: ty = HEIGHT - 1 - ty
|
||
if 0 <= tx < WIDTH and 0 <= ty < HEIGHT: return (tx, ty)
|
||
return None
|
||
|
||
# ============================== DISPLAY SETUP ==============================
|
||
def st7796_init():
|
||
inv = b'\x21\x00' if INVERT_COLORS else b'\x20\x00'
|
||
return (
|
||
b'\x01\x80\x78' # SWRESET + 120ms
|
||
b'\x11\x80\x78' # SLPOUT + 120ms
|
||
b'\xF0\x01\xC3' b'\xF0\x01\x96' # command-set unlock
|
||
+ bytes([0x36, 0x01, MADCTL]) +
|
||
b'\x3A\x01\x55' # 16bpp
|
||
b'\xB4\x01\x01'
|
||
b'\xB6\x03\x80\x02\x3B'
|
||
b'\xE8\x08\x40\x8A\x00\x00\x29\x19\xA5\x33'
|
||
b'\xC1\x01\x06' b'\xC2\x01\xA7'
|
||
b'\xC5\x81\x18\x78' # VCOM + 120ms
|
||
b'\xE0\x0E\xF0\x09\x0B\x06\x04\x15\x2F\x54\x42\x3C\x17\x14\x18\x1B'
|
||
b'\xE1\x0E\xE0\x09\x0B\x06\x04\x03\x2B\x43\x42\x3B\x16\x14\x17\x1B'
|
||
b'\xF0\x01\x3C' b'\xF0\x81\x69\x78' # lock + 120ms
|
||
+ inv +
|
||
b'\x29\x80\x32' # DISPON + 50ms
|
||
)
|
||
|
||
def make_display():
|
||
displayio.release_displays()
|
||
spi = busio.SPI(clock=P_SCK, MOSI=P_MOSI)
|
||
bus = FourWire(spi, command=P_DC, chip_select=P_CS, reset=P_RST, baudrate=SPI_BAUD)
|
||
return BusDisplay(bus, st7796_init(), width=WIDTH, height=HEIGHT, auto_refresh=False)
|
||
|
||
def solid(color):
|
||
p = displayio.Palette(1); p[0] = color; return p
|
||
|
||
def rect(x, y, w, h, color):
|
||
return vectorio.Rectangle(pixel_shader=solid(color), width=w, height=h, x=x, y=y)
|
||
|
||
# ============================== APP ==============================
|
||
class App:
|
||
def __init__(self):
|
||
self.display = make_display()
|
||
self.i2c = busio.I2C(scl=P_SCL, sda=P_SDA, frequency=400_000)
|
||
self.touch = GT911(self.i2c)
|
||
self.midi = usb_midi.ports[1] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 1) else None
|
||
self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None
|
||
self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0
|
||
self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler (clock + pushed programs)
|
||
self.led = RGB(P_RGB)
|
||
self.buz = pwmio.PWMOut(P_BUZ, frequency=1600, variable_frequency=True, duty_cycle=0)
|
||
self.buz_off = 0
|
||
self.btnA = self._btn(P_BTNA); self.btnB = self._btn(P_BTNB)
|
||
self._aPrev = True; self._bPrev = True
|
||
self.jx = analogio.AnalogIn(P_JOYX); self.jy = analogio.AnalogIn(P_JOYY)
|
||
self._joyNext = 0
|
||
self._touchDown = False; self._touchSeen = 0
|
||
self.running = False; self.bpm = 120; self.idx = 0; self.lanes = []; self.rgb = (0, 0, 0)
|
||
self.programs = load_programs()
|
||
self.dirty = True
|
||
self.pad_pal = displayio.Palette(8)
|
||
for i in range(4): self.pad_pal[i] = PAD_DIM[i]; self.pad_pal[i + 4] = PAD_LIT[i]
|
||
self.lane_pads = []; self.lane_lit = []
|
||
# practice history — persisted to /history.json (next to programs.json) when we own the filesystem
|
||
self.can_write = self._probe_write()
|
||
self.log = self._load_log()
|
||
self.play_start = None; self.play_bpm = 0; self.play_name = ""
|
||
self._armed = None; self.log_rows = []
|
||
self._build_scene()
|
||
self.load(0)
|
||
self.draw_log()
|
||
|
||
def _btn(self, pin):
|
||
d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP
|
||
return d
|
||
|
||
# ---------- scene graph ----------
|
||
def _build_scene(self):
|
||
root = displayio.Group(); self.display.root_group = root
|
||
root.append(rect(0, 0, WIDTH, HEIGHT, C_BG))
|
||
tg, w, h = make_text("PM_K-1 KIT", FONT_M, C_CYAN, C_BG); tg.x = 12; tg.y = 8; root.append(tg)
|
||
root.append(rect(0, 38, WIDTH, 2, C_PANEL))
|
||
# dynamic groups
|
||
self.g_bpm = displayio.Group(); root.append(self.g_bpm) # big tempo (right)
|
||
self.g_run = displayio.Group(); root.append(self.g_run) # RUN / STOP (left)
|
||
self.g_name = displayio.Group(); root.append(self.g_name) # item index + name
|
||
self.g_midi = displayio.Group(); root.append(self.g_midi) # "MIDI" indicator (top-right) when a host is listening
|
||
self.g_grid = displayio.Group(); root.append(self.g_grid) # lanes × step pads
|
||
root.append(rect(0, LOG_TOP - 6, WIDTH, 2, C_PANEL)) # divider above the history log
|
||
self.g_log = displayio.Group(); root.append(self.g_log) # practice history (tap a row to delete)
|
||
# (no on-screen buttons — transport is the joystick + buttons A/B; touch deletes log rows)
|
||
|
||
def _place(self, group, s, x, y, fg, bg, font, right_edge=None):
|
||
while len(group): group.pop()
|
||
self.dirty = True
|
||
if not s: return
|
||
tg, w, h = make_text(s, font, fg, bg)
|
||
tg.x = (right_edge - w) if right_edge is not None else x; tg.y = y; group.append(tg)
|
||
def _center(self, group, s, cx, cy, fg, bg, font):
|
||
while len(group): group.pop()
|
||
tg, w, h = make_text(s, font, fg, bg); tg.x = cx - w//2; tg.y = cy - h//2; group.append(tg)
|
||
self.dirty = True
|
||
|
||
# ---------- program ----------
|
||
def load(self, i):
|
||
n = len(self.programs); self.idx = i % n
|
||
self.name, prog = self.programs[self.idx]
|
||
self.bpm, self.lanes = parse_program(prog)
|
||
self.master = self.lanes[0]
|
||
self._reset_clock(); self.draw_bpm(); self.draw_status(); self.build_grid()
|
||
def _step_dur(self, L, step):
|
||
beat = 60_000_000_000 / self.bpm
|
||
if L['poly']: # ~ polymeter: fit this lane's whole cycle into lane 1's bar
|
||
m = self.lanes[0]; master_bar = beat * (m['steps'] // m['sub'])
|
||
return int(master_bar / L['steps'])
|
||
sub = L['sub']
|
||
if L['swing'] and sub % 2 == 0: # swing even subdivisions: long–short (2:1) pairs
|
||
pair = beat / (sub // 2)
|
||
return int(pair * 2 / 3) if (step % sub) % 2 == 0 else int(pair / 3)
|
||
return int(beat / sub) # straight: a step = one beat / subdivision
|
||
def _reset_clock(self):
|
||
now = time.monotonic_ns()
|
||
for L in self.lanes:
|
||
L['next'] = now; L['step'] = -1
|
||
|
||
# ---------- audio + light ----------
|
||
def click(self, level):
|
||
self.buz.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600)
|
||
self.buz.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000)
|
||
self.buz_off = time.monotonic_ns() + 22_000_000
|
||
def flash(self, level):
|
||
self.rgb = LEVEL_RGB.get(level, (0, 150, 255))
|
||
self.led.set(*self.rgb)
|
||
def led_off(self):
|
||
self.rgb = (0, 0, 0)
|
||
self.led.set(0, 0, 0)
|
||
def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer
|
||
if self.midi is None: return
|
||
try: self.midi.write(bytes([0x90, note, vel])) # Note On (percussive — no Note Off needed)
|
||
except Exception: pass
|
||
|
||
# ---------- transport ----------
|
||
def toggle(self):
|
||
self.running = not self.running
|
||
if self.running: self._reset_clock(); self._start_play()
|
||
else: self.buz.duty_cycle = 0; self.led_off(); self.reset_playheads(); self._log_play()
|
||
self.draw_status()
|
||
def set_bpm(self, v):
|
||
v = max(30, min(300, v))
|
||
if v != self.bpm:
|
||
self.bpm = v
|
||
self.draw_bpm()
|
||
def goto(self, i):
|
||
was = self.running
|
||
if was: self.running = False; self._log_play() # close out the track that was playing
|
||
self.load(i)
|
||
if was: self.running = True; self._reset_clock(); self._start_play()
|
||
def tap(self):
|
||
now = time.monotonic()
|
||
if not hasattr(self, '_taps'): self._taps = []
|
||
self._taps = [t for t in self._taps if now - t < 2.4]
|
||
self._taps.append(now)
|
||
if len(self._taps) >= 2:
|
||
span = (self._taps[-1] - self._taps[0]) / (len(self._taps) - 1)
|
||
if span > 0: self.set_bpm(round(60 / span))
|
||
|
||
# ---------- scheduler ----------
|
||
def tick(self):
|
||
now = time.monotonic_ns()
|
||
if self.buz_off and now >= self.buz_off: self.buz.duty_cycle = 0; self.buz_off = 0
|
||
if self.running:
|
||
fired = []
|
||
for li, L in enumerate(self.lanes):
|
||
adv = False
|
||
while now >= L['next']:
|
||
L['step'] = (L['step'] + 1) % L['steps']
|
||
lvl = 0 if L['mute'] else L['levels'][L['step']]
|
||
if lvl > 0:
|
||
fired.append(lvl)
|
||
self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90)) # one note per lane
|
||
L['next'] += self._step_dur(L, L['step']); adv = True
|
||
if adv and li < len(self.lane_pads): self._move_playhead(li, L['step'])
|
||
if fired:
|
||
best = max(fired, key=lambda l: PRIO.get(l, 0))
|
||
if not MUTE_BUZZER and not self.midi_host: self.click(best) # computer plays it instead
|
||
self.flash(best)
|
||
if self.rgb != (0, 0, 0):
|
||
r, g, b = self.rgb; r = r*7//10; g = g*7//10; b = b*7//10
|
||
self.rgb = (r, g, b) if (r+g+b) > 12 else (0, 0, 0)
|
||
self.led.set(*self.rgb)
|
||
|
||
# ---------- inputs ----------
|
||
def poll(self):
|
||
a = self.btnA.value
|
||
if (not a) and self._aPrev: self.toggle()
|
||
self._aPrev = a
|
||
b = self.btnB.value
|
||
if (not b) and self._bPrev: self.tap()
|
||
self._bPrev = b
|
||
now = time.monotonic_ns()
|
||
if now >= self._joyNext:
|
||
x = self.jx.value - 32768; y = self.jy.value - 32768
|
||
if JOY_INVERT_X: x = -x
|
||
if JOY_INVERT_Y: y = -y
|
||
if abs(y) > JOY_DEADZONE:
|
||
self.set_bpm(self.bpm + (1 if y > 0 else -1) * (5 if abs(y) > 26000 else 1))
|
||
self._joyNext = now + 70_000_000
|
||
elif abs(x) > JOY_DEADZONE:
|
||
self.goto(self.idx + (1 if x > 0 else -1)); self._joyNext = now + 350_000_000; return
|
||
else:
|
||
self._joyNext = now + 20_000_000
|
||
pt = self.touch.read()
|
||
nowms = time.monotonic()
|
||
if pt:
|
||
self._touchSeen = nowms
|
||
if not self._touchDown:
|
||
self._touchDown = True; self._tap_log(pt[0], pt[1])
|
||
elif self._touchDown and (nowms - self._touchSeen) > 0.14:
|
||
self._touchDown = False
|
||
# USB-MIDI in: any byte = a host is listening (heartbeat); also assemble SysEx (clock / pushed programs)
|
||
if self.midi_in is not None:
|
||
try: n = self.midi_in.readinto(self._mbuf)
|
||
except Exception: n = 0
|
||
if n:
|
||
self.last_midi_in = nowms
|
||
self._feed_midi(self._mbuf, n)
|
||
host = bool(self.last_midi_in) and (nowms - self.last_midi_in) < 1.0
|
||
if host != self.midi_host:
|
||
self.midi_host = host
|
||
if host: self.buz.duty_cycle = 0 # silence the buzzer when the computer takes over
|
||
self.led_off(); self.draw_midi()
|
||
|
||
# ---------- drawing ----------
|
||
def draw_bpm(self):
|
||
self._place(self.g_bpm, str(self.bpm), 0, 44, C_TXT, C_BG, FONT_L, right_edge=WIDTH-12)
|
||
def draw_status(self):
|
||
self._place(self.g_run, "RUN" if self.running else "STOP", 12, 48,
|
||
C_GREEN if self.running else C_MUTE, C_BG, FONT_M)
|
||
self._place(self.g_name, "%d/%d %s" % (self.idx+1, len(self.programs), self.name[:18]),
|
||
12, 112, C_TXT, C_BG, FONT_M)
|
||
def draw_midi(self):
|
||
self._place(self.g_midi, "MIDI" if self.midi_host else "", 0, 12, C_GREEN, C_BG, FONT_M, right_edge=WIDTH-12)
|
||
|
||
# ---------- pad grid (each lane = a row of step pads; playhead lit as it plays) ----------
|
||
def _padbase(self, L, s):
|
||
return 0 if L['mute'] else L['levels'][s]
|
||
def build_grid(self):
|
||
while len(self.g_grid): self.g_grid.pop()
|
||
self.lane_pads = []; self.lane_lit = []
|
||
n = min(len(self.lanes), MAXLANES)
|
||
top = 140; rowh = min(40, (296 - top) // max(1, n))
|
||
for li in range(n):
|
||
L = self.lanes[li]; y = top + li * rowh; cy = y + rowh // 2
|
||
tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG)
|
||
tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg)
|
||
steps = L['steps']; sub = L['sub']; px0 = 60
|
||
usable = WIDTH - 8 - px0 - 12; stepw = max(1, usable // steps)
|
||
r_big = max(2, min(6, stepw // 2, (rowh - 8) // 2)); r_sml = max(2, r_big - 2)
|
||
pads = []
|
||
for s in range(steps):
|
||
rad = r_big if (s % sub == 0) else r_sml # big = beat (division), small = subdivision
|
||
cxp = px0 + 6 + (s * usable) // steps # proportional → beats line up across lanes
|
||
c = vectorio.Circle(pixel_shader=self.pad_pal, radius=rad, x=cxp, y=cy)
|
||
c.color_index = self._padbase(L, s); self.g_grid.append(c); pads.append(c)
|
||
self.lane_pads.append(pads); self.lane_lit.append(-1)
|
||
self.dirty = True
|
||
def _move_playhead(self, li, step):
|
||
pads = self.lane_pads[li]; prev = self.lane_lit[li]
|
||
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
|
||
if step < len(pads): pads[step].color_index = self._padbase(self.lanes[li], step) + 4
|
||
self.lane_lit[li] = step; self.dirty = True
|
||
def reset_playheads(self):
|
||
for li, pads in enumerate(self.lane_pads):
|
||
prev = self.lane_lit[li]
|
||
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
|
||
self.lane_lit[li] = -1
|
||
self.dirty = True
|
||
|
||
# ---------- practice history (saved to /history.json, next to programs.json) ----------
|
||
def _probe_write(self):
|
||
try:
|
||
with open("/.wtest", "w") as f: f.write("1")
|
||
try: os.remove("/.wtest")
|
||
except Exception: pass
|
||
return True
|
||
except OSError:
|
||
return False # editor mode: the computer owns the FS
|
||
def _load_log(self):
|
||
try:
|
||
with open("/history.json") as f: return json.load(f).get("log", [])
|
||
except Exception:
|
||
return []
|
||
def _save_log(self):
|
||
if not self.can_write: return
|
||
try:
|
||
with open("/history.json", "w") as f: json.dump({"log": self.log[:200]}, f)
|
||
except OSError:
|
||
self.can_write = False
|
||
def _start_play(self):
|
||
self.play_start = time.monotonic(); self.play_bpm = self.bpm; self.play_name = self.name
|
||
def _log_play(self):
|
||
if self.play_start is None: return
|
||
dur = int(time.monotonic() - self.play_start); self.play_start = None
|
||
if dur < MIN_LOG_SEC: return # skip plays under 5 seconds
|
||
t = time.localtime()
|
||
self.log.insert(0, {"t": "%02d:%02d" % (t.tm_hour, t.tm_min), "bpm": self.play_bpm,
|
||
"dur": dur, "name": self.play_name})
|
||
del self.log[200:]; self._armed = None
|
||
self._save_log(); self.draw_log()
|
||
def draw_log(self):
|
||
g = self.g_log
|
||
while len(g): g.pop()
|
||
self.log_rows = []
|
||
hdr, w, h = make_text("PRACTICE LOG", FONT_S, C_MUTE, C_BG); hdr.x = 10; hdr.y = LOG_TOP; g.append(hdr)
|
||
if not self.log:
|
||
tg, w, h = make_text("plays over 5s show here", FONT_S, C_DIM, C_BG); tg.x = 10; tg.y = LOG_TOP + LOG_ROWH; g.append(tg)
|
||
self.dirty = True; return
|
||
y = LOG_TOP + LOG_ROWH + 2
|
||
for idx in range(min(LOG_ROWS, len(self.log))):
|
||
e = self.log[idx]; armed = (idx == self._armed)
|
||
dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60)
|
||
line = "%s%s %3d %5s %s" % ("x " if armed else "", e.get("t", "--:--"), e["bpm"], dur, e["name"][:16])
|
||
tg, w, h = make_text(line, FONT_S, C_AMBER if armed else C_TXT, C_BG); tg.x = 10; tg.y = y; g.append(tg)
|
||
self.log_rows.append((y - 2, y + LOG_ROWH - 2, idx))
|
||
y += LOG_ROWH
|
||
self.dirty = True
|
||
def _tap_log(self, x, ty):
|
||
for y0, y1, idx in self.log_rows:
|
||
if y0 <= ty <= y1:
|
||
if self._armed == idx: del self.log[idx]; self._armed = None; self._save_log(); self.draw_log() # confirm delete
|
||
else: self._armed = idx; self.draw_log() # arm (tap again)
|
||
return
|
||
if self._armed is not None: self._armed = None; self.draw_log() # tapped elsewhere -> cancel
|
||
|
||
# ---------- USB-MIDI in: SysEx assembler (clock + editor-pushed programs) ----------
|
||
def _feed_midi(self, buf, n):
|
||
for i in range(n):
|
||
b = buf[i]
|
||
if b == 0xF0: self._sx = bytearray(); self._sxon = True
|
||
elif b == 0xF7:
|
||
if self._sxon: self._handle_sysex(self._sx)
|
||
self._sxon = False
|
||
elif b >= 0xF8: pass # real-time (e.g. Active Sensing 0xFE) — ignore
|
||
elif self._sxon:
|
||
if len(self._sx) < 6000: self._sx.append(b)
|
||
else: self._sxon = False # overflow guard
|
||
def _handle_sysex(self, sx):
|
||
if len(sx) < 2 or sx[0] != 0x7D: return # 0x7D = our (educational) manufacturer id
|
||
cmd = sx[1]
|
||
if cmd == 0x01 and len(sx) >= 8 and rtc is not None: # set clock: yr-2000, mo, dd, hh, mm, ss
|
||
try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1))
|
||
except Exception: pass
|
||
elif cmd == 0x10: # write /programs.json pushed from the editor, then reload
|
||
try:
|
||
with open("/programs.json", "wb") as f: f.write(bytes(sx[2:]))
|
||
self.programs = load_programs(); self.idx = min(self.idx, len(self.programs) - 1)
|
||
self.load(self.idx)
|
||
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F, 0xF7])) # ACK ok
|
||
except OSError:
|
||
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7E, 0xF7])) # NAK: read-only (editor mode)
|
||
|
||
def run(self):
|
||
if self.touch.addr is None:
|
||
print("GT911 touch not found")
|
||
while True:
|
||
self.tick(); self.poll()
|
||
# push a complete frame only when something changed (no mid-update tearing);
|
||
# capped at the display's refresh rate, so dirty regions stay small and quick
|
||
if self.dirty and self.display.refresh():
|
||
self.dirty = False
|
||
time.sleep(0.0005)
|
||
|
||
App().run()
|