ZDDC/zddc/internal/cache/outbox.go
2026-06-11 13:32:31 -05:00

455 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cache
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"sort"
"sync"
"time"
)
// OutboxDir is the leaf directory under the cache root where queued
// offline writes are persisted. Each entry is itself a directory:
//
// <cache-root>/.zddc-outbox/<id>/meta.json — request metadata
// <cache-root>/.zddc-outbox/<id>/body.bin — request body
//
// Conflicted replays (412 Precondition Failed) are renamed to
//
// <cache-root>/.zddc-outbox/<id>.conflict-<rfc3339>/
//
// Operators clear the conflict directory after manually reconciling.
const OutboxDir = ".zddc-outbox"
// outboxConflictPrefix is appended to a conflict-renamed entry. The
// timestamp is RFC3339 (filename-safe — uses ":" via percent-encoding
// when written to the filename). The full literal is
// `.conflict-<RFC3339>` and is grep-friendly across the whole tree.
const outboxConflictPrefix = ".conflict-"
// MaxOutboxBodyBytes caps the per-entry body size persisted to disk.
// Larger writes still go through online — only the offline-queue path
// has this limit. Default 256 MiB matches MaxWriteBytes on master.
const MaxOutboxBodyBytes = 256 * 1024 * 1024
// ReplayInterval governs how often the background loop attempts
// replay when the outbox is non-empty. Empty-outbox iterations sleep
// 10× longer so an idle client doesn't spin.
const (
ReplayInterval = 30 * time.Second
ReplayIdleInterval = 5 * time.Minute
)
// OutboxEntry is the persisted shape of a queued write. The body
// lives in a sibling file (body.bin) so it can be streamed back to
// upstream at replay time without buffering the whole thing in
// memory.
type OutboxEntry struct {
// ID is the directory name. Sortable by lexical order so replays
// happen in queue order: <unix-nano>-<random>.
ID string `json:"id"`
// Method, RawURI, ContentType identify the request.
Method string `json:"method"`
RawURI string `json:"raw_uri"`
ContentType string `json:"content_type,omitempty"`
// BaseModTime is the local cache file's mtime at queue time. Used
// at replay to send If-Unmodified-Since so the master rejects the
// write (412) if its file changed since we observed it. Zero =
// no precondition (e.g. PUT to a path with no prior cache).
BaseModTime time.Time `json:"base_mod_time,omitempty"`
QueuedAt time.Time `json:"queued_at"`
}
// Outbox owns the on-disk write-queue + the background replay loop.
// Constructed from cache.New when client mode is active; nil
// otherwise. Safe for concurrent Enqueue calls.
type Outbox struct {
cache *Cache
dir string
mu sync.Mutex // serializes file creation; replay holds it briefly per-entry
// wake is a non-blocking signal channel. Enqueue posts to it
// (drop-on-full) so RunReplayLoop reacts to new entries within
// ms, not after its idle-poll interval. Buffered=1 so a burst of
// enqueues collapses to a single wake.
wake chan struct{}
}
// NewOutbox opens (and creates if missing) the outbox directory under
// cfg.Root. Mode 0700 — single-user-trust at the FS layer.
func NewOutbox(c *Cache) (*Outbox, error) {
dir := filepath.Join(c.root, OutboxDir)
if err := os.MkdirAll(dir, 0o700); err != nil {
return nil, fmt.Errorf("create outbox dir: %w", err)
}
_ = os.Chmod(dir, 0o700)
return &Outbox{cache: c, dir: dir, wake: make(chan struct{}, 1)}, nil
}
// Dir exposes the on-disk path for tests / diagnostics.
func (o *Outbox) Dir() string { return o.dir }
// Enqueue persists the request to disk so it can be replayed when
// upstream is reachable again. Returns the created entry. The body
// reader is fully consumed; callers should not assume r.Body is
// readable afterwards.
//
// baseModTime is the local cache file's mtime at the moment we
// decided to queue (zero when none — e.g. PUT to a never-cached
// path). It becomes the If-Unmodified-Since precondition at replay.
func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, error) {
o.mu.Lock()
defer o.mu.Unlock()
id, err := newOutboxID()
if err != nil {
return nil, fmt.Errorf("outbox id: %w", err)
}
entryDir := filepath.Join(o.dir, id)
if err := os.MkdirAll(entryDir, 0o700); err != nil {
return nil, fmt.Errorf("create entry dir: %w", err)
}
// Body first — failure cleans up the empty entry dir.
bodyPath := filepath.Join(entryDir, "body.bin")
bf, err := os.OpenFile(bodyPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
if err != nil {
_ = os.RemoveAll(entryDir)
return nil, fmt.Errorf("create body file: %w", err)
}
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
if _, err := io.Copy(bf, limited); err != nil {
_ = bf.Close()
_ = os.RemoveAll(entryDir)
return nil, fmt.Errorf("save body: %w", err)
}
if err := bf.Close(); err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
entry := OutboxEntry{
ID: id,
Method: r.Method,
RawURI: r.URL.RequestURI(),
ContentType: r.Header.Get("Content-Type"),
BaseModTime: baseModTime,
QueuedAt: time.Now().UTC().Truncate(time.Second),
}
metaBytes, err := json.MarshalIndent(entry, "", " ")
if err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
metaPath := filepath.Join(entryDir, "meta.json")
if err := writeFileAtomic(metaPath, metaBytes, 0o600); err != nil {
_ = os.RemoveAll(entryDir)
return nil, err
}
// Signal the replay loop. Non-blocking — buffered=1 collapses
// a burst of enqueues to a single wake. If the loop is busy
// replaying, our entry will be picked up on the next pass.
select {
case o.wake <- struct{}{}:
default:
}
return &entry, nil
}
// Pending returns the queued entries in lexical-ID order (which is
// queue order, since IDs lead with unix-nano). Entries that fail to
// parse are skipped. Conflict-renamed entries are excluded.
func (o *Outbox) Pending() ([]OutboxEntry, error) {
dirEntries, err := os.ReadDir(o.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var ids []string
for _, e := range dirEntries {
if !e.IsDir() {
continue
}
name := e.Name()
if isConflictName(name) {
continue
}
ids = append(ids, name)
}
sort.Strings(ids)
out := make([]OutboxEntry, 0, len(ids))
for _, id := range ids {
entry, err := o.loadEntry(id)
if err != nil {
slog.Debug("outbox: skip unreadable entry", "id", id, "err", err)
continue
}
out = append(out, *entry)
}
return out, nil
}
// loadEntry reads + parses meta.json for the given ID.
func (o *Outbox) loadEntry(id string) (*OutboxEntry, error) {
metaPath := filepath.Join(o.dir, id, "meta.json")
bytes, err := os.ReadFile(metaPath)
if err != nil {
return nil, err
}
var entry OutboxEntry
if err := json.Unmarshal(bytes, &entry); err != nil {
return nil, err
}
return &entry, nil
}
// Replay iterates Pending() in order and fires each entry against
// upstream. Returns the count of successfully-replayed entries and
// the count of conflict-renamed entries. Network errors leave the
// entry in place for the next iteration.
func (o *Outbox) Replay(ctx context.Context) (replayed, conflicts int, err error) {
pending, err := o.Pending()
if err != nil {
return 0, 0, err
}
for _, entry := range pending {
if ctx.Err() != nil {
return replayed, conflicts, ctx.Err()
}
outcome, err := o.replayOne(ctx, entry)
switch outcome {
case replayDone:
replayed++
case replayConflict:
conflicts++
case replayDefer:
// transient — leave entry in place
if err != nil {
slog.Debug("outbox: defer entry", "id", entry.ID, "err", err)
}
}
}
return replayed, conflicts, nil
}
type replayOutcome int
const (
replayDone replayOutcome = iota
replayConflict
replayDefer
)
// replayOne fires a single outbox entry. Outcomes:
//
// - 2xx → entry directory deleted (replayDone).
// - 412 Precondition Failed → entry renamed to <id>.conflict-<ts>/
// (replayConflict). Operator manually reconciles.
// - 4xx other than 412 → entry deleted (replayDone with a warn log).
// The master rejected the request for a reason other than
// concurrency; retrying won't help.
// - 5xx, network errors → entry left in place (replayDefer).
func (o *Outbox) replayOne(ctx context.Context, entry OutboxEntry) (replayOutcome, error) {
bodyPath := filepath.Join(o.dir, entry.ID, "body.bin")
bf, err := os.Open(bodyPath)
if err != nil {
// Body missing — entry is malformed. Move to conflict so we
// don't loop on it forever.
_ = o.markConflict(entry.ID)
return replayConflict, fmt.Errorf("open body: %w", err)
}
defer bf.Close()
target := o.cache.upstream + entry.RawURI
req, err := http.NewRequestWithContext(ctx, entry.Method, target, bf)
if err != nil {
_ = o.markConflict(entry.ID)
return replayConflict, err
}
if entry.ContentType != "" {
req.Header.Set("Content-Type", entry.ContentType)
}
if !entry.BaseModTime.IsZero() {
req.Header.Set("If-Unmodified-Since", entry.BaseModTime.UTC().Format(http.TimeFormat))
}
if o.cache.bearer != "" {
req.Header.Set("Authorization", "Bearer "+o.cache.bearer)
}
resp, err := o.cache.client.Do(req)
if err != nil {
// Transport / network error — defer.
return replayDefer, err
}
defer resp.Body.Close()
_, _ = io.Copy(io.Discard, resp.Body) // drain so connection reuses
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 300:
// Success. For PUT, refresh the local cache from upstream so
// the user's view stays consistent. (We don't have the new
// body here; just remove the cached entry so next read
// fetches fresh.)
if entry.Method == http.MethodPut || entry.Method == http.MethodDelete {
if path, ok := o.cache.cachePathForURI(entry.RawURI); ok {
_ = os.Remove(path)
}
}
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
slog.Info("outbox: replayed", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
return replayDone, nil
case resp.StatusCode == http.StatusPreconditionFailed:
// Conflict — base version on master changed since we observed.
_ = o.markConflict(entry.ID)
slog.Warn("outbox: conflict", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI)
return replayConflict, nil
case resp.StatusCode >= 400 && resp.StatusCode < 500:
// Permanent rejection (auth, bad request, etc). Drop the
// entry — retrying won't help.
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
slog.Warn("outbox: dropped", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
return replayDone, nil
default:
// 5xx — defer for next iteration.
return replayDefer, fmt.Errorf("upstream status %d", resp.StatusCode)
}
}
// markConflict renames the entry directory to <id>.conflict-<ts>/.
// Idempotent: if the rename target already exists, a counter is
// appended. Best-effort.
func (o *Outbox) markConflict(id string) error {
src := filepath.Join(o.dir, id)
ts := time.Now().UTC().Format("20060102T150405Z")
dst := filepath.Join(o.dir, id+outboxConflictPrefix+ts)
if _, err := os.Stat(dst); err == nil {
dst = dst + "-" + shortRandom()
}
return os.Rename(src, dst)
}
// RunReplayLoop is the background goroutine that drives Replay on a
// schedule. Stops when ctx is cancelled. Safe to call once per
// process; not designed for multiple concurrent loops over the same
// outbox.
//
// Wakes on three signals:
// - timer (ReplayInterval when entries pending, ReplayIdleInterval when idle)
// - Enqueue posting to o.wake (instant reaction to new offline writes)
// - context cancel (shutdown)
func (o *Outbox) RunReplayLoop(ctx context.Context) {
// Eagerly attempt replay at startup so a re-launched client
// catches up before any user request fires.
o.tryReplay(ctx)
for {
interval := ReplayIdleInterval
pending, err := o.Pending()
if err == nil && len(pending) > 0 {
interval = ReplayInterval
}
select {
case <-ctx.Done():
return
case <-time.After(interval):
o.tryReplay(ctx)
case <-o.wake:
// New entry arrived while we were idle. Replay
// immediately rather than waiting for the timer.
o.tryReplay(ctx)
}
}
}
// tryReplay is a single replay pass with a bounded timeout. Errors
// logged at debug; per-entry outcomes logged inside Replay/replayOne.
// The pass itself logs at info when it runs against >0 entries so an
// operator watching the log sees the replay loop is alive even if
// every attempt defers.
func (o *Outbox) tryReplay(ctx context.Context) {
tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
pending, _ := o.Pending()
if len(pending) > 0 {
slog.Info("outbox: replay attempt", "pending", len(pending))
}
replayed, conflicts, err := o.Replay(tCtx)
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
slog.Debug("outbox: replay error", "err", err)
}
if replayed > 0 || conflicts > 0 {
slog.Info("outbox: replay pass complete", "replayed", replayed, "conflicts", conflicts)
}
}
// isConflictName reports whether the directory name is an outbox
// conflict-renamed entry (`<id>.conflict-<ts>` form).
func isConflictName(name string) bool {
for i := 0; i+len(outboxConflictPrefix) <= len(name); i++ {
if name[i:i+len(outboxConflictPrefix)] == outboxConflictPrefix {
return true
}
}
return false
}
// newOutboxID returns a sortable-by-time ID with a random suffix:
// <unix-nano-base16>-<6-hex>. Lexical sort ≅ chronological for the
// foreseeable lifetime of the outbox (until ~year 2262).
func newOutboxID() (string, error) {
var randBuf [3]byte
if _, err := rand.Read(randBuf[:]); err != nil {
return "", err
}
return fmt.Sprintf("%016x-%s", time.Now().UnixNano(), hex.EncodeToString(randBuf[:])), nil
}
// shortRandom returns 4 random hex chars. Used to disambiguate
// conflict-renamed directories that collide on the same RFC3339
// timestamp.
func shortRandom() string {
var b [2]byte
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}
// writeFileAtomic writes data to path via temp + rename in the same
// directory. Mode applied to the final file.
func writeFileAtomic(path string, data []byte, mode os.FileMode) error {
tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*")
if err != nil {
return err
}
tmpName := tmp.Name()
if _, err := tmp.Write(data); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Chmod(mode); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
return os.Rename(tmpName, path)
}