package cache import ( "context" "crypto/rand" "encoding/hex" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "os" "path/filepath" "sort" "sync" "time" ) // OutboxDir is the leaf directory under the cache root where queued // offline writes are persisted. Each entry is itself a directory: // // /.zddc-outbox//meta.json — request metadata // /.zddc-outbox//body.bin — request body // // Conflicted replays (412 Precondition Failed) are renamed to // // /.zddc-outbox/.conflict-/ // // Operators clear the conflict directory after manually reconciling. const OutboxDir = ".zddc-outbox" // outboxConflictPrefix is appended to a conflict-renamed entry. The // timestamp is RFC3339 (filename-safe — uses ":" via percent-encoding // when written to the filename). The full literal is // `.conflict-` and is grep-friendly across the whole tree. const outboxConflictPrefix = ".conflict-" // MaxOutboxBodyBytes caps the per-entry body size persisted to disk. // Larger writes still go through online — only the offline-queue path // has this limit. Default 256 MiB matches MaxWriteBytes on master. const MaxOutboxBodyBytes = 256 * 1024 * 1024 // ReplayInterval governs how often the background loop attempts // replay when the outbox is non-empty. Empty-outbox iterations sleep // 10× longer so an idle client doesn't spin. const ( ReplayInterval = 30 * time.Second ReplayIdleInterval = 5 * time.Minute ) // OutboxEntry is the persisted shape of a queued write. The body // lives in a sibling file (body.bin) so it can be streamed back to // upstream at replay time without buffering the whole thing in // memory. type OutboxEntry struct { // ID is the directory name. Sortable by lexical order so replays // happen in queue order: -. ID string `json:"id"` // Method, RawURI, ContentType identify the request. Method string `json:"method"` RawURI string `json:"raw_uri"` ContentType string `json:"content_type,omitempty"` // BaseModTime is the local cache file's mtime at queue time. Used // at replay to send If-Unmodified-Since so the master rejects the // write (412) if its file changed since we observed it. Zero = // no precondition (e.g. PUT to a path with no prior cache). BaseModTime time.Time `json:"base_mod_time,omitempty"` QueuedAt time.Time `json:"queued_at"` } // Outbox owns the on-disk write-queue + the background replay loop. // Constructed from cache.New when client mode is active; nil // otherwise. Safe for concurrent Enqueue calls. type Outbox struct { cache *Cache dir string mu sync.Mutex // serializes file creation; replay holds it briefly per-entry // wake is a non-blocking signal channel. Enqueue posts to it // (drop-on-full) so RunReplayLoop reacts to new entries within // ms, not after its idle-poll interval. Buffered=1 so a burst of // enqueues collapses to a single wake. wake chan struct{} } // NewOutbox opens (and creates if missing) the outbox directory under // cfg.Root. Mode 0700 — single-user-trust at the FS layer. func NewOutbox(c *Cache) (*Outbox, error) { dir := filepath.Join(c.root, OutboxDir) if err := os.MkdirAll(dir, 0o700); err != nil { return nil, fmt.Errorf("create outbox dir: %w", err) } _ = os.Chmod(dir, 0o700) return &Outbox{cache: c, dir: dir, wake: make(chan struct{}, 1)}, nil } // Dir exposes the on-disk path for tests / diagnostics. func (o *Outbox) Dir() string { return o.dir } // Enqueue persists the request to disk so it can be replayed when // upstream is reachable again. Returns the created entry. The body // reader is fully consumed; callers should not assume r.Body is // readable afterwards. // // baseModTime is the local cache file's mtime at the moment we // decided to queue (zero when none — e.g. PUT to a never-cached // path). It becomes the If-Unmodified-Since precondition at replay. func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, error) { o.mu.Lock() defer o.mu.Unlock() id, err := newOutboxID() if err != nil { return nil, fmt.Errorf("outbox id: %w", err) } entryDir := filepath.Join(o.dir, id) if err := os.MkdirAll(entryDir, 0o700); err != nil { return nil, fmt.Errorf("create entry dir: %w", err) } // Body first — failure cleans up the empty entry dir. bodyPath := filepath.Join(entryDir, "body.bin") bf, err := os.OpenFile(bodyPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) if err != nil { _ = os.RemoveAll(entryDir) return nil, fmt.Errorf("create body file: %w", err) } limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes) if _, err := io.Copy(bf, limited); err != nil { _ = bf.Close() _ = os.RemoveAll(entryDir) return nil, fmt.Errorf("save body: %w", err) } if err := bf.Close(); err != nil { _ = os.RemoveAll(entryDir) return nil, err } entry := OutboxEntry{ ID: id, Method: r.Method, RawURI: r.URL.RequestURI(), ContentType: r.Header.Get("Content-Type"), BaseModTime: baseModTime, QueuedAt: time.Now().UTC().Truncate(time.Second), } metaBytes, err := json.MarshalIndent(entry, "", " ") if err != nil { _ = os.RemoveAll(entryDir) return nil, err } metaPath := filepath.Join(entryDir, "meta.json") if err := writeFileAtomic(metaPath, metaBytes, 0o600); err != nil { _ = os.RemoveAll(entryDir) return nil, err } // Signal the replay loop. Non-blocking — buffered=1 collapses // a burst of enqueues to a single wake. If the loop is busy // replaying, our entry will be picked up on the next pass. select { case o.wake <- struct{}{}: default: } return &entry, nil } // Pending returns the queued entries in lexical-ID order (which is // queue order, since IDs lead with unix-nano). Entries that fail to // parse are skipped. Conflict-renamed entries are excluded. func (o *Outbox) Pending() ([]OutboxEntry, error) { dirEntries, err := os.ReadDir(o.dir) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, err } var ids []string for _, e := range dirEntries { if !e.IsDir() { continue } name := e.Name() if isConflictName(name) { continue } ids = append(ids, name) } sort.Strings(ids) out := make([]OutboxEntry, 0, len(ids)) for _, id := range ids { entry, err := o.loadEntry(id) if err != nil { slog.Debug("outbox: skip unreadable entry", "id", id, "err", err) continue } out = append(out, *entry) } return out, nil } // loadEntry reads + parses meta.json for the given ID. func (o *Outbox) loadEntry(id string) (*OutboxEntry, error) { metaPath := filepath.Join(o.dir, id, "meta.json") bytes, err := os.ReadFile(metaPath) if err != nil { return nil, err } var entry OutboxEntry if err := json.Unmarshal(bytes, &entry); err != nil { return nil, err } return &entry, nil } // Replay iterates Pending() in order and fires each entry against // upstream. Returns the count of successfully-replayed entries and // the count of conflict-renamed entries. Network errors leave the // entry in place for the next iteration. func (o *Outbox) Replay(ctx context.Context) (replayed, conflicts int, err error) { pending, err := o.Pending() if err != nil { return 0, 0, err } for _, entry := range pending { if ctx.Err() != nil { return replayed, conflicts, ctx.Err() } outcome, err := o.replayOne(ctx, entry) switch outcome { case replayDone: replayed++ case replayConflict: conflicts++ case replayDefer: // transient — leave entry in place if err != nil { slog.Debug("outbox: defer entry", "id", entry.ID, "err", err) } } } return replayed, conflicts, nil } type replayOutcome int const ( replayDone replayOutcome = iota replayConflict replayDefer ) // replayOne fires a single outbox entry. Outcomes: // // - 2xx → entry directory deleted (replayDone). // - 412 Precondition Failed → entry renamed to .conflict-/ // (replayConflict). Operator manually reconciles. // - 4xx other than 412 → entry deleted (replayDone with a warn log). // The master rejected the request for a reason other than // concurrency; retrying won't help. // - 5xx, network errors → entry left in place (replayDefer). func (o *Outbox) replayOne(ctx context.Context, entry OutboxEntry) (replayOutcome, error) { bodyPath := filepath.Join(o.dir, entry.ID, "body.bin") bf, err := os.Open(bodyPath) if err != nil { // Body missing — entry is malformed. Move to conflict so we // don't loop on it forever. _ = o.markConflict(entry.ID) return replayConflict, fmt.Errorf("open body: %w", err) } defer bf.Close() target := o.cache.upstream + entry.RawURI req, err := http.NewRequestWithContext(ctx, entry.Method, target, bf) if err != nil { _ = o.markConflict(entry.ID) return replayConflict, err } if entry.ContentType != "" { req.Header.Set("Content-Type", entry.ContentType) } if !entry.BaseModTime.IsZero() { req.Header.Set("If-Unmodified-Since", entry.BaseModTime.UTC().Format(http.TimeFormat)) } if o.cache.bearer != "" { req.Header.Set("Authorization", "Bearer "+o.cache.bearer) } resp, err := o.cache.client.Do(req) if err != nil { // Transport / network error — defer. return replayDefer, err } defer resp.Body.Close() _, _ = io.Copy(io.Discard, resp.Body) // drain so connection reuses switch { case resp.StatusCode >= 200 && resp.StatusCode < 300: // Success. For PUT, refresh the local cache from upstream so // the user's view stays consistent. (We don't have the new // body here; just remove the cached entry so next read // fetches fresh.) if entry.Method == http.MethodPut || entry.Method == http.MethodDelete { if path, ok := o.cache.cachePathForURI(entry.RawURI); ok { _ = os.Remove(path) } } _ = os.RemoveAll(filepath.Join(o.dir, entry.ID)) slog.Info("outbox: replayed", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode) return replayDone, nil case resp.StatusCode == http.StatusPreconditionFailed: // Conflict — base version on master changed since we observed. _ = o.markConflict(entry.ID) slog.Warn("outbox: conflict", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI) return replayConflict, nil case resp.StatusCode >= 400 && resp.StatusCode < 500: // Permanent rejection (auth, bad request, etc). Drop the // entry — retrying won't help. _ = os.RemoveAll(filepath.Join(o.dir, entry.ID)) slog.Warn("outbox: dropped", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode) return replayDone, nil default: // 5xx — defer for next iteration. return replayDefer, fmt.Errorf("upstream status %d", resp.StatusCode) } } // markConflict renames the entry directory to .conflict-/. // Idempotent: if the rename target already exists, a counter is // appended. Best-effort. func (o *Outbox) markConflict(id string) error { src := filepath.Join(o.dir, id) ts := time.Now().UTC().Format("20060102T150405Z") dst := filepath.Join(o.dir, id+outboxConflictPrefix+ts) if _, err := os.Stat(dst); err == nil { dst = dst + "-" + shortRandom() } return os.Rename(src, dst) } // RunReplayLoop is the background goroutine that drives Replay on a // schedule. Stops when ctx is cancelled. Safe to call once per // process; not designed for multiple concurrent loops over the same // outbox. // // Wakes on three signals: // - timer (ReplayInterval when entries pending, ReplayIdleInterval when idle) // - Enqueue posting to o.wake (instant reaction to new offline writes) // - context cancel (shutdown) func (o *Outbox) RunReplayLoop(ctx context.Context) { // Eagerly attempt replay at startup so a re-launched client // catches up before any user request fires. o.tryReplay(ctx) for { interval := ReplayIdleInterval pending, err := o.Pending() if err == nil && len(pending) > 0 { interval = ReplayInterval } select { case <-ctx.Done(): return case <-time.After(interval): o.tryReplay(ctx) case <-o.wake: // New entry arrived while we were idle. Replay // immediately rather than waiting for the timer. o.tryReplay(ctx) } } } // tryReplay is a single replay pass with a bounded timeout. Errors // logged at debug; per-entry outcomes logged inside Replay/replayOne. // The pass itself logs at info when it runs against >0 entries so an // operator watching the log sees the replay loop is alive even if // every attempt defers. func (o *Outbox) tryReplay(ctx context.Context) { tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() pending, _ := o.Pending() if len(pending) > 0 { slog.Info("outbox: replay attempt", "pending", len(pending)) } replayed, conflicts, err := o.Replay(tCtx) if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { slog.Debug("outbox: replay error", "err", err) } if replayed > 0 || conflicts > 0 { slog.Info("outbox: replay pass complete", "replayed", replayed, "conflicts", conflicts) } } // isConflictName reports whether the directory name is an outbox // conflict-renamed entry (`.conflict-` form). func isConflictName(name string) bool { for i := 0; i+len(outboxConflictPrefix) <= len(name); i++ { if name[i:i+len(outboxConflictPrefix)] == outboxConflictPrefix { return true } } return false } // newOutboxID returns a sortable-by-time ID with a random suffix: // -<6-hex>. Lexical sort ≅ chronological for the // foreseeable lifetime of the outbox (until ~year 2262). func newOutboxID() (string, error) { var randBuf [3]byte if _, err := rand.Read(randBuf[:]); err != nil { return "", err } return fmt.Sprintf("%016x-%s", time.Now().UnixNano(), hex.EncodeToString(randBuf[:])), nil } // shortRandom returns 4 random hex chars. Used to disambiguate // conflict-renamed directories that collide on the same RFC3339 // timestamp. func shortRandom() string { var b [2]byte _, _ = rand.Read(b[:]) return hex.EncodeToString(b[:]) } // writeFileAtomic writes data to path via temp + rename in the same // directory. Mode applied to the final file. func writeFileAtomic(path string, data []byte, mode os.FileMode) error { tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*") if err != nil { return err } tmpName := tmp.Name() if _, err := tmp.Write(data); err != nil { _ = tmp.Close() _ = os.Remove(tmpName) return err } if err := tmp.Chmod(mode); err != nil { _ = tmp.Close() _ = os.Remove(tmpName) return err } if err := tmp.Close(); err != nil { _ = os.Remove(tmpName) return err } return os.Rename(tmpName, path) }