Phase 3 + 4 live two-instance smoke tests against the synthetic
~/zddc-test-data fixture surfaced three real bugs that the unit
tests missed. All three are fixed in this commit.
1. walker: filenames with spaces/parens land on disk percent-encoded
walkSubtree was passing the URL-encoded child URL (built via
url.PathEscape) to fetchFileIfNeeded → cachePathFor, so a file
named "Foo (IFI) - Bar.md" landed at <root>/.../Foo%20%28IFI%29
%20-%20Bar.md on disk. Then purgeOrphans iterated os.ReadDir
(which sees the encoded names) and compared against upstreamNames
(decoded names from the listing JSON). Every fetched file was
classified as an orphan and immediately deleted: a 180-file walk
produced "fetched=180 purged=111" with only 70 files remaining.
Fix: walker now maintains two parallel path strings — dirURL
(URL-encoded for HTTP requests) and dirPath (decoded for disk
keys). fetchFileIfNeeded, fetchListing, persistOnly, and
purgeOrphans all take the decoded path. listingCachePathFor
gets dirPath too. Smoke confirmed: dirs=29 files=180 fetched=179
purged=0 (one file already cached from the user's GET that
triggered the walk).
2. outbox: replay loop sleeps 5min after eager startup pass
RunReplayLoop's idle-poll interval is 5min. After the eager
startup pass with 0 entries, the loop sleeps 5min — even if a
PUT-while-offline arrives 1 second later, replay won't fire for
~5 min. The cache returned 202 promptly but the queued write sat
on disk until either a 5min nap elapsed or another PUT happened.
Fix: Outbox gains a wake chan (buffered=1, drop-on-full).
Enqueue posts to it after writing meta.json. RunReplayLoop selects
on wake alongside the timer, so a new offline write triggers an
immediate replay attempt. Smoke confirmed: PUT queued at T+0,
master back at T+3, replay completes at T+3 (was previously a
30s wait through the timer-based poll).
3. master: PUT/DELETE didn't honor If-Unmodified-Since
The cache's outbox sends If-Unmodified-Since: <cached-mtime> on
replay so the master can reject conflicting writes with 412. The
master's checkIfMatch only evaluated If-Match (ETag-based), so
the cache's mtime-based precondition was silently ignored. Result:
an offline PUT staged before an external mod would clobber the
newer external content on replay — silent data loss in the exact
scenario the outbox is designed to detect.
Fix: checkIfMatch now also evaluates If-Unmodified-Since per
RFC 7232 §3.4, returning 412 when the file's current mtime is
strictly later than the header value (1-second resolution to
match HTTP-Date precision). Smoke confirmed: cache GET → external
mod via direct file write → cache offline PUT → master back →
replay sends IUS → master 412 → outbox entry renamed to
<id>.conflict-<RFC3339>/ → master content preserved (the
external mod, not the stale offline write).
Also added an info-level "outbox: replay attempt" log to tryReplay
so an operator watching the cache logs sees the replay loop is
alive even when every entry defers (transport error). Previously
the loop was silent unless a replay actually completed (200) or
conflicted (412).
go vet + go test ./... + go test -race ./internal/{cache,auth,handler}/...
all green. Synthetic ~/zddc-test-data fixture (553 files, 144 PDFs)
exercises the walker against realistic ZDDC filenames including
spaces, parens, and accented characters that the unit tests'
"a.txt" / "b.txt" inputs never hit.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
455 lines
14 KiB
Go
455 lines
14 KiB
Go
package cache
|
||
|
||
import (
|
||
"context"
|
||
"crypto/rand"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"sort"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// OutboxDir is the leaf directory under the cache root where queued
|
||
// offline writes are persisted. Each entry is itself a directory:
|
||
//
|
||
// <cache-root>/.zddc-outbox/<id>/meta.json — request metadata
|
||
// <cache-root>/.zddc-outbox/<id>/body.bin — request body
|
||
//
|
||
// Conflicted replays (412 Precondition Failed) are renamed to
|
||
//
|
||
// <cache-root>/.zddc-outbox/<id>.conflict-<rfc3339>/
|
||
//
|
||
// Operators clear the conflict directory after manually reconciling.
|
||
const OutboxDir = ".zddc-outbox"
|
||
|
||
// outboxConflictPrefix is appended to a conflict-renamed entry. The
|
||
// timestamp is RFC3339 (filename-safe — uses ":" via percent-encoding
|
||
// when written to the filename). The full literal is
|
||
// `.conflict-<RFC3339>` and is grep-friendly across the whole tree.
|
||
const outboxConflictPrefix = ".conflict-"
|
||
|
||
// MaxOutboxBodyBytes caps the per-entry body size persisted to disk.
|
||
// Larger writes still go through online — only the offline-queue path
|
||
// has this limit. Default 256 MiB matches MaxWriteBytes on master.
|
||
const MaxOutboxBodyBytes = 256 * 1024 * 1024
|
||
|
||
// ReplayInterval governs how often the background loop attempts
|
||
// replay when the outbox is non-empty. Empty-outbox iterations sleep
|
||
// 10× longer so an idle client doesn't spin.
|
||
const (
|
||
ReplayInterval = 30 * time.Second
|
||
ReplayIdleInterval = 5 * time.Minute
|
||
)
|
||
|
||
// OutboxEntry is the persisted shape of a queued write. The body
|
||
// lives in a sibling file (body.bin) so it can be streamed back to
|
||
// upstream at replay time without buffering the whole thing in
|
||
// memory.
|
||
type OutboxEntry struct {
|
||
// ID is the directory name. Sortable by lexical order so replays
|
||
// happen in queue order: <unix-nano>-<random>.
|
||
ID string `json:"id"`
|
||
|
||
// Method, RawURI, ContentType identify the request.
|
||
Method string `json:"method"`
|
||
RawURI string `json:"raw_uri"`
|
||
ContentType string `json:"content_type,omitempty"`
|
||
|
||
// BaseModTime is the local cache file's mtime at queue time. Used
|
||
// at replay to send If-Unmodified-Since so the master rejects the
|
||
// write (412) if its file changed since we observed it. Zero =
|
||
// no precondition (e.g. PUT to a path with no prior cache).
|
||
BaseModTime time.Time `json:"base_mod_time,omitempty"`
|
||
|
||
QueuedAt time.Time `json:"queued_at"`
|
||
}
|
||
|
||
// Outbox owns the on-disk write-queue + the background replay loop.
|
||
// Constructed from cache.New when client mode is active; nil
|
||
// otherwise. Safe for concurrent Enqueue calls.
|
||
type Outbox struct {
|
||
cache *Cache
|
||
dir string
|
||
mu sync.Mutex // serializes file creation; replay holds it briefly per-entry
|
||
|
||
// wake is a non-blocking signal channel. Enqueue posts to it
|
||
// (drop-on-full) so RunReplayLoop reacts to new entries within
|
||
// ms, not after its idle-poll interval. Buffered=1 so a burst of
|
||
// enqueues collapses to a single wake.
|
||
wake chan struct{}
|
||
}
|
||
|
||
// NewOutbox opens (and creates if missing) the outbox directory under
|
||
// cfg.Root. Mode 0700 — single-user-trust at the FS layer.
|
||
func NewOutbox(c *Cache) (*Outbox, error) {
|
||
dir := filepath.Join(c.root, OutboxDir)
|
||
if err := os.MkdirAll(dir, 0o700); err != nil {
|
||
return nil, fmt.Errorf("create outbox dir: %w", err)
|
||
}
|
||
_ = os.Chmod(dir, 0o700)
|
||
return &Outbox{cache: c, dir: dir, wake: make(chan struct{}, 1)}, nil
|
||
}
|
||
|
||
// Dir exposes the on-disk path for tests / diagnostics.
|
||
func (o *Outbox) Dir() string { return o.dir }
|
||
|
||
// Enqueue persists the request to disk so it can be replayed when
|
||
// upstream is reachable again. Returns the created entry. The body
|
||
// reader is fully consumed; callers should not assume r.Body is
|
||
// readable afterwards.
|
||
//
|
||
// baseModTime is the local cache file's mtime at the moment we
|
||
// decided to queue (zero when none — e.g. PUT to a never-cached
|
||
// path). It becomes the If-Unmodified-Since precondition at replay.
|
||
func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, error) {
|
||
o.mu.Lock()
|
||
defer o.mu.Unlock()
|
||
|
||
id, err := newOutboxID()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("outbox id: %w", err)
|
||
}
|
||
entryDir := filepath.Join(o.dir, id)
|
||
if err := os.MkdirAll(entryDir, 0o700); err != nil {
|
||
return nil, fmt.Errorf("create entry dir: %w", err)
|
||
}
|
||
|
||
// Body first — failure cleans up the empty entry dir.
|
||
bodyPath := filepath.Join(entryDir, "body.bin")
|
||
bf, err := os.OpenFile(bodyPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
|
||
if err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, fmt.Errorf("create body file: %w", err)
|
||
}
|
||
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
|
||
if _, err := io.Copy(bf, limited); err != nil {
|
||
_ = bf.Close()
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, fmt.Errorf("save body: %w", err)
|
||
}
|
||
if err := bf.Close(); err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, err
|
||
}
|
||
|
||
entry := OutboxEntry{
|
||
ID: id,
|
||
Method: r.Method,
|
||
RawURI: r.URL.RequestURI(),
|
||
ContentType: r.Header.Get("Content-Type"),
|
||
BaseModTime: baseModTime,
|
||
QueuedAt: time.Now().UTC().Truncate(time.Second),
|
||
}
|
||
metaBytes, err := json.MarshalIndent(entry, "", " ")
|
||
if err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, err
|
||
}
|
||
metaPath := filepath.Join(entryDir, "meta.json")
|
||
if err := writeFileAtomic(metaPath, metaBytes, 0o600); err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, err
|
||
}
|
||
// Signal the replay loop. Non-blocking — buffered=1 collapses
|
||
// a burst of enqueues to a single wake. If the loop is busy
|
||
// replaying, our entry will be picked up on the next pass.
|
||
select {
|
||
case o.wake <- struct{}{}:
|
||
default:
|
||
}
|
||
return &entry, nil
|
||
}
|
||
|
||
// Pending returns the queued entries in lexical-ID order (which is
|
||
// queue order, since IDs lead with unix-nano). Entries that fail to
|
||
// parse are skipped. Conflict-renamed entries are excluded.
|
||
func (o *Outbox) Pending() ([]OutboxEntry, error) {
|
||
dirEntries, err := os.ReadDir(o.dir)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
return nil, nil
|
||
}
|
||
return nil, err
|
||
}
|
||
var ids []string
|
||
for _, e := range dirEntries {
|
||
if !e.IsDir() {
|
||
continue
|
||
}
|
||
name := e.Name()
|
||
if isConflictName(name) {
|
||
continue
|
||
}
|
||
ids = append(ids, name)
|
||
}
|
||
sort.Strings(ids)
|
||
out := make([]OutboxEntry, 0, len(ids))
|
||
for _, id := range ids {
|
||
entry, err := o.loadEntry(id)
|
||
if err != nil {
|
||
slog.Debug("outbox: skip unreadable entry", "id", id, "err", err)
|
||
continue
|
||
}
|
||
out = append(out, *entry)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// loadEntry reads + parses meta.json for the given ID.
|
||
func (o *Outbox) loadEntry(id string) (*OutboxEntry, error) {
|
||
metaPath := filepath.Join(o.dir, id, "meta.json")
|
||
bytes, err := os.ReadFile(metaPath)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
var entry OutboxEntry
|
||
if err := json.Unmarshal(bytes, &entry); err != nil {
|
||
return nil, err
|
||
}
|
||
return &entry, nil
|
||
}
|
||
|
||
// Replay iterates Pending() in order and fires each entry against
|
||
// upstream. Returns the count of successfully-replayed entries and
|
||
// the count of conflict-renamed entries. Network errors leave the
|
||
// entry in place for the next iteration.
|
||
func (o *Outbox) Replay(ctx context.Context) (replayed, conflicts int, err error) {
|
||
pending, err := o.Pending()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
for _, entry := range pending {
|
||
if ctx.Err() != nil {
|
||
return replayed, conflicts, ctx.Err()
|
||
}
|
||
outcome, err := o.replayOne(ctx, entry)
|
||
switch outcome {
|
||
case replayDone:
|
||
replayed++
|
||
case replayConflict:
|
||
conflicts++
|
||
case replayDefer:
|
||
// transient — leave entry in place
|
||
if err != nil {
|
||
slog.Debug("outbox: defer entry", "id", entry.ID, "err", err)
|
||
}
|
||
}
|
||
}
|
||
return replayed, conflicts, nil
|
||
}
|
||
|
||
type replayOutcome int
|
||
|
||
const (
|
||
replayDone replayOutcome = iota
|
||
replayConflict
|
||
replayDefer
|
||
)
|
||
|
||
// replayOne fires a single outbox entry. Outcomes:
|
||
//
|
||
// - 2xx → entry directory deleted (replayDone).
|
||
// - 412 Precondition Failed → entry renamed to <id>.conflict-<ts>/
|
||
// (replayConflict). Operator manually reconciles.
|
||
// - 4xx other than 412 → entry deleted (replayDone with a warn log).
|
||
// The master rejected the request for a reason other than
|
||
// concurrency; retrying won't help.
|
||
// - 5xx, network errors → entry left in place (replayDefer).
|
||
func (o *Outbox) replayOne(ctx context.Context, entry OutboxEntry) (replayOutcome, error) {
|
||
bodyPath := filepath.Join(o.dir, entry.ID, "body.bin")
|
||
bf, err := os.Open(bodyPath)
|
||
if err != nil {
|
||
// Body missing — entry is malformed. Move to conflict so we
|
||
// don't loop on it forever.
|
||
_ = o.markConflict(entry.ID)
|
||
return replayConflict, fmt.Errorf("open body: %w", err)
|
||
}
|
||
defer bf.Close()
|
||
|
||
target := o.cache.upstream + entry.RawURI
|
||
req, err := http.NewRequestWithContext(ctx, entry.Method, target, bf)
|
||
if err != nil {
|
||
_ = o.markConflict(entry.ID)
|
||
return replayConflict, err
|
||
}
|
||
if entry.ContentType != "" {
|
||
req.Header.Set("Content-Type", entry.ContentType)
|
||
}
|
||
if !entry.BaseModTime.IsZero() {
|
||
req.Header.Set("If-Unmodified-Since", entry.BaseModTime.UTC().Format(http.TimeFormat))
|
||
}
|
||
if o.cache.bearer != "" {
|
||
req.Header.Set("Authorization", "Bearer "+o.cache.bearer)
|
||
}
|
||
|
||
resp, err := o.cache.client.Do(req)
|
||
if err != nil {
|
||
// Transport / network error — defer.
|
||
return replayDefer, err
|
||
}
|
||
defer resp.Body.Close()
|
||
_, _ = io.Copy(io.Discard, resp.Body) // drain so connection reuses
|
||
|
||
switch {
|
||
case resp.StatusCode >= 200 && resp.StatusCode < 300:
|
||
// Success. For PUT, refresh the local cache from upstream so
|
||
// the user's view stays consistent. (We don't have the new
|
||
// body here; just remove the cached entry so next read
|
||
// fetches fresh.)
|
||
if entry.Method == http.MethodPut || entry.Method == http.MethodDelete {
|
||
if path, ok := o.cache.cachePathForURI(entry.RawURI); ok {
|
||
_ = os.Remove(path)
|
||
}
|
||
}
|
||
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
|
||
slog.Info("outbox: replayed", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
|
||
return replayDone, nil
|
||
|
||
case resp.StatusCode == http.StatusPreconditionFailed:
|
||
// Conflict — base version on master changed since we observed.
|
||
_ = o.markConflict(entry.ID)
|
||
slog.Warn("outbox: conflict", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI)
|
||
return replayConflict, nil
|
||
|
||
case resp.StatusCode >= 400 && resp.StatusCode < 500:
|
||
// Permanent rejection (auth, bad request, etc). Drop the
|
||
// entry — retrying won't help.
|
||
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
|
||
slog.Warn("outbox: dropped", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
|
||
return replayDone, nil
|
||
|
||
default:
|
||
// 5xx — defer for next iteration.
|
||
return replayDefer, fmt.Errorf("upstream status %d", resp.StatusCode)
|
||
}
|
||
}
|
||
|
||
// markConflict renames the entry directory to <id>.conflict-<ts>/.
|
||
// Idempotent: if the rename target already exists, a counter is
|
||
// appended. Best-effort.
|
||
func (o *Outbox) markConflict(id string) error {
|
||
src := filepath.Join(o.dir, id)
|
||
ts := time.Now().UTC().Format("20060102T150405Z")
|
||
dst := filepath.Join(o.dir, id+outboxConflictPrefix+ts)
|
||
if _, err := os.Stat(dst); err == nil {
|
||
dst = dst + "-" + shortRandom()
|
||
}
|
||
return os.Rename(src, dst)
|
||
}
|
||
|
||
// RunReplayLoop is the background goroutine that drives Replay on a
|
||
// schedule. Stops when ctx is cancelled. Safe to call once per
|
||
// process; not designed for multiple concurrent loops over the same
|
||
// outbox.
|
||
//
|
||
// Wakes on three signals:
|
||
// - timer (ReplayInterval when entries pending, ReplayIdleInterval when idle)
|
||
// - Enqueue posting to o.wake (instant reaction to new offline writes)
|
||
// - context cancel (shutdown)
|
||
func (o *Outbox) RunReplayLoop(ctx context.Context) {
|
||
// Eagerly attempt replay at startup so a re-launched client
|
||
// catches up before any user request fires.
|
||
o.tryReplay(ctx)
|
||
for {
|
||
interval := ReplayIdleInterval
|
||
pending, err := o.Pending()
|
||
if err == nil && len(pending) > 0 {
|
||
interval = ReplayInterval
|
||
}
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-time.After(interval):
|
||
o.tryReplay(ctx)
|
||
case <-o.wake:
|
||
// New entry arrived while we were idle. Replay
|
||
// immediately rather than waiting for the timer.
|
||
o.tryReplay(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
// tryReplay is a single replay pass with a bounded timeout. Errors
|
||
// logged at debug; per-entry outcomes logged inside Replay/replayOne.
|
||
// The pass itself logs at info when it runs against >0 entries so an
|
||
// operator watching the log sees the replay loop is alive even if
|
||
// every attempt defers.
|
||
func (o *Outbox) tryReplay(ctx context.Context) {
|
||
tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
||
defer cancel()
|
||
pending, _ := o.Pending()
|
||
if len(pending) > 0 {
|
||
slog.Info("outbox: replay attempt", "pending", len(pending))
|
||
}
|
||
replayed, conflicts, err := o.Replay(tCtx)
|
||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||
slog.Debug("outbox: replay error", "err", err)
|
||
}
|
||
if replayed > 0 || conflicts > 0 {
|
||
slog.Info("outbox: replay pass complete", "replayed", replayed, "conflicts", conflicts)
|
||
}
|
||
}
|
||
|
||
// isConflictName reports whether the directory name is an outbox
|
||
// conflict-renamed entry (`<id>.conflict-<ts>` form).
|
||
func isConflictName(name string) bool {
|
||
for i := 0; i+len(outboxConflictPrefix) <= len(name); i++ {
|
||
if name[i:i+len(outboxConflictPrefix)] == outboxConflictPrefix {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// newOutboxID returns a sortable-by-time ID with a random suffix:
|
||
// <unix-nano-base16>-<6-hex>. Lexical sort ≅ chronological for the
|
||
// foreseeable lifetime of the outbox (until ~year 2262).
|
||
func newOutboxID() (string, error) {
|
||
var randBuf [3]byte
|
||
if _, err := rand.Read(randBuf[:]); err != nil {
|
||
return "", err
|
||
}
|
||
return fmt.Sprintf("%016x-%s", time.Now().UnixNano(), hex.EncodeToString(randBuf[:])), nil
|
||
}
|
||
|
||
// shortRandom returns 4 random hex chars. Used to disambiguate
|
||
// conflict-renamed directories that collide on the same RFC3339
|
||
// timestamp.
|
||
func shortRandom() string {
|
||
var b [2]byte
|
||
_, _ = rand.Read(b[:])
|
||
return hex.EncodeToString(b[:])
|
||
}
|
||
|
||
// writeFileAtomic writes data to path via temp + rename in the same
|
||
// directory. Mode applied to the final file.
|
||
func writeFileAtomic(path string, data []byte, mode os.FileMode) error {
|
||
tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
tmpName := tmp.Name()
|
||
if _, err := tmp.Write(data); err != nil {
|
||
_ = tmp.Close()
|
||
_ = os.Remove(tmpName)
|
||
return err
|
||
}
|
||
if err := tmp.Chmod(mode); err != nil {
|
||
_ = tmp.Close()
|
||
_ = os.Remove(tmpName)
|
||
return err
|
||
}
|
||
if err := tmp.Close(); err != nil {
|
||
_ = os.Remove(tmpName)
|
||
return err
|
||
}
|
||
return os.Rename(tmpName, path)
|
||
}
|