ZDDC/zddc/internal/cache/outbox_test.go
ZDDC 8a049ca2a4 feat(client): outbox — offline write queue + replay with If-Unmodified-Since
PUT / POST / DELETE in client mode now work end-to-end. Online: the
cache layer forwards to upstream and (on success) drops any cached
entry for the path so the next read fetches fresh. PUT/DELETE include
If-Unmodified-Since derived from the cached file's mtime so the master
can reject conflicting writes with 412 Precondition Failed.

When upstream is unreachable, the request is captured in the outbox
at <root>/.zddc-outbox/<id>/ — directory per queued write, mode 0700,
containing meta.json (method, RawURI, Content-Type, base mtime,
queued-at) and body.bin (request body, capped at 256 MiB). The client
gets 202 Accepted + X-ZDDC-Cache: queued and a JSON envelope.

A background replay loop started by runClient processes the queue:
- 2xx → delete entry; drop cached path so next read fetches fresh
- 412 → rename to <id>.conflict-<RFC3339>/ for manual reconciliation
       (body + meta intact for inspection or re-submit)
- 4xx other → drop (retry won't help; logged at WARN)
- 5xx / transport error → leave for next pass

Replay schedule: eager at startup, then 30s while pending falling
back to 5min while idle. Loop honors graceful-shutdown context.
Disabled in --mode=proxy (proxy persists nothing by design — offline
writes return 503 instead of queueing).

Outbox IDs are <unix-nano-base16>-<hex-random> so lex-sort = queue
order; concurrent enqueues never collide. Conflict-rename appends a
4-char random suffix on the unlikely same-second collision.

The local cache is intentionally not updated for offline writes:
until upstream confirms the user reads still see the upstream-cached
version (or 503 if uncached). Trade-off: no "did my queued write
actually win?" ambiguity, at the cost of not seeing one's own
offline edits immediately. Phase 5 will surface .conflict-<ts>/
directories in browse views.

Tests (20 new in outbox_test.go, 5 new in cache_test.go covering
the write path): NewOutbox creates 0700 dir, Enqueue persists meta
+ body, Pending returns lex-sorted entries excluding conflicts,
Replay deletes on 2xx / renames on 412 / leaves on transport error
/ leaves on 5xx / drops on 4xx-other, IUS sent only for PUT/DELETE
with base mtime, query string preserved, ServeHTTP online write
forwards + evicts cache, ServeHTTP offline write queues with 202,
ServeHTTP offline + no outbox returns 503, ServeHTTP PUT sends IUS
from cached mtime, oversize body rejected, IDs lex-sortable,
RunReplayLoop stops on context cancel, concurrent Enqueue 30×
no collisions. Full suite + go vet clean.

Doc updates: zddc/README.md gains a "Writes (online + offline
outbox)" subsection covering both paths and replay outcomes;
"What client mode is NOT, yet" now lists only conflict UI and
multi-tenancy. AGENTS.md client-mode pipeline gains writes +
mirror-mode bullets. ARCHITECTURE.md adds a "Writes: outbox +
offline replay" subsection with the trade-off rationale and the
phase-5-deferred conflict UI hand-off.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 08:20:07 -05:00

463 lines
14 KiB
Go

package cache
import (
"bytes"
"context"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"codeberg.org/VARASYS/ZDDC/zddc/internal/config"
)
func newOutboxFixture(t *testing.T, upstreamHandler http.HandlerFunc) (*Cache, *Outbox, *httptest.Server) {
t.Helper()
upstream := httptest.NewServer(upstreamHandler)
t.Cleanup(upstream.Close)
root := t.TempDir()
c, err := New(config.Config{Root: root, Upstream: upstream.URL, Mode: "cache"})
if err != nil {
t.Fatalf("New: %v", err)
}
o, err := NewOutbox(c)
if err != nil {
t.Fatalf("NewOutbox: %v", err)
}
c.SetOutbox(o)
return c, o, upstream
}
func TestNewOutbox_CreatesDirectoryWith0700(t *testing.T) {
root := t.TempDir()
c, _ := New(config.Config{Root: root, Upstream: "http://example.com", Mode: "cache"})
o, err := NewOutbox(c)
if err != nil {
t.Fatalf("NewOutbox: %v", err)
}
want := filepath.Join(root, OutboxDir)
if o.Dir() != want {
t.Errorf("Dir() = %q, want %q", o.Dir(), want)
}
info, err := os.Stat(want)
if err != nil {
t.Fatalf("stat: %v", err)
}
if mode := info.Mode().Perm(); mode&0o077 != 0 {
t.Errorf("dir mode %o exposes group/other bits", mode)
}
}
func TestEnqueue_PersistsBodyAndMeta(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {})
body := []byte("hello world")
r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader(body))
r.Header.Set("Content-Type", "text/plain")
base := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC)
entry, err := o.Enqueue(r, base)
if err != nil {
t.Fatalf("Enqueue: %v", err)
}
if entry.ID == "" {
t.Fatal("entry.ID empty")
}
if entry.Method != "PUT" || entry.RawURI != "/foo.txt" {
t.Errorf("entry method/uri wrong: %+v", entry)
}
if entry.ContentType != "text/plain" {
t.Errorf("ContentType = %q", entry.ContentType)
}
if !entry.BaseModTime.Equal(base) {
t.Errorf("BaseModTime = %v, want %v", entry.BaseModTime, base)
}
// Body file should contain the request bytes.
got, err := os.ReadFile(filepath.Join(o.Dir(), entry.ID, "body.bin"))
if err != nil {
t.Fatalf("read body.bin: %v", err)
}
if !bytes.Equal(got, body) {
t.Errorf("body = %q, want %q", got, body)
}
}
func TestPending_OrdersByID(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {})
for i := 0; i < 3; i++ {
r := httptest.NewRequest(http.MethodPut, "/x.txt", strings.NewReader("data"))
_, err := o.Enqueue(r, time.Time{})
if err != nil {
t.Fatalf("enqueue %d: %v", i, err)
}
time.Sleep(2 * time.Millisecond) // ensure unique unix-nano IDs
}
got, err := o.Pending()
if err != nil {
t.Fatalf("Pending: %v", err)
}
if len(got) != 3 {
t.Fatalf("len = %d", len(got))
}
for i := 1; i < len(got); i++ {
if got[i].ID < got[i-1].ID {
t.Errorf("entries not lex-sorted at index %d: %q < %q", i, got[i].ID, got[i-1].ID)
}
}
}
func TestReplay_DeletesOnSuccess(t *testing.T) {
var lastBody []byte
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
lastBody, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusNoContent)
})
r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("payload")))
r.Header.Set("Content-Type", "text/plain")
entry, _ := o.Enqueue(r, time.Time{})
replayed, conflicts, err := o.Replay(context.Background())
if err != nil {
t.Fatalf("Replay: %v", err)
}
if replayed != 1 || conflicts != 0 {
t.Errorf("replayed=%d conflicts=%d, want 1/0", replayed, conflicts)
}
if string(lastBody) != "payload" {
t.Errorf("upstream got body %q", lastBody)
}
// Entry directory should be gone.
if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) {
t.Errorf("entry not removed: %v", err)
}
}
func TestReplay_412RenamesToConflict(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusPreconditionFailed)
})
r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x")))
entry, _ := o.Enqueue(r, time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC))
replayed, conflicts, _ := o.Replay(context.Background())
if replayed != 0 || conflicts != 1 {
t.Errorf("replayed=%d conflicts=%d, want 0/1", replayed, conflicts)
}
// Original entry dir gone, conflict-renamed dir present.
if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) {
t.Error("original entry not renamed")
}
entries, _ := os.ReadDir(o.Dir())
foundConflict := false
for _, e := range entries {
if strings.HasPrefix(e.Name(), entry.ID+outboxConflictPrefix) {
foundConflict = true
}
}
if !foundConflict {
t.Errorf("no conflict-renamed dir found among %v", dirNames(entries))
}
// Pending should now exclude the conflict.
pending, _ := o.Pending()
if len(pending) != 0 {
t.Errorf("Pending() includes conflicts: %d entries", len(pending))
}
}
func TestReplay_NetworkErrorLeavesEntry(t *testing.T) {
root := t.TempDir()
c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"})
c.client.Timeout = 200 * time.Millisecond
o, _ := NewOutbox(c)
r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("retry-me")))
entry, _ := o.Enqueue(r, time.Time{})
replayed, conflicts, _ := o.Replay(context.Background())
if replayed != 0 || conflicts != 0 {
t.Errorf("replayed=%d conflicts=%d, want 0/0 (deferred)", replayed, conflicts)
}
if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); err != nil {
t.Errorf("entry was removed despite network error: %v", err)
}
}
func TestReplay_4xxNon412Drops(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Forbidden", http.StatusForbidden)
})
r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x")))
entry, _ := o.Enqueue(r, time.Time{})
replayed, conflicts, _ := o.Replay(context.Background())
// 4xx-other-than-412 counts as "done" (we drop it; retrying won't help).
if replayed != 1 || conflicts != 0 {
t.Errorf("replayed=%d conflicts=%d", replayed, conflicts)
}
if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) {
t.Errorf("entry not dropped after 403: %v", err)
}
}
func TestReplay_5xxLeavesEntry(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Internal", http.StatusInternalServerError)
})
r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x")))
entry, _ := o.Enqueue(r, time.Time{})
replayed, conflicts, _ := o.Replay(context.Background())
if replayed != 0 || conflicts != 0 {
t.Errorf("replayed=%d conflicts=%d, want 0/0 (deferred)", replayed, conflicts)
}
if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); err != nil {
t.Errorf("entry should remain after 500: %v", err)
}
}
func TestReplay_SendsIfUnmodifiedSinceWhenBaseSet(t *testing.T) {
var seenIUS string
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
seenIUS = r.Header.Get("If-Unmodified-Since")
w.WriteHeader(http.StatusNoContent)
})
base := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)
r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x")))
_, _ = o.Enqueue(r, base)
_, _, _ = o.Replay(context.Background())
if seenIUS == "" {
t.Error("upstream did not see If-Unmodified-Since")
}
parsed, err := http.ParseTime(seenIUS)
if err != nil {
t.Fatalf("parse IUS: %v", err)
}
if !parsed.Equal(base) {
t.Errorf("IUS = %v, want %v", parsed, base)
}
}
func TestReplay_OmitsIfUnmodifiedSinceForPOST(t *testing.T) {
var seenIUS string
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
seenIUS = r.Header.Get("If-Unmodified-Since")
w.WriteHeader(http.StatusOK)
})
r := httptest.NewRequest(http.MethodPost, "/submit", bytes.NewReader([]byte("data")))
_, _ = o.Enqueue(r, time.Time{})
_, _, _ = o.Replay(context.Background())
if seenIUS != "" {
t.Errorf("POST should not send If-Unmodified-Since, got %q", seenIUS)
}
}
func TestReplay_PreservesQueryString(t *testing.T) {
var seenURI string
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
seenURI = r.URL.RequestURI()
w.WriteHeader(http.StatusOK)
})
r := httptest.NewRequest(http.MethodPost, "/foo?x=1&y=2", strings.NewReader(""))
_, _ = o.Enqueue(r, time.Time{})
_, _, _ = o.Replay(context.Background())
if seenURI != "/foo?x=1&y=2" {
t.Errorf("upstream saw URI %q", seenURI)
}
}
func TestServeHTTP_OnlineWriteForwards(t *testing.T) {
var lastBody []byte
c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
lastBody, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusCreated)
})
rec := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("data")))
r.Header.Set("Content-Type", "text/plain")
c.ServeHTTP(rec, r)
if rec.Code != http.StatusCreated {
t.Errorf("status = %d, want 201", rec.Code)
}
if string(lastBody) != "data" {
t.Errorf("upstream body = %q", lastBody)
}
// No outbox entry should have been created for an online write.
pending, _ := c.outbox.Pending()
if len(pending) != 0 {
t.Errorf("pending = %d, want 0", len(pending))
}
}
func TestServeHTTP_OfflineWriteQueues(t *testing.T) {
root := t.TempDir()
c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"})
c.client.Timeout = 200 * time.Millisecond
o, _ := NewOutbox(c)
c.SetOutbox(o)
rec := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("offline-data")))
r.Header.Set("Content-Type", "text/plain")
c.ServeHTTP(rec, r)
if rec.Code != http.StatusAccepted {
t.Fatalf("status = %d, want 202; body=%q", rec.Code, rec.Body.String())
}
if got := rec.Header().Get(HeaderName); got != "queued" {
t.Errorf("cache header = %q", got)
}
if !strings.Contains(rec.Body.String(), "outbox_id") {
t.Errorf("body missing outbox_id: %q", rec.Body.String())
}
pending, _ := o.Pending()
if len(pending) != 1 {
t.Errorf("pending = %d, want 1", len(pending))
}
}
func TestServeHTTP_OfflineWriteNoOutbox503(t *testing.T) {
root := t.TempDir()
c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"})
c.client.Timeout = 200 * time.Millisecond
// No outbox installed.
rec := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("data")))
c.ServeHTTP(rec, r)
if rec.Code != http.StatusServiceUnavailable {
t.Errorf("status = %d, want 503", rec.Code)
}
}
func TestServeHTTP_OnlinePutEvictsCachedEntry(t *testing.T) {
c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
})
cached := filepath.Join(c.root, "x.txt")
if err := os.WriteFile(cached, []byte("old"), 0o644); err != nil {
t.Fatalf("seed: %v", err)
}
rec := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("new")))
c.ServeHTTP(rec, r)
if rec.Code != http.StatusNoContent {
t.Fatalf("status = %d", rec.Code)
}
if _, err := os.Stat(cached); !os.IsNotExist(err) {
t.Error("cached entry not evicted after successful PUT")
}
}
func TestServeHTTP_PUTSendsIfUnmodifiedSinceFromCachedMtime(t *testing.T) {
var seenIUS string
c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
seenIUS = r.Header.Get("If-Unmodified-Since")
w.WriteHeader(http.StatusNoContent)
})
// Seed cached file with a specific mtime.
cached := filepath.Join(c.root, "y.txt")
if err := os.WriteFile(cached, []byte("old"), 0o644); err != nil {
t.Fatalf("seed: %v", err)
}
when := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC)
if err := os.Chtimes(cached, when, when); err != nil {
t.Fatalf("chtimes: %v", err)
}
rec := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPut, "/y.txt", bytes.NewReader([]byte("new")))
c.ServeHTTP(rec, r)
if seenIUS == "" {
t.Fatal("upstream did not see If-Unmodified-Since")
}
parsed, _ := http.ParseTime(seenIUS)
if !parsed.Equal(when) {
t.Errorf("IUS = %v, want %v (cached mtime)", parsed, when)
}
}
func TestEnqueue_RejectsOversizeBody(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {})
huge := make([]byte, MaxOutboxBodyBytes+10)
r := httptest.NewRequest(http.MethodPut, "/big", bytes.NewReader(huge))
_, err := o.Enqueue(r, time.Time{})
if err == nil {
t.Error("expected error for oversize body")
}
}
func TestNewOutboxID_LexSortable(t *testing.T) {
var ids []string
for i := 0; i < 5; i++ {
id, err := newOutboxID()
if err != nil {
t.Fatalf("id: %v", err)
}
ids = append(ids, id)
time.Sleep(2 * time.Millisecond)
}
for i := 1; i < len(ids); i++ {
if ids[i] <= ids[i-1] {
t.Errorf("IDs not strictly increasing: %q vs %q", ids[i-1], ids[i])
}
}
}
func TestRunReplayLoop_StopsOnContextCancel(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
o.RunReplayLoop(ctx)
close(done)
}()
cancel()
select {
case <-done:
// ok
case <-time.After(2 * time.Second):
t.Fatal("RunReplayLoop did not exit on context cancel")
}
}
func TestConcurrentEnqueue_NoIDCollision(t *testing.T) {
_, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {})
const N = 30
var wg sync.WaitGroup
var fails int32
for i := 0; i < N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := httptest.NewRequest(http.MethodPut, "/x", strings.NewReader("data"))
if _, err := o.Enqueue(r, time.Time{}); err != nil {
atomic.AddInt32(&fails, 1)
}
}()
}
wg.Wait()
if fails != 0 {
t.Errorf("%d enqueues failed", fails)
}
pending, _ := o.Pending()
if len(pending) != N {
t.Errorf("pending = %d, want %d", len(pending), N)
}
}
func dirNames(entries []os.DirEntry) []string {
out := make([]string, len(entries))
for i, e := range entries {
out[i] = e.Name()
}
return out
}