package cache import ( "bytes" "context" "io" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "sync" "sync/atomic" "testing" "time" "codeberg.org/VARASYS/ZDDC/zddc/internal/config" ) func newOutboxFixture(t *testing.T, upstreamHandler http.HandlerFunc) (*Cache, *Outbox, *httptest.Server) { t.Helper() upstream := httptest.NewServer(upstreamHandler) t.Cleanup(upstream.Close) root := t.TempDir() c, err := New(config.Config{Root: root, Upstream: upstream.URL, Mode: "cache"}) if err != nil { t.Fatalf("New: %v", err) } o, err := NewOutbox(c) if err != nil { t.Fatalf("NewOutbox: %v", err) } c.SetOutbox(o) return c, o, upstream } func TestNewOutbox_CreatesDirectoryWith0700(t *testing.T) { root := t.TempDir() c, _ := New(config.Config{Root: root, Upstream: "http://example.com", Mode: "cache"}) o, err := NewOutbox(c) if err != nil { t.Fatalf("NewOutbox: %v", err) } want := filepath.Join(root, OutboxDir) if o.Dir() != want { t.Errorf("Dir() = %q, want %q", o.Dir(), want) } info, err := os.Stat(want) if err != nil { t.Fatalf("stat: %v", err) } if mode := info.Mode().Perm(); mode&0o077 != 0 { t.Errorf("dir mode %o exposes group/other bits", mode) } } func TestEnqueue_PersistsBodyAndMeta(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) body := []byte("hello world") r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader(body)) r.Header.Set("Content-Type", "text/plain") base := time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC) entry, err := o.Enqueue(r, base) if err != nil { t.Fatalf("Enqueue: %v", err) } if entry.ID == "" { t.Fatal("entry.ID empty") } if entry.Method != "PUT" || entry.RawURI != "/foo.txt" { t.Errorf("entry method/uri wrong: %+v", entry) } if entry.ContentType != "text/plain" { t.Errorf("ContentType = %q", entry.ContentType) } if !entry.BaseModTime.Equal(base) { t.Errorf("BaseModTime = %v, want %v", entry.BaseModTime, base) } // Body file should contain the request bytes. got, err := os.ReadFile(filepath.Join(o.Dir(), entry.ID, "body.bin")) if err != nil { t.Fatalf("read body.bin: %v", err) } if !bytes.Equal(got, body) { t.Errorf("body = %q, want %q", got, body) } } func TestPending_OrdersByID(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) for i := 0; i < 3; i++ { r := httptest.NewRequest(http.MethodPut, "/x.txt", strings.NewReader("data")) _, err := o.Enqueue(r, time.Time{}) if err != nil { t.Fatalf("enqueue %d: %v", i, err) } time.Sleep(2 * time.Millisecond) // ensure unique unix-nano IDs } got, err := o.Pending() if err != nil { t.Fatalf("Pending: %v", err) } if len(got) != 3 { t.Fatalf("len = %d", len(got)) } for i := 1; i < len(got); i++ { if got[i].ID < got[i-1].ID { t.Errorf("entries not lex-sorted at index %d: %q < %q", i, got[i].ID, got[i-1].ID) } } } func TestReplay_DeletesOnSuccess(t *testing.T) { var lastBody []byte _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { lastBody, _ = io.ReadAll(r.Body) w.WriteHeader(http.StatusNoContent) }) r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("payload"))) r.Header.Set("Content-Type", "text/plain") entry, _ := o.Enqueue(r, time.Time{}) replayed, conflicts, err := o.Replay(context.Background()) if err != nil { t.Fatalf("Replay: %v", err) } if replayed != 1 || conflicts != 0 { t.Errorf("replayed=%d conflicts=%d, want 1/0", replayed, conflicts) } if string(lastBody) != "payload" { t.Errorf("upstream got body %q", lastBody) } // Entry directory should be gone. if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) { t.Errorf("entry not removed: %v", err) } } func TestReplay_412RenamesToConflict(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusPreconditionFailed) }) r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) entry, _ := o.Enqueue(r, time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)) replayed, conflicts, _ := o.Replay(context.Background()) if replayed != 0 || conflicts != 1 { t.Errorf("replayed=%d conflicts=%d, want 0/1", replayed, conflicts) } // Original entry dir gone, conflict-renamed dir present. if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) { t.Error("original entry not renamed") } entries, _ := os.ReadDir(o.Dir()) foundConflict := false for _, e := range entries { if strings.HasPrefix(e.Name(), entry.ID+outboxConflictPrefix) { foundConflict = true } } if !foundConflict { t.Errorf("no conflict-renamed dir found among %v", dirNames(entries)) } // Pending should now exclude the conflict. pending, _ := o.Pending() if len(pending) != 0 { t.Errorf("Pending() includes conflicts: %d entries", len(pending)) } } func TestReplay_NetworkErrorLeavesEntry(t *testing.T) { root := t.TempDir() c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"}) c.client.Timeout = 200 * time.Millisecond o, _ := NewOutbox(c) r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("retry-me"))) entry, _ := o.Enqueue(r, time.Time{}) replayed, conflicts, _ := o.Replay(context.Background()) if replayed != 0 || conflicts != 0 { t.Errorf("replayed=%d conflicts=%d, want 0/0 (deferred)", replayed, conflicts) } if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); err != nil { t.Errorf("entry was removed despite network error: %v", err) } } func TestReplay_4xxNon412Drops(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { http.Error(w, "Forbidden", http.StatusForbidden) }) r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) entry, _ := o.Enqueue(r, time.Time{}) replayed, conflicts, _ := o.Replay(context.Background()) // 4xx-other-than-412 counts as "done" (we drop it; retrying won't help). if replayed != 1 || conflicts != 0 { t.Errorf("replayed=%d conflicts=%d", replayed, conflicts) } if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); !os.IsNotExist(err) { t.Errorf("entry not dropped after 403: %v", err) } } func TestReplay_5xxLeavesEntry(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { http.Error(w, "Internal", http.StatusInternalServerError) }) r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) entry, _ := o.Enqueue(r, time.Time{}) replayed, conflicts, _ := o.Replay(context.Background()) if replayed != 0 || conflicts != 0 { t.Errorf("replayed=%d conflicts=%d, want 0/0 (deferred)", replayed, conflicts) } if _, err := os.Stat(filepath.Join(o.Dir(), entry.ID)); err != nil { t.Errorf("entry should remain after 500: %v", err) } } func TestReplay_SendsIfUnmodifiedSinceWhenBaseSet(t *testing.T) { var seenIUS string _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { seenIUS = r.Header.Get("If-Unmodified-Since") w.WriteHeader(http.StatusNoContent) }) base := time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC) r := httptest.NewRequest(http.MethodPut, "/foo.txt", bytes.NewReader([]byte("x"))) _, _ = o.Enqueue(r, base) _, _, _ = o.Replay(context.Background()) if seenIUS == "" { t.Error("upstream did not see If-Unmodified-Since") } parsed, err := http.ParseTime(seenIUS) if err != nil { t.Fatalf("parse IUS: %v", err) } if !parsed.Equal(base) { t.Errorf("IUS = %v, want %v", parsed, base) } } func TestReplay_OmitsIfUnmodifiedSinceForPOST(t *testing.T) { var seenIUS string _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { seenIUS = r.Header.Get("If-Unmodified-Since") w.WriteHeader(http.StatusOK) }) r := httptest.NewRequest(http.MethodPost, "/submit", bytes.NewReader([]byte("data"))) _, _ = o.Enqueue(r, time.Time{}) _, _, _ = o.Replay(context.Background()) if seenIUS != "" { t.Errorf("POST should not send If-Unmodified-Since, got %q", seenIUS) } } func TestReplay_PreservesQueryString(t *testing.T) { var seenURI string _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { seenURI = r.URL.RequestURI() w.WriteHeader(http.StatusOK) }) r := httptest.NewRequest(http.MethodPost, "/foo?x=1&y=2", strings.NewReader("")) _, _ = o.Enqueue(r, time.Time{}) _, _, _ = o.Replay(context.Background()) if seenURI != "/foo?x=1&y=2" { t.Errorf("upstream saw URI %q", seenURI) } } func TestServeHTTP_OnlineWriteForwards(t *testing.T) { var lastBody []byte c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { lastBody, _ = io.ReadAll(r.Body) w.WriteHeader(http.StatusCreated) }) rec := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("data"))) r.Header.Set("Content-Type", "text/plain") c.ServeHTTP(rec, r) if rec.Code != http.StatusCreated { t.Errorf("status = %d, want 201", rec.Code) } if string(lastBody) != "data" { t.Errorf("upstream body = %q", lastBody) } // No outbox entry should have been created for an online write. pending, _ := c.outbox.Pending() if len(pending) != 0 { t.Errorf("pending = %d, want 0", len(pending)) } } func TestServeHTTP_OfflineWriteQueues(t *testing.T) { root := t.TempDir() c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"}) c.client.Timeout = 200 * time.Millisecond o, _ := NewOutbox(c) c.SetOutbox(o) rec := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("offline-data"))) r.Header.Set("Content-Type", "text/plain") c.ServeHTTP(rec, r) if rec.Code != http.StatusAccepted { t.Fatalf("status = %d, want 202; body=%q", rec.Code, rec.Body.String()) } if got := rec.Header().Get(HeaderName); got != "queued" { t.Errorf("cache header = %q", got) } if !strings.Contains(rec.Body.String(), "outbox_id") { t.Errorf("body missing outbox_id: %q", rec.Body.String()) } pending, _ := o.Pending() if len(pending) != 1 { t.Errorf("pending = %d, want 1", len(pending)) } } func TestServeHTTP_OfflineWriteNoOutbox503(t *testing.T) { root := t.TempDir() c, _ := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"}) c.client.Timeout = 200 * time.Millisecond // No outbox installed. rec := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("data"))) c.ServeHTTP(rec, r) if rec.Code != http.StatusServiceUnavailable { t.Errorf("status = %d, want 503", rec.Code) } } func TestServeHTTP_OnlinePutEvictsCachedEntry(t *testing.T) { c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) }) cached := filepath.Join(c.root, "x.txt") if err := os.WriteFile(cached, []byte("old"), 0o644); err != nil { t.Fatalf("seed: %v", err) } rec := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPut, "/x.txt", bytes.NewReader([]byte("new"))) c.ServeHTTP(rec, r) if rec.Code != http.StatusNoContent { t.Fatalf("status = %d", rec.Code) } if _, err := os.Stat(cached); !os.IsNotExist(err) { t.Error("cached entry not evicted after successful PUT") } } func TestServeHTTP_PUTSendsIfUnmodifiedSinceFromCachedMtime(t *testing.T) { var seenIUS string c, _, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { seenIUS = r.Header.Get("If-Unmodified-Since") w.WriteHeader(http.StatusNoContent) }) // Seed cached file with a specific mtime. cached := filepath.Join(c.root, "y.txt") if err := os.WriteFile(cached, []byte("old"), 0o644); err != nil { t.Fatalf("seed: %v", err) } when := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) if err := os.Chtimes(cached, when, when); err != nil { t.Fatalf("chtimes: %v", err) } rec := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPut, "/y.txt", bytes.NewReader([]byte("new"))) c.ServeHTTP(rec, r) if seenIUS == "" { t.Fatal("upstream did not see If-Unmodified-Since") } parsed, _ := http.ParseTime(seenIUS) if !parsed.Equal(when) { t.Errorf("IUS = %v, want %v (cached mtime)", parsed, when) } } func TestEnqueue_RejectsOversizeBody(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) huge := make([]byte, MaxOutboxBodyBytes+10) r := httptest.NewRequest(http.MethodPut, "/big", bytes.NewReader(huge)) _, err := o.Enqueue(r, time.Time{}) if err == nil { t.Error("expected error for oversize body") } } func TestNewOutboxID_LexSortable(t *testing.T) { var ids []string for i := 0; i < 5; i++ { id, err := newOutboxID() if err != nil { t.Fatalf("id: %v", err) } ids = append(ids, id) time.Sleep(2 * time.Millisecond) } for i := 1; i < len(ids); i++ { if ids[i] <= ids[i-1] { t.Errorf("IDs not strictly increasing: %q vs %q", ids[i-1], ids[i]) } } } func TestRunReplayLoop_StopsOnContextCancel(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) go func() { o.RunReplayLoop(ctx) close(done) }() cancel() select { case <-done: // ok case <-time.After(2 * time.Second): t.Fatal("RunReplayLoop did not exit on context cancel") } } func TestConcurrentEnqueue_NoIDCollision(t *testing.T) { _, o, _ := newOutboxFixture(t, func(w http.ResponseWriter, r *http.Request) {}) const N = 30 var wg sync.WaitGroup var fails int32 for i := 0; i < N; i++ { wg.Add(1) go func() { defer wg.Done() r := httptest.NewRequest(http.MethodPut, "/x", strings.NewReader("data")) if _, err := o.Enqueue(r, time.Time{}); err != nil { atomic.AddInt32(&fails, 1) } }() } wg.Wait() if fails != 0 { t.Errorf("%d enqueues failed", fails) } pending, _ := o.Pending() if len(pending) != N { t.Errorf("pending = %d, want %d", len(pending), N) } } func dirNames(entries []os.DirEntry) []string { out := make([]string, len(entries)) for i, e := range entries { out[i] = e.Name() } return out }