--mode mirror layers an access-triggered walker on top of the cache
pipeline. When an incoming request's URL falls under one of the
configured --mirror-subtree paths, the scheduler kicks off a recursive
walk of that subtree iff (a) no walk for that subtree is in flight and
(b) now - last_walk_at >= --mirror-min-interval (default 1h). Walks
run in a goroutine; the user's request never blocks on scheduling.
Why access-triggered: a naive "walk on a fixed timer" would produce
thundering-herd polls on a master from many vendor mirrors most of
which are idle most of the time. Demand-triggering means idle mirrors
generate zero upstream traffic until someone hits them; active
mirrors stay current as a side effect of normal use.
The walk:
1. Recursively fetches JSON listings under the subtree, persisting
each at <dir>/.zddc-listing.json so directory browsing works
offline for walked subtrees.
2. For each file, fires a conditional If-Modified-Since GET (bounded
parallelism; default 4 concurrent) — 304 no-op, 200 overwrites,
403/404 purges the local cache.
3. After enumeration, per-directory orphan purge: local files absent
from upstream's filtered listing are removed (handles upstream
deletes + ACL revocations).
State persists at <root>/.zddc-mirror-state.json as
{subtrees: {<path>: {last_walk_at}}}. In-flight tracking is in-memory
only — a crash mid-walk lets the next access retry without manual
cleanup. Subtree path matching is longest-prefix-wins; "/" is a
catch-all (full mirror, the default when --mode=mirror is set without
explicit --mirror-subtree).
The cache layer also gained directory-listing caching (independent of
mirror mode but enabled by it). Directories are now stored at
<dir>/.zddc-listing.<html|json> sidecars, varied by Accept header.
Hit/miss/offline semantics mirror the file pipeline. Phase 2's
limitation that directories always proxied live (no offline browse)
is now resolved for any directory the user has visited or that mirror
mode has walked.
Mirror scope falls out of auth: the walker uses the local instance's
bearer, so it sees exactly what the user can see at upstream. Admin
bearer → full mirror; vendor bearer → vendor's permitted subtree;
no code distinguishes the cases.
New flags (also as ZDDC_* env vars), ignored when --mode != mirror:
- --mirror-subtree <csv> — repeatable subtrees (comma-separated);
empty + --mode=mirror = "/" (full mirror)
- --mirror-min-interval <duration> — default 1h
Tests (15 new in walker_test.go, 3 new in cache_test.go): subtree
normalization, longest-prefix matching, root-as-catch-all, walk
fetches all files in scope, out-of-scope URLs are no-op, rate-
limiting prevents double-walks within min-interval, walks re-fire
after interval elapses, orphan purge removes local-only files,
state file survives restart, concurrent triggers don't double-walk,
end-to-end ServeHTTP-kicks-mirror-on-access, listing format varies
by Accept, listing offline serves stale, persisted state atomic
write + corrupt-input handling. Full suite + go vet clean.
Doc updates: zddc/README.md flags table gains the two new entries
plus a "Mirror mode (access-triggered subtree walker)" subsection
with trigger semantics and properties; the "What client mode is NOT,
yet" list shrinks accordingly. AGENTS.md env-var table gains the
two new entries. ARCHITECTURE.md "Master + proxy/cache/mirror"
section now documents the walker scheduler / walk algorithm / state
file in a "Mirror walker (access-triggered)" subsection.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
438 lines
14 KiB
Go
438 lines
14 KiB
Go
package cache
|
|
|
|
import (
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"codeberg.org/VARASYS/ZDDC/zddc/internal/config"
|
|
)
|
|
|
|
// fakeUpstream returns a handler that serves a small fixed tree:
|
|
//
|
|
// /Vendors/Acme/ → JSON listing of [a.txt, b.txt, sub/]
|
|
// /Vendors/Acme/a.txt → "alpha"
|
|
// /Vendors/Acme/b.txt → "beta"
|
|
// /Vendors/Acme/sub/ → JSON listing of [c.txt]
|
|
// /Vendors/Acme/sub/c.txt → "charlie"
|
|
// /Vendors/Beta/ → JSON listing of [out-of-scope.txt]
|
|
// /Vendors/Beta/out-of-scope.txt → "should-not-be-fetched"
|
|
//
|
|
// hit counts every URL request so tests can assert which paths the
|
|
// walker visits.
|
|
func fakeUpstream() (http.HandlerFunc, *sync.Map) {
|
|
hits := &sync.Map{}
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
old, _ := hits.LoadOrStore(r.URL.Path, int64(0))
|
|
hits.Store(r.URL.Path, old.(int64)+1)
|
|
switch r.URL.Path {
|
|
case "/Vendors/Acme/":
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(`[{"name":"a.txt","is_dir":false},{"name":"b.txt","is_dir":false},{"name":"sub/","is_dir":true}]`))
|
|
case "/Vendors/Acme/a.txt":
|
|
_, _ = w.Write([]byte("alpha"))
|
|
case "/Vendors/Acme/b.txt":
|
|
_, _ = w.Write([]byte("beta"))
|
|
case "/Vendors/Acme/sub/":
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(`[{"name":"c.txt","is_dir":false}]`))
|
|
case "/Vendors/Acme/sub/c.txt":
|
|
_, _ = w.Write([]byte("charlie"))
|
|
case "/Vendors/Beta/":
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(`[{"name":"out-of-scope.txt","is_dir":false}]`))
|
|
case "/Vendors/Beta/out-of-scope.txt":
|
|
_, _ = w.Write([]byte("should-not-be-fetched"))
|
|
default:
|
|
http.NotFound(w, r)
|
|
}
|
|
}, hits
|
|
}
|
|
|
|
func newWalkerTest(t *testing.T, subtrees []string, minInterval time.Duration) (*Cache, *MirrorScheduler, *httptest.Server, *sync.Map) {
|
|
t.Helper()
|
|
handler, hits := fakeUpstream()
|
|
upstream := httptest.NewServer(handler)
|
|
t.Cleanup(upstream.Close)
|
|
root := t.TempDir()
|
|
c, err := New(config.Config{Root: root, Upstream: upstream.URL, Mode: "mirror"})
|
|
if err != nil {
|
|
t.Fatalf("New: %v", err)
|
|
}
|
|
sched, err := NewMirrorScheduler(c, subtrees, minInterval, 0)
|
|
if err != nil {
|
|
t.Fatalf("NewMirrorScheduler: %v", err)
|
|
}
|
|
if sched == nil {
|
|
t.Fatal("expected scheduler, got nil")
|
|
}
|
|
return c, sched, upstream, hits
|
|
}
|
|
|
|
// hitsCount returns the number of times urlPath was hit on upstream.
|
|
func hitsCount(hits *sync.Map, urlPath string) int64 {
|
|
v, ok := hits.Load(urlPath)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
return v.(int64)
|
|
}
|
|
|
|
// waitForFile polls until the named file exists or the deadline
|
|
// passes. Used to wait on background walker writes without sleep loops.
|
|
func waitForFile(t *testing.T, path string, deadline time.Duration) {
|
|
t.Helper()
|
|
end := time.Now().Add(deadline)
|
|
for time.Now().Before(end) {
|
|
if _, err := os.Stat(path); err == nil {
|
|
return
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
t.Fatalf("file did not appear within %v: %s", deadline, path)
|
|
}
|
|
|
|
func TestNewMirrorScheduler_NoSubtrees(t *testing.T) {
|
|
root := t.TempDir()
|
|
c, _ := New(config.Config{Root: root, Upstream: "http://example.com", Mode: "cache"})
|
|
sched, err := NewMirrorScheduler(c, nil, 0, 0)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if sched != nil {
|
|
t.Errorf("expected nil scheduler for empty subtree list")
|
|
}
|
|
}
|
|
|
|
func TestMirrorScheduler_NormalizesSubtrees(t *testing.T) {
|
|
root := t.TempDir()
|
|
c, _ := New(config.Config{Root: root, Upstream: "http://example.com", Mode: "mirror"})
|
|
sched, _ := NewMirrorScheduler(c, []string{"Vendors/Acme/", "/Vendors/Beta", " Public ", "/"}, 0, 0)
|
|
got := sched.Subtrees()
|
|
want := []string{"/Vendors/Acme", "/Vendors/Beta", "/Public", "/"}
|
|
if len(got) != len(want) {
|
|
t.Fatalf("subtrees = %v, want %v", got, want)
|
|
}
|
|
for i := range got {
|
|
if got[i] != want[i] {
|
|
t.Errorf("subtree[%d] = %q, want %q", i, got[i], want[i])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMatchSubtree(t *testing.T) {
|
|
c, _ := New(config.Config{Root: t.TempDir(), Upstream: "http://example.com", Mode: "mirror"})
|
|
sched, _ := NewMirrorScheduler(c, []string{"/Vendors/Acme", "/Public"}, 0, 0)
|
|
cases := []struct {
|
|
url string
|
|
want string
|
|
}{
|
|
{"/Vendors/Acme/foo.txt", "/Vendors/Acme"},
|
|
{"/Vendors/Acme", "/Vendors/Acme"},
|
|
{"/Vendors/Acme/sub/x.txt", "/Vendors/Acme"},
|
|
{"/Vendors/Beta/x.txt", ""},
|
|
{"/Public/file", "/Public"},
|
|
{"/Other", ""},
|
|
{"", ""},
|
|
}
|
|
for _, tc := range cases {
|
|
if got := sched.matchSubtree(tc.url); got != tc.want {
|
|
t.Errorf("matchSubtree(%q) = %q, want %q", tc.url, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMatchSubtree_RootMatchesEverything(t *testing.T) {
|
|
c, _ := New(config.Config{Root: t.TempDir(), Upstream: "http://example.com", Mode: "mirror"})
|
|
sched, _ := NewMirrorScheduler(c, []string{"/"}, 0, 0)
|
|
for _, u := range []string{"/anything", "/Project/X", "/"} {
|
|
if got := sched.matchSubtree(u); got != "/" {
|
|
t.Errorf("matchSubtree(%q) = %q, want /", u, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestTrigger_FetchesSubtree(t *testing.T) {
|
|
c, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond)
|
|
|
|
// Trigger via a request URL under the subtree.
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
// Wait for the walker to drop all expected files.
|
|
waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "a.txt"), 2*time.Second)
|
|
waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "b.txt"), 2*time.Second)
|
|
waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "sub", "c.txt"), 2*time.Second)
|
|
|
|
// JSON listing sidecars exist for the walked dirs.
|
|
waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", listingCachePrefix+"json"), 2*time.Second)
|
|
waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "sub", listingCachePrefix+"json"), 2*time.Second)
|
|
|
|
// Out-of-scope path was not visited.
|
|
if got := hitsCount(hits, "/Vendors/Beta/"); got != 0 {
|
|
t.Errorf("walker hit out-of-scope /Vendors/Beta/ %d times", got)
|
|
}
|
|
}
|
|
|
|
func TestTrigger_OutOfScopeIsNoOp(t *testing.T) {
|
|
_, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond)
|
|
sched.Trigger("/Other/path.txt")
|
|
time.Sleep(150 * time.Millisecond)
|
|
// No upstream calls should have happened.
|
|
count := 0
|
|
hits.Range(func(_, _ interface{}) bool { count++; return true })
|
|
if count != 0 {
|
|
t.Errorf("triggered %d upstream calls for out-of-scope URL", count)
|
|
}
|
|
}
|
|
|
|
func TestTrigger_RateLimitsRapidAccesses(t *testing.T) {
|
|
_, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 1*time.Hour)
|
|
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
// Wait for first walk to complete.
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
sched.mu.Lock()
|
|
flight := sched.inFlight["/Vendors/Acme"]
|
|
st := sched.state["/Vendors/Acme"]
|
|
sched.mu.Unlock()
|
|
if !flight && !st.LastWalkAt.IsZero() {
|
|
break
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
first := hitsCount(hits, "/Vendors/Acme/a.txt")
|
|
// Subsequent triggers within the min-interval are no-ops.
|
|
for i := 0; i < 5; i++ {
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
second := hitsCount(hits, "/Vendors/Acme/a.txt")
|
|
if second != first {
|
|
t.Errorf("rate limit failed: hits went %d → %d", first, second)
|
|
}
|
|
}
|
|
|
|
func TestTrigger_AfterIntervalElapsesWalksAgain(t *testing.T) {
|
|
_, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 50*time.Millisecond)
|
|
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
sched.mu.Lock()
|
|
flight := sched.inFlight["/Vendors/Acme"]
|
|
sched.mu.Unlock()
|
|
if !flight {
|
|
break
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
first := hitsCount(hits, "/Vendors/Acme/a.txt")
|
|
|
|
// Wait past min-interval, trigger again, walk should re-fire.
|
|
time.Sleep(100 * time.Millisecond)
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
deadline = time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
if hitsCount(hits, "/Vendors/Acme/a.txt") > first {
|
|
break
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
second := hitsCount(hits, "/Vendors/Acme/a.txt")
|
|
if second <= first {
|
|
t.Errorf("expected walk after interval; hits %d → %d", first, second)
|
|
}
|
|
}
|
|
|
|
func TestTrigger_PurgesOrphanedFiles(t *testing.T) {
|
|
c, sched, _, _ := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond)
|
|
|
|
// Pre-seed an orphan that's NOT in upstream's listing.
|
|
if err := os.MkdirAll(filepath.Join(c.root, "Vendors", "Acme"), 0o755); err != nil {
|
|
t.Fatalf("mkdir: %v", err)
|
|
}
|
|
orphan := filepath.Join(c.root, "Vendors", "Acme", "stale.txt")
|
|
if err := os.WriteFile(orphan, []byte("orphan"), 0o644); err != nil {
|
|
t.Fatalf("seed: %v", err)
|
|
}
|
|
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "a.txt"), 2*time.Second)
|
|
|
|
// Wait for walk to fully complete (orphan purge happens after fetches).
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
sched.mu.Lock()
|
|
flight := sched.inFlight["/Vendors/Acme"]
|
|
sched.mu.Unlock()
|
|
if !flight {
|
|
break
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
|
|
if _, err := os.Stat(orphan); !os.IsNotExist(err) {
|
|
t.Errorf("orphan file not purged: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestPersistedState_SurvivesRestart(t *testing.T) {
|
|
c, sched, _, _ := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond)
|
|
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
sched.mu.Lock()
|
|
flight := sched.inFlight["/Vendors/Acme"]
|
|
st := sched.state["/Vendors/Acme"]
|
|
sched.mu.Unlock()
|
|
if !flight && !st.LastWalkAt.IsZero() {
|
|
break
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
|
|
// State file should exist.
|
|
statePath := filepath.Join(c.root, MirrorStateFile)
|
|
if _, err := os.Stat(statePath); err != nil {
|
|
t.Fatalf("state file missing: %v", err)
|
|
}
|
|
|
|
// New scheduler against the same root reads the prior state and
|
|
// honors the rate-limit gate.
|
|
sched2, _ := NewMirrorScheduler(c, []string{"/Vendors/Acme"}, 1*time.Hour, 0)
|
|
st := sched2.state["/Vendors/Acme"]
|
|
if st.LastWalkAt.IsZero() {
|
|
t.Error("restart did not load prior LastWalkAt from state file")
|
|
}
|
|
}
|
|
|
|
func TestTrigger_ConcurrentSameSubtreeDoesNotDoubleWalk(t *testing.T) {
|
|
_, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 1*time.Hour)
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sched.Trigger("/Vendors/Acme/a.txt")
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
sched.mu.Lock()
|
|
flight := sched.inFlight["/Vendors/Acme"]
|
|
sched.mu.Unlock()
|
|
if !flight {
|
|
break
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
// Each file should have been fetched exactly once even though we
|
|
// triggered 10 times concurrently.
|
|
if got := hitsCount(hits, "/Vendors/Acme/a.txt"); got != 1 {
|
|
t.Errorf("a.txt fetched %d times, want 1", got)
|
|
}
|
|
}
|
|
|
|
func TestServeHTTP_KicksMirrorOnAccess(t *testing.T) {
|
|
// End-to-end through the cache layer's ServeHTTP, verifying the
|
|
// onAccess hook fires and the walker prefetches.
|
|
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
switch r.URL.Path {
|
|
case "/Vendors/Acme/":
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(`[{"name":"only.txt","is_dir":false}]`))
|
|
case "/Vendors/Acme/only.txt":
|
|
_, _ = w.Write([]byte("data"))
|
|
default:
|
|
http.NotFound(w, r)
|
|
}
|
|
}))
|
|
defer upstream.Close()
|
|
|
|
root := t.TempDir()
|
|
c, err := New(config.Config{Root: root, Upstream: upstream.URL, Mode: "mirror"})
|
|
if err != nil {
|
|
t.Fatalf("New: %v", err)
|
|
}
|
|
sched, err := NewMirrorScheduler(c, []string{"/Vendors/Acme"}, 5*time.Millisecond, 0)
|
|
if err != nil {
|
|
t.Fatalf("NewMirrorScheduler: %v", err)
|
|
}
|
|
|
|
rec := httptest.NewRecorder()
|
|
c.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/Vendors/Acme/only.txt", nil))
|
|
if rec.Code != http.StatusOK {
|
|
t.Fatalf("status = %d", rec.Code)
|
|
}
|
|
// The user's request itself fetched only.txt. Then the walker
|
|
// (kicked async) fetches the listing + only.txt. So only.txt may
|
|
// be fetched twice. The walker MUST have hit the listing.
|
|
waitForFile(t, filepath.Join(root, "Vendors", "Acme", listingCachePrefix+"json"), 2*time.Second)
|
|
waitForFile(t, filepath.Join(root, "Vendors", "Acme", "only.txt"), 2*time.Second)
|
|
|
|
// Drain the walk before t.TempDir cleanup races with file writes.
|
|
waitForWalkDrain(t, sched, "/Vendors/Acme", 2*time.Second)
|
|
}
|
|
|
|
// waitForWalkDrain blocks until no walk is in flight for subtree.
|
|
// Avoids TempDir cleanup races where the walker is still writing
|
|
// when the test finishes.
|
|
func waitForWalkDrain(t *testing.T, sched *MirrorScheduler, subtree string, deadline time.Duration) {
|
|
t.Helper()
|
|
end := time.Now().Add(deadline)
|
|
for time.Now().Before(end) {
|
|
sched.mu.Lock()
|
|
flight := sched.inFlight[subtree]
|
|
sched.mu.Unlock()
|
|
if !flight {
|
|
return
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
t.Fatalf("walk for %s did not drain within %v", subtree, deadline)
|
|
}
|
|
|
|
func TestPersistMirrorState_AtomicWrite(t *testing.T) {
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, MirrorStateFile)
|
|
state := map[string]subtreeState{
|
|
"/Vendors/Acme": {LastWalkAt: time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC)},
|
|
}
|
|
persistMirrorState(path, state)
|
|
|
|
got := loadMirrorState(path)
|
|
if got["/Vendors/Acme"].LastWalkAt.IsZero() {
|
|
t.Error("round-trip lost LastWalkAt")
|
|
}
|
|
// No leftover tmp files.
|
|
entries, _ := os.ReadDir(dir)
|
|
for _, e := range entries {
|
|
if strings.HasPrefix(e.Name(), ".zddc-mirror-state-tmp-") {
|
|
t.Errorf("leftover tmp: %s", e.Name())
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestLoadMirrorState_MissingReturnsEmpty(t *testing.T) {
|
|
got := loadMirrorState(filepath.Join(t.TempDir(), "does-not-exist"))
|
|
if got == nil || len(got) != 0 {
|
|
t.Errorf("expected empty map, got %v", got)
|
|
}
|
|
}
|
|
|
|
func TestLoadMirrorState_CorruptReturnsEmpty(t *testing.T) {
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, "garbage")
|
|
_ = os.WriteFile(path, []byte("not json"), 0o644)
|
|
got := loadMirrorState(path)
|
|
if got == nil || len(got) != 0 {
|
|
t.Errorf("corrupt should yield empty map, got %v", got)
|
|
}
|
|
}
|