ZDDC/zddc/internal/cache/outbox.go
ZDDC 8a049ca2a4 feat(client): outbox — offline write queue + replay with If-Unmodified-Since
PUT / POST / DELETE in client mode now work end-to-end. Online: the
cache layer forwards to upstream and (on success) drops any cached
entry for the path so the next read fetches fresh. PUT/DELETE include
If-Unmodified-Since derived from the cached file's mtime so the master
can reject conflicting writes with 412 Precondition Failed.

When upstream is unreachable, the request is captured in the outbox
at <root>/.zddc-outbox/<id>/ — directory per queued write, mode 0700,
containing meta.json (method, RawURI, Content-Type, base mtime,
queued-at) and body.bin (request body, capped at 256 MiB). The client
gets 202 Accepted + X-ZDDC-Cache: queued and a JSON envelope.

A background replay loop started by runClient processes the queue:
- 2xx → delete entry; drop cached path so next read fetches fresh
- 412 → rename to <id>.conflict-<RFC3339>/ for manual reconciliation
       (body + meta intact for inspection or re-submit)
- 4xx other → drop (retry won't help; logged at WARN)
- 5xx / transport error → leave for next pass

Replay schedule: eager at startup, then 30s while pending falling
back to 5min while idle. Loop honors graceful-shutdown context.
Disabled in --mode=proxy (proxy persists nothing by design — offline
writes return 503 instead of queueing).

Outbox IDs are <unix-nano-base16>-<hex-random> so lex-sort = queue
order; concurrent enqueues never collide. Conflict-rename appends a
4-char random suffix on the unlikely same-second collision.

The local cache is intentionally not updated for offline writes:
until upstream confirms the user reads still see the upstream-cached
version (or 503 if uncached). Trade-off: no "did my queued write
actually win?" ambiguity, at the cost of not seeing one's own
offline edits immediately. Phase 5 will surface .conflict-<ts>/
directories in browse views.

Tests (20 new in outbox_test.go, 5 new in cache_test.go covering
the write path): NewOutbox creates 0700 dir, Enqueue persists meta
+ body, Pending returns lex-sorted entries excluding conflicts,
Replay deletes on 2xx / renames on 412 / leaves on transport error
/ leaves on 5xx / drops on 4xx-other, IUS sent only for PUT/DELETE
with base mtime, query string preserved, ServeHTTP online write
forwards + evicts cache, ServeHTTP offline write queues with 202,
ServeHTTP offline + no outbox returns 503, ServeHTTP PUT sends IUS
from cached mtime, oversize body rejected, IDs lex-sortable,
RunReplayLoop stops on context cancel, concurrent Enqueue 30×
no collisions. Full suite + go vet clean.

Doc updates: zddc/README.md gains a "Writes (online + offline
outbox)" subsection covering both paths and replay outcomes;
"What client mode is NOT, yet" now lists only conflict UI and
multi-tenancy. AGENTS.md client-mode pipeline gains writes +
mirror-mode bullets. ARCHITECTURE.md adds a "Writes: outbox +
offline replay" subsection with the trade-off rationale and the
phase-5-deferred conflict UI hand-off.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 08:20:07 -05:00

426 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cache
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"sort"
"sync"
"time"
)
// OutboxDir is the leaf directory under the cache root where queued
// offline writes are persisted. Each entry is itself a directory:
//
// <cache-root>/.zddc-outbox/<id>/meta.json — request metadata
// <cache-root>/.zddc-outbox/<id>/body.bin — request body
//
// Conflicted replays (412 Precondition Failed) are renamed to
//
// <cache-root>/.zddc-outbox/<id>.conflict-<rfc3339>/
//
// Operators clear the conflict directory after manually reconciling.
const OutboxDir = ".zddc-outbox"
// outboxConflictPrefix is appended to a conflict-renamed entry. The
// timestamp is RFC3339 (filename-safe — uses ":" via percent-encoding
// when written to the filename). The full literal is
// `.conflict-<RFC3339>` and is grep-friendly across the whole tree.
const outboxConflictPrefix = ".conflict-"
// MaxOutboxBodyBytes caps the per-entry body size persisted to disk.
// Larger writes still go through online — only the offline-queue path
// has this limit. Default 256 MiB matches MaxWriteBytes on master.
const MaxOutboxBodyBytes = 256 * 1024 * 1024
// ReplayInterval governs how often the background loop attempts
// replay when the outbox is non-empty. Empty-outbox iterations sleep
// 10× longer so an idle client doesn't spin.
const (
ReplayInterval = 30 * time.Second
ReplayIdleInterval = 5 * time.Minute
)
// OutboxEntry is the persisted shape of a queued write. The body
// lives in a sibling file (body.bin) so it can be streamed back to
// upstream at replay time without buffering the whole thing in
// memory.
type OutboxEntry struct {
// ID is the directory name. Sortable by lexical order so replays
// happen in queue order: <unix-nano>-<random>.
ID string `json:"id"`
// Method, RawURI, ContentType identify the request.
Method string `json:"method"`
RawURI string `json:"raw_uri"`
ContentType string `json:"content_type,omitempty"`
// BaseModTime is the local cache file's mtime at queue time. Used
// at replay to send If-Unmodified-Since so the master rejects the
// write (412) if its file changed since we observed it. Zero =
// no precondition (e.g. PUT to a path with no prior cache).
BaseModTime time.Time `json:"base_mod_time,omitempty"`
QueuedAt time.Time `json:"queued_at"`
}
// Outbox owns the on-disk write-queue + the background replay loop.
// Constructed from cache.New when client mode is active; nil
// otherwise. Safe for concurrent Enqueue calls.
type Outbox struct {
cache *Cache
dir string
mu sync.Mutex // serializes file creation; replay holds it briefly per-entry
}
// NewOutbox opens (and creates if missing) the outbox directory under
// cfg.Root. Mode 0700 — single-user-trust at the FS layer.
func NewOutbox(c *Cache) (*Outbox, error) {
dir := filepath.Join(c.root, OutboxDir)
if err := os.MkdirAll(dir, 0o700); err != nil {
return nil, fmt.Errorf("create outbox dir: %w", err)
}
_ = os.Chmod(dir, 0o700)
return &Outbox{cache: c, dir: dir}, nil
}
// Dir exposes the on-disk path for tests / diagnostics.
func (o *Outbox) Dir() string { return o.dir }
// Enqueue persists the request to disk so it can be replayed when
// upstream is reachable again. Returns the created entry. The body
// reader is fully consumed; callers should not assume r.Body is
// readable afterwards.
//
// baseModTime is the local cache file's mtime at the moment we
// decided to queue (zero when none — e.g. PUT to a never-cached
// path). It becomes the If-Unmodified-Since precondition at replay.
func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, error) {
o.mu.Lock()
defer o.mu.Unlock()
id, err := newOutboxID()
if err != nil {
return nil, fmt.Errorf("outbox id: %w", err)
}
entryDir := filepath.Join(o.dir, id)
if err := os.MkdirAll(entryDir, 0o700); err != nil {
return nil, fmt.Errorf("create entry dir: %w", err)
}
// Body first — failure cleans up the empty entry dir.
bodyPath := filepath.Join(entryDir, "body.bin")
bf, err := os.OpenFile(bodyPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
if err != nil {
_ = os.RemoveAll(entryDir)
return nil, fmt.Errorf("create body file: %w", err)
}
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
if _, err := io.Copy(bf, limited); err != nil {
_ = bf.Close()
_ = os.RemoveAll(entryDir)
return nil, fmt.Errorf("save body: %w", err)
}
if err := bf.Close(); err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
entry := OutboxEntry{
ID: id,
Method: r.Method,
RawURI: r.URL.RequestURI(),
ContentType: r.Header.Get("Content-Type"),
BaseModTime: baseModTime,
QueuedAt: time.Now().UTC().Truncate(time.Second),
}
metaBytes, err := json.MarshalIndent(entry, "", " ")
if err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
metaPath := filepath.Join(entryDir, "meta.json")
if err := writeFileAtomic(metaPath, metaBytes, 0o600); err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
return &entry, nil
}
// Pending returns the queued entries in lexical-ID order (which is
// queue order, since IDs lead with unix-nano). Entries that fail to
// parse are skipped. Conflict-renamed entries are excluded.
func (o *Outbox) Pending() ([]OutboxEntry, error) {
dirEntries, err := os.ReadDir(o.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var ids []string
for _, e := range dirEntries {
if !e.IsDir() {
continue
}
name := e.Name()
if isConflictName(name) {
continue
}
ids = append(ids, name)
}
sort.Strings(ids)
out := make([]OutboxEntry, 0, len(ids))
for _, id := range ids {
entry, err := o.loadEntry(id)
if err != nil {
slog.Debug("outbox: skip unreadable entry", "id", id, "err", err)
continue
}
out = append(out, *entry)
}
return out, nil
}
// loadEntry reads + parses meta.json for the given ID.
func (o *Outbox) loadEntry(id string) (*OutboxEntry, error) {
metaPath := filepath.Join(o.dir, id, "meta.json")
bytes, err := os.ReadFile(metaPath)
if err != nil {
return nil, err
}
var entry OutboxEntry
if err := json.Unmarshal(bytes, &entry); err != nil {
return nil, err
}
return &entry, nil
}
// Replay iterates Pending() in order and fires each entry against
// upstream. Returns the count of successfully-replayed entries and
// the count of conflict-renamed entries. Network errors leave the
// entry in place for the next iteration.
func (o *Outbox) Replay(ctx context.Context) (replayed, conflicts int, err error) {
pending, err := o.Pending()
if err != nil {
return 0, 0, err
}
for _, entry := range pending {
if ctx.Err() != nil {
return replayed, conflicts, ctx.Err()
}
outcome, err := o.replayOne(ctx, entry)
switch outcome {
case replayDone:
replayed++
case replayConflict:
conflicts++
case replayDefer:
// transient — leave entry in place
if err != nil {
slog.Debug("outbox: defer entry", "id", entry.ID, "err", err)
}
}
}
return replayed, conflicts, nil
}
type replayOutcome int
const (
replayDone replayOutcome = iota
replayConflict
replayDefer
)
// replayOne fires a single outbox entry. Outcomes:
//
// - 2xx → entry directory deleted (replayDone).
// - 412 Precondition Failed → entry renamed to <id>.conflict-<ts>/
// (replayConflict). Operator manually reconciles.
// - 4xx other than 412 → entry deleted (replayDone with a warn log).
// The master rejected the request for a reason other than
// concurrency; retrying won't help.
// - 5xx, network errors → entry left in place (replayDefer).
func (o *Outbox) replayOne(ctx context.Context, entry OutboxEntry) (replayOutcome, error) {
bodyPath := filepath.Join(o.dir, entry.ID, "body.bin")
bf, err := os.Open(bodyPath)
if err != nil {
// Body missing — entry is malformed. Move to conflict so we
// don't loop on it forever.
_ = o.markConflict(entry.ID)
return replayConflict, fmt.Errorf("open body: %w", err)
}
defer bf.Close()
target := o.cache.upstream + entry.RawURI
req, err := http.NewRequestWithContext(ctx, entry.Method, target, bf)
if err != nil {
_ = o.markConflict(entry.ID)
return replayConflict, err
}
if entry.ContentType != "" {
req.Header.Set("Content-Type", entry.ContentType)
}
if !entry.BaseModTime.IsZero() {
req.Header.Set("If-Unmodified-Since", entry.BaseModTime.UTC().Format(http.TimeFormat))
}
if o.cache.bearer != "" {
req.Header.Set("Authorization", "Bearer "+o.cache.bearer)
}
resp, err := o.cache.client.Do(req)
if err != nil {
// Transport / network error — defer.
return replayDefer, err
}
defer resp.Body.Close()
_, _ = io.Copy(io.Discard, resp.Body) // drain so connection reuses
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 300:
// Success. For PUT, refresh the local cache from upstream so
// the user's view stays consistent. (We don't have the new
// body here; just remove the cached entry so next read
// fetches fresh.)
if entry.Method == http.MethodPut || entry.Method == http.MethodDelete {
if path, ok := o.cache.cachePathForURI(entry.RawURI); ok {
_ = os.Remove(path)
}
}
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
slog.Info("outbox: replayed", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
return replayDone, nil
case resp.StatusCode == http.StatusPreconditionFailed:
// Conflict — base version on master changed since we observed.
_ = o.markConflict(entry.ID)
slog.Warn("outbox: conflict", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI)
return replayConflict, nil
case resp.StatusCode >= 400 && resp.StatusCode < 500:
// Permanent rejection (auth, bad request, etc). Drop the
// entry — retrying won't help.
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
slog.Warn("outbox: dropped", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
return replayDone, nil
default:
// 5xx — defer for next iteration.
return replayDefer, fmt.Errorf("upstream status %d", resp.StatusCode)
}
}
// markConflict renames the entry directory to <id>.conflict-<ts>/.
// Idempotent: if the rename target already exists, a counter is
// appended. Best-effort.
func (o *Outbox) markConflict(id string) error {
src := filepath.Join(o.dir, id)
ts := time.Now().UTC().Format("20060102T150405Z")
dst := filepath.Join(o.dir, id+outboxConflictPrefix+ts)
if _, err := os.Stat(dst); err == nil {
dst = dst + "-" + shortRandom()
}
return os.Rename(src, dst)
}
// RunReplayLoop is the background goroutine that drives Replay on a
// schedule. Stops when ctx is cancelled. Safe to call once per
// process; not designed for multiple concurrent loops over the same
// outbox.
func (o *Outbox) RunReplayLoop(ctx context.Context) {
// Eagerly attempt replay at startup so a re-launched client
// catches up before any user request fires.
o.tryReplay(ctx)
for {
interval := ReplayIdleInterval
pending, err := o.Pending()
if err == nil && len(pending) > 0 {
interval = ReplayInterval
}
select {
case <-ctx.Done():
return
case <-time.After(interval):
o.tryReplay(ctx)
}
}
}
// tryReplay is a single replay pass with a bounded timeout. Errors
// logged at debug; per-entry outcomes logged inside Replay/replayOne.
func (o *Outbox) tryReplay(ctx context.Context) {
tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
replayed, conflicts, err := o.Replay(tCtx)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
slog.Debug("outbox: replay error", "err", err)
}
if replayed > 0 || conflicts > 0 {
slog.Info("outbox: replay pass complete", "replayed", replayed, "conflicts", conflicts)
}
}
// isConflictName reports whether the directory name is an outbox
// conflict-renamed entry (`<id>.conflict-<ts>` form).
func isConflictName(name string) bool {
for i := 0; i+len(outboxConflictPrefix) <= len(name); i++ {
if name[i:i+len(outboxConflictPrefix)] == outboxConflictPrefix {
return true
}
}
return false
}
// newOutboxID returns a sortable-by-time ID with a random suffix:
// <unix-nano-base16>-<6-hex>. Lexical sort ≅ chronological for the
// foreseeable lifetime of the outbox (until ~year 2262).
func newOutboxID() (string, error) {
var randBuf [3]byte
if _, err := rand.Read(randBuf[:]); err != nil {
return "", err
}
return fmt.Sprintf("%016x-%s", time.Now().UnixNano(), hex.EncodeToString(randBuf[:])), nil
}
// shortRandom returns 4 random hex chars. Used to disambiguate
// conflict-renamed directories that collide on the same RFC3339
// timestamp.
func shortRandom() string {
var b [2]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}
// writeFileAtomic writes data to path via temp + rename in the same
// directory. Mode applied to the final file.
func writeFileAtomic(path string, data []byte, mode os.FileMode) error {
tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*")
if err != nil {
return err
}
tmpName := tmp.Name()
if _, err := tmp.Write(data); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Chmod(mode); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
return os.Rename(tmpName, path)
}