The parallel agent's full session, committed now that it's solo: - Grammar: flam/drag/roll ornaments (f/F d/D z/Z, per-lane orns channel) across src/engine.js, pico-cp/pico-explorer/pico-scroll app.py, pico/main.py, rust/track-format, + golden vectors / conformance (tests/, rust/track-format/tests). - Live-sync deep-sync: SysEx 0x44 SLSYNC + 0x45 LOGSYNC (docs/livesync-protocol.md, src/livesync.js). - PM_E-2 notation: web engine (pm_e-2.html, build/deploy/index/embed wiring) + Rust device port (pm-ui draw_notation rewrite + LaneView.groups, pm-kit ViewMode, uisim notesim). Verified: node tests/run.mjs 47 pass / 1 known; ./rust/run.sh green; pm-kit firmware + uisim compile.
1850 lines
110 KiB
Python
1850 lines
110 KiB
Python
# VARASYS PolyMeter - PM_K-1 "Kit" firmware (CircuitPython edition)
|
|
# Raspberry Pi Pico (Pico / Pico W / Pico 2) on the 52Pi EP-0172 "Pico Breadboard Kit Plus":
|
|
# 3.5" ST7796 320x480 cap-touch (GT911), PSP joystick, WS2812 RGB, speaker, 2 buttons.
|
|
#
|
|
# WHY CIRCUITPYTHON: the board then mounts as a USB drive (CIRCUITPY) carrying this code, your
|
|
# tracks (programs.json) and a copy of the editor - edit on the web, "Save to device" writes
|
|
# programs.json here, and CircuitPython auto-reloads with the new grooves. It also sends USB-MIDI
|
|
# (a note per click) so the web editor can play it out the computer's speakers ("Device audio").
|
|
# Runs the SAME program strings as metronome.varasys.io.
|
|
#
|
|
# INSTALL: flash CircuitPython (https://circuitpython.org/board/raspberry_pi_pico/), then copy
|
|
# this file as code.py plus programs.json onto the CIRCUITPY drive. It runs on boot.
|
|
#
|
|
# Fallback: the simpler MicroPython firmware (pico/main.py) is always available - BOOTSEL +
|
|
# drag a MicroPython .uf2 to go back. The Pico cannot be bricked.
|
|
#
|
|
# Untested-panel notes & calibration flags are in CONFIG + pico-cp/README.md.
|
|
|
|
import board, busio, digitalio, analogio, pwmio, displayio, vectorio, time, json, gc, os, supervisor
|
|
supervisor.runtime.autoreload = False # we write our own files (log + pushed programs); never self-restart
|
|
APP_VERSION = "0.0.24" # firmware version (the A/B updater pushes/compares this)
|
|
DEVICE_ID = "K" # 'K' = 52Pi kit, 'X' = Pimoroni Explorer (per docs/livesync-protocol.md and the version reply)
|
|
try:
|
|
import rtc # set from the editor's clock SysEx so the log has real timestamps
|
|
except ImportError:
|
|
rtc = None
|
|
try: # CircuitPython 9.x
|
|
from fourwire import FourWire
|
|
from busdisplay import BusDisplay
|
|
except ImportError: # CircuitPython 8.x
|
|
from displayio import FourWire
|
|
from displayio import Display as BusDisplay
|
|
try:
|
|
import neopixel_write # core module on RP2040 - drives WS2812 with no external library
|
|
except ImportError:
|
|
neopixel_write = None
|
|
try:
|
|
import usb_midi # default-enabled on RP2040 - sends a MIDI note per click to the computer
|
|
except ImportError:
|
|
usb_midi = None
|
|
try:
|
|
from binascii import a2b_base64 # decode the base64-encoded .mpy pushed by the editor's one-click update
|
|
except ImportError:
|
|
a2b_base64 = None
|
|
|
|
# ============================== CONFIG (tweak if needed) ==============================
|
|
SPI_BAUD = 62_500_000 # faster SPI = smaller tearing window; drop to 40_000_000 if unstable
|
|
LED_BRIGHTNESS = 0.15 # WS2812 sits right next to you - keep it dim (0..1)
|
|
MIDI_ENABLED = True # send a USB-MIDI note per click (play via the web editor's "Device audio")
|
|
MIDI_CHANNEL = 10 # 1..16 - GM channel 10 is the drum channel (what DAWs auto-route to drums)
|
|
MIDI_CLOCK_OUT = False # send 24 PPQN MIDI Clock so a DAW can slave its tempo to the metronome
|
|
MIDI_CLOCK_OUT_TRANSPORT = True # also send Start (0xFA) / Stop (0xFC) on play / stop (relevant if MIDI_CLOCK_OUT)
|
|
MIDI_CLOCK_IN = False # follow an external 24 PPQN clock (DAW / sequencer becomes the master)
|
|
MIDI_CLOCK_IN_TRANSPORT = True # also follow Start (0xFA) / Stop (0xFC) from the master (relevant if MIDI_CLOCK_IN)
|
|
MUTE_SPEAKER = False # always silence the on-board speaker
|
|
SPEAKER_AUTO_MUTE = True # auto-mute the speaker when a MIDI host is listening (computer plays it instead)
|
|
WIDTH, HEIGHT = 320, 480
|
|
MADCTL = 0x48 # portrait; 0x48 swaps R/B for this BGR panel (cyan reads cyan). Use 0x40 if reversed.
|
|
INVERT_COLORS = True # most ST7796 modules need inversion ON; set False if colours look negative
|
|
# Touch (GT911) - flip if taps land wrong:
|
|
TOUCH_SWAP_XY = False
|
|
TOUCH_INVERT_X = False
|
|
TOUCH_INVERT_Y = False
|
|
TOUCH_DEBUG = False
|
|
# Joystick:
|
|
JOY_INVERT_X = False
|
|
JOY_INVERT_Y = False
|
|
JOY_DEADZONE = 9000
|
|
|
|
# ----- pins (fixed by the EP-0172 board) -----
|
|
P_SCK, P_MOSI, P_CS, P_DC, P_RST = board.GP2, board.GP3, board.GP5, board.GP6, board.GP7
|
|
P_SDA, P_SCL = board.GP8, board.GP9
|
|
P_RGB, P_SPK, P_BTNA, P_BTNB = board.GP12, board.GP13, board.GP15, board.GP14
|
|
P_JOYX, P_JOYY = board.GP26, board.GP27
|
|
|
|
# ----- BUILT-IN playlists: the standard defaults from the web editor, baked in here so they update with
|
|
# firmware and the user can't change/delete them. User playlists live separately in programs.json
|
|
# (pushed from the editor) and never touch these. (ASCII only - it's pushed 7-bit + the fonts are ASCII.)
|
|
BUILTIN_SETLISTS = [
|
|
("Styles", [
|
|
("Four-on-the-floor", "t120;kick:4;snare:4=.x.x;hatClosed:4/2"),
|
|
("Swing ride", "t150;ride:4/2s;kick:4=X..x;snare:4=.x.x"),
|
|
("Purdie half-time shuffle", "t92;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
|
|
("Samba (2/4)", "t104;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
|
|
("Nanigo (6/8 bembe)", "t130;cowbell:4/3=X.xx.x.xx.x.;kick:4/3=X.....X.....;hatClosed:4/3=..x..x..x..x"),
|
|
("6/8 groove", "t100;kick:3+3=x..x..;snare:3+3=...x..;hatClosed:3+3/2"),
|
|
("7/8 (2+2+3)", "t130;kick:2+2+3=x..x..x;hatClosed:2+2+3/2"),
|
|
("5/4 (3+2)", "t112;kick:3+2=x..x.;snare:3+2=..x..;hatClosed:3+2/2"),
|
|
]),
|
|
("Practice", [
|
|
("5 over 4 polyrhythm", "t100;kick:4;claves:5~"),
|
|
("3 over 2 hemiola", "t96;woodblock:2;cowbell:3~"),
|
|
("2 & 4 & 3 over one bar", "t100;kick:3;cowbell:2~;claves:4~"),
|
|
("Triplet hats", "t100;kick:4;snare:4=.x.x;hatClosed:4/3"),
|
|
("Accents - cycle the pads", "t92;kick:4=X..X;snare:4=.X.X;hatClosed:4/2"),
|
|
("Tempo builder 80 up", "t80;woodblock:4;rmp80/4/4"),
|
|
("Gap trainer (play 2 / rest 2)", "t100;kick:4;hatClosed:4/2;tr2/2"),
|
|
]),
|
|
("Song (continuous)", [ # ~4-bar sections; with Continue on they roll one into the next
|
|
("Intro - hats & kick", "t88;b4;kick:4=X.x.;hatClosed:4/2=gggggggg"),
|
|
("Groove in - backbeat", "t88;b4;kick:4=X.x.;snare:4=.X.X;hatClosed:4/2"),
|
|
("Half-time shuffle", "t92;b4;kick:4/3=X....x...x..;snare:4/3=..gg.gX.gg.g;hatClosed:4/3=X.xX.xX.xX.x"),
|
|
("Build - ramp 92-120", "t92;b4;rmp92/4/2;kick:4;snare:4=.X.X;hatClosed:4/2"),
|
|
("Four-on-the-floor (909)", "t124;b4;kick909:4;clap909:4=.X.X;hat909:4/2=.X.X.X.X"),
|
|
("Samba break (2/4)", "t116;b4;tomLow:2/4=x...X...;hatClosed:2/4;woodblock:2/4=X.xx.xX."),
|
|
("Peak - 16ths", "t132;b4;kick:4=X..x;snare:4=.X.X;hatClosed:4/4"),
|
|
("Outro - ramp down", "t132;b4;rmp132/-7/1;kick:4=X..x;hatClosed:4/2=gggggggg"),
|
|
]),
|
|
]
|
|
|
|
# ============================== COLOURS (0xRRGGBB; displayio handles 565) ==============================
|
|
C_BG, C_PANEL, C_TXT, C_MUTE = 0x06090E, 0x1C222C, 0xC7D0DB, 0x788494
|
|
C_CYAN, C_AMBER, C_GREEN, C_DIM = 0x0AB3F7, 0xFF9B2E, 0x2FE07A, 0x243240
|
|
C_BTN = 0x1C222C
|
|
LEVEL_RGB = {2: (255, 110, 0), 1: (0, 150, 255), 3: (130, 70, 255)} # beat pulse: accent / normal / ghost
|
|
LED_IDLE = (0, 80, 0) # RGB LED resting colour when stopped: dim green ("on")
|
|
LED_RUN = (110, 0, 0) # RGB LED resting colour when playing: dim red (beats pulse brighter on top)
|
|
# voice -> General-MIDI note (USB-MIDI bridge), and level -> MIDI velocity
|
|
SOUND_GM = {"kick":36,"kick808":36,"kick909":36, "snare":38,"snare808":38,"snare909":38,
|
|
"clap":39,"clap808":39,"clap909":39, "rim":37, "hatClosed":42,"hat808":42,"hat909":42,
|
|
"hatOpen":46,"openHat808":46, "ride":51,"ride909":51, "crash":49,"crash909":49,
|
|
"tomLow":41,"tom808":45,"tomMid":45,"tomHigh":48, "tambourine":54,
|
|
"cowbell":56,"cowbell808":56, "woodblock":76,"jamblock":76, "claves":75, "beep":37}
|
|
GM_DEFAULT = 37
|
|
SOUNDS = ["kick", "snare", "clap", "rim", "hatClosed", "hatOpen", "ride", "crash", # lane-editor sound cycle
|
|
"tomLow", "tomMid", "tomHigh", "cowbell", "woodblock", "claves", "tambourine", "beep"]
|
|
HELP_PAGES = ( # paginated on-device help (rendered in _draw_help)
|
|
("Transport & Navigation", (
|
|
"Joystick up/down: tempo +/-1 (5 if held)",
|
|
"Joystick left/right: prev/next track",
|
|
"Button A: play / stop",
|
|
"Button B: tap tempo",
|
|
"Tap set-list tab: switch playlist",
|
|
"Tap CONT (top of tab): auto-advance",
|
|
"Tap hamburger: this menu",
|
|
)),
|
|
("Editing", (
|
|
"Tap a beat: off -> normal -> accent -> ghost",
|
|
"Tap an instrument name: lane editor",
|
|
"Lane editor: sound / beats / sub / swing /",
|
|
" mute, plus + Lane / Remove",
|
|
"Title turns red: unsaved edits",
|
|
"Tap red title: Save or Revert",
|
|
"Built-in edits save into 'My edits'",
|
|
)),
|
|
("Status & Hardware", (
|
|
"MIDI badge green: laptop listening",
|
|
"USB badge cyan: connected to a computer",
|
|
"RGB LED: green=stop / red=play + pulse",
|
|
"Squares = main beats, circles = subs",
|
|
"Ramp arrow: track has a tempo ramp",
|
|
"Gap symbol: silent rest bars",
|
|
"Practice log: time / BPM / dur / bars",
|
|
)),
|
|
)
|
|
MIDI_VEL = {2: 120, 1: 90, 3: 45} # accent / normal / ghost
|
|
MAXLANES = 5 # lanes shown on the pad grid (extras still play)
|
|
GRID_TOP = 158 # top of the pad grid (leaves room for time/bar/ramp/tab rows)
|
|
LOG_TOP, LOG_ROWH, LOG_ROWS = 302, 16, 9 # practice-history log area (below the pad grid)
|
|
MIN_LOG_SEC = 5 # don't log plays shorter than this
|
|
PAD_DIM = (0x10161E, 0x0A3A52, 0x4A3010, 0x2A1D4A) # idle pad: mute / normal / accent / ghost
|
|
PAD_LIT = (0x39414D, 0x0AB3F7, 0xFF9B2E, 0x967BFF) # playhead pad: mute / normal / accent / ghost
|
|
C_GRID = 0x1A2330 # faint vertical beat gridlines (beats line up across lanes)
|
|
C_RED = 0xFF5A5A # unsaved-edits (dirty) track title
|
|
|
|
# WS2812 RGB LED - self-contained via the core neopixel_write module (no external library)
|
|
class RGB:
|
|
def __init__(self, pin):
|
|
self.ok = neopixel_write is not None
|
|
if self.ok:
|
|
self.io = digitalio.DigitalInOut(pin); self.io.direction = digitalio.Direction.OUTPUT
|
|
self.buf = bytearray(3)
|
|
def set(self, r, g, b):
|
|
if not self.ok: return
|
|
# WS2812 wants GRB order; scale down so it isn't blinding
|
|
self.buf[0] = int(g * LED_BRIGHTNESS); self.buf[1] = int(r * LED_BRIGHTNESS); self.buf[2] = int(b * LED_BRIGHTNESS)
|
|
try: neopixel_write.neopixel_write(self.io, self.buf)
|
|
except Exception: self.ok = False
|
|
|
|
# ============================== ANTI-ALIASED FONTS (binary blobs on the drive; see pico/gen_font.py) ==============================
|
|
def load_font(path):
|
|
with open(path, "rb") as f:
|
|
blob = f.read()
|
|
count = blob[0]; p = 1; pixoff = 1 + count * 7; glyphs = {}
|
|
for _ in range(count):
|
|
cp = (blob[p] << 8) | blob[p+1]; w = blob[p+2]; h = blob[p+3]
|
|
xoff = blob[p+4]; xoff = xoff - 256 if xoff > 127 else xoff
|
|
top = blob[p+5]; adv = blob[p+6]; p += 7
|
|
glyphs[cp] = (w, h, xoff, top, adv, pixoff); pixoff += (w * h + 1) // 2
|
|
return (glyphs, blob)
|
|
|
|
FONT_S = load_font("/font_s.bin") # small - pad-grid lane labels
|
|
FONT_M = load_font("/font_m.bin") # labels / buttons
|
|
FONT_L = load_font("/font_l.bin") # big BPM
|
|
gc.collect()
|
|
|
|
def _blend(bg, fg, i):
|
|
t = i * 17
|
|
r = (((bg >> 16) & 0xFF)*(255-t) + ((fg >> 16) & 0xFF)*t) // 255
|
|
g = (((bg >> 8) & 0xFF)*(255-t) + ((fg >> 8) & 0xFF)*t) // 255
|
|
b = ((bg & 0xFF)*(255-t) + (fg & 0xFF)*t) // 255
|
|
return (r << 16) | (g << 8) | b
|
|
|
|
def make_text(s, font, fg, bg):
|
|
"""Render a string into a displayio TileGrid (anti-aliased via a 16-step blend palette)."""
|
|
glyphs, blob = font
|
|
w = 0; top0 = 999; bot = 0
|
|
for c in s:
|
|
g = glyphs.get(ord(c))
|
|
if not g: continue
|
|
w += g[4]
|
|
if g[1]:
|
|
if g[3] < top0: top0 = g[3]
|
|
if g[3] + g[1] > bot: bot = g[3] + g[1]
|
|
if top0 == 999: top0 = 0
|
|
w = max(1, w); h = max(1, bot - top0)
|
|
gc.collect()
|
|
bmp = displayio.Bitmap(w, h, 16)
|
|
pal = displayio.Palette(16)
|
|
for i in range(16): pal[i] = _blend(bg, fg, i)
|
|
pen = 0
|
|
for c in s:
|
|
g = glyphs.get(ord(c))
|
|
if not g: continue
|
|
gw, gh, xoff, gtop, adv, off = g
|
|
for j in range(gh):
|
|
row = (gtop - top0) + j
|
|
for i in range(gw):
|
|
k = j * gw + i
|
|
byte = blob[off + (k >> 1)]
|
|
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
|
|
if nib:
|
|
x = pen + xoff + i
|
|
if 0 <= x < w and 0 <= row < h: bmp[x, row] = nib
|
|
pen += adv
|
|
return displayio.TileGrid(bmp, pixel_shader=pal), w, h
|
|
|
|
# ---- single-image alpha assets (logo, status icons) - blit like a one-off glyph; see gen_assets.py ----
|
|
def load_alpha(path):
|
|
try:
|
|
with open(path, "rb") as f: blob = f.read()
|
|
return (blob[0], blob[1], blob) # (w, h, bytes); pixels start at offset 2
|
|
except Exception:
|
|
return None # missing/corrupt -> caller falls back to text (no crash)
|
|
def make_glyph(asset, fg, bg):
|
|
w, h, blob = asset
|
|
gc.collect()
|
|
bmp = displayio.Bitmap(w, h, 16); pal = displayio.Palette(16)
|
|
for i in range(16): pal[i] = _blend(bg, fg, i)
|
|
for k in range(w * h):
|
|
byte = blob[2 + (k >> 1)]
|
|
nib = (byte >> 4) if (k & 1) == 0 else (byte & 0xF)
|
|
if nib: bmp[k % w, k // w] = nib
|
|
return displayio.TileGrid(bmp, pixel_shader=pal), pal, w, h
|
|
def _recolor(pal, fg, bg): # re-tint a stored asset palette in place (tear-free)
|
|
for i in range(16): pal[i] = _blend(bg, fg, i)
|
|
|
|
LOGO = load_alpha("/logo.bin") # VARASYS wordmark (no tagline)
|
|
ICON_MIDI = load_alpha("/midi.bin") # DIN-5: green when a MIDI host is listening
|
|
ICON_USB = load_alpha("/usb.bin") # trident: lit when USB-connected to a computer
|
|
gc.collect()
|
|
|
|
# ============================== POLYMETER ENGINE (same semantics as the web/MicroPython) ==============================
|
|
PAT = {'X': 2, 'x': 1, 'g': 3, '.': 0, '-': 0, '_': 0,
|
|
'f': 1, 'F': 2, 'd': 1, 'D': 2, 'z': 1, 'Z': 2} # ornament hits: UPPER = accented, lower = normal
|
|
ORN = {'f': 1, 'F': 1, 'd': 2, 'D': 2, 'z': 3, 'Z': 3} # ornament type: 0 none / 1 flam / 2 drag / 3 roll
|
|
PRIO = {2: 3, 1: 2, 3: 1}
|
|
# General-MIDI percussion note numbers -> voice names (so a lane can be typed as "36:4"); matches the web GM_NUM
|
|
GM_NUM = {35: "kick", 36: "kick", 37: "rim", 38: "snare", 39: "clap", 40: "snare", 41: "tomLow", 42: "hatClosed",
|
|
43: "tomLow", 44: "hatClosed", 45: "tomMid", 46: "hatOpen", 47: "tomMid", 48: "tomHigh", 49: "crash",
|
|
50: "tomHigh", 51: "ride", 53: "ride", 54: "tambourine", 56: "cowbell", 75: "claves", 76: "woodblock", 77: "woodblock"}
|
|
|
|
def _euclid(k, n, rot): # even distribution: k hits over n steps, rotated (matches web euclid())
|
|
n = max(1, n); k = max(0, min(n, k)); rot = ((rot % n) + n) % n
|
|
return [1 if ((((i + rot) % n) * k) % n) < k else 0 for i in range(n)]
|
|
|
|
def parse_program(s):
|
|
bpm = 120; lanes = []; bars = 0; ramp = None; trainer = None; rep = None; end = None
|
|
for tok in s.strip().split(';'):
|
|
tok = tok.strip()
|
|
if not tok: continue
|
|
if tok[0] == 't' and tok[1:].isdigit():
|
|
bpm = int(tok[1:]); continue
|
|
if tok[0] == 'b' and tok[1:].isdigit(): # b<n> = segment length in bars (totals + Continue)
|
|
bars = int(tok[1:]); continue # (lane sounds like "beep:4" have a ':' -> not matched here)
|
|
if tok.startswith('rmp'): # rmp<start>/<amount>/<everyBars> tempo ramp (amount may be -)
|
|
p = tok[3:].split('/')
|
|
if len(p) == 3:
|
|
try: ramp = {'start': int(p[0]), 'amt': int(p[1]), 'every': max(1, int(p[2]))}
|
|
except ValueError: pass
|
|
continue
|
|
if tok.startswith('tr') and '/' in tok and ':' not in tok: # tr<play>/<mute> gap trainer (bars)
|
|
p = tok[2:].split('/')
|
|
if len(p) == 2:
|
|
try: trainer = {'play': max(0, int(p[0])), 'mute': max(0, int(p[1]))}
|
|
except ValueError: pass
|
|
continue
|
|
if tok.startswith('rep='): # rep=<n> cycles before the end-action fires (playback flow)
|
|
try: rep = max(1, int(tok[4:]))
|
|
except ValueError: pass
|
|
continue
|
|
if tok.startswith('end='): # end=stop | end=next(+1) | end=<+/-N> relative goto; absent = loop forever
|
|
v = tok[4:]
|
|
if v == 'stop': end = 'stop'
|
|
elif v == 'next': end = 1
|
|
else:
|
|
try: end = int(v)
|
|
except ValueError: pass
|
|
continue
|
|
if ':' not in tok: continue
|
|
lane = _parse_lane(tok)
|
|
if lane: lanes.append(lane)
|
|
if not lanes: lanes = [_parse_lane("beep:4")]
|
|
return max(5, min(300, bpm)), lanes, bars, ramp, trainer, rep, end
|
|
|
|
def _parse_lane(tok):
|
|
poly = '~' in tok; mute = '!' in tok
|
|
tok = tok.replace('~', '').replace('!', '')
|
|
gain = ''
|
|
if '@' in tok: tok, _, g = tok.partition('@'); gain = '@' + g # preserve @db for round-trip (engine ignores it)
|
|
sound, _, rest = tok.partition(':')
|
|
if sound.isdigit(): sound = GM_NUM.get(int(sound), sound) # GM note-number alias (e.g. 36 -> kick)
|
|
euc = None # euclidean (k,n,rot) shorthand - pulled before the =/ splits
|
|
lp = rest.find('(')
|
|
if lp >= 0:
|
|
rp = rest.find(')', lp)
|
|
if rp > lp:
|
|
nums = [int(x) for x in rest[lp + 1:rp].split(',') if x.strip().isdigit()]
|
|
rest = rest[:lp] + rest[rp + 1:]
|
|
if nums: euc = nums
|
|
pattern = None
|
|
if '=' in rest: rest, _, pattern = rest.partition('=')
|
|
sub = 1; swing = False
|
|
if '/' in rest:
|
|
rest, _, sd = rest.partition('/')
|
|
swing = sd.endswith('s'); sd = sd.rstrip('s') # "/2s" = swung eighths
|
|
sub = int(sd) if sd.isdigit() else 1
|
|
groups = [int(g) for g in rest.split('+') if g.isdigit()] or [4]
|
|
beats = sum(groups); starts = set(); acc = 0
|
|
for gp in groups: starts.add(acc); acc += gp
|
|
if euc: # euclidean: k hits over n steps, first hit accented
|
|
k = euc[0]; n = euc[1] if len(euc) > 1 else beats * sub; rot = euc[2] if len(euc) > 2 else 0
|
|
if len(euc) > 1:
|
|
if n % beats == 0: sub = n // beats
|
|
else: groups = [n]; sub = 1
|
|
steps = n; levels = []; first = True
|
|
for h in _euclid(k, n, rot):
|
|
if h: levels.append(2 if first else 1); first = False
|
|
else: levels.append(0)
|
|
orns = [0] * len(levels) # euclid hits carry no ornament
|
|
elif pattern:
|
|
steps = beats * sub
|
|
levels = [PAT.get(ch, 0) for ch in pattern]
|
|
orns = [ORN.get(ch, 0) for ch in pattern] # per-step flam/drag/roll, parallel to levels
|
|
if len(levels) < steps:
|
|
levels += [0] * (steps - len(levels)); orns += [0] * (steps - len(orns))
|
|
steps = len(levels)
|
|
else:
|
|
steps = beats * sub
|
|
levels = []
|
|
for i in range(steps):
|
|
if i % sub == 0: levels.append(2 if (i // sub) in starts else 1) # beat: accent on group starts
|
|
else: levels.append(1) # off-beat subdivisions sound at normal (grouping IS the accent map)
|
|
orns = [0] * steps
|
|
if sound not in SOUND_GM: sound = "beep" # unknown sound -> beep (match web)
|
|
return {'sound': sound, 'sub': sub, 'swing': swing, 'steps': steps, 'levels': levels, 'orns': orns,
|
|
'poly': poly, 'mute': mute, 'groups': groups, 'gain': gain}
|
|
|
|
PAT_CH = {2: 'X', 1: 'x', 3: 'g', 0: '.'} # level -> pattern char (inverse of PAT)
|
|
ORN_CH = {1: ('f', 'F'), 2: ('d', 'D'), 3: ('z', 'Z')} # ornament -> (normal, accented) pattern char
|
|
def _cell_ch(v, o): # (level, ornament) -> one pattern char
|
|
if o in ORN_CH: return ORN_CH[o][1 if v >= 2 else 0]
|
|
return PAT_CH.get(v, '.')
|
|
def lane_to_str(L): # serialize a lane back to the share grammar (round-trips)
|
|
s = L['sound'] + ':' + '+'.join(str(g) for g in L.get('groups', [4]))
|
|
if L['sub'] != 1 or L['swing']: s += '/' + str(L['sub']) + ('s' if L['swing'] else '')
|
|
orns = L.get('orns') or [0] * len(L['levels'])
|
|
s += '=' + ''.join(_cell_ch(v, orns[i] if i < len(orns) else 0) for i, v in enumerate(L['levels']))
|
|
s += L.get('gain', '')
|
|
if L['poly']: s += '~'
|
|
if L['mute']: s += '!'
|
|
return s
|
|
|
|
_ALNUM = "abcdefghijklmnopqrstuvwxyz0123456789"
|
|
def _slkey(t): # normalise a title for built-in/user de-dup (no str.isalnum on CircuitPython)
|
|
return "".join(c for c in t.lower() if c in _ALNUM)
|
|
def load_user_setlists():
|
|
# User playlists from /programs.json (pushed by the editor). New {setlists:[{title,programs:[..]}]} form,
|
|
# or the old flat {programs:[..]} (one list). Built-ins are baked in BUILTIN_SETLISTS, never here.
|
|
try:
|
|
with open("/programs.json") as f: d = json.load(f)
|
|
except Exception as e:
|
|
print("programs.json:", e); return []
|
|
def items_of(pl): return [(p.get("name", "?"), p.get("prog", "")) for p in pl if p.get("prog")]
|
|
out = []
|
|
try:
|
|
if isinstance(d.get("setlists"), list):
|
|
for sl in d["setlists"]:
|
|
it = items_of(sl.get("programs", []))
|
|
if it: out.append((sl.get("title", "My set list"), it))
|
|
elif isinstance(d.get("programs"), list):
|
|
it = items_of(d["programs"])
|
|
if it: out.append((d.get("title", "My set list"), it))
|
|
except Exception as e:
|
|
print("setlists:", e)
|
|
return out
|
|
|
|
# ============================== GT911 TOUCH ==============================
|
|
class GT911:
|
|
def __init__(self, i2c):
|
|
self.i2c = i2c; self.addr = None
|
|
while not i2c.try_lock(): pass
|
|
try: found = i2c.scan()
|
|
finally: i2c.unlock()
|
|
for a in (0x5D, 0x14):
|
|
if a in found: self.addr = a; break
|
|
if self.addr is None and found: self.addr = found[0]
|
|
def _rd(self, reg, n):
|
|
b = bytearray(n)
|
|
while not self.i2c.try_lock(): pass
|
|
try:
|
|
self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF]))
|
|
self.i2c.readfrom_into(self.addr, b)
|
|
finally: self.i2c.unlock()
|
|
return b
|
|
def _wr(self, reg, val):
|
|
while not self.i2c.try_lock(): pass
|
|
try: self.i2c.writeto(self.addr, bytes([reg >> 8, reg & 0xFF, val]))
|
|
finally: self.i2c.unlock()
|
|
def read(self):
|
|
if self.addr is None: return None
|
|
try: st = self._rd(0x814E, 1)[0]
|
|
except OSError: return None
|
|
if not (st & 0x80): return None
|
|
n = st & 0x0F; pt = None
|
|
if n >= 1:
|
|
b = self._rd(0x8150, 4); tx = b[0] | (b[1] << 8); ty = b[2] | (b[3] << 8)
|
|
pt = self._map(tx, ty)
|
|
try: self._wr(0x814E, 0)
|
|
except OSError: pass
|
|
return pt
|
|
def _map(self, tx, ty):
|
|
if TOUCH_DEBUG: print("touch raw", tx, ty)
|
|
if TOUCH_SWAP_XY: tx, ty = ty, tx
|
|
if TOUCH_INVERT_X: tx = WIDTH - 1 - tx
|
|
if TOUCH_INVERT_Y: ty = HEIGHT - 1 - ty
|
|
if 0 <= tx < WIDTH and 0 <= ty < HEIGHT: return (tx, ty)
|
|
return None
|
|
|
|
# ============================== DISPLAY SETUP ==============================
|
|
def st7796_init():
|
|
inv = b'\x21\x00' if INVERT_COLORS else b'\x20\x00'
|
|
return (
|
|
b'\x01\x80\x78' # SWRESET + 120ms
|
|
b'\x11\x80\x78' # SLPOUT + 120ms
|
|
b'\xF0\x01\xC3' b'\xF0\x01\x96' # command-set unlock
|
|
+ bytes([0x36, 0x01, MADCTL]) +
|
|
b'\x3A\x01\x55' # 16bpp
|
|
b'\xB4\x01\x01'
|
|
b'\xB6\x03\x80\x02\x3B'
|
|
b'\xE8\x08\x40\x8A\x00\x00\x29\x19\xA5\x33'
|
|
b'\xC1\x01\x06' b'\xC2\x01\xA7'
|
|
b'\xC5\x81\x18\x78' # VCOM + 120ms
|
|
b'\xE0\x0E\xF0\x09\x0B\x06\x04\x15\x2F\x54\x42\x3C\x17\x14\x18\x1B'
|
|
b'\xE1\x0E\xE0\x09\x0B\x06\x04\x03\x2B\x43\x42\x3B\x16\x14\x17\x1B'
|
|
b'\xF0\x01\x3C' b'\xF0\x81\x69\x78' # lock + 120ms
|
|
+ inv +
|
|
b'\x29\x80\x32' # DISPON + 50ms
|
|
)
|
|
|
|
def make_display():
|
|
displayio.release_displays()
|
|
spi = busio.SPI(clock=P_SCK, MOSI=P_MOSI)
|
|
bus = FourWire(spi, command=P_DC, chip_select=P_CS, reset=P_RST, baudrate=SPI_BAUD)
|
|
return BusDisplay(bus, st7796_init(), width=WIDTH, height=HEIGHT, auto_refresh=False)
|
|
|
|
def solid(color):
|
|
p = displayio.Palette(1); p[0] = color; return p
|
|
|
|
def rect(x, y, w, h, color):
|
|
return vectorio.Rectangle(pixel_shader=solid(color), width=w, height=h, x=x, y=y)
|
|
|
|
# ============================== APP ==============================
|
|
class App:
|
|
def __init__(self):
|
|
self.display = make_display()
|
|
self.i2c = busio.I2C(scl=P_SCL, sda=P_SDA, frequency=400_000)
|
|
self.touch = GT911(self.i2c)
|
|
self.midi = usb_midi.ports[1] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 1) else None
|
|
self.midi_in = usb_midi.ports[0] if (MIDI_ENABLED and usb_midi and len(usb_midi.ports) > 0) else None
|
|
self._mbuf = bytearray(64); self.midi_host = False; self.last_midi_in = 0.0
|
|
self._sx = bytearray(); self._sxon = False # USB-MIDI SysEx assembler (clock + pushed programs)
|
|
self._fw = None; self._fw_n = 0; self._fw_pushing = False # chunked firmware transfer state + bus-quiet flag
|
|
self.led = RGB(P_RGB)
|
|
self.spk = pwmio.PWMOut(P_SPK, frequency=1600, variable_frequency=True, duty_cycle=0)
|
|
self.spk_off = 0
|
|
self.btnA = self._btn(P_BTNA); self.btnB = self._btn(P_BTNB)
|
|
self._aPrev = True; self._bPrev = True
|
|
self.jx = analogio.AnalogIn(P_JOYX); self.jy = analogio.AnalogIn(P_JOYY)
|
|
self._joyNext = 0
|
|
self._touchDown = False; self._touchSeen = 0
|
|
self.running = False; self.bpm = 120; self.idx = 0; self.lanes = []; self.bars = 0; self.rgb = (0, 0, 0)
|
|
self.ramp = None; self.trainer = None; self._lastbar = -1; self._muted = False; self._ramp_base = 120
|
|
self.rep = None; self.end = None # per-track playback flow: rep=cycles, end=stop|next|+/-N goto
|
|
self._dirty = False; self._overlay = None; self._ovbtns = [] # on-device editing: unsaved edits + modal
|
|
self.continue_on = False; self._advance = False; self._grid = {} # auto-advance + pad hit-test geometry
|
|
self._next_pending = None; self._seam_t = 0; self._need_redraw = False # gapless seam between tracks
|
|
self._heavy_redraw_at = 0 # deferred build_grid + draw_log deadline (so B's intro isn't blocked by SPI/alloc)
|
|
self._grid_li = None; self._grid_n = 0; self._grid_geo = (0, 0, 0, 0) # chunked build_grid progress (1 PAD / loop iter)
|
|
self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] # per-lane sub-state for sub-pad chunking
|
|
self._heavy_log_pending = False
|
|
self._beat_ns = 60_000_000_000 // self.bpm # cached: ns per quarter note; refreshed on every bpm change
|
|
self._note_buf = bytearray([0x90, 0, 0]) # reused for every Note On (no per-click bytes() alloc)
|
|
self._clock_byte = bytes([0xF8]) # singleton MIDI Clock tick (24 PPQN)
|
|
self._start_byte = bytes([0xFA]); self._stop_byte = bytes([0xFC])
|
|
self._lastRefresh = 0.0 # for the "force refresh after Xms even if a beat is imminent" guard
|
|
try: # live sync: short random id so peers can drop their own echoes
|
|
o = os.urandom(4); self._sync_origin = "d" + "".join("%02x" % b for b in o)
|
|
except Exception:
|
|
self._sync_origin = "d%08x" % (time.monotonic_ns() & 0xFFFFFFFF)
|
|
self._sync_armed = False; self._sync_seq = 0; self._sync_applying = False
|
|
self._sync_heartbeat_next = 0.0 # next periodic FULL broadcast deadline (when armed)
|
|
self._displayed_bpm = -1; self._clock_next = 0 # lazy BPM redraw + MIDI Clock Out tick scheduler
|
|
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False # MIDI Clock In: smoothed tracker + slave flag
|
|
self.sl = 0; self.rebuild_setlists() # built-in playlists (baked) + user playlists (programs.json)
|
|
self.dirty = True
|
|
self.pad_pal = displayio.Palette(8) # 0-3 idle levels (mute/normal/accent/ghost), 4-7 the lit playhead
|
|
for i in range(4): self.pad_pal[i] = PAD_DIM[i]; self.pad_pal[i + 4] = PAD_LIT[i]
|
|
self.lane_pads = []; self.lane_lit = []
|
|
self.usb_conn = False; self._m_steps = 0 # USB-connected state; master-lane steps (for the bar counter)
|
|
self._uiNext = 0.0; self._lastTs = None; self._lastBs = None # throttle the stopwatch/bar redraw
|
|
self._seg_start = 0.0 # timer origin; resets with the bar counter (each segment)
|
|
self._refreshNext = 0.0; self._touchNext = 0.0 # cap display refresh + touch polling (tighter MIDI timing)
|
|
self.ic_midi_pal = None; self.ic_usb_pal = None
|
|
# practice history - persisted to /history.json (next to programs.json) when we own the filesystem
|
|
self.can_write = self._probe_write()
|
|
self._load_settings() # /settings.json overrides the module-level defaults
|
|
self.log = self._load_log()
|
|
self.play_start = None; self.play_bpm = 0; self.play_name = ""
|
|
self._armed = None; self.log_rows = []
|
|
self._build_scene()
|
|
self.load(0) # load() also draws the (track-filtered) practice log
|
|
self.draw_icons(); self.draw_meters(); self.led_rest() # LED green = on
|
|
|
|
def _btn(self, pin):
|
|
d = digitalio.DigitalInOut(pin); d.direction = digitalio.Direction.INPUT; d.pull = digitalio.Pull.UP
|
|
return d
|
|
|
|
# ---------- scene graph ----------
|
|
def _build_scene(self):
|
|
root = displayio.Group(); self.display.root_group = root
|
|
root.append(rect(0, 0, WIDTH, HEIGHT, C_BG)) # static background (run state shows on the LED)
|
|
# header: VARASYS logo (left, no tagline) + version (small, top, right of the logo) + MIDI/USB icons (right)
|
|
if LOGO:
|
|
tg, _p, lw, lh = make_glyph(LOGO, C_CYAN, C_BG); tg.x = 10; tg.y = 9; root.append(tg)
|
|
lx = 10 + lw
|
|
else:
|
|
tg, w, h = make_text("VARASYS", FONT_M, C_CYAN, C_BG); tg.x = 10; tg.y = 8; root.append(tg)
|
|
lx = 10 + w
|
|
vtg, vw, vh = make_text("v" + APP_VERSION, FONT_S, C_DIM, C_BG); vtg.x = lx + 6; vtg.y = 8; root.append(vtg)
|
|
# Hamburger menu (3 thin rects) at the far right; tap zone is generous so it's easy to hit.
|
|
mx = WIDTH - 30 # left edge of the icon (18 px wide x 14 px tall total)
|
|
for dy in (10, 16, 22):
|
|
root.append(rect(mx, dy, 18, 2, C_MUTE))
|
|
self._menu_bbox = (mx - 8, 0, WIDTH, 32)
|
|
x = mx - 8 # MIDI/USB icons start LEFT of the hamburger
|
|
for asset, attr in ((ICON_USB, "ic_usb_pal"), (ICON_MIDI, "ic_midi_pal")):
|
|
if asset:
|
|
tg, pal, w, h = make_glyph(asset, C_DIM, C_BG); x -= w; tg.x = x; tg.y = 8; x -= 8
|
|
root.append(tg); setattr(self, attr, pal)
|
|
root.append(rect(0, 38, WIDTH, 2, C_PANEL))
|
|
# dynamic groups
|
|
self.g_bpm = displayio.Group(); root.append(self.g_bpm) # big tempo (right)
|
|
self.g_time = displayio.Group(); root.append(self.g_time) # elapsed [of total] (left)
|
|
self.g_bar = displayio.Group(); root.append(self.g_bar) # bar [of total] (left)
|
|
self.g_train = displayio.Group(); root.append(self.g_train) # ramp / gap-trainer indicators
|
|
self.g_cont = displayio.Group(); root.append(self.g_cont) # CONT (Continue auto-advance) toggle indicator
|
|
self.g_name = displayio.Group(); root.append(self.g_name) # track title (red when edited/unsaved)
|
|
self.g_idx = displayio.Group(); root.append(self.g_idx) # set-list tab (tap to switch playlist)
|
|
self.g_grid = displayio.Group(); root.append(self.g_grid) # lanes x step pads
|
|
root.append(rect(0, LOG_TOP - 6, WIDTH, 2, C_PANEL)) # divider above the history log
|
|
self.g_log = displayio.Group(); root.append(self.g_log) # practice history (tap a row to delete)
|
|
self.g_overlay = displayio.Group(); root.append(self.g_overlay) # modal (save/revert) - drawn on top
|
|
# run/stop shows on the RGB LED; tap beats to edit, tap the title to save/revert, tap the tab to switch lists
|
|
|
|
def _place(self, group, s, x, y, fg, bg, font, right_edge=None):
|
|
while len(group): group.pop()
|
|
self.dirty = True
|
|
if not s: return
|
|
tg, w, h = make_text(s, font, fg, bg)
|
|
tg.x = (right_edge - w) if right_edge is not None else x; tg.y = y; group.append(tg)
|
|
def _center(self, group, s, cx, cy, fg, bg, font):
|
|
while len(group): group.pop()
|
|
tg, w, h = make_text(s, font, fg, bg); tg.x = cx - w//2; tg.y = cy - h//2; group.append(tg)
|
|
self.dirty = True
|
|
|
|
# ---------- program ----------
|
|
def rebuild_setlists(self):
|
|
# built-in playlists first (read-only), then user playlists from programs.json (a baked title always wins)
|
|
self.setlists = [{'title': t, 'items': it, 'builtin': True} for t, it in BUILTIN_SETLISTS]
|
|
seen = set(_slkey(t) for t, _ in BUILTIN_SETLISTS)
|
|
for t, it in load_user_setlists():
|
|
if _slkey(t) in seen: continue
|
|
seen.add(_slkey(t)); self.setlists.append({'title': t, 'items': it, 'builtin': False})
|
|
if self.sl >= len(self.setlists): self.sl = 0
|
|
def switch_setlist(self, delta=1):
|
|
if len(self.setlists) < 2: return
|
|
if self._sync_applying: return # the editor sends sel=... directly; don't ping-pong
|
|
was = self.running
|
|
if was: self.running = False; self._log_play()
|
|
self.sl = (self.sl + delta) % len(self.setlists)
|
|
self.load(0)
|
|
if was: self.running = True; self._reset_clock(); self._start_play()
|
|
self.led_rest(); self.draw_meters()
|
|
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
|
|
def load(self, i):
|
|
items = self.setlists[self.sl]['items']
|
|
self.idx = i % len(items)
|
|
self.name, prog = items[self.idx]
|
|
self.bpm, self.lanes, self.bars, self.ramp, self.trainer, self.rep, self.end = parse_program(prog)
|
|
self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() # step grids ready for this lane set
|
|
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
|
|
self._dirty = False; self._overlay = None # fresh load -> no unsaved edits
|
|
self._next_pending = None; self._need_redraw = False # discard any prepared seam (user navigated away)
|
|
self._heavy_redraw_at = 0; self._heavy_log_pending = False; self._grid_li = None # cancel any in-progress chunked rebuild
|
|
while len(self.g_overlay): self.g_overlay.pop() # dismiss any open modal
|
|
self._reset_clock(); self.draw_bpm(); self.draw_status(); self.draw_train()
|
|
self.build_grid(); self.draw_log()
|
|
def _prog_str(self): # serialize the current (possibly edited) track to a program string
|
|
parts = ['t' + str(self.bpm)]
|
|
if self.bars: parts.append('b' + str(self.bars))
|
|
if self.ramp: parts.append('rmp%d/%d/%d' % (self.ramp.get('start', self.bpm), self.ramp['amt'], self.ramp['every']))
|
|
if self.trainer: parts.append('tr%d/%d' % (self.trainer['play'], self.trainer['mute']))
|
|
for L in self.lanes: parts.append(lane_to_str(L))
|
|
if self.end is not None: # per-track playback flow (default = loop forever -> omitted)
|
|
if self.rep and self.rep > 1: parts.append('rep=' + str(self.rep))
|
|
parts.append('end=' + ('stop' if self.end == 'stop' else 'next' if self.end == 1 else ('+%d' % self.end if self.end > 0 else str(self.end))))
|
|
return ';'.join(parts)
|
|
|
|
# ---------- on-device editing: tap a beat to cycle it; tap the title to save/revert ----------
|
|
def _grid_hit(self, tx, ty): # map a touch to (kind, lane[, step]) on the pad grid
|
|
g = self._grid
|
|
if not g or not (g['top'] <= ty < g['top'] + g['n'] * g['rowh']): return None
|
|
li = (ty - g['top']) // g['rowh']
|
|
if li >= g['n']: return None
|
|
if tx < g['px0']: return ('lane', li) # tapped the lane label (lane editor = 0.1.0)
|
|
L = self.lanes[li]; steps = L['steps']
|
|
s = int((tx - g['px0'] - 6) * steps / g['usable'] + 0.5)
|
|
return ('beat', li, max(0, min(steps - 1, s)))
|
|
def _cycle_beat(self, li, s): # off -> normal -> accent -> ghost -> off
|
|
L = self.lanes[li]
|
|
L['levels'][s] = {0: 1, 1: 2, 2: 3, 3: 0}[L['levels'][s]]
|
|
base = self._padbase(L, s); lit = (self.lane_lit[li] == s)
|
|
self.lane_pads[li][s].color_index = base + 4 if lit else base
|
|
self._set_dirty()
|
|
self._sync_broadcast("beat=%d/%d/%d" % (li, s, L['levels'][s]))
|
|
def _set_dirty(self):
|
|
if not self._dirty: self._dirty = True; self.draw_status()
|
|
self.dirty = True
|
|
def toggle_continue(self):
|
|
self.continue_on = not self.continue_on; self.draw_status()
|
|
def _user_list(self, title): # find or create a user playlist
|
|
for s in self.setlists:
|
|
if not s['builtin'] and s['title'] == title: return s
|
|
s = {'title': title, 'items': [], 'builtin': False}; self.setlists.append(s); return s
|
|
def _persist_user(self): # write all user playlists back to /programs.json
|
|
user = [s for s in self.setlists if not s['builtin']]
|
|
data = {"setlists": [{"title": s['title'],
|
|
"programs": [{"name": n, "prog": p} for n, p in s['items']]} for s in user]}
|
|
try:
|
|
with open("/programs.json", "w") as f: json.dump(data, f)
|
|
if not self._sync_applying: self._sync_send_setlists() # mirror our user library to the editor (sec 8)
|
|
return True
|
|
except OSError:
|
|
return False # editor mode: the drive is read-only to us
|
|
def _save_edit(self):
|
|
prog = self._prog_str(); sl = self.setlists[self.sl]
|
|
if sl['builtin']: # built-ins are read-only -> save a USER copy
|
|
tgt = self._user_list("My edits"); names = [n for n, _ in tgt['items']]
|
|
if self.name in names: tgt['items'][names.index(self.name)] = (self.name, prog)
|
|
else: tgt['items'].append((self.name, prog))
|
|
dest = ("My edits", self.name)
|
|
else:
|
|
sl['items'] = list(sl['items']); sl['items'][self.idx] = (self.name, prog)
|
|
dest = (sl['title'], self.name)
|
|
if not self._persist_user():
|
|
self._show_msg("Read-only: reboot without holding A"); return
|
|
self.rebuild_setlists() # refresh, then jump to the saved (user) copy
|
|
for i, s in enumerate(self.setlists):
|
|
if not s['builtin'] and s['title'] == dest[0]:
|
|
self.sl = i; names = [n for n, _ in s['items']]
|
|
self.load(names.index(dest[1]) if dest[1] in names else 0); return
|
|
self.load(0)
|
|
def _revert(self):
|
|
self.load(self.idx) # reload from source -> discard edits
|
|
# ---------- modal overlay (save / revert / message) ----------
|
|
def _show_saverevert(self):
|
|
gc.collect()
|
|
self._overlay = 'saverevert'; g = self.g_overlay
|
|
while len(g): g.pop()
|
|
px, py, pw, ph = 24, 178, WIDTH - 48, 116
|
|
g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_CYAN))
|
|
t, w, h = make_text("Unsaved edits", FONT_M, C_TXT, C_PANEL); t.x = px + 14; t.y = py + 12; g.append(t)
|
|
self._ovbtns = []; by = py + 44; bh = 50; gap = 12; bw = (pw - 3 * gap) // 2
|
|
for i, (lbl, col, act) in enumerate((("SAVE", C_GREEN, self._save_edit), ("REVERT", C_AMBER, self._revert))):
|
|
bx = px + gap + i * (bw + gap)
|
|
g.append(rect(bx, by, bw, bh, C_BTN)); g.append(rect(bx, by, bw, 2, col))
|
|
tt, tw, th = make_text(lbl, FONT_M, col, C_BTN); tt.x = bx + (bw - tw) // 2; tt.y = by + (bh - th) // 2; g.append(tt)
|
|
self._ovbtns.append((bx, by, bx + bw, by + bh, act))
|
|
c, cw, ch = make_text("tap outside to cancel", FONT_S, C_DIM, C_PANEL); c.x = px + 14; c.y = py + ph - 16; g.append(c)
|
|
self.dirty = True
|
|
def _show_msg(self, text):
|
|
self._overlay = 'msg'; g = self.g_overlay
|
|
while len(g): g.pop()
|
|
px, py, pw, ph = 24, 200, WIDTH - 48, 64
|
|
g.append(rect(px, py, pw, ph, C_PANEL)); g.append(rect(px, py, pw, 2, C_AMBER))
|
|
t, w, h = make_text(text[:28], FONT_S, C_TXT, C_PANEL); t.x = px + 12; t.y = py + 14; g.append(t)
|
|
t2, w2, h2 = make_text("(tap to dismiss)", FONT_S, C_DIM, C_PANEL); t2.x = px + 12; t2.y = py + 38; g.append(t2)
|
|
self.dirty = True
|
|
def _close_overlay(self):
|
|
self._overlay = None
|
|
while len(self.g_overlay): self.g_overlay.pop()
|
|
self.dirty = True
|
|
def _tap_overlay(self, tx, ty):
|
|
if self._overlay == 'msg': self._close_overlay(); return
|
|
for x0, y0, x1, y1, act in self._ovbtns: # each action manages the panel (lane edits redraw it live)
|
|
if x0 <= tx <= x1 and y0 <= ty <= y1: act(); return
|
|
self._close_overlay() # tapped outside a button -> cancel / done
|
|
def _handle_tap(self, tx, ty):
|
|
if self._overlay: self._tap_overlay(tx, ty); return
|
|
x0, y0, x1, y1 = self._menu_bbox # hamburger -> main menu
|
|
if x0 <= tx <= x1 and y0 <= ty <= y1: self._show_menu(); return
|
|
if 112 <= ty <= 126: # set-list tab line
|
|
if tx > WIDTH - 56: self.toggle_continue() # right end = CONT (auto-advance) toggle
|
|
else: self.switch_setlist(1)
|
|
return
|
|
if 128 <= ty <= 154: # track-title line
|
|
if self._dirty: self._show_saverevert()
|
|
return
|
|
hit = self._grid_hit(tx, ty)
|
|
if hit and hit[0] == 'beat': self._cycle_beat(hit[1], hit[2]); return
|
|
if hit and hit[0] == 'lane': self._show_laneedit(hit[1]); return # tap the instrument name -> lane editor
|
|
self._tap_log(tx, ty) # else the practice log
|
|
# ---------- lane editor (tap the instrument name): sound / beats / sub / swing / mute + add / remove ----------
|
|
def _show_laneedit(self, li):
|
|
gc.collect()
|
|
self._overlay = 'lane'; self._edit_li = li; self._draw_laneedit()
|
|
def _draw_laneedit(self):
|
|
li = self._edit_li; L = self.lanes[li]; g = self.g_overlay
|
|
while len(g): g.pop()
|
|
self._ovbtns = []
|
|
PX, PY, PW, RH = 14, 54, WIDTH - 28, 34
|
|
g.append(rect(PX, PY, PW, RH * 7 + 30, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
|
|
t, w, h = make_text("Edit lane %d of %d" % (li + 1, len(self.lanes)), FONT_S, C_MUTE, C_PANEL)
|
|
t.x = PX + 12; t.y = PY + 8; g.append(t)
|
|
y = [PY + 28]
|
|
def vrow(label, value, fn): # label + [<] value [>]; left tap = fn(-1), right = fn(+1)
|
|
yy = y[0]
|
|
lt, lw, lh = make_text(label, FONT_S, C_MUTE, C_PANEL); lt.x = PX + 12; lt.y = yy + 9; g.append(lt)
|
|
g.append(rect(PX + 108, yy + 3, 28, RH - 8, C_BTN))
|
|
at, aw, ah = make_text("<", FONT_M, C_CYAN, C_BTN); at.x = PX + 108 + 9; at.y = yy + 7; g.append(at)
|
|
vt, vw, vh = make_text(value, FONT_M, C_TXT, C_PANEL); vt.x = PX + 146; vt.y = yy + 5; g.append(vt)
|
|
g.append(rect(PX + PW - 36, yy + 3, 28, RH - 8, C_BTN))
|
|
gt, gw, gh = make_text(">", FONT_M, C_CYAN, C_BTN); gt.x = PX + PW - 36 + 9; gt.y = yy + 7; g.append(gt)
|
|
self._ovbtns.append((PX + 104, yy, PX + 140, yy + RH, lambda: fn(-1)))
|
|
self._ovbtns.append((PX + PW - 40, yy, PX + PW, yy + RH, lambda: fn(1)))
|
|
y[0] += RH
|
|
vrow("Sound", L['sound'][:9], self._edit_sound)
|
|
vrow("Beats", str(sum(L['groups'])), self._edit_beats)
|
|
vrow("Subdiv", str(L['sub']), self._edit_sub)
|
|
vrow("Swing", "on" if L['swing'] else "off", self._edit_swing)
|
|
vrow("Mute", "yes" if L['mute'] else "no", self._edit_mute)
|
|
yy = y[0] + 2; bw = (PW - 36) // 2 # + Lane | Remove
|
|
g.append(rect(PX + 12, yy, bw, RH - 6, C_BTN))
|
|
a, aw, ah = make_text("+ Lane", FONT_S, C_GREEN if len(self.lanes) < MAXLANES else C_DIM, C_BTN); a.x = PX + 22; a.y = yy + 8; g.append(a)
|
|
self._ovbtns.append((PX + 12, yy, PX + 12 + bw, yy + RH, self._edit_add))
|
|
g.append(rect(PX + PW - 12 - bw, yy, bw, RH - 6, C_BTN))
|
|
r, rw, rh = make_text("Remove", FONT_S, C_AMBER if len(self.lanes) > 1 else C_DIM, C_BTN); r.x = PX + PW - 12 - bw + 14; r.y = yy + 8; g.append(r)
|
|
self._ovbtns.append((PX + PW - 12 - bw, yy, PX + PW - 12, yy + RH, self._edit_remove))
|
|
yy += RH + 2
|
|
g.append(rect(PX + 12, yy, PW - 24, RH - 4, C_BTN))
|
|
d, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); d.x = PX + (PW - dw) // 2; d.y = yy + 5; g.append(d)
|
|
self._ovbtns.append((PX + 12, yy, PX + PW - 12, yy + RH, self._edit_done))
|
|
self.dirty = True
|
|
def _regen_levels(self, L): # default accents after a beats/sub change
|
|
sub = L['sub']; groups = L['groups']; starts = set(); acc = 0
|
|
for gp in groups: starts.add(acc); acc += gp
|
|
L['steps'] = sum(groups) * sub
|
|
L['levels'] = [(2 if (i // sub) in starts else 1) if i % sub == 0 else 0 for i in range(L['steps'])]
|
|
def _lane_dirty(self, structural):
|
|
if structural: self._regen_levels(self.lanes[self._edit_li])
|
|
if structural and self._edit_li == 0: self._rebuild_dur_all() # master changed -> polymeter lanes follow
|
|
else: self._rebuild_dur(self.lanes[self._edit_li])
|
|
self.build_grid()
|
|
if not self._sync_applying: # coalesce structural / multi-field lane edits into one FULL
|
|
self._sync_broadcast_full()
|
|
if not self._dirty: self._dirty = True; self.draw_status()
|
|
self._draw_laneedit() # refresh the modal with the new values
|
|
def _edit_sound(self, d):
|
|
L = self.lanes[self._edit_li]; i = SOUNDS.index(L['sound']) if L['sound'] in SOUNDS else 0
|
|
L['sound'] = SOUNDS[(i + d) % len(SOUNDS)]; self._lane_dirty(False)
|
|
def _edit_beats(self, d):
|
|
L = self.lanes[self._edit_li]; L['groups'] = [max(1, min(12, sum(L['groups']) + d))]; self._lane_dirty(True)
|
|
def _edit_sub(self, d):
|
|
L = self.lanes[self._edit_li]; L['sub'] = max(1, min(8, L['sub'] + d)); self._lane_dirty(True)
|
|
def _edit_swing(self, d):
|
|
L = self.lanes[self._edit_li]; L['swing'] = not L['swing']; self._lane_dirty(False)
|
|
def _edit_mute(self, d):
|
|
L = self.lanes[self._edit_li]; L['mute'] = not L['mute']; self._lane_dirty(False)
|
|
def _edit_add(self):
|
|
if len(self.lanes) >= MAXLANES: return
|
|
self.lanes.insert(self._edit_li + 1, _parse_lane("beep:4")); self._edit_li += 1; self._lane_dirty(False)
|
|
def _edit_remove(self):
|
|
if len(self.lanes) <= 1: return
|
|
del self.lanes[self._edit_li]
|
|
if self._edit_li >= len(self.lanes): self._edit_li = len(self.lanes) - 1
|
|
self._lane_dirty(False)
|
|
def _edit_done(self):
|
|
self._close_overlay()
|
|
# ---------- hamburger menu (main) + sub-modals (Settings / Help / About) ----------
|
|
def _show_menu(self):
|
|
gc.collect() # defragment before allocating modal bitmaps
|
|
self._overlay = 'menu'; self._draw_menu()
|
|
def _draw_menu(self):
|
|
g = self.g_overlay
|
|
while len(g): g.pop()
|
|
self._ovbtns = []
|
|
PX, PY, PW, RH = 24, 70, WIDTH - 48, 34
|
|
rows = (
|
|
("Save edits", C_GREEN if self._dirty else C_DIM, self._save_edit if self._dirty else None),
|
|
("Revert edits", C_AMBER if self._dirty else C_DIM, self._revert if self._dirty else None),
|
|
("Continue: " + ("on" if self.continue_on else "off"), C_CYAN if self.continue_on else C_TXT, self._menu_toggle_continue),
|
|
("Settings >", C_TXT, self._show_settings),
|
|
("Help >", C_TXT, self._show_help),
|
|
("About", C_TXT, self._show_about),
|
|
)
|
|
PH = 38 + len(rows) * RH + RH + 8
|
|
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
|
|
t, w, h = make_text("Menu", FONT_M, C_TXT, C_PANEL); t.x = PX + 14; t.y = PY + 12; g.append(t)
|
|
for i, (label, col, act) in enumerate(rows):
|
|
yy = PY + 38 + i * RH
|
|
g.append(rect(PX + 10, yy, PW - 20, RH - 4, C_BTN))
|
|
tt, tw, th = make_text(label, FONT_M, col, C_BTN); tt.x = PX + 20; tt.y = yy + 6; g.append(tt)
|
|
if act: self._ovbtns.append((PX + 10, yy, PX + PW - 10, yy + RH, act))
|
|
yy = PY + 38 + len(rows) * RH + 4
|
|
g.append(rect(PX + 10, yy, PW - 20, RH - 4, C_BTN))
|
|
dt, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); dt.x = PX + (PW - dw) // 2; dt.y = yy + 6; g.append(dt)
|
|
self._ovbtns.append((PX + 10, yy, PX + PW - 10, yy + RH, self._close_overlay))
|
|
self.dirty = True
|
|
def _menu_toggle_continue(self):
|
|
self.continue_on = not self.continue_on; self.draw_status(); self._draw_menu()
|
|
|
|
# ---------- Settings sub-modal (LED / Speaker / MIDI Out / Channel / Clock Out / Clock In) ----------
|
|
def _show_settings(self):
|
|
gc.collect()
|
|
self._overlay = 'settings'; self._draw_settings()
|
|
def _draw_settings(self):
|
|
g = self.g_overlay
|
|
while len(g): g.pop()
|
|
self._ovbtns = []
|
|
PX, PY, PW, RH = 14, 50, WIDTH - 28, 32
|
|
sm = "Off" if MUTE_SPEAKER else ("Auto" if SPEAKER_AUTO_MUTE else "Always")
|
|
rows = (
|
|
("LED", "%d%%" % int(LED_BRIGHTNESS * 100 + 0.5), self._adj_led),
|
|
("Speaker", sm, self._adj_speaker),
|
|
("MIDI Out", "on" if MIDI_ENABLED else "off", self._adj_midi_out),
|
|
("Channel", str(MIDI_CHANNEL), self._adj_midi_ch),
|
|
("Clock Out", "on" if MIDI_CLOCK_OUT else "off", self._adj_clock_out),
|
|
("Clock In", "on" if MIDI_CLOCK_IN else "off", self._adj_clock_in),
|
|
)
|
|
PH = 30 + len(rows) * RH + RH + 8
|
|
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
|
|
t, w, h = make_text("Settings", FONT_S, C_MUTE, C_PANEL); t.x = PX + 12; t.y = PY + 8; g.append(t)
|
|
for i, (label, value, fn) in enumerate(rows):
|
|
yy = PY + 26 + i * RH
|
|
lt, lw, lh = make_text(label, FONT_S, C_MUTE, C_PANEL); lt.x = PX + 12; lt.y = yy + 9; g.append(lt)
|
|
g.append(rect(PX + 108, yy + 3, 28, RH - 8, C_BTN))
|
|
at, aw, ah = make_text("<", FONT_M, C_CYAN, C_BTN); at.x = PX + 108 + 9; at.y = yy + 6; g.append(at)
|
|
vt, vw, vh = make_text(value, FONT_M, C_TXT, C_PANEL); vt.x = PX + 150; vt.y = yy + 4; g.append(vt)
|
|
g.append(rect(PX + PW - 36, yy + 3, 28, RH - 8, C_BTN))
|
|
gt, gw, gh = make_text(">", FONT_M, C_CYAN, C_BTN); gt.x = PX + PW - 36 + 9; gt.y = yy + 6; g.append(gt)
|
|
self._ovbtns.append((PX + 104, yy, PX + 140, yy + RH, lambda f=fn: f(-1)))
|
|
self._ovbtns.append((PX + PW - 40, yy, PX + PW, yy + RH, lambda f=fn: f(1)))
|
|
yy = PY + 26 + len(rows) * RH + 4
|
|
g.append(rect(PX + 12, yy + 2, PW - 24, RH - 4, C_BTN))
|
|
dt, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); dt.x = PX + (PW - dw) // 2; dt.y = yy + 5; g.append(dt)
|
|
self._ovbtns.append((PX + 12, yy, PX + PW - 12, yy + RH, self._close_overlay))
|
|
self.dirty = True
|
|
def _adj_led(self, d):
|
|
global LED_BRIGHTNESS
|
|
v = LED_BRIGHTNESS + d * 0.05
|
|
if v < 0.05: v = 0.05
|
|
if v > 0.50: v = 0.50
|
|
LED_BRIGHTNESS = round(v * 100) / 100.0
|
|
self.led.set(*self.rgb); self._save_settings(); self._draw_settings()
|
|
def _adj_speaker(self, d):
|
|
global MUTE_SPEAKER, SPEAKER_AUTO_MUTE
|
|
modes = ("auto", "always", "off")
|
|
cur = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always")
|
|
i = (modes.index(cur) + d) % 3
|
|
MUTE_SPEAKER = (modes[i] == "off"); SPEAKER_AUTO_MUTE = (modes[i] == "auto")
|
|
if MUTE_SPEAKER: self.spk.duty_cycle = 0
|
|
self._save_settings(); self._draw_settings()
|
|
def _adj_midi_out(self, d):
|
|
global MIDI_ENABLED
|
|
MIDI_ENABLED = not MIDI_ENABLED; self._save_settings(); self._draw_settings()
|
|
def _adj_midi_ch(self, d):
|
|
global MIDI_CHANNEL
|
|
MIDI_CHANNEL = ((MIDI_CHANNEL - 1 + d) % 16) + 1
|
|
self._save_settings(); self._draw_settings()
|
|
def _adj_clock_out(self, d):
|
|
global MIDI_CLOCK_OUT
|
|
MIDI_CLOCK_OUT = not MIDI_CLOCK_OUT
|
|
if MIDI_CLOCK_OUT: self._clock_next = time.monotonic_ns()
|
|
self._save_settings(); self._draw_settings()
|
|
def _adj_clock_in(self, d):
|
|
global MIDI_CLOCK_IN
|
|
MIDI_CLOCK_IN = not MIDI_CLOCK_IN
|
|
if not MIDI_CLOCK_IN: self._slaved = False
|
|
self._save_settings(); self._draw_settings()
|
|
|
|
# ---------- Help sub-modal (paginated) ----------
|
|
def _show_help(self):
|
|
gc.collect()
|
|
self._overlay = 'help'; self._help_page = 0; self._draw_help()
|
|
def _draw_help(self):
|
|
g = self.g_overlay
|
|
while len(g): g.pop()
|
|
self._ovbtns = []
|
|
PX, PY, PW = 14, 50, WIDTH - 28
|
|
title, lines = HELP_PAGES[self._help_page]
|
|
PH = 38 + 18 * len(lines) + 60
|
|
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
|
|
t, w, h = make_text(title, FONT_M, C_TXT, C_PANEL); t.x = PX + 12; t.y = PY + 8; g.append(t)
|
|
pi, piw, pih = make_text("%d / %d" % (self._help_page + 1, len(HELP_PAGES)), FONT_S, C_DIM, C_PANEL)
|
|
pi.x = PX + PW - piw - 12; pi.y = PY + 12; g.append(pi)
|
|
yy = PY + 36
|
|
for ln in lines:
|
|
lt, lw, lh = make_text(ln[:42], FONT_S, C_TXT, C_PANEL); lt.x = PX + 12; lt.y = yy; g.append(lt)
|
|
yy += 16
|
|
# Nav: < (prev) | Done | > (next)
|
|
by = PY + PH - 38; bh = 32; bw = (PW - 36) // 3
|
|
for i, (lbl, col, act) in enumerate((
|
|
("<", C_CYAN if self._help_page > 0 else C_DIM,
|
|
self._help_prev if self._help_page > 0 else None),
|
|
("Done", C_CYAN, self._close_overlay),
|
|
(">", C_CYAN if self._help_page < len(HELP_PAGES) - 1 else C_DIM,
|
|
self._help_next if self._help_page < len(HELP_PAGES) - 1 else None))):
|
|
bx = PX + 12 + i * (bw + 6)
|
|
g.append(rect(bx, by, bw, bh, C_BTN))
|
|
lt, lw, lh = make_text(lbl, FONT_M, col, C_BTN); lt.x = bx + (bw - lw) // 2; lt.y = by + 6; g.append(lt)
|
|
if act: self._ovbtns.append((bx, by, bx + bw, by + bh, act))
|
|
self.dirty = True
|
|
def _help_prev(self):
|
|
self._help_page = max(0, self._help_page - 1); self._draw_help()
|
|
def _help_next(self):
|
|
self._help_page = min(len(HELP_PAGES) - 1, self._help_page + 1); self._draw_help()
|
|
|
|
# ---------- About sub-modal ----------
|
|
def _show_about(self):
|
|
gc.collect()
|
|
self._overlay = 'about'; self._draw_about()
|
|
def _draw_about(self):
|
|
import sys
|
|
gc.collect()
|
|
try: free = gc.mem_free()
|
|
except Exception: free = 0 # mem_free is CircuitPython-only
|
|
try: cp_ver = "%d.%d.%d" % sys.implementation.version[:3]
|
|
except Exception: cp_ver = "?"
|
|
up_min = int(time.monotonic()) // 60
|
|
lines = (
|
|
"VARASYS PolyMeter",
|
|
"PM_K-1 Kit",
|
|
"",
|
|
"Firmware: v" + APP_VERSION,
|
|
"Free RAM: %d KB" % (free // 1024),
|
|
"Uptime: %dm" % up_min,
|
|
"CircuitPython: " + cp_ver,
|
|
"",
|
|
"metronome.varasys.io",
|
|
)
|
|
g = self.g_overlay
|
|
while len(g): g.pop()
|
|
self._ovbtns = []
|
|
PX, PY, PW = 24, 90, WIDTH - 48; PH = 30 + 18 * len(lines) + 50
|
|
g.append(rect(PX, PY, PW, PH, C_PANEL)); g.append(rect(PX, PY, PW, 2, C_CYAN))
|
|
yy = PY + 16
|
|
for i, ln in enumerate(lines):
|
|
col = C_CYAN if i == 0 else (C_TXT if ln and i != 8 else C_DIM)
|
|
lt, lw, lh = make_text(ln, FONT_S, col, C_PANEL); lt.x = PX + 14; lt.y = yy; g.append(lt)
|
|
yy += 18
|
|
by = PY + PH - 38
|
|
g.append(rect(PX + 12, by, PW - 24, 32, C_BTN))
|
|
dt, dw, dh = make_text("Done", FONT_M, C_CYAN, C_BTN); dt.x = PX + (PW - dw) // 2; dt.y = by + 6; g.append(dt)
|
|
self._ovbtns.append((PX + 12, by, PX + PW - 12, by + 32, self._close_overlay))
|
|
self.dirty = True
|
|
|
|
# ---------- Settings persistence (/settings.json) ----------
|
|
def _load_settings(self):
|
|
global LED_BRIGHTNESS, MUTE_SPEAKER, SPEAKER_AUTO_MUTE, MIDI_ENABLED, MIDI_CHANNEL, MIDI_CLOCK_OUT, MIDI_CLOCK_IN
|
|
try:
|
|
with open("/settings.json") as f: d = json.load(f)
|
|
except Exception: return
|
|
try:
|
|
LED_BRIGHTNESS = float(d.get("led_brightness", LED_BRIGHTNESS))
|
|
sm = d.get("speaker", "auto")
|
|
MUTE_SPEAKER = (sm == "off"); SPEAKER_AUTO_MUTE = (sm == "auto")
|
|
MIDI_ENABLED = bool(d.get("midi_out", MIDI_ENABLED))
|
|
MIDI_CHANNEL = max(1, min(16, int(d.get("midi_channel", MIDI_CHANNEL))))
|
|
MIDI_CLOCK_OUT = bool(d.get("clock_out", MIDI_CLOCK_OUT))
|
|
MIDI_CLOCK_IN = bool(d.get("clock_in", MIDI_CLOCK_IN))
|
|
except Exception as e: print("settings:", e)
|
|
def _save_settings(self):
|
|
if not self.can_write: return
|
|
sm = "off" if MUTE_SPEAKER else ("auto" if SPEAKER_AUTO_MUTE else "always")
|
|
d = {"led_brightness": LED_BRIGHTNESS, "speaker": sm, "midi_out": MIDI_ENABLED,
|
|
"midi_channel": MIDI_CHANNEL, "clock_out": MIDI_CLOCK_OUT, "clock_in": MIDI_CLOCK_IN}
|
|
try:
|
|
with open("/settings.json", "w") as f: json.dump(d, f)
|
|
except OSError: self.can_write = False
|
|
|
|
def _rebuild_dur(self, L): # cache the per-step ns durations into L['durs'] (tuple lookup is ~10us)
|
|
beat = self._beat_ns
|
|
sub = max(1, L['sub']); steps = max(1, L['steps'])
|
|
if L.get('poly') and self.lanes: # polymeter: spread this lane's cycle across master's bar
|
|
m = self.lanes[0]; master_bar = beat * (m['steps'] // max(1, m['sub']))
|
|
d = master_bar // steps; L['durs'] = tuple(d for _ in range(steps))
|
|
elif L.get('swing') and sub % 2 == 0: # swing: long-short pairs
|
|
pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3
|
|
L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps))
|
|
else: # straight: every step is beat/sub long
|
|
d = beat // sub; L['durs'] = tuple(d for _ in range(steps))
|
|
def _rebuild_dur_all(self): # called on bpm change + lane mutation + track swap
|
|
for L in self.lanes: self._rebuild_dur(L)
|
|
def _reset_clock(self):
|
|
now = time.monotonic_ns()
|
|
for L in self.lanes:
|
|
L['next'] = now; L['step'] = -1
|
|
self._m_steps = 0 # restart the bar count
|
|
self._seg_start = time.monotonic() # and the on-screen timer (resets with the bar counter)
|
|
|
|
# ---------- audio + light ----------
|
|
def click(self, level):
|
|
self.spk.frequency = {2: 2300, 1: 1600, 3: 1050}.get(level, 1600)
|
|
self.spk.duty_cycle = {2: 42000, 1: 30000, 3: 14000}.get(level, 30000)
|
|
self.spk_off = time.monotonic_ns() + 22_000_000
|
|
def _led_base(self):
|
|
return LED_RUN if self.running else LED_IDLE # dim red while playing / dim green when stopped
|
|
def flash(self, level):
|
|
self.rgb = LEVEL_RGB.get(level, (0, 150, 255)) # bright beat pulse, fades back to the base in tick()
|
|
self.led.set(*self.rgb)
|
|
def led_rest(self): # settle to the resting colour (green idle / red running)
|
|
self.rgb = self._led_base()
|
|
self.led.set(*self.rgb)
|
|
# ---------- Live sync (HELLO/FULL/DELTA/BYE on SysEx 0x40-0x43; see src/livesync.js for the editor side) ----------
|
|
def _sync_send(self, op, text):
|
|
if self.midi is None: return
|
|
b = bytearray((0xF0, 0x7D, op))
|
|
for c in text: # ASCII-only payload (the share grammar uses ; / = digits letters)
|
|
v = ord(c); b.append(v if v < 0x80 else 0x3F)
|
|
b.append(0xF7)
|
|
try: self.midi.write(b)
|
|
except Exception: pass
|
|
def _sync_broadcast(self, evt): # one DELTA event; suppressed while applying a remote change (echo guard)
|
|
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
|
|
text = "%s;%d;%s" % (self._sync_origin, self._sync_seq, evt); self._sync_seq += 1
|
|
self._sync_send(0x42, text)
|
|
def _sync_broadcast_full(self): # FULL snapshot: running + sl + item + patch (coalesces structural edits)
|
|
if not self._sync_armed or self.midi is None or self._fw_pushing: return
|
|
try: patch = self._prog_str()
|
|
except Exception: return
|
|
text = "%s;%d;%d;%d;%d;%s" % (self._sync_origin, self._sync_seq,
|
|
1 if self.running else 0, self.sl, self.idx, patch)
|
|
self._sync_seq += 1
|
|
self._sync_send(0x41, text)
|
|
self._sync_heartbeat_next = time.monotonic() + 5.0 # next periodic heartbeat
|
|
def _sync_apply_full(self, running, patch): # accept the peer's snapshot as ground truth
|
|
self._sync_applying = True
|
|
try:
|
|
try:
|
|
gc.collect()
|
|
# Diff before rebuilding -> avoid grid flicker / lost focus on a heartbeat that matches local state
|
|
try: cur = self._prog_str()
|
|
except Exception: cur = None
|
|
if patch and patch != cur:
|
|
bpm, lanes, bars, ramp, trainer, rep, end = parse_program(patch)
|
|
self.bpm = bpm; self.lanes = lanes; self.bars = bars; self.ramp = ramp; self.trainer = trainer; self.rep = rep; self.end = end
|
|
self._beat_ns = 60_000_000_000 // max(1, bpm); self._rebuild_dur_all()
|
|
self.master = self.lanes[0]; self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False
|
|
self._dirty = False; self._overlay = None
|
|
while len(self.g_overlay): self.g_overlay.pop()
|
|
self._reset_clock()
|
|
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
|
|
self.build_grid(); self.draw_log()
|
|
if running and not self.running: self.toggle()
|
|
elif (not running) and self.running: self.toggle()
|
|
except Exception as e:
|
|
try: print("sync FULL apply:", e)
|
|
except Exception: pass
|
|
finally:
|
|
self._sync_applying = False
|
|
def _sync_apply_delta(self, evt): # one mutation
|
|
self._sync_applying = True
|
|
try:
|
|
eq = evt.find('=')
|
|
key = evt if eq < 0 else evt[:eq]
|
|
val = '' if eq < 0 else evt[eq+1:]
|
|
if key == 'play':
|
|
if not self.running: self.toggle()
|
|
elif key == 'stop':
|
|
if self.running: self.toggle()
|
|
elif key == 'bpm':
|
|
try: self.set_bpm(int(val))
|
|
except Exception: pass
|
|
elif key == 'sel': # sel=-1/-1 = "no selection" sentinel -> ignore
|
|
p = val.split('/')
|
|
if len(p) == 2:
|
|
try:
|
|
sl = int(p[0]); item = int(p[1])
|
|
if sl >= 0 and item >= 0:
|
|
if sl < len(self.setlists) and sl != self.sl: self.sl = sl
|
|
items = self.setlists[self.sl]['items']
|
|
if 0 <= item < len(items) and item != self.idx: self.goto(item)
|
|
except Exception: pass
|
|
elif key == 'beat': # beat=lane/step/level (0=mute 1=normal 2=accent 3=ghost)
|
|
p = val.split('/')
|
|
if len(p) == 3:
|
|
try:
|
|
li = int(p[0]); s = int(p[1]); lvl = int(p[2])
|
|
if 0 <= li < len(self.lanes):
|
|
L = self.lanes[li]
|
|
if 0 <= s < len(L['levels']):
|
|
L['levels'][s] = lvl & 3
|
|
if li < len(self.lane_pads) and s < len(self.lane_pads[li]):
|
|
lit = (self.lane_lit[li] == s)
|
|
self.lane_pads[li][s].color_index = self._padbase(L, s) + (4 if lit else 0)
|
|
self._set_dirty()
|
|
except Exception: pass
|
|
elif key == 'lane': # lane=lane/field/value (field: sound|groups|sub|swing|gain|poly|enabled)
|
|
p = val.split('/')
|
|
if len(p) >= 3:
|
|
try:
|
|
li = int(p[0]); field = p[1]; v = '/'.join(p[2:])
|
|
if 0 <= li < len(self.lanes):
|
|
L = self.lanes[li]; structural = False
|
|
if field == 'sound': L['sound'] = v
|
|
elif field == 'groups':
|
|
try: L['groups'] = [int(x) for x in v.split('+')]; structural = True
|
|
except Exception: pass
|
|
elif field == 'sub':
|
|
try: L['sub'] = int(v); structural = True
|
|
except Exception: pass
|
|
elif field == 'swing': L['swing'] = (v == '1'); structural = True # swing changes the dur grid
|
|
elif field == 'enabled': L['mute'] = not (v == '1')
|
|
elif field == 'gain':
|
|
try: L['gain'] = int(v)
|
|
except Exception: pass
|
|
elif field == 'poly': L['poly'] = (v == '1'); structural = True
|
|
if structural: self._regen_levels(L)
|
|
if li == 0 and structural: self._rebuild_dur_all() # master changed -> poly lanes follow
|
|
else: self._rebuild_dur(L)
|
|
if structural: self.build_grid()
|
|
self._set_dirty()
|
|
except Exception: pass
|
|
finally:
|
|
self._sync_applying = False
|
|
|
|
# ---------- set-list content sync (0x44) + practice-log sync (0x45); see docs/livesync-protocol.md ----------
|
|
def _sync_send_setlists(self): # 0x44: manifest of OUR user lists (programs.json shape), merge by title
|
|
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
|
|
user = [s for s in self.setlists if not s['builtin']]
|
|
sls = [{"title": s['title'],
|
|
"programs": [{"name": n, "prog": p} for n, p in s['items']]} for s in user]
|
|
try: body = json.dumps({"setlists": sls})
|
|
except Exception: return
|
|
self._sync_send(0x44, "%s;%d;%s" % (self._sync_origin, self._sync_seq, body)); self._sync_seq += 1
|
|
def _log_to_wire(self, e): # device entry -> wire schema {at,name,dur,bpm} (sec 9)
|
|
return {"at": e.get("at", 0), "name": e.get("name", ""), "dur": e.get("dur", 0), "bpm": e.get("bpm", 0)}
|
|
def _sync_send_log_batch(self): # 0x45: whole practice log (on connect / HELLO)
|
|
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
|
|
try: body = json.dumps({"log": [self._log_to_wire(e) for e in self.log]})
|
|
except Exception: return
|
|
self._sync_send(0x45, "%s;%d;%s" % (self._sync_origin, self._sync_seq, body)); self._sync_seq += 1
|
|
def _sync_send_log_one(self, e): # 0x45: a single freshly-logged session
|
|
if not self._sync_armed or self._sync_applying or self.midi is None or self._fw_pushing: return
|
|
try: body = json.dumps({"log": [self._log_to_wire(e)]})
|
|
except Exception: return
|
|
self._sync_send(0x45, "%s;%d;%s" % (self._sync_origin, self._sync_seq, body)); self._sync_seq += 1
|
|
def _sync_apply_setlists(self, body): # merge user lists by normalized title (replace / append; never delete built-ins)
|
|
self._sync_applying = True
|
|
try:
|
|
try:
|
|
d = json.loads(body); lists = d.get("setlists")
|
|
if not isinstance(lists, list): return
|
|
builtin_keys = set(_slkey(t) for t, _ in BUILTIN_SETLISTS)
|
|
changed = False
|
|
for rl in lists:
|
|
title = rl.get("title", ""); key = _slkey(title)
|
|
if not key or key in builtin_keys: continue # never overwrite a baked-in list
|
|
items = [(p.get("name", "Item"), p.get("prog", "")) for p in rl.get("programs", []) if p.get("prog")]
|
|
tgt = None
|
|
for s in self.setlists:
|
|
if not s['builtin'] and _slkey(s['title']) == key: tgt = s; break
|
|
if tgt is not None: tgt['items'] = items
|
|
else: self.setlists.append({'title': title or "Device", 'items': items, 'builtin': False})
|
|
changed = True
|
|
if changed:
|
|
self._persist_user() # write back to programs.json (no-op if read-only)
|
|
if self.sl >= len(self.setlists): self.sl = 0
|
|
self.draw_meters()
|
|
except Exception as e:
|
|
try: print("sync SLSYNC:", e)
|
|
except Exception: pass
|
|
finally:
|
|
self._sync_applying = False
|
|
def _sync_apply_log(self, body): # additive merge by (at,name); at==0 always appended
|
|
self._sync_applying = True
|
|
try:
|
|
try:
|
|
d = json.loads(body); incoming = d.get("log")
|
|
if not isinstance(incoming, list): return
|
|
have = set()
|
|
for e in self.log:
|
|
a = e.get("at", 0)
|
|
if a: have.add((a, e.get("name", "")))
|
|
added = 0
|
|
for w in incoming:
|
|
nm = w.get("name", "")
|
|
if not nm: continue
|
|
at = w.get("at", 0) or 0
|
|
if at and (at, nm) in have: continue
|
|
self.log.append({"at": at, "name": nm, "dur": w.get("dur", 0), "bpm": w.get("bpm", 0),
|
|
"t": self._hhmm(at), "bars": 0})
|
|
if at: have.add((at, nm))
|
|
added += 1
|
|
if added:
|
|
self.log.sort(key=lambda e: e.get("at", 0), reverse=True) # newest first
|
|
del self.log[200:]
|
|
self._save_log(); self.draw_log()
|
|
except Exception as e:
|
|
try: print("sync LOGSYNC:", e)
|
|
except Exception: pass
|
|
finally:
|
|
self._sync_applying = False
|
|
def _hhmm(self, at_ms): # epoch-ms -> "HH:MM" for the on-device log row (0 -> unknown)
|
|
if not at_ms: return "--:--"
|
|
try:
|
|
t = time.localtime(at_ms // 1000); return "%02d:%02d" % (t.tm_hour, t.tm_min)
|
|
except Exception:
|
|
return "--:--"
|
|
def _epoch_ms(self): # real epoch ms once the editor has set the RTC, else 0 (unset)
|
|
try:
|
|
secs = time.time()
|
|
return int(secs) * 1000 if secs > 1_000_000_000 else 0 # < 2001 -> RTC unset, no stable key
|
|
except Exception:
|
|
return 0
|
|
|
|
def midi_send(self, note, vel): # device-as-conductor: a note per click to the computer
|
|
if self.midi is None or self._fw_pushing: return # keep the bus quiet during a firmware push so ACKs aren't interleaved
|
|
b = self._note_buf # reused bytearray -> zero alloc per click (hot path)
|
|
b[0] = 0x90 | ((MIDI_CHANNEL - 1) & 0x0F) # Note On, channel 1..16
|
|
b[1] = note & 0x7F; b[2] = vel & 0x7F
|
|
try: self.midi.write(b)
|
|
except Exception: pass
|
|
|
|
# ---------- transport ----------
|
|
def toggle(self):
|
|
self.running = not self.running
|
|
if self.running:
|
|
self._reset_clock(); self._start_play()
|
|
self._clock_next = time.monotonic_ns() # start MIDI Clock Out from zero
|
|
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
|
|
try: self.midi.write(self._start_byte) # Start (reused singleton)
|
|
except Exception: pass
|
|
else:
|
|
self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
|
|
if MIDI_CLOCK_OUT and MIDI_CLOCK_OUT_TRANSPORT and self.midi is not None:
|
|
try: self.midi.write(self._stop_byte) # Stop (reused singleton)
|
|
except Exception: pass
|
|
self.led_rest(); self.draw_meters() # LED shows run state: red running / green stopped
|
|
self._sync_broadcast("play" if self.running else "stop")
|
|
def set_bpm(self, v):
|
|
v = max(5, min(300, v))
|
|
if v != self.bpm:
|
|
self.bpm = v; self._beat_ns = 60_000_000_000 // v
|
|
self._rebuild_dur_all() # step grids follow the new beat duration
|
|
# Don't draw here -- the 4Hz UI tick redraws bpm/meters; calling per joystick nudge allocated text bitmaps fast enough to trigger GC pauses
|
|
self._sync_broadcast("bpm=%d" % v)
|
|
def goto(self, i):
|
|
was = self.running
|
|
if was: self.running = False; self._log_play() # close out the track that was playing
|
|
self.load(i)
|
|
if was: self.running = True; self._reset_clock(); self._start_play()
|
|
self.led_rest(); self.draw_meters()
|
|
self._sync_broadcast("sel=%d/%d" % (self.sl, self.idx))
|
|
def tap(self):
|
|
now = time.monotonic()
|
|
if not hasattr(self, '_taps'): self._taps = []
|
|
self._taps = [t for t in self._taps if now - t < 2.4]
|
|
self._taps.append(now)
|
|
if len(self._taps) >= 2:
|
|
span = (self._taps[-1] - self._taps[0]) / (len(self._taps) - 1)
|
|
if span > 0: self.set_bpm(round(60 / span))
|
|
|
|
# ---------- scheduler ----------
|
|
def tick(self):
|
|
now = time.monotonic_ns()
|
|
if self.spk_off and now >= self.spk_off: self.spk.duty_cycle = 0; self.spk_off = 0
|
|
# Slave decay: if no Clock In tick in the last 1s, fall back to internal tempo
|
|
if self._slaved and (now - self._clock_in_last_t) > 1_000_000_000: self._slaved = False
|
|
if self.running:
|
|
fired_best = 0; fired_prio = -1 # int tracking, no per-tick list alloc
|
|
for li, L in enumerate(self.lanes):
|
|
if self._advance: break # seam armed - skip remaining lanes for THIS tick
|
|
adv = False
|
|
while now >= L['next']:
|
|
L['step'] = (L['step'] + 1) % L['steps']
|
|
if li == 0:
|
|
self._m_steps += 1 # count master-lane steps -> bars
|
|
nb = (self._m_steps - 1) // L['steps'] # bar of THIS step (off-by-one fix vs 0.0.16)
|
|
if nb != self._lastbar: self._lastbar = nb; self._on_new_bar(nb)
|
|
if self._advance: break # seam armed - suppress this step's firing
|
|
if self.ramp and L['steps'] > 0 and not self._slaved: # CONTINUOUS ramp (off when slaved)
|
|
mlen = L['steps']
|
|
bar_pos = self._m_steps / mlen
|
|
seg_bar = (bar_pos % self.bars) if self.bars else bar_pos
|
|
new_bpm = max(5, min(300, int(self._ramp_base + seg_bar / self.ramp['every'] * self.ramp['amt'])))
|
|
if new_bpm != self.bpm:
|
|
self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm
|
|
self._rebuild_dur_all() # ramp moves bpm -> step grids follow
|
|
lvl = 0 if L['mute'] else L['levels'][L['step']]
|
|
if lvl > 0:
|
|
p = PRIO.get(lvl, 0)
|
|
if p > fired_prio: fired_prio = p; fired_best = lvl # accent > normal > ghost
|
|
if not self._muted: # gap trainer: silent during the rest bars
|
|
self.midi_send(SOUND_GM.get(L['sound'], GM_DEFAULT), MIDI_VEL.get(lvl, 90))
|
|
L['next'] += L['durs'][L['step']]; adv = True # zero method call, zero dict lookup, just a tuple index
|
|
if adv and li < len(self.lane_pads): self._move_playhead(li, L['step'])
|
|
if fired_best and not self._muted:
|
|
if not MUTE_SPEAKER and not (SPEAKER_AUTO_MUTE and self.midi_host):
|
|
self.click(fired_best) # speaker silent if user muted it / auto-mute on + host present
|
|
self.flash(fired_best)
|
|
base = self._led_base() # decay the beat pulse back down to the red running base
|
|
if self.rgb != base:
|
|
r = base[0] + (self.rgb[0]-base[0])*7//10
|
|
g = base[1] + (self.rgb[1]-base[1])*7//10
|
|
b = base[2] + (self.rgb[2]-base[2])*7//10
|
|
if abs(r-base[0])+abs(g-base[1])+abs(b-base[2]) < 6: r, g, b = base
|
|
self.rgb = (r, g, b); self.led.set(r, g, b)
|
|
if self._advance: # Continue: gapless swap to the prepared track at seam_t
|
|
self._advance = False
|
|
self._do_advance()
|
|
# MIDI Clock Out (master): 24 PPQN; interval follows the live bpm (so continuous ramps carry through)
|
|
if self.running and MIDI_CLOCK_OUT and self.midi is not None and not self._slaved and not self._fw_pushing:
|
|
clk = self._clock_byte # reused singleton bytes (no per-tick alloc)
|
|
tick_ns = self._beat_ns // 24 # cached: ns per Clock pulse
|
|
while now >= self._clock_next:
|
|
try: self.midi.write(clk)
|
|
except Exception: pass
|
|
self._clock_next += tick_ns
|
|
def _end_plan(self):
|
|
# Per-track playback flow. Returns None to loop forever, else (fire_bars, action) where action is
|
|
# 'stop' or a signed int goto offset. Explicit end= governs; otherwise the global Continue toggle
|
|
# acts as a default end=next (legacy behaviour, still needs b<bars> to define the segment).
|
|
end = self.end
|
|
if end is None:
|
|
if self.continue_on and self.bars: end = 1
|
|
else: return None
|
|
cyc = self.bars if self.bars else 1 # a cycle = b<bars>, else one master bar
|
|
reps = self.rep if self.rep else 1
|
|
return (cyc * reps, end)
|
|
def _goto_target(self, offset):
|
|
items = self.setlists[self.sl]['items']; n = len(items)
|
|
t = self.idx + offset
|
|
return 0 if t < 0 else (t % n if t >= n else t) # before first -> clamp; past last -> wrap (loop)
|
|
def _end_stop(self):
|
|
self.running = False; self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
|
|
self.led_rest(); self.draw_meters(); self._sync_broadcast("stop")
|
|
def _on_new_bar(self, bar):
|
|
plan = self._end_plan() # None = loop forever; else (fire_bars, action)
|
|
if plan is not None and plan[1] != 'stop' and self._next_pending is None and bar == plan[0] - 1:
|
|
self._prepare_next(self._goto_target(plan[1])) # pre-parse the target during the bar before the seam
|
|
if self.bars and bar > 0 and bar % self.bars == 0: # segment boundary -> reset the on-screen timer
|
|
self._seg_start = time.monotonic()
|
|
if plan is not None and bar > 0 and bar == plan[0]: # fire the end-action
|
|
action = plan[1]
|
|
if not (self.bars and bar % self.bars == 0): self._seg_start = time.monotonic() # no-bars: still reset the timer
|
|
if action == 'stop':
|
|
self._end_stop()
|
|
else:
|
|
if self._next_pending is None: self._prepare_next(self._goto_target(action)) # late prep
|
|
if self._next_pending is not None:
|
|
self._seam_t = self.lanes[0]['next'] # wall-clock time of THIS boundary step
|
|
self._advance = True # tick() will swap to the prepared track
|
|
t = self.trainer # gap trainer: silence during the rest bars
|
|
self._muted = bool(t and (t['play'] + t['mute']) and (bar % (t['play'] + t['mute'])) >= t['play'])
|
|
def _prepare_next(self, target=None): # parse a playlist item into a side holder for the gapless seam
|
|
items = self.setlists[self.sl]['items']
|
|
nxt = (self.idx + 1) % len(items) if target is None else target
|
|
if nxt == self.idx: return # same track (1-item list or self-goto) -> just loop, no swap
|
|
name, prog = items[nxt]
|
|
gc.collect() # defragment before parse_program allocates new lanes
|
|
try:
|
|
bpm, lanes, bars, ramp, trainer, rep, end = parse_program(prog)
|
|
except MemoryError:
|
|
gc.collect(); return # leave _next_pending None -> the segment just loops
|
|
beat = 60_000_000_000 // max(1, bpm) # pre-compute B's durs against B's bpm so the seam swap is allocation-free
|
|
for L in lanes:
|
|
sub = max(1, L['sub']); steps = max(1, L['steps'])
|
|
if L.get('poly'):
|
|
m = lanes[0]; mbar = beat * (m['steps'] // max(1, m['sub']))
|
|
d = mbar // steps; L['durs'] = tuple(d for _ in range(steps))
|
|
elif L.get('swing') and sub % 2 == 0:
|
|
pair = beat // max(1, sub // 2); lng = (pair * 2) // 3; sht = pair // 3
|
|
L['durs'] = tuple(lng if (s % sub) % 2 == 0 else sht for s in range(steps))
|
|
else:
|
|
d = beat // sub; L['durs'] = tuple(d for _ in range(steps))
|
|
self._next_pending = {'lanes': lanes, 'bpm': bpm, 'bars': bars, 'ramp': ramp,
|
|
'trainer': trainer, 'name': name, 'idx': nxt, 'rep': rep, 'end': end}
|
|
def _do_advance(self): # gapless seam: swap the prepared track in at seam_t
|
|
n = self._next_pending
|
|
if n is None: return
|
|
self._next_pending = None
|
|
self.lanes = n['lanes']; self.bpm = n['bpm']; self.bars = n['bars']
|
|
self.ramp = n['ramp']; self.trainer = n['trainer']; self.name = n['name']; self.idx = n['idx']
|
|
self.rep = n['rep']; self.end = n['end'] # the swapped-in track's own playback flow governs from here
|
|
self._beat_ns = 60_000_000_000 // max(1, self.bpm); self._rebuild_dur_all() # B's step grids built at the seam
|
|
self._ramp_base = self.bpm; self._lastbar = -1; self._muted = False; self._m_steps = 0
|
|
self._dirty = False; self._overlay = None
|
|
while len(self.g_overlay): self.g_overlay.pop()
|
|
seam = self._seam_t
|
|
for L in self.lanes: L['next'] = seam; L['step'] = -1 # NEXT tick fires step 0 of the new track at seam_t
|
|
self._need_redraw = True # cheap header bits: meters/bpm/status/train -> next refresh
|
|
self._heavy_redraw_at = time.monotonic() + 0.6 # heavy: build_grid + draw_log deferred ~0.6s so B's intro plays unblocked
|
|
self._seg_start = time.monotonic() # reset the on-screen timer
|
|
self.led_rest()
|
|
|
|
# ---------- inputs ----------
|
|
def poll(self):
|
|
a = self.btnA.value
|
|
if (not a) and self._aPrev: self.toggle()
|
|
self._aPrev = a
|
|
b = self.btnB.value
|
|
if (not b) and self._bPrev: self.tap()
|
|
self._bPrev = b
|
|
now = time.monotonic_ns()
|
|
if now >= self._joyNext:
|
|
x = self.jx.value - 32768; y = self.jy.value - 32768
|
|
if JOY_INVERT_X: x = -x
|
|
if JOY_INVERT_Y: y = -y
|
|
if abs(y) > JOY_DEADZONE:
|
|
self.set_bpm(self.bpm + (1 if y > 0 else -1) * (5 if abs(y) > 26000 else 1))
|
|
self._joyNext = now + 70_000_000
|
|
elif abs(x) > JOY_DEADZONE:
|
|
self.goto(self.idx + (1 if x > 0 else -1)); self._joyNext = now + 350_000_000; return
|
|
else:
|
|
self._joyNext = now + 20_000_000
|
|
nowms = time.monotonic()
|
|
if nowms >= self._touchNext: # poll touch ~30x/s (the I2C read adds loop latency -> MIDI jitter)
|
|
self._touchNext = nowms + 0.033
|
|
pt = self.touch.read()
|
|
if pt:
|
|
self._touchSeen = nowms
|
|
if not self._touchDown:
|
|
self._touchDown = True; self._handle_tap(pt[0], pt[1])
|
|
elif self._touchDown and (nowms - self._touchSeen) > 0.14:
|
|
self._touchDown = False
|
|
# USB-MIDI in: any byte = a host is listening (heartbeat); also assemble SysEx (clock / pushed programs)
|
|
if self.midi_in is not None:
|
|
try: n = self.midi_in.readinto(self._mbuf)
|
|
except Exception: n = 0
|
|
if n:
|
|
self.last_midi_in = nowms
|
|
self._feed_midi(self._mbuf, n)
|
|
host = bool(self.last_midi_in) and (nowms - self.last_midi_in) < 1.0
|
|
if host != self.midi_host:
|
|
self.midi_host = host
|
|
if host and SPEAKER_AUTO_MUTE: self.spk.duty_cycle = 0 # auto-mute when the computer takes over
|
|
self.led_rest(); self.draw_icons()
|
|
uc = bool(getattr(supervisor.runtime, "usb_connected", True)) # connected to a computer?
|
|
if uc != self.usb_conn:
|
|
self.usb_conn = uc; self.draw_icons()
|
|
|
|
# ---------- drawing ----------
|
|
def draw_bpm(self): # lazy: skip the bitmap alloc if the displayed integer is unchanged
|
|
if self.bpm == self._displayed_bpm: return
|
|
self._displayed_bpm = self.bpm
|
|
self._place(self.g_bpm, str(self.bpm), 0, 44, C_TXT, C_BG, FONT_L, right_edge=WIDTH-12)
|
|
def draw_status(self): # set-list tab (tap=switch) + CONT toggle, above the item title
|
|
sl = self.setlists[self.sl]
|
|
# tab: playlist + position; muted = built-in (read-only), cyan = your own
|
|
self._place(self.g_idx, "%s %d/%d" % (sl['title'][:11], self.idx + 1, len(sl['items'])),
|
|
12, 118, C_MUTE if sl['builtin'] else C_CYAN, C_BG, FONT_S)
|
|
self._place(self.g_cont, "CONT", 0, 118, C_GREEN if self.continue_on else C_DIM, C_BG, FONT_S, right_edge=WIDTH-12)
|
|
# title turns red when edited (tap it to save/revert)
|
|
self._place(self.g_name, self.name[:20], 12, 134, C_RED if self._dirty else C_TXT, C_BG, FONT_M)
|
|
def draw_train(self): # ramp + gap-trainer indicators (symbol + params), when set
|
|
g = self.g_train
|
|
while len(g): g.pop()
|
|
x = 12; y = 100
|
|
if self.ramp:
|
|
up = self.ramp['amt'] >= 0
|
|
pts = [(0, 9), (12, 9), (12, 0)] if up else [(0, 0), (0, 9), (12, 9)] # rising / falling ramp
|
|
g.append(vectorio.Polygon(pixel_shader=solid(C_AMBER), points=pts, x=x, y=y)); x += 16
|
|
a = self.ramp['amt']; lbl = ("+%d" % a if a >= 0 else "%d" % a) + "/%db" % self.ramp['every']
|
|
tg, w, h = make_text(lbl, FONT_S, C_AMBER, C_BG); tg.x = x; tg.y = y; g.append(tg); x += w + 14
|
|
if self.trainer:
|
|
g.append(rect(x, y, 4, 9, C_CYAN)); g.append(rect(x + 6, y, 4, 9, C_DIM)) # play | rest
|
|
x += 14
|
|
tg, w, h = make_text("%d/%db" % (self.trainer['play'], self.trainer['mute']), FONT_S, C_CYAN, C_BG)
|
|
tg.x = x; tg.y = y; g.append(tg)
|
|
self.dirty = True
|
|
def draw_icons(self): # recolor the MIDI/USB icons by state (tear-free palette swap)
|
|
if self.ic_midi_pal is not None:
|
|
_recolor(self.ic_midi_pal, C_GREEN if self.midi_host else C_DIM, C_BG)
|
|
if self.ic_usb_pal is not None:
|
|
_recolor(self.ic_usb_pal, C_CYAN if self.usb_conn else C_DIM, C_BG)
|
|
self.dirty = True
|
|
def _fmt_t(self, s): # m:ss, or h:mm:ss past an hour
|
|
s = int(s)
|
|
return "%d:%02d:%02d" % (s // 3600, (s % 3600) // 60, s % 60) if s >= 3600 else "%d:%02d" % (s // 60, s % 60)
|
|
def draw_meters(self): # running time [of total] + bar [of total]; ~4x/s from run()
|
|
run = self.running and self.play_start is not None
|
|
mlen = self.lanes[0]['steps'] if self.lanes else 1
|
|
bpb = (self.lanes[0]['steps'] // max(1, self.lanes[0]['sub'])) if self.lanes else 4
|
|
el = (time.monotonic() - self._seg_start) if run else 0 # time within the current segment (resets with the bar)
|
|
mbars = max(0, self._m_steps - 1) // max(1, mlen) # bar containing THIS step (off-by-one fix vs 0.0.16)
|
|
cur = ("%d" % ((mbars % self.bars + 1) if self.bars else (mbars + 1))) if run else "-" # cycle 1..N
|
|
if self.bars: # track has a length (b<n>): show "X of TOTAL"
|
|
ts = "%s of %s" % (self._fmt_t(el), self._fmt_t(self.bars * bpb * 60.0 / self.bpm))
|
|
bs = "bar %s of %d" % (cur, self.bars)
|
|
else:
|
|
ts = self._fmt_t(el); bs = "bar %s" % cur
|
|
if ts != self._lastTs:
|
|
self._place(self.g_time, ts, 12, 50, C_TXT, C_BG, FONT_M); self._lastTs = ts
|
|
if bs != self._lastBs:
|
|
self._place(self.g_bar, bs, 12, 78, C_MUTE, C_BG, FONT_M); self._lastBs = bs
|
|
|
|
# ---------- pad grid (each lane = a row of step pads; playhead lit as it plays) ----------
|
|
def _padbase(self, L, s):
|
|
return 0 if L['mute'] else L['levels'][s]
|
|
def build_grid(self): # synchronous: kick off chunked rebuild and run to completion
|
|
self._grid_rebuild_start()
|
|
while self._grid_li is not None: self._grid_rebuild_step()
|
|
def _grid_rebuild_start(self): # tear down + gridlines + initial state for chunked rebuild
|
|
while len(self.g_grid): self.g_grid.pop()
|
|
self.lane_pads = []; self.lane_lit = []
|
|
gc.collect() # 64-128 vectorio allocs incoming - want a defragmented heap
|
|
n = min(len(self.lanes), MAXLANES)
|
|
top = GRID_TOP; rowh = min(40, ((LOG_TOP - 10) - top) // max(1, n))
|
|
px0 = 60; usable = WIDTH - 8 - px0 - 12; gridh = n * rowh
|
|
self._grid = {'top': top, 'rowh': rowh, 'px0': px0, 'usable': usable, 'n': n} # for touch hit-testing
|
|
m = self.lanes[0] if self.lanes else None
|
|
if m is not None: # vertical gridlines (cheap; one pass before chunked lanes)
|
|
mbeats = max(1, m['steps'] // max(1, m['sub']))
|
|
for bcol in range(mbeats):
|
|
self.g_grid.append(rect(px0 + 6 + (bcol * usable) // mbeats, top, 1, gridh, C_GRID))
|
|
self._grid_n = n
|
|
self._grid_geo = (top, rowh, px0, usable)
|
|
self._grid_li = 0 if n > 0 else None
|
|
self._grid_pi = 0; self._grid_lane_st = None; self._grid_pads = [] # start at lane 0, pad 0, no lane initialized yet
|
|
self.dirty = True
|
|
def _grid_rebuild_step(self): # PER-PAD chunk: build at most one rectangle, then yield
|
|
li = self._grid_li
|
|
if li is None: return
|
|
if li >= self._grid_n or li >= len(self.lanes):
|
|
self._grid_li = None; return # whole rebuild done -> main loop runs draw_log
|
|
L = self.lanes[li]
|
|
top, rowh, px0, usable = self._grid_geo
|
|
y = top + li * rowh; cy = y + rowh // 2
|
|
st = self._grid_lane_st
|
|
if st is None: # first chunk on this lane: draw the instrument label + cache the geometry
|
|
tg, w, h = make_text((L.get('sound', '') or '?')[:7], FONT_S, C_MUTE, C_BG)
|
|
tg.x = 8; tg.y = cy - h // 2; self.g_grid.append(tg)
|
|
steps = L['steps']; sub = L['sub']; stepw = max(1, usable // steps)
|
|
side = max(5, min(15, stepw - 1, rowh - 6)) # square edge for the main pulse
|
|
rad = max(2, min(side // 2, stepw // 2 - 1)) # smaller circle for subdivisions
|
|
self._grid_lane_st = (cy, steps, sub, stepw, side, rad)
|
|
self._grid_pi = 0; self._grid_pads = []; self.dirty = True
|
|
return # one chunk = "init this lane"; next iter does the first pad
|
|
cy_, steps, sub, stepw, side, rad = st
|
|
s = self._grid_pi
|
|
if s >= steps: # this lane finished; commit and advance to next
|
|
self.lane_pads.append(self._grid_pads); self.lane_lit.append(-1)
|
|
self._grid_pads = []; self._grid_lane_st = None; self._grid_li = li + 1
|
|
return
|
|
cxp = px0 + 6 + (s * usable) // steps # proportional -> beats line up across lanes
|
|
pal = self.pad_pal
|
|
if s % sub == 0:
|
|
p = vectorio.Rectangle(pixel_shader=pal, width=side, height=side, x=cxp - side // 2, y=cy_ - side // 2)
|
|
else:
|
|
p = vectorio.Circle(pixel_shader=pal, radius=rad, x=cxp, y=cy_)
|
|
p.color_index = self._padbase(L, s); self.g_grid.append(p); self._grid_pads.append(p)
|
|
self._grid_pi = s + 1
|
|
self.dirty = True
|
|
def _move_playhead(self, li, step):
|
|
pads = self.lane_pads[li]; prev = self.lane_lit[li]
|
|
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
|
|
if step < len(pads): pads[step].color_index = self._padbase(self.lanes[li], step) + 4
|
|
self.lane_lit[li] = step; self.dirty = True
|
|
def reset_playheads(self):
|
|
for li, pads in enumerate(self.lane_pads):
|
|
prev = self.lane_lit[li]
|
|
if 0 <= prev < len(pads): pads[prev].color_index = self._padbase(self.lanes[li], prev)
|
|
self.lane_lit[li] = -1
|
|
self.dirty = True
|
|
|
|
# ---------- practice history (saved to /history.json, next to programs.json) ----------
|
|
def _probe_write(self):
|
|
try:
|
|
with open("/.wtest", "w") as f: f.write("1")
|
|
try: os.remove("/.wtest")
|
|
except Exception: pass
|
|
return True
|
|
except OSError:
|
|
return False # editor mode: the computer owns the FS
|
|
def _load_log(self):
|
|
try:
|
|
with open("/history.json") as f: return json.load(f).get("log", [])
|
|
except Exception:
|
|
return []
|
|
def _save_log(self):
|
|
if not self.can_write: return
|
|
try:
|
|
with open("/history.json", "w") as f: json.dump({"log": self.log[:200]}, f)
|
|
except OSError:
|
|
self.can_write = False
|
|
def _start_play(self):
|
|
self.play_start = time.monotonic(); self.play_bpm = self.bpm; self.play_name = self.name
|
|
def _log_play(self):
|
|
if self.play_start is None: return
|
|
dur = int(time.monotonic() - self.play_start); self.play_start = None
|
|
if dur < MIN_LOG_SEC: return # skip plays under 5 seconds
|
|
mlen = self.lanes[0]['steps'] if self.lanes else 1
|
|
t = time.localtime()
|
|
e = {"t": "%02d:%02d" % (t.tm_hour, t.tm_min), "bpm": self.play_bpm,
|
|
"dur": dur, "bars": self._m_steps // max(1, mlen), "name": self.play_name,
|
|
"at": self._epoch_ms()} # epoch ms (when RTC set) = cross-half dedup key (sec 9)
|
|
self.log.insert(0, e)
|
|
del self.log[200:]; self._armed = None
|
|
self._save_log(); self.draw_log()
|
|
self._sync_send_log_one(e) # mirror this session to the editor (no-op if not armed)
|
|
def draw_log(self):
|
|
g = self.g_log
|
|
while len(g): g.pop()
|
|
self.log_rows = []
|
|
gc.collect() # several text bitmaps allocated below want a clean heap
|
|
hdr, w, h = make_text("PRACTICE LOG - THIS TRACK", FONT_S, C_MUTE, C_BG); hdr.x = 10; hdr.y = LOG_TOP; g.append(hdr)
|
|
rows = [(i, e) for i, e in enumerate(self.log) if e.get("name") == self.name] # current track only
|
|
if not rows:
|
|
tg, w, h = make_text("no plays over 5s yet", FONT_S, C_DIM, C_BG); tg.x = 10; tg.y = LOG_TOP + LOG_ROWH; g.append(tg)
|
|
self.dirty = True; return
|
|
y = LOG_TOP + LOG_ROWH + 2
|
|
for k in range(min(LOG_ROWS, len(rows))):
|
|
oi, e = rows[k]; armed = (oi == self._armed) # oi = index into self.log (for delete)
|
|
dur = "%d:%02d" % (e["dur"] // 60, e["dur"] % 60)
|
|
bars = e.get("bars", 0); bstr = (" %dbar" % bars) if bars else ""
|
|
line = "%s%s %3dbpm %s%s" % ("x " if armed else "", e.get("t", "--:--"), e["bpm"], dur, bstr)
|
|
tg, w, h = make_text(line, FONT_S, C_AMBER if armed else C_TXT, C_BG); tg.x = 10; tg.y = y; g.append(tg)
|
|
self.log_rows.append((y - 2, y + LOG_ROWH - 2, oi))
|
|
y += LOG_ROWH
|
|
self.dirty = True
|
|
def _tap_log(self, x, ty):
|
|
for y0, y1, idx in self.log_rows:
|
|
if y0 <= ty <= y1:
|
|
if self._armed == idx: del self.log[idx]; self._armed = None; self._save_log(); self.draw_log() # confirm delete
|
|
else: self._armed = idx; self.draw_log() # arm (tap again)
|
|
return
|
|
if self._armed is not None: self._armed = None; self.draw_log() # tapped elsewhere -> cancel
|
|
|
|
# ---------- USB-MIDI in: SysEx assembler (clock + editor-pushed programs) ----------
|
|
def _feed_midi(self, buf, n):
|
|
now_ns = time.monotonic_ns() if MIDI_CLOCK_IN else 0 # only timestamp when slave is enabled
|
|
for i in range(n):
|
|
b = buf[i]
|
|
if b == 0xF0: self._sx = bytearray(); self._sxon = True
|
|
elif b == 0xF7:
|
|
if self._sxon: self._handle_sysex(self._sx)
|
|
self._sxon = False
|
|
elif b == 0xF8 and MIDI_CLOCK_IN: self._slave_tick(now_ns) # 24 PPQN clock tick from a master
|
|
elif b == 0xFA and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() # Start
|
|
elif b == 0xFB and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_start() # Continue (no SPP -> treat as Start)
|
|
elif b == 0xFC and MIDI_CLOCK_IN_TRANSPORT and MIDI_CLOCK_IN: self._slave_stop() # Stop
|
|
elif b >= 0xF8: pass # other real-time (Active Sensing 0xFE etc.) - ignore
|
|
elif self._sxon:
|
|
if len(self._sx) < 60000: self._sx.append(b) # big enough for a pushed firmware (app.py)
|
|
else: self._sxon = False # overflow guard
|
|
def _slave_tick(self, now_ns): # one 24 PPQN tick: smooth the interval -> bpm
|
|
if self._clock_in_last_t == 0:
|
|
self._clock_in_last_t = now_ns; self._slaved = True; return # first tick: just record the timestamp
|
|
interval = now_ns - self._clock_in_last_t
|
|
self._clock_in_last_t = now_ns
|
|
# reject out-of-range intervals (30..300 BPM at 24 PPQN -> 8.33..83.3 ms per tick)
|
|
if interval < 8_300_000 or interval > 500_000_000: return
|
|
if self._clock_in_avg == 0: self._clock_in_avg = interval
|
|
else: self._clock_in_avg = (self._clock_in_avg * 7 + interval) // 8 # exponential smoothing, alpha = 1/8
|
|
new_bpm = max(5, min(300, int(60_000_000_000 // (self._clock_in_avg * 24))))
|
|
if new_bpm != self.bpm:
|
|
self.bpm = new_bpm; self._beat_ns = 60_000_000_000 // new_bpm; self._rebuild_dur_all()
|
|
self._slaved = True
|
|
def _slave_start(self): # master sent Start (or Continue) -> start playback
|
|
if not self.running:
|
|
self.running = True; self._reset_clock(); self._start_play()
|
|
self.led_rest(); self.draw_meters() # NOTE: do not echo 0xFA on output (we're slaved)
|
|
self._clock_in_last_t = 0; self._clock_in_avg = 0 # next tick re-establishes the smoothed interval
|
|
def _slave_stop(self): # master sent Stop -> stop playback
|
|
if self.running:
|
|
self.running = False; self.spk.duty_cycle = 0; self.reset_playheads(); self._log_play()
|
|
self.led_rest(); self.draw_meters()
|
|
self._clock_in_last_t = 0; self._clock_in_avg = 0; self._slaved = False
|
|
def _handle_sysex(self, sx):
|
|
if len(sx) < 2 or sx[0] != 0x7D: return # 0x7D = our (educational) manufacturer id
|
|
cmd = sx[1]
|
|
if cmd == 0x01 and len(sx) >= 8 and rtc is not None: # set clock: yr-2000, mo, dd, hh, mm, ss
|
|
try: rtc.RTC().datetime = time.struct_time((2000 + sx[2], sx[3], sx[4], sx[5], sx[6], sx[7], 0, -1, -1))
|
|
except Exception: pass
|
|
elif cmd == 0x02: # version query -> reply 0x03 + "<device_id>;<APP_VERSION>"
|
|
if self.midi: # old firmware sent bare APP_VERSION; editor parses "contains ';'?" for back-compat
|
|
payload = DEVICE_ID + ";" + APP_VERSION
|
|
self.midi.write(bytes([0xF0, 0x7D, 0x03]) + payload.encode() + bytes([0xF7]))
|
|
elif cmd == 0x40 or cmd == 0x41 or cmd == 0x42 or cmd == 0x43 or cmd == 0x44 or cmd == 0x45: # Live sync (see src/livesync.js)
|
|
try: text = "".join(chr(b) if 0x20 <= b < 0x7F else "" for b in sx[2:])
|
|
except Exception: return
|
|
origin = text.split(";", 1)[0] if text else ""
|
|
if origin == self._sync_origin: return # drop our own echoes (composite USB may loop)
|
|
self._sync_armed = True
|
|
if cmd == 0x40: # HELLO -> reply with our FULL + set-list library + practice log
|
|
self._sync_broadcast_full(); self._sync_send_setlists(); self._sync_send_log_batch()
|
|
elif cmd == 0x43: # BYE -> peer disconnected; stop heartbeats
|
|
self._sync_armed = False
|
|
elif cmd == 0x41: # FULL: origin;seq;running;sl;item;patch...
|
|
parts = text.split(";", 5)
|
|
if len(parts) >= 6:
|
|
try:
|
|
running = parts[2] == "1"; patch = parts[5]
|
|
self._sync_apply_full(running, patch)
|
|
except Exception: pass
|
|
elif cmd == 0x42: # DELTA: origin;seq;evt
|
|
parts = text.split(";", 2)
|
|
if len(parts) >= 3: self._sync_apply_delta(parts[2])
|
|
elif cmd == 0x44: # SLSYNC: origin;seq;json (set-list content)
|
|
parts = text.split(";", 2)
|
|
if len(parts) >= 3: self._sync_apply_setlists(parts[2])
|
|
elif cmd == 0x45: # LOGSYNC: origin;seq;json (practice entries)
|
|
parts = text.split(";", 2)
|
|
if len(parts) >= 3: self._sync_apply_log(parts[2])
|
|
elif cmd == 0x10: # write /programs.json (user playlists) pushed from the editor
|
|
try:
|
|
with open("/programs.json", "wb") as f: f.write(bytes(sx[2:]))
|
|
self.rebuild_setlists(); self.load(0) # built-ins untouched; show the refreshed lists
|
|
self._ack(True)
|
|
except Exception:
|
|
self._ack(False) # read-only (editor mode) etc.
|
|
# A/B firmware update, sent as small flow-controlled chunks (a single huge SysEx overruns the
|
|
# USB-MIDI input buffer and arrives corrupt). begin(0x21,len) -> data(0x22)* -> commit(0x23).
|
|
elif cmd == 0x21: # BEGIN firmware transfer: open the .mpy staging file
|
|
try:
|
|
try: self._fw.close()
|
|
except Exception: pass
|
|
self._fw = open("/app.new", "wb"); self._fw_n = 0
|
|
self._fw_pushing = True # silence Note On / Clock Out / Live-sync broadcasts during the push
|
|
self._ack(True)
|
|
except Exception: # read-only (editor mode) / no space
|
|
self._fw = None; self._fw_pushing = False; self._ack(False)
|
|
elif cmd == 0x22: # DATA: a base64 chunk (multiple of 4) -> decode -> append
|
|
try:
|
|
if self._fw is None or a2b_base64 is None: raise OSError()
|
|
self._fw.write(a2b_base64(bytes(sx[2:])))
|
|
self._fw.flush() # small, predictable per-chunk flush (no slow burst flushes later)
|
|
self._fw_n += 1
|
|
gc.collect() # SysEx assembler allocates a fresh bytearray per chunk;
|
|
self._ack(True) # GC every chunk so 600 chunks' worth of garbage doesn't accumulate
|
|
except Exception:
|
|
try: self._fw.close()
|
|
except Exception: pass
|
|
self._fw = None; self._fw_pushing = False; self._ack(False)
|
|
elif cmd == 0x23: # COMMIT: verify it's a CircuitPython .mpy, then A/B install
|
|
try:
|
|
try: self._fw.close()
|
|
except Exception: pass
|
|
self._fw = None; gc.collect()
|
|
with open("/app.new", "rb") as f: head = f.read(2)
|
|
if os.stat("/app.new")[6] < 4000 or len(head) < 2 or head[0] != 0x43 or head[1] != 0x06:
|
|
try: os.remove("/app.new") # not a CircuitPython mpy v6 -> reject, keep the working build
|
|
except OSError: pass
|
|
self._fw_pushing = False; self._ack(False); return
|
|
try: os.remove("/app.bak")
|
|
except OSError: pass
|
|
os.rename("/app.mpy", "/app.bak") # current build becomes the rollback
|
|
os.rename("/app.new", "/app.mpy")
|
|
open("/trial", "w").close() # arm the trial; the loader reverts if it won't boot
|
|
self._fw_pushing = False
|
|
self._ack(True); time.sleep(0.4); supervisor.reload()
|
|
except Exception: # catch ALL (read-only, MemoryError, ...) -> never brick
|
|
self._fw_pushing = False; self._ack(False)
|
|
def _ack(self, ok):
|
|
if self.midi: self.midi.write(bytes([0xF0, 0x7D, 0x7F if ok else 0x7E, 0xF7]))
|
|
|
|
def run(self):
|
|
if self.touch.addr is None:
|
|
print("GT911 touch not found")
|
|
boot = time.monotonic()
|
|
try: os.stat("/trial"); committed = False # we're a freshly-pushed build on trial
|
|
except OSError: committed = True
|
|
while True:
|
|
try:
|
|
self.tick(); self.poll()
|
|
if self._need_redraw: # post-seam fast pass: cheap header/status bits, runs immediately
|
|
self._need_redraw = False
|
|
self.draw_bpm(); self.draw_status(); self.draw_train(); self.draw_meters()
|
|
if self._heavy_redraw_at and time.monotonic() >= self._heavy_redraw_at:
|
|
self._heavy_redraw_at = 0 # post-seam slow pass: kick off the chunked rebuild
|
|
self._grid_rebuild_start(); self._heavy_log_pending = True
|
|
if self._grid_li is not None: # process ONE lane per loop iter -> tick() runs between lanes
|
|
self._grid_rebuild_step()
|
|
elif self._heavy_log_pending: # grid done -> draw_log (cheap-ish; also one shot)
|
|
self._heavy_log_pending = False; self.draw_log()
|
|
tnow = time.monotonic()
|
|
if tnow >= self._uiNext: # ~4x/s: tick the stopwatch + bar counter
|
|
self._uiNext = tnow + 0.25; self.draw_meters(); self.draw_bpm() # bpm follows the continuous ramp
|
|
if self._sync_armed and tnow >= self._sync_heartbeat_next:
|
|
self._sync_broadcast_full() # periodic FULL: device is the convergence authority
|
|
if not committed and tnow - boot > 5: # booted & ran fine for 5s -> confirm the update
|
|
try: os.remove("/trial")
|
|
except Exception: pass
|
|
committed = True
|
|
# Refresh display ~20x/s, skip ONLY when the MASTER lane's next step is within ~10ms (its alignment
|
|
# matters most musically; sub-lanes can take a ~few ms jitter without audible problem). Force-refresh
|
|
# after 200ms so titles + meters still feel live at fast subdivisions.
|
|
if self.dirty and tnow >= self._refreshNext:
|
|
safe = True
|
|
if self.running and self.lanes:
|
|
nb = self.lanes[0]['next'] # master only -> doesn't starve at fine subdivisions
|
|
safe = (nb - time.monotonic_ns()) > 10_000_000 or (tnow - self._lastRefresh) > 0.2
|
|
if safe:
|
|
if self.display.refresh(): self.dirty = False
|
|
self._lastRefresh = tnow; self._refreshNext = tnow + 0.05
|
|
else:
|
|
self._refreshNext = tnow + 0.003 # check again very soon; don't wait the 50ms
|
|
time.sleep(0.0005)
|
|
except MemoryError: # surface, gc, keep running (don't crash on a fragmented heap)
|
|
try: print("MemoryError: gc + continue")
|
|
except Exception: pass
|
|
gc.collect(); time.sleep(0.05)
|
|
except Exception as e: # any other transient error: log, continue
|
|
try: print("tick error:", e)
|
|
except Exception: pass
|
|
time.sleep(0.05)
|
|
|
|
App().run()
|