ZDDC/zddc/internal/cache/outbox.go
ZDDC 70d49ba111 fix(client): three bugs found by live smoke testing
Phase 3 + 4 live two-instance smoke tests against the synthetic
~/zddc-test-data fixture surfaced three real bugs that the unit
tests missed. All three are fixed in this commit.

1. walker: filenames with spaces/parens land on disk percent-encoded

   walkSubtree was passing the URL-encoded child URL (built via
   url.PathEscape) to fetchFileIfNeeded → cachePathFor, so a file
   named "Foo (IFI) - Bar.md" landed at <root>/.../Foo%20%28IFI%29
   %20-%20Bar.md on disk. Then purgeOrphans iterated os.ReadDir
   (which sees the encoded names) and compared against upstreamNames
   (decoded names from the listing JSON). Every fetched file was
   classified as an orphan and immediately deleted: a 180-file walk
   produced "fetched=180 purged=111" with only 70 files remaining.

   Fix: walker now maintains two parallel path strings — dirURL
   (URL-encoded for HTTP requests) and dirPath (decoded for disk
   keys). fetchFileIfNeeded, fetchListing, persistOnly, and
   purgeOrphans all take the decoded path. listingCachePathFor
   gets dirPath too. Smoke confirmed: dirs=29 files=180 fetched=179
   purged=0 (one file already cached from the user's GET that
   triggered the walk).

2. outbox: replay loop sleeps 5min after eager startup pass

   RunReplayLoop's idle-poll interval is 5min. After the eager
   startup pass with 0 entries, the loop sleeps 5min — even if a
   PUT-while-offline arrives 1 second later, replay won't fire for
   ~5 min. The cache returned 202 promptly but the queued write sat
   on disk until either a 5min nap elapsed or another PUT happened.

   Fix: Outbox gains a wake chan (buffered=1, drop-on-full).
   Enqueue posts to it after writing meta.json. RunReplayLoop selects
   on wake alongside the timer, so a new offline write triggers an
   immediate replay attempt. Smoke confirmed: PUT queued at T+0,
   master back at T+3, replay completes at T+3 (was previously a
   30s wait through the timer-based poll).

3. master: PUT/DELETE didn't honor If-Unmodified-Since

   The cache's outbox sends If-Unmodified-Since: <cached-mtime> on
   replay so the master can reject conflicting writes with 412. The
   master's checkIfMatch only evaluated If-Match (ETag-based), so
   the cache's mtime-based precondition was silently ignored. Result:
   an offline PUT staged before an external mod would clobber the
   newer external content on replay — silent data loss in the exact
   scenario the outbox is designed to detect.

   Fix: checkIfMatch now also evaluates If-Unmodified-Since per
   RFC 7232 §3.4, returning 412 when the file's current mtime is
   strictly later than the header value (1-second resolution to
   match HTTP-Date precision). Smoke confirmed: cache GET → external
   mod via direct file write → cache offline PUT → master back →
   replay sends IUS → master 412 → outbox entry renamed to
   <id>.conflict-<RFC3339>/ → master content preserved (the
   external mod, not the stale offline write).

Also added an info-level "outbox: replay attempt" log to tryReplay
so an operator watching the cache logs sees the replay loop is
alive even when every entry defers (transport error). Previously
the loop was silent unless a replay actually completed (200) or
conflicted (412).

go vet + go test ./... + go test -race ./internal/{cache,auth,handler}/...
all green. Synthetic ~/zddc-test-data fixture (553 files, 144 PDFs)
exercises the walker against realistic ZDDC filenames including
spaces, parens, and accented characters that the unit tests'
"a.txt" / "b.txt" inputs never hit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 09:34:07 -05:00

455 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cache
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"sort"
"sync"
"time"
)
// OutboxDir is the leaf directory under the cache root where queued
// offline writes are persisted. Each entry is itself a directory:
//
// <cache-root>/.zddc-outbox/<id>/meta.json — request metadata
// <cache-root>/.zddc-outbox/<id>/body.bin — request body
//
// Conflicted replays (412 Precondition Failed) are renamed to
//
// <cache-root>/.zddc-outbox/<id>.conflict-<rfc3339>/
//
// Operators clear the conflict directory after manually reconciling.
const OutboxDir = ".zddc-outbox"
// outboxConflictPrefix is appended to a conflict-renamed entry. The
// timestamp is RFC3339 (filename-safe — uses ":" via percent-encoding
// when written to the filename). The full literal is
// `.conflict-<RFC3339>` and is grep-friendly across the whole tree.
const outboxConflictPrefix = ".conflict-"
// MaxOutboxBodyBytes caps the per-entry body size persisted to disk.
// Larger writes still go through online — only the offline-queue path
// has this limit. Default 256 MiB matches MaxWriteBytes on master.
const MaxOutboxBodyBytes = 256 * 1024 * 1024
// ReplayInterval governs how often the background loop attempts
// replay when the outbox is non-empty. Empty-outbox iterations sleep
// 10× longer so an idle client doesn't spin.
const (
ReplayInterval = 30 * time.Second
ReplayIdleInterval = 5 * time.Minute
)
// OutboxEntry is the persisted shape of a queued write. The body
// lives in a sibling file (body.bin) so it can be streamed back to
// upstream at replay time without buffering the whole thing in
// memory.
type OutboxEntry struct {
// ID is the directory name. Sortable by lexical order so replays
// happen in queue order: <unix-nano>-<random>.
ID string `json:"id"`
// Method, RawURI, ContentType identify the request.
Method string `json:"method"`
RawURI string `json:"raw_uri"`
ContentType string `json:"content_type,omitempty"`
// BaseModTime is the local cache file's mtime at queue time. Used
// at replay to send If-Unmodified-Since so the master rejects the
// write (412) if its file changed since we observed it. Zero =
// no precondition (e.g. PUT to a path with no prior cache).
BaseModTime time.Time `json:"base_mod_time,omitempty"`
QueuedAt time.Time `json:"queued_at"`
}
// Outbox owns the on-disk write-queue + the background replay loop.
// Constructed from cache.New when client mode is active; nil
// otherwise. Safe for concurrent Enqueue calls.
type Outbox struct {
cache *Cache
dir string
mu sync.Mutex // serializes file creation; replay holds it briefly per-entry
// wake is a non-blocking signal channel. Enqueue posts to it
// (drop-on-full) so RunReplayLoop reacts to new entries within
// ms, not after its idle-poll interval. Buffered=1 so a burst of
// enqueues collapses to a single wake.
wake chan struct{}
}
// NewOutbox opens (and creates if missing) the outbox directory under
// cfg.Root. Mode 0700 — single-user-trust at the FS layer.
func NewOutbox(c *Cache) (*Outbox, error) {
dir := filepath.Join(c.root, OutboxDir)
if err := os.MkdirAll(dir, 0o700); err != nil {
return nil, fmt.Errorf("create outbox dir: %w", err)
}
_ = os.Chmod(dir, 0o700)
return &Outbox{cache: c, dir: dir, wake: make(chan struct{}, 1)}, nil
}
// Dir exposes the on-disk path for tests / diagnostics.
func (o *Outbox) Dir() string { return o.dir }
// Enqueue persists the request to disk so it can be replayed when
// upstream is reachable again. Returns the created entry. The body
// reader is fully consumed; callers should not assume r.Body is
// readable afterwards.
//
// baseModTime is the local cache file's mtime at the moment we
// decided to queue (zero when none — e.g. PUT to a never-cached
// path). It becomes the If-Unmodified-Since precondition at replay.
func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, error) {
o.mu.Lock()
defer o.mu.Unlock()
id, err := newOutboxID()
if err != nil {
return nil, fmt.Errorf("outbox id: %w", err)
}
entryDir := filepath.Join(o.dir, id)
if err := os.MkdirAll(entryDir, 0o700); err != nil {
return nil, fmt.Errorf("create entry dir: %w", err)
}
// Body first — failure cleans up the empty entry dir.
bodyPath := filepath.Join(entryDir, "body.bin")
bf, err := os.OpenFile(bodyPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
if err != nil {
_ = os.RemoveAll(entryDir)
return nil, fmt.Errorf("create body file: %w", err)
}
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
if _, err := io.Copy(bf, limited); err != nil {
_ = bf.Close()
_ = os.RemoveAll(entryDir)
return nil, fmt.Errorf("save body: %w", err)
}
if err := bf.Close(); err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
entry := OutboxEntry{
ID: id,
Method: r.Method,
RawURI: r.URL.RequestURI(),
ContentType: r.Header.Get("Content-Type"),
BaseModTime: baseModTime,
QueuedAt: time.Now().UTC().Truncate(time.Second),
}
metaBytes, err := json.MarshalIndent(entry, "", " ")
if err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
metaPath := filepath.Join(entryDir, "meta.json")
if err := writeFileAtomic(metaPath, metaBytes, 0o600); err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
// Signal the replay loop. Non-blocking — buffered=1 collapses
// a burst of enqueues to a single wake. If the loop is busy
// replaying, our entry will be picked up on the next pass.
select {
case o.wake <- struct{}{}:
default:
}
return &entry, nil
}
// Pending returns the queued entries in lexical-ID order (which is
// queue order, since IDs lead with unix-nano). Entries that fail to
// parse are skipped. Conflict-renamed entries are excluded.
func (o *Outbox) Pending() ([]OutboxEntry, error) {
dirEntries, err := os.ReadDir(o.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var ids []string
for _, e := range dirEntries {
if !e.IsDir() {
continue
}
name := e.Name()
if isConflictName(name) {
continue
}
ids = append(ids, name)
}
sort.Strings(ids)
out := make([]OutboxEntry, 0, len(ids))
for _, id := range ids {
entry, err := o.loadEntry(id)
if err != nil {
slog.Debug("outbox: skip unreadable entry", "id", id, "err", err)
continue
}
out = append(out, *entry)
}
return out, nil
}
// loadEntry reads + parses meta.json for the given ID.
func (o *Outbox) loadEntry(id string) (*OutboxEntry, error) {
metaPath := filepath.Join(o.dir, id, "meta.json")
bytes, err := os.ReadFile(metaPath)
if err != nil {
return nil, err
}
var entry OutboxEntry
if err := json.Unmarshal(bytes, &entry); err != nil {
return nil, err
}
return &entry, nil
}
// Replay iterates Pending() in order and fires each entry against
// upstream. Returns the count of successfully-replayed entries and
// the count of conflict-renamed entries. Network errors leave the
// entry in place for the next iteration.
func (o *Outbox) Replay(ctx context.Context) (replayed, conflicts int, err error) {
pending, err := o.Pending()
if err != nil {
return 0, 0, err
}
for _, entry := range pending {
if ctx.Err() != nil {
return replayed, conflicts, ctx.Err()
}
outcome, err := o.replayOne(ctx, entry)
switch outcome {
case replayDone:
replayed++
case replayConflict:
conflicts++
case replayDefer:
// transient — leave entry in place
if err != nil {
slog.Debug("outbox: defer entry", "id", entry.ID, "err", err)
}
}
}
return replayed, conflicts, nil
}
type replayOutcome int
const (
replayDone replayOutcome = iota
replayConflict
replayDefer
)
// replayOne fires a single outbox entry. Outcomes:
//
// - 2xx → entry directory deleted (replayDone).
// - 412 Precondition Failed → entry renamed to <id>.conflict-<ts>/
// (replayConflict). Operator manually reconciles.
// - 4xx other than 412 → entry deleted (replayDone with a warn log).
// The master rejected the request for a reason other than
// concurrency; retrying won't help.
// - 5xx, network errors → entry left in place (replayDefer).
func (o *Outbox) replayOne(ctx context.Context, entry OutboxEntry) (replayOutcome, error) {
bodyPath := filepath.Join(o.dir, entry.ID, "body.bin")
bf, err := os.Open(bodyPath)
if err != nil {
// Body missing — entry is malformed. Move to conflict so we
// don't loop on it forever.
_ = o.markConflict(entry.ID)
return replayConflict, fmt.Errorf("open body: %w", err)
}
defer bf.Close()
target := o.cache.upstream + entry.RawURI
req, err := http.NewRequestWithContext(ctx, entry.Method, target, bf)
if err != nil {
_ = o.markConflict(entry.ID)
return replayConflict, err
}
if entry.ContentType != "" {
req.Header.Set("Content-Type", entry.ContentType)
}
if !entry.BaseModTime.IsZero() {
req.Header.Set("If-Unmodified-Since", entry.BaseModTime.UTC().Format(http.TimeFormat))
}
if o.cache.bearer != "" {
req.Header.Set("Authorization", "Bearer "+o.cache.bearer)
}
resp, err := o.cache.client.Do(req)
if err != nil {
// Transport / network error — defer.
return replayDefer, err
}
defer resp.Body.Close()
_, _ = io.Copy(io.Discard, resp.Body) // drain so connection reuses
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 300:
// Success. For PUT, refresh the local cache from upstream so
// the user's view stays consistent. (We don't have the new
// body here; just remove the cached entry so next read
// fetches fresh.)
if entry.Method == http.MethodPut || entry.Method == http.MethodDelete {
if path, ok := o.cache.cachePathForURI(entry.RawURI); ok {
_ = os.Remove(path)
}
}
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
slog.Info("outbox: replayed", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
return replayDone, nil
case resp.StatusCode == http.StatusPreconditionFailed:
// Conflict — base version on master changed since we observed.
_ = o.markConflict(entry.ID)
slog.Warn("outbox: conflict", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI)
return replayConflict, nil
case resp.StatusCode >= 400 && resp.StatusCode < 500:
// Permanent rejection (auth, bad request, etc). Drop the
// entry — retrying won't help.
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
slog.Warn("outbox: dropped", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
return replayDone, nil
default:
// 5xx — defer for next iteration.
return replayDefer, fmt.Errorf("upstream status %d", resp.StatusCode)
}
}
// markConflict renames the entry directory to <id>.conflict-<ts>/.
// Idempotent: if the rename target already exists, a counter is
// appended. Best-effort.
func (o *Outbox) markConflict(id string) error {
src := filepath.Join(o.dir, id)
ts := time.Now().UTC().Format("20060102T150405Z")
dst := filepath.Join(o.dir, id+outboxConflictPrefix+ts)
if _, err := os.Stat(dst); err == nil {
dst = dst + "-" + shortRandom()
}
return os.Rename(src, dst)
}
// RunReplayLoop is the background goroutine that drives Replay on a
// schedule. Stops when ctx is cancelled. Safe to call once per
// process; not designed for multiple concurrent loops over the same
// outbox.
//
// Wakes on three signals:
// - timer (ReplayInterval when entries pending, ReplayIdleInterval when idle)
// - Enqueue posting to o.wake (instant reaction to new offline writes)
// - context cancel (shutdown)
func (o *Outbox) RunReplayLoop(ctx context.Context) {
// Eagerly attempt replay at startup so a re-launched client
// catches up before any user request fires.
o.tryReplay(ctx)
for {
interval := ReplayIdleInterval
pending, err := o.Pending()
if err == nil && len(pending) > 0 {
interval = ReplayInterval
}
select {
case <-ctx.Done():
return
case <-time.After(interval):
o.tryReplay(ctx)
case <-o.wake:
// New entry arrived while we were idle. Replay
// immediately rather than waiting for the timer.
o.tryReplay(ctx)
}
}
}
// tryReplay is a single replay pass with a bounded timeout. Errors
// logged at debug; per-entry outcomes logged inside Replay/replayOne.
// The pass itself logs at info when it runs against >0 entries so an
// operator watching the log sees the replay loop is alive even if
// every attempt defers.
func (o *Outbox) tryReplay(ctx context.Context) {
tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
pending, _ := o.Pending()
if len(pending) > 0 {
slog.Info("outbox: replay attempt", "pending", len(pending))
}
replayed, conflicts, err := o.Replay(tCtx)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
slog.Debug("outbox: replay error", "err", err)
}
if replayed > 0 || conflicts > 0 {
slog.Info("outbox: replay pass complete", "replayed", replayed, "conflicts", conflicts)
}
}
// isConflictName reports whether the directory name is an outbox
// conflict-renamed entry (`<id>.conflict-<ts>` form).
func isConflictName(name string) bool {
for i := 0; i+len(outboxConflictPrefix) <= len(name); i++ {
if name[i:i+len(outboxConflictPrefix)] == outboxConflictPrefix {
return true
}
}
return false
}
// newOutboxID returns a sortable-by-time ID with a random suffix:
// <unix-nano-base16>-<6-hex>. Lexical sort ≅ chronological for the
// foreseeable lifetime of the outbox (until ~year 2262).
func newOutboxID() (string, error) {
var randBuf [3]byte
if _, err := rand.Read(randBuf[:]); err != nil {
return "", err
}
return fmt.Sprintf("%016x-%s", time.Now().UnixNano(), hex.EncodeToString(randBuf[:])), nil
}
// shortRandom returns 4 random hex chars. Used to disambiguate
// conflict-renamed directories that collide on the same RFC3339
// timestamp.
func shortRandom() string {
var b [2]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}
// writeFileAtomic writes data to path via temp + rename in the same
// directory. Mode applied to the final file.
func writeFileAtomic(path string, data []byte, mode os.FileMode) error {
tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*")
if err != nil {
return err
}
tmpName := tmp.Name()
if _, err := tmp.Write(data); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Chmod(mode); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
return os.Rename(tmpName, path)
}