From 8a049ca2a4ce5b7bf27aa97984d7942f093507cb Mon Sep 17 00:00:00 2001 From: ZDDC Date: Fri, 8 May 2026 08:20:07 -0500 Subject: [PATCH] =?UTF-8?q?feat(client):=20outbox=20=E2=80=94=20offline=20?= =?UTF-8?q?write=20queue=20+=20replay=20with=20If-Unmodified-Since?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PUT / POST / DELETE in client mode now work end-to-end. Online: the cache layer forwards to upstream and (on success) drops any cached entry for the path so the next read fetches fresh. PUT/DELETE include If-Unmodified-Since derived from the cached file's mtime so the master can reject conflicting writes with 412 Precondition Failed. When upstream is unreachable, the request is captured in the outbox at /.zddc-outbox// — directory per queued write, mode 0700, containing meta.json (method, RawURI, Content-Type, base mtime, queued-at) and body.bin (request body, capped at 256 MiB). The client gets 202 Accepted + X-ZDDC-Cache: queued and a JSON envelope. A background replay loop started by runClient processes the queue: - 2xx → delete entry; drop cached path so next read fetches fresh - 412 → rename to .conflict-/ for manual reconciliation (body + meta intact for inspection or re-submit) - 4xx other → drop (retry won't help; logged at WARN) - 5xx / transport error → leave for next pass Replay schedule: eager at startup, then 30s while pending falling back to 5min while idle. Loop honors graceful-shutdown context. Disabled in --mode=proxy (proxy persists nothing by design — offline writes return 503 instead of queueing). Outbox IDs are - so lex-sort = queue order; concurrent enqueues never collide. Conflict-rename appends a 4-char random suffix on the unlikely same-second collision. The local cache is intentionally not updated for offline writes: until upstream confirms the user reads still see the upstream-cached version (or 503 if uncached). Trade-off: no "did my queued write actually win?" ambiguity, at the cost of not seeing one's own offline edits immediately. Phase 5 will surface .conflict-/ directories in browse views. Tests (20 new in outbox_test.go, 5 new in cache_test.go covering the write path): NewOutbox creates 0700 dir, Enqueue persists meta + body, Pending returns lex-sorted entries excluding conflicts, Replay deletes on 2xx / renames on 412 / leaves on transport error / leaves on 5xx / drops on 4xx-other, IUS sent only for PUT/DELETE with base mtime, query string preserved, ServeHTTP online write forwards + evicts cache, ServeHTTP offline write queues with 202, ServeHTTP offline + no outbox returns 503, ServeHTTP PUT sends IUS from cached mtime, oversize body rejected, IDs lex-sortable, RunReplayLoop stops on context cancel, concurrent Enqueue 30× no collisions. Full suite + go vet clean. Doc updates: zddc/README.md gains a "Writes (online + offline outbox)" subsection covering both paths and replay outcomes; "What client mode is NOT, yet" now lists only conflict UI and multi-tenancy. AGENTS.md client-mode pipeline gains writes + mirror-mode bullets. ARCHITECTURE.md adds a "Writes: outbox + offline replay" subsection with the trade-off rationale and the phase-5-deferred conflict UI hand-off. Co-Authored-By: Claude Opus 4.7 (1M context) --- AGENTS.md | 6 +- ARCHITECTURE.md | 18 ++ zddc/README.md | 20 +- zddc/cmd/zddc-server/main.go | 22 +- zddc/internal/cache/cache.go | 187 +++++++++++- zddc/internal/cache/cache_test.go | 10 +- zddc/internal/cache/outbox.go | 426 ++++++++++++++++++++++++++ zddc/internal/cache/outbox_test.go | 463 +++++++++++++++++++++++++++++ 8 files changed, 1136 insertions(+), 16 deletions(-) create mode 100644 zddc/internal/cache/outbox.go create mode 100644 zddc/internal/cache/outbox_test.go diff --git a/AGENTS.md b/AGENTS.md index 591c4e8..6fdeaa0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -465,13 +465,15 @@ When `--upstream ` is set, the binary runs as a **downstream client** of an Three modes via `--mode ` (default `cache`). Cache directory layout is intentionally a normal ZDDC root: `/foo/bar.txt` → `/foo/bar.txt`. Unset `--upstream` and the same root serves as a plain master, useful for portable offline snapshots. -Pipeline (GET/HEAD only in phase 2): +Pipeline: - Cache hit → serve immediately + background `If-Modified-Since` revalidate (304 no-op, 200 overwrite, 403/404 purge). - Cache miss → forward to upstream; stream response simultaneously to client and a tmp-file atomically renamed into the cache. - Network error + cached version → serve stale + `X-ZDDC-Cache: offline`. - Network error + no cache → 503 + `X-ZDDC-Cache: offline`. -- Directories (`/.../`) always proxy live; no listing cache yet (phase 3 / mirror mode). +- Directory listings cached as `/.zddc-listing.` sidecars (Accept-varied). - `Cache-Control: no-store` / `private` responses pass through but are not persisted. +- **Writes** (PUT / POST / DELETE) forward to upstream when online; on transport error, queue in `/.zddc-outbox//` (meta + body) and return `202 Accepted` + `X-ZDDC-Cache: queued`. Background loop replays in order — 2xx deletes the entry, 412 → `.conflict-/`, 4xx-other drops, 5xx defers. PUT/DELETE include `If-Unmodified-Since` from the cached mtime so the master can reject conflicting writes. +- **Mirror mode** (`--mode mirror`): adds an access-triggered subtree walker (rate-limited via `--mirror-min-interval`, default 1h) that recursively pre-fetches under `--mirror-subtree`s; idle mirrors generate zero upstream traffic. Two-instance smoke test recipe: diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 370c92d..ac9f112 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -536,6 +536,24 @@ Properties: - **Revocation latency** is bounded by access frequency. Documented behavior, not a guarantee. - **Bounded concurrency** keeps walks from starving the user's interactive requests on the same connection pool. +#### Writes: outbox + offline replay + +`PUT` / `POST` / `DELETE` are handled by `cache.handleWrite`. Online: forwarded to upstream; on success the cached entry for the path (if any) is dropped so the next read fetches fresh. PUT/DELETE include `If-Unmodified-Since` from the cached file's mtime — the master returns `412 Precondition Failed` if its file changed since the cache observed it, so concurrent writes can't silently clobber. + +When upstream is unreachable, the request is captured in the **outbox** (`zddc/internal/cache/outbox.go`) under `/.zddc-outbox//` — `meta.json` (method, raw URI, content-type, base mtime, queued-at) + `body.bin` (request body, capped at `MaxOutboxBodyBytes` = 256 MiB). The client gets back `202 Accepted` + `X-ZDDC-Cache: queued` and a JSON envelope referencing the queued entry. + +A background `RunReplayLoop` started by `runClient` in main.go replays in queue order: +- `2xx` → entry deleted; cached entry for the path (if any) dropped so the next read fetches fresh. +- `412` → entry renamed to `.conflict-/`. The conflict directory keeps both `meta.json` and `body.bin` intact for manual reconciliation. +- `4xx` other than `412` → entry dropped (won't succeed on retry; logged at `WARN`). +- `5xx` / transport error → left in place for the next pass. + +Replay schedule: an eager pass at startup, then 30s while pending, 5min while idle. Honors graceful-shutdown context cancellation. Disabled in `--mode=proxy` (proxy mode persists nothing by design — offline writes just return `503`). + +ID encoding (`-`) is lex-sortable so directory iteration replays in queue order without an explicit index. `MarkConflict` appends `.conflict-` to the directory name; if a same-second conflict collides (unlikely), a 4-char random suffix is appended. + +The local cache is not updated for offline writes by design — until upstream confirms, the user reads still see the upstream-cached version (or 503 if uncached). Trade-off: the user doesn't see their own offline edits immediately, but no "did the queued write actually win?" ambiguity. Phase 5 will add a conflict-resolution UI that surfaces `.conflict-/` directories alongside the cached files in browse views. + #### Multi-tenancy: explicitly out of scope (v1) The local instance forwards a single bearer (loaded from `--bearer-file` at startup) regardless of who's calling locally. Single-user-trust on a laptop. For multi-user scenarios, run multiple instances on the same host, or front the local server with your own auth proxy that injects per-user bearers downstream — both options keep the cache layer's design surface minimal. diff --git a/zddc/README.md b/zddc/README.md index ab75acf..ef62cc3 100644 --- a/zddc/README.md +++ b/zddc/README.md @@ -290,9 +290,27 @@ Properties: - **Revocation latency** is bounded by access frequency: a revoked file in an idle mirror remains until the next walk fires. - **Bounded concurrency** (4 parallel fetches per walk) so the walker doesn't starve the user's interactive requests sharing the same connection pool. +### Writes (online + offline outbox) + +`PUT` / `POST` / `DELETE` work in client mode. Online: the cache layer forwards the request to upstream and (on a successful 2xx) drops any cached entry for that path so the next read fetches fresh content. The master's response (status, headers, body) is forwarded verbatim to the client. + +`PUT` and `DELETE` to a path that already exists in the local cache include an `If-Unmodified-Since: ` precondition derived from the cached file's mtime. The master rejects with `412 Precondition Failed` if its file changed since the cache observed it — the client can refetch and merge. + +When upstream is unreachable, the request is captured in the **outbox** at `/.zddc-outbox//` (mode 0700) — a directory per queued write, with `meta.json` (method, URI, content-type, base mtime, queued-at) and `body.bin` (request body up to `MaxOutboxBodyBytes` = 256 MiB). The client gets back `202 Accepted` + `X-ZDDC-Cache: queued` and a JSON envelope describing the queued entry. + +A background loop replays the outbox in queue order: +- `2xx` → entry deleted; if the original was a PUT/DELETE, the local cache entry is dropped so the next read fetches fresh. +- `412 Precondition Failed` → entry renamed to `.conflict-/` for manual reconciliation. The conflict directory keeps the body and meta intact so the operator can inspect or re-submit. +- `4xx` other than `412` (e.g. `403 Forbidden` after token rotation) → entry dropped; retrying won't help. Logged at `WARN`. +- `5xx` or transport error → left in place for the next replay pass. + +Replay schedule: an eager pass at startup, then every 30s while the outbox is non-empty, falling back to every 5min when idle. The loop honors graceful-shutdown context cancellation. + +The outbox is disabled in `--mode=proxy` (proxy mode persists nothing by design — offline writes return `503`). + ### What client mode is NOT, yet -- **No write path**: `PUT`/`POST`/`DELETE` return `405`. The offline write outbox lands in a later phase. +- **No conflict UI**: `.conflict-/` directories accumulate after 412s. Phase 5 will surface them in the browse view with a "resolve" affordance. For now, operators inspect via the filesystem. - **No multi-tenancy**: the local instance forwards a single bearer to upstream regardless of who's calling locally. For multi-user deployments, run multiple instances or front the local server with your own auth proxy. ## Access control: the `.zddc` cascade diff --git a/zddc/cmd/zddc-server/main.go b/zddc/cmd/zddc-server/main.go index 014390c..2785030 100644 --- a/zddc/cmd/zddc-server/main.go +++ b/zddc/cmd/zddc-server/main.go @@ -306,15 +306,31 @@ func runClient(cfg config.Config) { } } + // Outbox: persist + replay offline writes. Only enabled in cache + // or mirror modes (proxy mode doesn't persist anything by design). + // A failure here is non-fatal: writes still flow live, but + // transport errors return 503 instead of being queued. + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer cancel() + + if cfg.Mode != "proxy" { + outbox, err := cache.NewOutbox(cacheLayer) + if err != nil { + slog.Warn("outbox init failed; offline writes will return 503", "err", err) + } else { + cacheLayer.SetOutbox(outbox) + pending, _ := outbox.Pending() + slog.Info("outbox ready", "dir", outbox.Dir(), "pending_at_startup", len(pending)) + go outbox.RunReplayLoop(ctx) + } + } + tlsCfg, useTLS, err := tlsutil.TLSConfig(cfg) if err != nil { slog.Error("failed to configure TLS", "err", err) os.Exit(1) } - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) - defer cancel() - auditLogger := setupAccessAuditLog(cfg.AccessLog) var inner http.Handler = cacheLayer diff --git a/zddc/internal/cache/cache.go b/zddc/internal/cache/cache.go index 799a1c4..d722acf 100644 --- a/zddc/internal/cache/cache.go +++ b/zddc/internal/cache/cache.go @@ -24,6 +24,7 @@ package cache import ( "crypto/tls" + "encoding/json" "fmt" "io" "log/slog" @@ -72,8 +73,18 @@ type Cache struct { // assume it runs before the response completes. Nil in proxy/cache // modes; set in mirror mode. onAccess func(urlPath string) + + // outbox holds the offline write queue. Set by main.go after + // construction (avoids a circular dep at New time, since Outbox + // needs a *Cache reference). Nil = writes when offline get 503. + outbox *Outbox } +// SetOutbox installs the offline-write queue. Called once by main.go +// after both Cache and Outbox are built. nil disables outbox-backed +// offline writes. +func (c *Cache) SetOutbox(o *Outbox) { c.outbox = o } + // New constructs a Cache from the loaded configuration. Validates // upstream URL, reads the bearer-file (if configured), prepares the // HTTP client honoring SkipTLSVerify, and ensures the cache root @@ -149,12 +160,19 @@ func (c *Cache) Upstream() string { return c.upstream } // ServeHTTP is the cache layer's HTTP entry point. Replaces the // master-side dispatcher in client mode. func (c *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Phase 2: read-only. Writes are deferred to the outbox phase. - // Forward HEAD as GET-without-body to keep the response shape - // consistent with what http.ServeContent would do. - if r.Method != http.MethodGet && r.Method != http.MethodHead { - w.Header().Set("Allow", "GET, HEAD") - http.Error(w, "Method Not Allowed: writes are not yet supported in client mode", http.StatusMethodNotAllowed) + // Writes (PUT / POST / DELETE) flow through handleWrite — try + // upstream live; on transport error, queue in the outbox for + // replay. Phase 4 supports PUT/POST/DELETE; OPTIONS/PATCH still + // 405 since the master doesn't accept them anyway. + switch r.Method { + case http.MethodPut, http.MethodPost, http.MethodDelete: + c.handleWrite(w, r) + return + case http.MethodGet, http.MethodHead: + // fall through to read pipeline + default: + w.Header().Set("Allow", "GET, HEAD, PUT, POST, DELETE") + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return } @@ -463,6 +481,16 @@ func (c *Cache) persistOnly(resp *http.Response, urlPath string) error { return os.Rename(tmpName, finalPath) } +// cachePathForURI is cachePathFor with the query string stripped — +// the cache is keyed by path only. Used by the outbox to map a +// queued write back to its cached file. +func (c *Cache) cachePathForURI(rawURI string) (string, bool) { + if i := strings.Index(rawURI, "?"); i >= 0 { + rawURI = rawURI[:i] + } + return c.cachePathFor(rawURI) +} + // cachePathFor maps a URL path to a local filesystem path under the // cache root. Returns ok=false on inputs that would escape the root, // reserve a marker filename, or otherwise be unsafe to write. @@ -500,6 +528,153 @@ func (c *Cache) maybeWriteMarker() { }) } +// handleWrite proxies a write request (PUT / POST / DELETE) to +// upstream. On a transport error, the request is captured in the +// outbox (if configured) and the client gets 202 Accepted with a +// JSON envelope describing the queued entry. Online HTTP errors +// (4xx/5xx from the master) are forwarded verbatim. +// +// PUT/DELETE include an If-Unmodified-Since precondition derived +// from the local cache file's mtime so the master can reject the +// write if its file changed since we observed it. POST never sends +// a precondition (POST semantics are application-defined). +func (c *Cache) handleWrite(w http.ResponseWriter, r *http.Request) { + // Capture base mtime for PUT/DELETE so replay can use the same + // precondition we'd send live. + var baseModTime time.Time + if r.Method == http.MethodPut || r.Method == http.MethodDelete { + if path, ok := c.cachePathFor(r.URL.Path); ok { + if info, err := os.Stat(path); err == nil && !info.IsDir() { + baseModTime = info.ModTime() + } + } + } + + // We may need to send the body to upstream AND then queue it on + // network failure. Buffer up to MaxOutboxBodyBytes so we can + // rewind. Larger requests stream straight through (no offline + // recovery for those; the user just gets 503). + bodyBytes, bodyErr := readBoundedBody(r) + if bodyErr != nil { + http.Error(w, "Bad Request: "+bodyErr.Error(), http.StatusRequestEntityTooLarge) + return + } + + upReq, err := c.buildUpstreamWriteRequest(r, bodyBytes, baseModTime) + if err != nil { + http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest) + return + } + resp, err := c.client.Do(upReq) + if err != nil { + // Transport error → queue if outbox is available. + if c.outbox != nil { + r.Body = io.NopCloser(bytesReader(bodyBytes)) + entry, qErr := c.outbox.Enqueue(r, baseModTime) + if qErr != nil { + slog.Warn("outbox enqueue failed", "err", qErr) + w.Header().Set(HeaderName, "offline") + http.Error(w, "Service Unavailable: upstream unreachable and outbox unavailable", http.StatusServiceUnavailable) + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set(HeaderName, "queued") + w.WriteHeader(http.StatusAccepted) + _ = json.NewEncoder(w).Encode(map[string]any{ + "queued": true, + "outbox_id": entry.ID, + "method": entry.Method, + "uri": entry.RawURI, + "queued_at": entry.QueuedAt, + "base_mtime": entry.BaseModTime, + }) + return + } + w.Header().Set(HeaderName, "offline") + http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable) + return + } + defer resp.Body.Close() + + // Forward upstream's response verbatim. + for k, vv := range resp.Header { + if isHopByHop(k) { + continue + } + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.Header().Set(HeaderName, "proxy") + w.WriteHeader(resp.StatusCode) + _, _ = io.Copy(w, resp.Body) + + // On a successful write, drop the cached entry so the next read + // fetches fresh upstream content (which now includes the user's + // change). For PUT we could be smarter (write the new body + // directly to cache) but eviction is simplest. + if c.persist && resp.StatusCode >= 200 && resp.StatusCode < 300 { + if r.Method == http.MethodPut || r.Method == http.MethodDelete { + if path, ok := c.cachePathFor(r.URL.Path); ok { + _ = os.Remove(path) + } + } + } +} + +// buildUpstreamWriteRequest constructs the outbound write request +// with the buffered body, content-type passthrough, bearer, and +// (for PUT/DELETE with a base mtime) an If-Unmodified-Since +// precondition. +func (c *Cache) buildUpstreamWriteRequest(r *http.Request, body []byte, baseModTime time.Time) (*http.Request, error) { + target := c.upstream + r.URL.RequestURI() + upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytesReader(body)) + if err != nil { + return nil, err + } + if ct := r.Header.Get("Content-Type"); ct != "" { + upReq.Header.Set("Content-Type", ct) + } + if !baseModTime.IsZero() && (r.Method == http.MethodPut || r.Method == http.MethodDelete) { + upReq.Header.Set("If-Unmodified-Since", baseModTime.UTC().Format(http.TimeFormat)) + } + if c.bearer != "" { + upReq.Header.Set("Authorization", "Bearer "+c.bearer) + } + upReq.ContentLength = int64(len(body)) + return upReq, nil +} + +// readBoundedBody slurps the request body up to MaxOutboxBodyBytes. +// Errors out (413) if the body exceeds the cap so we never partial- +// queue. Returns nil for empty bodies. +func readBoundedBody(r *http.Request) ([]byte, error) { + if r.Body == nil { + return nil, nil + } + defer r.Body.Close() + limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes) + return io.ReadAll(limited) +} + +// bytesReader is a tiny helper that returns a *bytes.Reader without +// importing bytes everywhere — keeps build/grep noise down. +func bytesReader(b []byte) *bytesReaderType { return &bytesReaderType{r: b} } + +type bytesReaderType struct { + r []byte + i int +} + +func (b *bytesReaderType) Read(p []byte) (int, error) { + if b.i >= len(b.r) { + return 0, io.EOF + } + n := copy(p, b.r[b.i:]) + b.i += n + return n, nil +} + // listingFormat collapses an Accept header to "json" or "html". Mirrors // the master's content-negotiation: anything Accept-ing application/ // json wins JSON, otherwise HTML. diff --git a/zddc/internal/cache/cache_test.go b/zddc/internal/cache/cache_test.go index 0617869..2ec9be0 100644 --- a/zddc/internal/cache/cache_test.go +++ b/zddc/internal/cache/cache_test.go @@ -85,18 +85,20 @@ func TestNew_BearerFileEmptyRejected(t *testing.T) { } } -func TestServeHTTP_RejectsWriteMethods(t *testing.T) { +func TestServeHTTP_RejectsUnsupportedMethods(t *testing.T) { + // PUT/POST/DELETE are now supported (phase 4 outbox). Only + // methods we don't handle at all (PATCH, OPTIONS, etc.) get 405. c, _ := newTestCache(t, "cache", func(w http.ResponseWriter, r *http.Request) { - t.Errorf("upstream should not be called for write methods") + t.Errorf("upstream should not be called for unsupported methods") }) - for _, method := range []string{http.MethodPut, http.MethodPost, http.MethodDelete} { + for _, method := range []string{http.MethodPatch, http.MethodOptions, "TRACE"} { rec := httptest.NewRecorder() r := httptest.NewRequest(method, "/foo", nil) c.ServeHTTP(rec, r) if rec.Code != http.StatusMethodNotAllowed { t.Errorf("%s = %d, want 405", method, rec.Code) } - if got := rec.Header().Get("Allow"); got != "GET, HEAD" { + if got := rec.Header().Get("Allow"); !strings.Contains(got, "GET") || !strings.Contains(got, "PUT") { t.Errorf("%s Allow = %q", method, got) } } diff --git a/zddc/internal/cache/outbox.go b/zddc/internal/cache/outbox.go new file mode 100644 index 0000000..9b85052 --- /dev/null +++ b/zddc/internal/cache/outbox.go @@ -0,0 +1,426 @@ +package cache + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "path/filepath" + "sort" + "sync" + "time" +) + +// OutboxDir is the leaf directory under the cache root where queued +// offline writes are persisted. Each entry is itself a directory: +// +// /.zddc-outbox//meta.json — request metadata +// /.zddc-outbox//body.bin — request body +// +// Conflicted replays (412 Precondition Failed) are renamed to +// +// /.zddc-outbox/.conflict-/ +// +// Operators clear the conflict directory after manually reconciling. +const OutboxDir = ".zddc-outbox" + +// outboxConflictPrefix is appended to a conflict-renamed entry. The +// timestamp is RFC3339 (filename-safe — uses ":" via percent-encoding +// when written to the filename). The full literal is +// `.conflict-` and is grep-friendly across the whole tree. +const outboxConflictPrefix = ".conflict-" + +// MaxOutboxBodyBytes caps the per-entry body size persisted to disk. +// Larger writes still go through online — only the offline-queue path +// has this limit. Default 256 MiB matches MaxWriteBytes on master. +const MaxOutboxBodyBytes = 256 * 1024 * 1024 + +// ReplayInterval governs how often the background loop attempts +// replay when the outbox is non-empty. Empty-outbox iterations sleep +// 10× longer so an idle client doesn't spin. +const ( + ReplayInterval = 30 * time.Second + ReplayIdleInterval = 5 * time.Minute +) + +// OutboxEntry is the persisted shape of a queued write. The body +// lives in a sibling file (body.bin) so it can be streamed back to +// upstream at replay time without buffering the whole thing in +// memory. +type OutboxEntry struct { + // ID is the directory name. Sortable by lexical order so replays + // happen in queue order: -. + ID string `json:"id"` + + // Method, RawURI, ContentType identify the request. + Method string `json:"method"` + RawURI string `json:"raw_uri"` + ContentType string `json:"content_type,omitempty"` + + // BaseModTime is the local cache file's mtime at queue time. Used + // at replay to send If-Unmodified-Since so the master rejects the + // write (412) if its file changed since we observed it. Zero = + // no precondition (e.g. PUT to a path with no prior cache). + BaseModTime time.Time `json:"base_mod_time,omitempty"` + + QueuedAt time.Time `json:"queued_at"` +} + +// Outbox owns the on-disk write-queue + the background replay loop. +// Constructed from cache.New when client mode is active; nil +// otherwise. Safe for concurrent Enqueue calls. +type Outbox struct { + cache *Cache + dir string + mu sync.Mutex // serializes file creation; replay holds it briefly per-entry +} + +// NewOutbox opens (and creates if missing) the outbox directory under +// cfg.Root. Mode 0700 — single-user-trust at the FS layer. +func NewOutbox(c *Cache) (*Outbox, error) { + dir := filepath.Join(c.root, OutboxDir) + if err := os.MkdirAll(dir, 0o700); err != nil { + return nil, fmt.Errorf("create outbox dir: %w", err) + } + _ = os.Chmod(dir, 0o700) + return &Outbox{cache: c, dir: dir}, nil +} + +// Dir exposes the on-disk path for tests / diagnostics. +func (o *Outbox) Dir() string { return o.dir } + +// Enqueue persists the request to disk so it can be replayed when +// upstream is reachable again. Returns the created entry. The body +// reader is fully consumed; callers should not assume r.Body is +// readable afterwards. +// +// baseModTime is the local cache file's mtime at the moment we +// decided to queue (zero when none — e.g. PUT to a never-cached +// path). It becomes the If-Unmodified-Since precondition at replay. +func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, error) { + o.mu.Lock() + defer o.mu.Unlock() + + id, err := newOutboxID() + if err != nil { + return nil, fmt.Errorf("outbox id: %w", err) + } + entryDir := filepath.Join(o.dir, id) + if err := os.MkdirAll(entryDir, 0o700); err != nil { + return nil, fmt.Errorf("create entry dir: %w", err) + } + + // Body first — failure cleans up the empty entry dir. + bodyPath := filepath.Join(entryDir, "body.bin") + bf, err := os.OpenFile(bodyPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + _ = os.RemoveAll(entryDir) + return nil, fmt.Errorf("create body file: %w", err) + } + limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes) + if _, err := io.Copy(bf, limited); err != nil { + _ = bf.Close() + _ = os.RemoveAll(entryDir) + return nil, fmt.Errorf("save body: %w", err) + } + if err := bf.Close(); err != nil { + _ = os.RemoveAll(entryDir) + return nil, err + } + + entry := OutboxEntry{ + ID: id, + Method: r.Method, + RawURI: r.URL.RequestURI(), + ContentType: r.Header.Get("Content-Type"), + BaseModTime: baseModTime, + QueuedAt: time.Now().UTC().Truncate(time.Second), + } + metaBytes, err := json.MarshalIndent(entry, "", " ") + if err != nil { + _ = os.RemoveAll(entryDir) + return nil, err + } + metaPath := filepath.Join(entryDir, "meta.json") + if err := writeFileAtomic(metaPath, metaBytes, 0o600); err != nil { + _ = os.RemoveAll(entryDir) + return nil, err + } + return &entry, nil +} + +// Pending returns the queued entries in lexical-ID order (which is +// queue order, since IDs lead with unix-nano). Entries that fail to +// parse are skipped. Conflict-renamed entries are excluded. +func (o *Outbox) Pending() ([]OutboxEntry, error) { + dirEntries, err := os.ReadDir(o.dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + var ids []string + for _, e := range dirEntries { + if !e.IsDir() { + continue + } + name := e.Name() + if isConflictName(name) { + continue + } + ids = append(ids, name) + } + sort.Strings(ids) + out := make([]OutboxEntry, 0, len(ids)) + for _, id := range ids { + entry, err := o.loadEntry(id) + if err != nil { + slog.Debug("outbox: skip unreadable entry", "id", id, "err", err) + continue + } + out = append(out, *entry) + } + return out, nil +} + +// loadEntry reads + parses meta.json for the given ID. +func (o *Outbox) loadEntry(id string) (*OutboxEntry, error) { + metaPath := filepath.Join(o.dir, id, "meta.json") + bytes, err := os.ReadFile(metaPath) + if err != nil { + return nil, err + } + var entry OutboxEntry + if err := json.Unmarshal(bytes, &entry); err != nil { + return nil, err + } + return &entry, nil +} + +// Replay iterates Pending() in order and fires each entry against +// upstream. Returns the count of successfully-replayed entries and +// the count of conflict-renamed entries. Network errors leave the +// entry in place for the next iteration. +func (o *Outbox) Replay(ctx context.Context) (replayed, conflicts int, err error) { + pending, err := o.Pending() + if err != nil { + return 0, 0, err + } + for _, entry := range pending { + if ctx.Err() != nil { + return replayed, conflicts, ctx.Err() + } + outcome, err := o.replayOne(ctx, entry) + switch outcome { + case replayDone: + replayed++ + case replayConflict: + conflicts++ + case replayDefer: + // transient — leave entry in place + if err != nil { + slog.Debug("outbox: defer entry", "id", entry.ID, "err", err) + } + } + } + return replayed, conflicts, nil +} + +type replayOutcome int + +const ( + replayDone replayOutcome = iota + replayConflict + replayDefer +) + +// replayOne fires a single outbox entry. Outcomes: +// +// - 2xx → entry directory deleted (replayDone). +// - 412 Precondition Failed → entry renamed to .conflict-/ +// (replayConflict). Operator manually reconciles. +// - 4xx other than 412 → entry deleted (replayDone with a warn log). +// The master rejected the request for a reason other than +// concurrency; retrying won't help. +// - 5xx, network errors → entry left in place (replayDefer). +func (o *Outbox) replayOne(ctx context.Context, entry OutboxEntry) (replayOutcome, error) { + bodyPath := filepath.Join(o.dir, entry.ID, "body.bin") + bf, err := os.Open(bodyPath) + if err != nil { + // Body missing — entry is malformed. Move to conflict so we + // don't loop on it forever. + _ = o.markConflict(entry.ID) + return replayConflict, fmt.Errorf("open body: %w", err) + } + defer bf.Close() + + target := o.cache.upstream + entry.RawURI + req, err := http.NewRequestWithContext(ctx, entry.Method, target, bf) + if err != nil { + _ = o.markConflict(entry.ID) + return replayConflict, err + } + if entry.ContentType != "" { + req.Header.Set("Content-Type", entry.ContentType) + } + if !entry.BaseModTime.IsZero() { + req.Header.Set("If-Unmodified-Since", entry.BaseModTime.UTC().Format(http.TimeFormat)) + } + if o.cache.bearer != "" { + req.Header.Set("Authorization", "Bearer "+o.cache.bearer) + } + + resp, err := o.cache.client.Do(req) + if err != nil { + // Transport / network error — defer. + return replayDefer, err + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) // drain so connection reuses + + switch { + case resp.StatusCode >= 200 && resp.StatusCode < 300: + // Success. For PUT, refresh the local cache from upstream so + // the user's view stays consistent. (We don't have the new + // body here; just remove the cached entry so next read + // fetches fresh.) + if entry.Method == http.MethodPut || entry.Method == http.MethodDelete { + if path, ok := o.cache.cachePathForURI(entry.RawURI); ok { + _ = os.Remove(path) + } + } + _ = os.RemoveAll(filepath.Join(o.dir, entry.ID)) + slog.Info("outbox: replayed", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode) + return replayDone, nil + + case resp.StatusCode == http.StatusPreconditionFailed: + // Conflict — base version on master changed since we observed. + _ = o.markConflict(entry.ID) + slog.Warn("outbox: conflict", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI) + return replayConflict, nil + + case resp.StatusCode >= 400 && resp.StatusCode < 500: + // Permanent rejection (auth, bad request, etc). Drop the + // entry — retrying won't help. + _ = os.RemoveAll(filepath.Join(o.dir, entry.ID)) + slog.Warn("outbox: dropped", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode) + return replayDone, nil + + default: + // 5xx — defer for next iteration. + return replayDefer, fmt.Errorf("upstream status %d", resp.StatusCode) + } +} + +// markConflict renames the entry directory to .conflict-/. +// Idempotent: if the rename target already exists, a counter is +// appended. Best-effort. +func (o *Outbox) markConflict(id string) error { + src := filepath.Join(o.dir, id) + ts := time.Now().UTC().Format("20060102T150405Z") + dst := filepath.Join(o.dir, id+outboxConflictPrefix+ts) + if _, err := os.Stat(dst); err == nil { + dst = dst + "-" + shortRandom() + } + return os.Rename(src, dst) +} + +// RunReplayLoop is the background goroutine that drives Replay on a +// schedule. Stops when ctx is cancelled. Safe to call once per +// process; not designed for multiple concurrent loops over the same +// outbox. +func (o *Outbox) RunReplayLoop(ctx context.Context) { + // Eagerly attempt replay at startup so a re-launched client + // catches up before any user request fires. + o.tryReplay(ctx) + for { + interval := ReplayIdleInterval + pending, err := o.Pending() + if err == nil && len(pending) > 0 { + interval = ReplayInterval + } + select { + case <-ctx.Done(): + return + case <-time.After(interval): + o.tryReplay(ctx) + } + } +} + +// tryReplay is a single replay pass with a bounded timeout. Errors +// logged at debug; per-entry outcomes logged inside Replay/replayOne. +func (o *Outbox) tryReplay(ctx context.Context) { + tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + replayed, conflicts, err := o.Replay(tCtx) + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + slog.Debug("outbox: replay error", "err", err) + } + if replayed > 0 || conflicts > 0 { + slog.Info("outbox: replay pass complete", "replayed", replayed, "conflicts", conflicts) + } +} + +// isConflictName reports whether the directory name is an outbox +// conflict-renamed entry (`.conflict-` form). +func isConflictName(name string) bool { + for i := 0; i+len(outboxConflictPrefix) <= len(name); i++ { + if name[i:i+len(outboxConflictPrefix)] == outboxConflictPrefix { + return true + } + } + return false +} + +// newOutboxID returns a sortable-by-time ID with a random suffix: +// -<6-hex>. Lexical sort ≅ chronological for the +// foreseeable lifetime of the outbox (until ~year 2262). +func newOutboxID() (string, error) { + var randBuf [3]byte + if _, err := rand.Read(randBuf[:]); err != nil { + return "", err + } + return fmt.Sprintf("%016x-%s", time.Now().UnixNano(), hex.EncodeToString(randBuf[:])), nil +} + +// shortRandom returns 4 random hex chars. Used to disambiguate +// conflict-renamed directories that collide on the same RFC3339 +// timestamp. +func shortRandom() string { + var b [2]byte + _, _ = rand.Read(b[:]) + return hex.EncodeToString(b[:]) +} + +// writeFileAtomic writes data to path via temp + rename in the same +// directory. Mode applied to the final file. +func writeFileAtomic(path string, data []byte, mode os.FileMode) error { + tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*") + if err != nil { + return err + } + tmpName := tmp.Name() + if _, err := tmp.Write(data); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpName) + return err + } + if err := tmp.Chmod(mode); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpName) + return err + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpName) + return err + } + return os.Rename(tmpName, path) +} diff --git a/zddc/internal/cache/outbox_test.go b/zddc/internal/cache/outbox_test.go new file mode 100644 index 0000000..00441b3 --- /dev/null +++ b/zddc/internal/cache/outbox_test.go @@ -0,0 +1,463 @@ +package cache + +import ( + "bytes" + "context" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "codeberg.org/VARASYS/ZDDC/zddc/internal/config" +) + +func newOutboxFixture(t *testing.T, upstreamHandler http.HandlerFunc) (*Cache, *Outbox, *httptest.Server) { + t.Helper() + upstream := httptest.NewServer(upstreamHandler) + t.Cleanup(upstream.Close) + root := t.TempDir() + c, err := New(config.Config{Root: root, Upstream: upstream.URL, Mode: "cache"}) + if err != nil { + t.Fatalf("New: %v", err) + } + o, err := NewOutbox(c) + if err != nil { + t.Fatalf("NewOutbox: %v", err) + } + c.SetOutbox(o) + return c, o, upstream +} + +func TestNewOutbox_CreatesDirectoryWith0700(t *testing.T) { + root := t.TempDir() + c, _ := New(config.Config{Root: root, Upstream: "http://example.com", Mode: "cache"}) + o, err := NewOutbox(c) + if err != nil { + t.Fatalf("NewOutbox: %v", err) + } + want := filepath.Join(root, OutboxDir) + if o.Dir() != want { + t.Errorf("Dir() = %q, want %q", o.Dir(), want) + } + info, err := os.Stat(want) + if err != nil { + t.Fatalf("stat: %v", err) + } + if mode := info.Mode().Perm(); mode&0o077 != 0 { + t.Errorf("dir mode %o exposes group/other bits", mode) + } +} + +func TestEnqueue_PersistsBodyAndMeta(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) + body := []byte("hello world") + r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader(body)) + r.Header.Set("Content-Type", "text/plain") + base := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) + + entry, err := o.Enqueue(r, base) + if err != nil { + t.Fatalf("Enqueue: %v", err) + } + if entry.ID == "" { + t.Fatal("entry.ID empty") + } + if entry.Method != "PUT" || entry.RawURI != "/foo.txt" { + t.Errorf("entry method/uri wrong: %+v", entry) + } + if entry.ContentType != "text/plain" { + t.Errorf("ContentType = %q", entry.ContentType) + } + if !entry.BaseModTime.Equal(base) { + t.Errorf("BaseModTime = %v, want %v", entry.BaseModTime, base) + } + + // Body file should contain the request bytes. + got, err := os.ReadFile(filepath.Join(o.Dir(), entry.ID, "body.bin")) + if err != nil { + t.Fatalf("read body.bin: %v", err) + } + if !bytes.Equal(got, body) { + t.Errorf("body = %q, want %q", got, body) + } +} + +func TestPending_OrdersByID(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) + for i := 0; i < 3; i++ { + r := httptest.NewRequest(http.MethodPut, "/x.txt", strings.NewReader("data")) + _, err := o.Enqueue(r, time.Time{}) + if err != nil { + t.Fatalf("enqueue %d: %v", i, err) + } + time.Sleep(2 * time.Millisecond) // ensure unique unix-nano IDs + } + got, err := o.Pending() + if err != nil { + t.Fatalf("Pending: %v", err) + } + if len(got) != 3 { + t.Fatalf("len = %d", len(got)) + } + for i := 1; i < len(got); i++ { + if got[i].ID < got[i-1].ID { + t.Errorf("entries not lex-sorted at index %d: %q < %q", i, got[i].ID, got[i-1].ID) + } + } +} + +func TestReplay_DeletesOnSuccess(t *testing.T) { + var lastBody []byte + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + lastBody, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusNoContent) + }) + r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("payload"))) + r.Header.Set("Content-Type", "text/plain") + entry, _ := o.Enqueue(r, time.Time{}) + + replayed, conflicts, err := o.Replay(context.Background()) + if err != nil { + t.Fatalf("Replay: %v", err) + } + if replayed != 1 || conflicts != 0 { + t.Errorf("replayed=%d conflicts=%d, want 1/0", replayed, conflicts) + } + if string(lastBody) != "payload" { + t.Errorf("upstream got body %q", lastBody) + } + // Entry directory should be gone. + if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) { + t.Errorf("entry not removed: %v", err) + } +} + +func TestReplay_412RenamesToConflict(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusPreconditionFailed) + }) + r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) + entry, _ := o.Enqueue(r, time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)) + + replayed, conflicts, _ := o.Replay(context.Background()) + if replayed != 0 || conflicts != 1 { + t.Errorf("replayed=%d conflicts=%d, want 0/1", replayed, conflicts) + } + // Original entry dir gone, conflict-renamed dir present. + if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) { + t.Error("original entry not renamed") + } + entries, _ := os.ReadDir(o.Dir()) + foundConflict := false + for _, e := range entries { + if strings.HasPrefix(e.Name(), entry.ID+outboxConflictPrefix) { + foundConflict = true + } + } + if !foundConflict { + t.Errorf("no conflict-renamed dir found among %v", dirNames(entries)) + } + // Pending should now exclude the conflict. + pending, _ := o.Pending() + if len(pending) != 0 { + t.Errorf("Pending() includes conflicts: %d entries", len(pending)) + } +} + +func TestReplay_NetworkErrorLeavesEntry(t *testing.T) { + root := t.TempDir() + c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"}) + c.client.Timeout = 200 * time.Millisecond + o, _ := NewOutbox(c) + r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("retry-me"))) + entry, _ := o.Enqueue(r, time.Time{}) + + replayed, conflicts, _ := o.Replay(context.Background()) + if replayed != 0 || conflicts != 0 { + t.Errorf("replayed=%d conflicts=%d, want 0/0 (deferred)", replayed, conflicts) + } + if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); err != nil { + t.Errorf("entry was removed despite network error: %v", err) + } +} + +func TestReplay_4xxNon412Drops(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Forbidden", http.StatusForbidden) + }) + r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) + entry, _ := o.Enqueue(r, time.Time{}) + + replayed, conflicts, _ := o.Replay(context.Background()) + // 4xx-other-than-412 counts as "done" (we drop it; retrying won't help). + if replayed != 1 || conflicts != 0 { + t.Errorf("replayed=%d conflicts=%d", replayed, conflicts) + } + if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) { + t.Errorf("entry not dropped after 403: %v", err) + } +} + +func TestReplay_5xxLeavesEntry(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Internal", http.StatusInternalServerError) + }) + r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) + entry, _ := o.Enqueue(r, time.Time{}) + + replayed, conflicts, _ := o.Replay(context.Background()) + if replayed != 0 || conflicts != 0 { + t.Errorf("replayed=%d conflicts=%d, want 0/0 (deferred)", replayed, conflicts) + } + if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); err != nil { + t.Errorf("entry should remain after 500: %v", err) + } +} + +func TestReplay_SendsIfUnmodifiedSinceWhenBaseSet(t *testing.T) { + var seenIUS string + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + seenIUS = r.Header.Get("If-Unmodified-Since") + w.WriteHeader(http.StatusNoContent) + }) + base := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) + r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) + _, _ = o.Enqueue(r, base) + + _, _, _ = o.Replay(context.Background()) + if seenIUS == "" { + t.Error("upstream did not see If-Unmodified-Since") + } + parsed, err := http.ParseTime(seenIUS) + if err != nil { + t.Fatalf("parse IUS: %v", err) + } + if !parsed.Equal(base) { + t.Errorf("IUS = %v, want %v", parsed, base) + } +} + +func TestReplay_OmitsIfUnmodifiedSinceForPOST(t *testing.T) { + var seenIUS string + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + seenIUS = r.Header.Get("If-Unmodified-Since") + w.WriteHeader(http.StatusOK) + }) + r := httptest.NewRequest(http.MethodPost, "/submit", bytes.NewReader([]byte("data"))) + _, _ = o.Enqueue(r, time.Time{}) + + _, _, _ = o.Replay(context.Background()) + if seenIUS != "" { + t.Errorf("POST should not send If-Unmodified-Since, got %q", seenIUS) + } +} + +func TestReplay_PreservesQueryString(t *testing.T) { + var seenURI string + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + seenURI = r.URL.RequestURI() + w.WriteHeader(http.StatusOK) + }) + r := httptest.NewRequest(http.MethodPost, "/foo?x=1&y=2", strings.NewReader("")) + _, _ = o.Enqueue(r, time.Time{}) + _, _, _ = o.Replay(context.Background()) + if seenURI != "/foo?x=1&y=2" { + t.Errorf("upstream saw URI %q", seenURI) + } +} + +func TestServeHTTP_OnlineWriteForwards(t *testing.T) { + var lastBody []byte + c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + lastBody, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusCreated) + }) + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("data"))) + r.Header.Set("Content-Type", "text/plain") + c.ServeHTTP(rec, r) + if rec.Code != http.StatusCreated { + t.Errorf("status = %d, want 201", rec.Code) + } + if string(lastBody) != "data" { + t.Errorf("upstream body = %q", lastBody) + } + // No outbox entry should have been created for an online write. + pending, _ := c.outbox.Pending() + if len(pending) != 0 { + t.Errorf("pending = %d, want 0", len(pending)) + } +} + +func TestServeHTTP_OfflineWriteQueues(t *testing.T) { + root := t.TempDir() + c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"}) + c.client.Timeout = 200 * time.Millisecond + o, _ := NewOutbox(c) + c.SetOutbox(o) + + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("offline-data"))) + r.Header.Set("Content-Type", "text/plain") + c.ServeHTTP(rec, r) + + if rec.Code != http.StatusAccepted { + t.Fatalf("status = %d, want 202; body=%q", rec.Code, rec.Body.String()) + } + if got := rec.Header().Get(HeaderName); got != "queued" { + t.Errorf("cache header = %q", got) + } + if !strings.Contains(rec.Body.String(), "outbox_id") { + t.Errorf("body missing outbox_id: %q", rec.Body.String()) + } + pending, _ := o.Pending() + if len(pending) != 1 { + t.Errorf("pending = %d, want 1", len(pending)) + } +} + +func TestServeHTTP_OfflineWriteNoOutbox503(t *testing.T) { + root := t.TempDir() + c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"}) + c.client.Timeout = 200 * time.Millisecond + // No outbox installed. + + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("data"))) + c.ServeHTTP(rec, r) + if rec.Code != http.StatusServiceUnavailable { + t.Errorf("status = %d, want 503", rec.Code) + } +} + +func TestServeHTTP_OnlinePutEvictsCachedEntry(t *testing.T) { + c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + }) + cached := filepath.Join(c.root, "x.txt") + if err := os.WriteFile(cached, []byte("old"), 0o644); err != nil { + t.Fatalf("seed: %v", err) + } + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("new"))) + c.ServeHTTP(rec, r) + if rec.Code != http.StatusNoContent { + t.Fatalf("status = %d", rec.Code) + } + if _, err := os.Stat(cached); !os.IsNotExist(err) { + t.Error("cached entry not evicted after successful PUT") + } +} + +func TestServeHTTP_PUTSendsIfUnmodifiedSinceFromCachedMtime(t *testing.T) { + var seenIUS string + c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + seenIUS = r.Header.Get("If-Unmodified-Since") + w.WriteHeader(http.StatusNoContent) + }) + // Seed cached file with a specific mtime. + cached := filepath.Join(c.root, "y.txt") + if err := os.WriteFile(cached, []byte("old"), 0o644); err != nil { + t.Fatalf("seed: %v", err) + } + when := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + if err := os.Chtimes(cached, when, when); err != nil { + t.Fatalf("chtimes: %v", err) + } + + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodPut, "/y.txt", bytes.NewReader([]byte("new"))) + c.ServeHTTP(rec, r) + if seenIUS == "" { + t.Fatal("upstream did not see If-Unmodified-Since") + } + parsed, _ := http.ParseTime(seenIUS) + if !parsed.Equal(when) { + t.Errorf("IUS = %v, want %v (cached mtime)", parsed, when) + } +} + +func TestEnqueue_RejectsOversizeBody(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) + huge := make([]byte, MaxOutboxBodyBytes+10) + r := httptest.NewRequest(http.MethodPut, "/big", bytes.NewReader(huge)) + _, err := o.Enqueue(r, time.Time{}) + if err == nil { + t.Error("expected error for oversize body") + } +} + +func TestNewOutboxID_LexSortable(t *testing.T) { + var ids []string + for i := 0; i < 5; i++ { + id, err := newOutboxID() + if err != nil { + t.Fatalf("id: %v", err) + } + ids = append(ids, id) + time.Sleep(2 * time.Millisecond) + } + for i := 1; i < len(ids); i++ { + if ids[i] <= ids[i-1] { + t.Errorf("IDs not strictly increasing: %q vs %q", ids[i-1], ids[i]) + } + } +} + +func TestRunReplayLoop_StopsOnContextCancel(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + o.RunReplayLoop(ctx) + close(done) + }() + cancel() + select { + case <-done: + // ok + case <-time.After(2 * time.Second): + t.Fatal("RunReplayLoop did not exit on context cancel") + } +} + +func TestConcurrentEnqueue_NoIDCollision(t *testing.T) { + _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) + const N = 30 + var wg sync.WaitGroup + var fails int32 + for i := 0; i < N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + r := httptest.NewRequest(http.MethodPut, "/x", strings.NewReader("data")) + if _, err := o.Enqueue(r, time.Time{}); err != nil { + atomic.AddInt32(&fails, 1) + } + }() + } + wg.Wait() + if fails != 0 { + t.Errorf("%d enqueues failed", fails) + } + pending, _ := o.Pending() + if len(pending) != N { + t.Errorf("pending = %d, want %d", len(pending), N) + } +} + +func dirNames(entries []os.DirEntry) []string { + out := make([]string, len(entries)) + for i, e := range entries { + out[i] = e.Name() + } + return out +}