diff --git a/AGENTS.md b/AGENTS.md index c206755..591c4e8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -451,6 +451,8 @@ ZDDC_ROOT=/path/to/your/archive ZDDC_TLS_CERT=none ZDDC_ADDR=:8080 \ | `ZDDC_MODE` | `cache` | Client mode: `proxy` (forward live, no persistence), `cache` (default; persist responses on access), `mirror` (phase 3 — currently behaves like `cache`). Ignored when `ZDDC_UPSTREAM` is empty. | | `ZDDC_BEARER_FILE` | *(empty)* | Path to a 0600 file containing the master-issued token (see `/.tokens` on the master). Forwarded as `Authorization: Bearer …` to upstream on every request. Ignored when `ZDDC_UPSTREAM` is empty. | | `ZDDC_SKIP_TLS_VERIFY` | *(empty)* | `1` accepts self-signed / untrusted upstream certs. Distinct from `ZDDC_NO_AUTH`. Dev / internal-CA scenarios only. | +| `ZDDC_MIRROR_SUBTREE` | *(empty)* | Comma-separated URL subtrees the access-triggered mirror walker keeps current (e.g. `/Vendors/Acme,/Public`). Empty + `ZDDC_MODE=mirror` = full mirror (`/`). Ignored when `ZDDC_MODE != mirror`. | +| `ZDDC_MIRROR_MIN_INTERVAL` | `1h` | Minimum gap between walks of the same mirror subtree. Idle subtrees generate zero upstream traffic until next access. Format is Go `time.ParseDuration`. | | `ZDDC_OPA_URL` | `internal` | Policy decider endpoint. `internal` (default) = in-process Go evaluator (same `.zddc` cascade we always had). `http(s)://...` or `unix:///...` = external OPA — every access decision becomes a `POST /v1/data/zddc/access/allow` to the configured endpoint. Federal customers with their own audited Rego use this; commercial deployments leave it `internal`. | | `ZDDC_OPA_FAIL_OPEN` | *(empty)* | External OPA only. `1` = allow on transport error; default = fail closed (deny). | | `ZDDC_OPA_CACHE_TTL` | `1s` | External OPA only. Per-decision cache TTL — amortizes round-trips on bursty patterns (e.g. `.archive` listings hit the same `(email, dir)` tuple many times). `0` disables. Format is Go `time.ParseDuration`. | diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 130276b..370c92d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -481,7 +481,7 @@ Three sub-modes within client mode, controlled by `--mode `: |---|---|---|---| | `proxy` | no | no | thin pass-through; nothing on local disk | | `cache` (default) | yes | no | field engineer — what you've viewed is available offline | -| `mirror` | yes | yes (planned, phase 3) | vendor mirrors of their subtree; admin backups; complete offline working set | +| `mirror` | yes | yes (access-triggered, subtree-scoped) | vendor mirrors of their subtree; admin backups; complete offline working set | Internally the modes collapse to two switches on a single request-handling pipeline (`persist`, `warm`). Proxy is cache without disk writes; mirror is cache plus an access-triggered walker. Implementation factor: `cache.New` reads `cfg.Mode` once and sets `c.persist = mode != "proxy"`; the warmer is the only path that doesn't yet exist (phase 3). @@ -511,6 +511,31 @@ Responses with `Cache-Control: no-store` or `Cache-Control: private` pass throug Hop-by-hop headers per RFC 7230 §6.1 (`Connection`, `Keep-Alive`, `Transfer-Encoding`, etc.) are dropped from forwarded responses; Go's transport drops most automatically, but the cache layer adds a guard for the cases that slip through. +#### Mirror walker (access-triggered) + +`--mode mirror` adds an access-triggered subtree warmer (`zddc/internal/cache/walker.go`) on top of the cache pipeline. Naive design ("walk on a fixed timer") would scale poorly: many vendor mirrors against one master would generate thundering-herd polls of subtrees no human has looked at in months. Instead, walks are demand-triggered, rate-limited per-subtree. + +Trigger policy (`MirrorScheduler.Trigger(urlPath)` is installed as the cache layer's `onAccess` hook, called in a goroutine on every authenticated request): + +1. Match `urlPath` against the configured `--mirror-subtree`s. Longest prefix wins; `/` is a catch-all (full mirror). +2. If a walk is already in flight for that subtree, no-op. +3. If `now - last_walk_at < --mirror-min-interval` (default 1h), no-op. +4. Otherwise, mark in-flight and kick a walk goroutine. + +Walk: + +1. Recursively fetch JSON listings under the subtree, persisting each as `/.zddc-listing.json` (so directory browsing works offline for walked subtrees). +2. For each file, fire a conditional `If-Modified-Since` GET (bounded parallelism — default 4 concurrent, configurable). 304 = no-op; 200 = overwrite; 403/404 = purge. +3. Per-directory orphan purge: any local file present locally but absent from the upstream listing is removed (handles upstream deletes + ACL revocations). + +State persists at `/.zddc-mirror-state.json` as `{subtrees: {: {last_walk_at}}}`. In-flight tracking is in-memory only — a crash mid-walk lets the next access retry without manual cleanup. + +Properties: +- **Idle mirrors are quiet.** No requests means no walks means zero upstream traffic. +- **Active mirrors stay current as a side effect of normal use** (no explicit refresh gesture). +- **Revocation latency** is bounded by access frequency. Documented behavior, not a guarantee. +- **Bounded concurrency** keeps walks from starving the user's interactive requests on the same connection pool. + #### Multi-tenancy: explicitly out of scope (v1) The local instance forwards a single bearer (loaded from `--bearer-file` at startup) regardless of who's calling locally. Single-user-trust on a laptop. For multi-user scenarios, run multiple instances on the same host, or front the local server with your own auth proxy that injects per-user bearers downstream — both options keep the cache layer's design surface minimal. diff --git a/zddc/README.md b/zddc/README.md index 5094dc7..ab75acf 100644 --- a/zddc/README.md +++ b/zddc/README.md @@ -215,8 +215,8 @@ Three modes via `--mode`: | Mode | Persists responses? | Subtree warmer? | Use case | |---|---|---|---| | `proxy` | no | no | thin pass-through; nothing on local disk | -| `cache` (default) | yes | no | field engineer — what you've viewed is available offline | -| `mirror` | yes | yes (phase 3) | vendor mirrors, admin backups, complete offline working set | +| `cache` (default) | yes | no (only what you visit) | field engineer — what you've viewed is available offline | +| `mirror` | yes | yes (access-triggered) | vendor mirrors, admin backups, complete offline working set | The cache directory layout is a normal ZDDC root: `/foo/bar.txt` is stored at `/foo/bar.txt`. No sidecar metadata. Running @@ -237,6 +237,8 @@ provides ops provenance. | `--bearer-file ` / `ZDDC_BEARER_FILE` | Path to a 0600 file with a master-issued token (see `/.tokens` on the master). Forwarded as `Authorization: Bearer …` on every upstream request. | | `--skip-tls-verify` / `ZDDC_SKIP_TLS_VERIFY` | Accept self-signed / untrusted upstream certs. Distinct from `--no-auth`. Dev / internal-CA scenarios only. | | `--no-auth` / `ZDDC_NO_AUTH` | Skip ACL enforcement on incoming requests to the local instance. The common case for personal field-engineer / cache deployments where the laptop is single-user-trust and the master already filtered. | +| `--mirror-subtree ` / `ZDDC_MIRROR_SUBTREE` | Mirror-mode only. Comma-separated URL subtrees the access-triggered walker keeps current. Empty + `--mode=mirror` = full mirror (`/`). | +| `--mirror-min-interval ` / `ZDDC_MIRROR_MIN_INTERVAL` | Mirror-mode only. Minimum gap between walks of the same subtree. Default `1h`. Idle subtrees generate zero upstream traffic until next access. | ### Pipeline @@ -272,11 +274,25 @@ zddc-server \ Browse `http://localhost:8444/`. Files you visit appear under `/tmp/zddc-mirror/` mirroring the master's path layout. Disconnect, refresh — previously-visited files keep working. Reconnect — background revalidates run on every cache hit, picking up master-side changes the next time you reload. +### Mirror mode (access-triggered subtree walker) + +`--mode mirror` adds an access-triggered walker on top of the cache pipeline. When a request arrives at a path under one of the configured `--mirror-subtree`s, the scheduler kicks off a recursive walk of that subtree if `now - last_walk_at >= --mirror-min-interval` and no walk is already in flight for that subtree. The walk: + +1. Recursively fetches JSON listings under the subtree (each listing also persisted as the JSON sidecar `.zddc-listing.json`, so directory browsing works offline). +2. For each file in the listing, fires a conditional `If-Modified-Since` GET — `304` is a no-op, `200` writes new bytes, `403`/`404` purges the local cache entry. +3. After enumeration, purges local files in each walked directory that no longer appear in the master's filtered listing (handles upstream deletes + ACL revocations). + +State (`{subtree → last_walk_at}`) persists in `/.zddc-mirror-state.json` so restarts honor the last-walked timestamp. In-flight tracking is in-memory only — a crash mid-walk lets the next access retry immediately. + +Properties: +- **Idle mirrors are quiet.** No requests means no walks means no traffic to the master. A vendor who hasn't logged in for three months triggers exactly one walk on next access. +- **Active mirrors stay current as a side effect of use.** Browsing always touches the access-trigger; walks fire at most once per `--mirror-min-interval` per subtree. +- **Revocation latency** is bounded by access frequency: a revoked file in an idle mirror remains until the next walk fires. +- **Bounded concurrency** (4 parallel fetches per walk) so the walker doesn't starve the user's interactive requests sharing the same connection pool. + ### What client mode is NOT, yet - **No write path**: `PUT`/`POST`/`DELETE` return `405`. The offline write outbox lands in a later phase. -- **No mirror walker**: `--mode mirror` is accepted but currently behaves like `cache` (no proactive prefetching). Phase 3 adds the access-triggered walk scheduler. -- **No listing cache**: directories always proxy live, so offline browsing of a directory you didn't visit while online won't show anything. Mirror mode + listing caching is phase 3. - **No multi-tenancy**: the local instance forwards a single bearer to upstream regardless of who's calling locally. For multi-user deployments, run multiple instances or front the local server with your own auth proxy. ## Access control: the `.zddc` cascade diff --git a/zddc/cmd/zddc-server/main.go b/zddc/cmd/zddc-server/main.go index 5823947..014390c 100644 --- a/zddc/cmd/zddc-server/main.go +++ b/zddc/cmd/zddc-server/main.go @@ -289,6 +289,23 @@ func runClient(cfg config.Config) { slog.Warn("--no-auth enabled: incoming requests are not ACL-checked locally; trusting upstream's filtering.") } + // Mirror walker: only constructed when --mode=mirror with at least + // one subtree (config validation ensures a default of "/" applies + // when the operator opted into mirror without specifying). Hooks + // itself into cacheLayer.onAccess; no further wiring needed here. + if cfg.Mode == "mirror" && len(cfg.MirrorSubtree) > 0 { + sched, err := cache.NewMirrorScheduler(cacheLayer, cfg.MirrorSubtree, cfg.MirrorMinInterval, 0) + if err != nil { + slog.Error("mirror scheduler init failed", "err", err) + os.Exit(1) + } + if sched != nil { + slog.Info("mirror walker armed", + "subtrees", sched.Subtrees(), + "min_interval", sched.MinInterval()) + } + } + tlsCfg, useTLS, err := tlsutil.TLSConfig(cfg) if err != nil { slog.Error("failed to configure TLS", "err", err) diff --git a/zddc/internal/cache/cache.go b/zddc/internal/cache/cache.go index 07658d2..799a1c4 100644 --- a/zddc/internal/cache/cache.go +++ b/zddc/internal/cache/cache.go @@ -48,6 +48,12 @@ const MarkerFile = ".zddc-upstream" // proxy, offline. const HeaderName = "X-ZDDC-Cache" +// listingCachePrefix is the basename prefix used for directory-listing +// sidecar files. Suffix is the format ("html" or "json"). Dot-prefixed +// so the local "serve cache root as plain master" mode hides them via +// the existing dispatch dot-prefix guard. +const listingCachePrefix = ".zddc-listing." + // Cache is the request handler installed in main.go when cfg.Upstream // is non-empty. It is safe for concurrent ServeHTTP calls. type Cache struct { @@ -59,6 +65,13 @@ type Cache struct { client *http.Client markerOnce sync.Once + + // onAccess is invoked (when non-nil) after a request is dispatched. + // The walker scheduler installs this hook to kick mirror walks based + // on incoming traffic. Always called in a goroutine — must not + // assume it runs before the response completes. Nil in proxy/cache + // modes; set in mirror mode. + onAccess func(urlPath string) } // New constructs a Cache from the loaded configuration. Validates @@ -145,12 +158,21 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // Directory listings are always proxied live in v1. The cache - // directory's actual filesystem listing would be inaccurate (it - // only contains visited files), and full listing-cache support - // belongs with the mirror walker in phase 3. + // Mirror walker hook: notify the scheduler that this URL was + // touched so it can decide whether to kick a subtree walk. Runs + // after we've started serving — the user's request never blocks + // on walk scheduling. + if c.onAccess != nil { + go c.onAccess(r.URL.Path) + } + + // Directory listings: try sidecar listing-cache, fall back to + // proxy. The Accept header determines which sidecar (HTML vs + // JSON) to consult — the master serves both formats and the + // browser asks for HTML for navigation, JSON for the inline-JS + // listing fetcher. if strings.HasSuffix(r.URL.Path, "/") { - c.proxy(w, r, false /* writeToCache */) + c.serveDirectory(w, r) return } @@ -172,6 +194,23 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) { c.proxy(w, r, c.persist) } +// serveDirectory handles directory requests. Tries the sidecar +// listing cache first (selected by Accept), revalidates in +// background on a hit, falls back to upstream proxy on miss. +func (c *Cache) serveDirectory(w http.ResponseWriter, r *http.Request) { + if c.persist { + if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok { + info, err := os.Stat(path) + if err == nil && !info.IsDir() { + c.serveListingFromDisk(w, r, path, info, "hit") + go c.revalidateListing(r.URL.Path, r.Header.Get("Accept"), info.ModTime()) + return + } + } + } + c.proxyDirectory(w, r, c.persist) +} + // proxy forwards the request to upstream and serves the response back // to the client. When writeToCache is true and the response is a // cacheable 200, the body is also persisted under cfg.Root. @@ -461,6 +500,210 @@ func (c *Cache) maybeWriteMarker() { }) } +// listingFormat collapses an Accept header to "json" or "html". Mirrors +// the master's content-negotiation: anything Accept-ing application/ +// json wins JSON, otherwise HTML. +func listingFormat(accept string) string { + if strings.Contains(strings.ToLower(accept), "application/json") { + return "json" + } + return "html" +} + +// listingCachePathFor returns the on-disk sidecar path for a cached +// directory listing. urlPath is the directory URL (always ending in +// "/"); accept is the request's Accept header. +func (c *Cache) listingCachePathFor(urlPath, accept string) (string, bool) { + if !strings.HasSuffix(urlPath, "/") { + return "", false + } + if strings.Contains(urlPath, "..") { + return "", false + } + rel := filepath.FromSlash(strings.Trim(urlPath, "/")) + dir := filepath.Join(c.root, rel) + if !strings.HasPrefix(dir, c.root) { + return "", false + } + return filepath.Join(dir, listingCachePrefix+listingFormat(accept)), true +} + +// serveListingFromDisk serves a cached directory listing. Like +// serveFromDisk but always sets Content-Type from the format suffix +// (the master's headers may have been gzip-stripped or otherwise +// massaged by the time we cached them). +func (c *Cache) serveListingFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) { + f, err := os.Open(path) + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + defer f.Close() + if strings.HasSuffix(path, ".json") { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + } else { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + } + w.Header().Set(HeaderName, cacheState) + http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f) +} + +// proxyDirectory is proxy() with directory-specific persistence: the +// listing sidecar (selected by Accept) is the cache target instead of +// the literal URL path. Falls back to serving stale on network error. +func (c *Cache) proxyDirectory(w http.ResponseWriter, r *http.Request, writeToCache bool) { + upReq, err := c.buildUpstreamRequest(r) + if err != nil { + http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest) + return + } + resp, err := c.client.Do(upReq) + if err != nil { + if writeToCache && r.Method == http.MethodGet { + if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok { + if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() { + c.serveListingFromDisk(w, r, path, info, "offline") + return + } + } + } + slog.Warn("upstream directory fetch failed", "url", upReq.URL.String(), "err", err) + w.Header().Set(HeaderName, "offline") + http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable) + return + } + defer resp.Body.Close() + + for k, vv := range resp.Header { + if isHopByHop(k) { + continue + } + for _, v := range vv { + w.Header().Add(k, v) + } + } + cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp) + if cacheable { + w.Header().Set(HeaderName, "miss") + } else { + w.Header().Set(HeaderName, "proxy") + } + w.WriteHeader(resp.StatusCode) + if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified { + return + } + if !cacheable { + _, _ = io.Copy(w, resp.Body) + return + } + if err := c.streamAndPersistListing(w, resp, r.URL.Path, r.Header.Get("Accept")); err != nil { + slog.Debug("listing stream-and-persist error", "url", r.URL.Path, "err", err) + } else { + c.maybeWriteMarker() + } +} + +// streamAndPersistListing is streamAndPersist for directory listings, +// resolving the cache target via listingCachePathFor (Accept-aware). +func (c *Cache) streamAndPersistListing(w http.ResponseWriter, resp *http.Response, urlPath, accept string) error { + finalPath, ok := c.listingCachePathFor(urlPath, accept) + if !ok { + _, err := io.Copy(w, resp.Body) + return err + } + if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil { + _, copyErr := io.Copy(w, resp.Body) + if copyErr != nil { + return copyErr + } + return err + } + tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*") + if err != nil { + _, copyErr := io.Copy(w, resp.Body) + if copyErr != nil { + return copyErr + } + return err + } + tmpName := tmp.Name() + mw := io.MultiWriter(tmp, w) + if _, err := io.Copy(mw, resp.Body); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpName) + return err + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpName) + return err + } + if lm := resp.Header.Get("Last-Modified"); lm != "" { + if t, err := http.ParseTime(lm); err == nil { + _ = os.Chtimes(tmpName, t, t) + } + } + return os.Rename(tmpName, finalPath) +} + +// revalidateListing is the listing-cache analogue of revalidate. +func (c *Cache) revalidateListing(urlPath, accept string, mtime time.Time) { + target := c.upstream + urlPath + req, err := http.NewRequest(http.MethodGet, target, nil) + if err != nil { + return + } + if !mtime.IsZero() { + req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat)) + } + if accept != "" { + req.Header.Set("Accept", accept) + } + if c.bearer != "" { + req.Header.Set("Authorization", "Bearer "+c.bearer) + } + resp, err := c.client.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusNotModified: + return + case http.StatusOK: + if !c.responseCacheable(resp) { + return + } + path, ok := c.listingCachePathFor(urlPath, accept) + if !ok { + return + } + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return + } + tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*") + if err != nil { + return + } + tmpName := tmp.Name() + if _, err := io.Copy(tmp, resp.Body); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpName) + return + } + _ = tmp.Close() + if lm := resp.Header.Get("Last-Modified"); lm != "" { + if t, err := http.ParseTime(lm); err == nil { + _ = os.Chtimes(tmpName, t, t) + } + } + _ = os.Rename(tmpName, path) + case http.StatusForbidden, http.StatusNotFound: + if path, ok := c.listingCachePathFor(urlPath, accept); ok { + _ = os.Remove(path) + } + } +} + // isHopByHop reports whether a header name is hop-by-hop per RFC 7230 // §6.1 — these must not be forwarded by a proxy. func isHopByHop(name string) bool { diff --git a/zddc/internal/cache/cache_test.go b/zddc/internal/cache/cache_test.go index f3e12f0..0617869 100644 --- a/zddc/internal/cache/cache_test.go +++ b/zddc/internal/cache/cache_test.go @@ -183,23 +183,96 @@ func TestServeHTTP_ProxyModeDoesNotPersist(t *testing.T) { } } -func TestServeHTTP_DirectoriesAreNeverCached(t *testing.T) { +func TestServeHTTP_DirectoryListingsCachedAsSidecar(t *testing.T) { + var hits int32 c, _ := newTestCache(t, "cache", func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&hits, 1) w.Header().Set("Content-Type", "text/html") _, _ = w.Write([]byte("listing")) }) + // First request: miss, body served + sidecar written. rec := httptest.NewRecorder() r := httptest.NewRequest(http.MethodGet, "/Project/", nil) + r.Header.Set("Accept", "text/html") c.ServeHTTP(rec, r) if rec.Code != http.StatusOK { t.Fatalf("status = %d", rec.Code) } - if got := rec.Header().Get(HeaderName); got != "proxy" { - t.Errorf("cache header = %q, want proxy (directories don't cache)", got) + if got := rec.Header().Get(HeaderName); got != "miss" { + t.Errorf("first cache header = %q, want miss", got) } - // No file or directory should have been created at the URL location. - if entries, _ := os.ReadDir(c.root); len(entries) > 0 { - t.Errorf("directory request created cache entries: %v", entries) + sidecar := filepath.Join(c.root, "Project", listingCachePrefix+"html") + if _, err := os.Stat(sidecar); err != nil { + t.Fatalf("expected listing sidecar: %v", err) + } + // Second request: hit. + rec2 := httptest.NewRecorder() + r2 := httptest.NewRequest(http.MethodGet, "/Project/", nil) + r2.Header.Set("Accept", "text/html") + c.ServeHTTP(rec2, r2) + if got := rec2.Header().Get(HeaderName); got != "hit" { + t.Errorf("second cache header = %q, want hit", got) + } + if rec2.Body.String() != "listing" { + t.Errorf("body = %q", rec2.Body.String()) + } +} + +func TestServeHTTP_ListingFormatVariesByAccept(t *testing.T) { + c, _ := newTestCache(t, "cache", func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.Header.Get("Accept"), "application/json") { + _, _ = w.Write([]byte(`[{"name":"foo"}]`)) + } else { + _, _ = w.Write([]byte("html")) + } + }) + // JSON request → JSON sidecar. + rec := httptest.NewRecorder() + rj := httptest.NewRequest(http.MethodGet, "/Project/", nil) + rj.Header.Set("Accept", "application/json") + c.ServeHTTP(rec, rj) + if !strings.Contains(rec.Body.String(), "foo") { + t.Errorf("json body = %q", rec.Body.String()) + } + // HTML request → HTML sidecar (separately). + rec2 := httptest.NewRecorder() + rh := httptest.NewRequest(http.MethodGet, "/Project/", nil) + rh.Header.Set("Accept", "text/html") + c.ServeHTTP(rec2, rh) + if !strings.Contains(rec2.Body.String(), "html") { + t.Errorf("html body = %q", rec2.Body.String()) + } + // Both sidecars exist. + if _, err := os.Stat(filepath.Join(c.root, "Project", listingCachePrefix+"json")); err != nil { + t.Errorf("json sidecar missing: %v", err) + } + if _, err := os.Stat(filepath.Join(c.root, "Project", listingCachePrefix+"html")); err != nil { + t.Errorf("html sidecar missing: %v", err) + } +} + +func TestServeHTTP_ListingOfflineServesStale(t *testing.T) { + root := t.TempDir() + if err := os.MkdirAll(filepath.Join(root, "Project"), 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + if err := os.WriteFile(filepath.Join(root, "Project", listingCachePrefix+"html"), []byte(""), 0o644); err != nil { + t.Fatalf("seed: %v", err) + } + c, err := New(config.Config{Root: root, Upstream: "http://127.0.0.1:1", Mode: "cache"}) + if err != nil { + t.Fatalf("New: %v", err) + } + c.client.Timeout = 200 * time.Millisecond + rec := httptest.NewRecorder() + r := httptest.NewRequest(http.MethodGet, "/Project/", nil) + r.Header.Set("Accept", "text/html") + c.ServeHTTP(rec, r) + if rec.Code != http.StatusOK { + t.Fatalf("offline listing = %d, want 200", rec.Code) + } + if !strings.Contains(rec.Body.String(), "") { + t.Errorf("body = %q", rec.Body.String()) } } diff --git a/zddc/internal/cache/walker.go b/zddc/internal/cache/walker.go new file mode 100644 index 0000000..62a85f4 --- /dev/null +++ b/zddc/internal/cache/walker.go @@ -0,0 +1,431 @@ +package cache + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" +) + +// MirrorStateFile is the persisted scheduler state — last walk time +// per subtree. Lives at the cache root so state survives restarts. +const MirrorStateFile = ".zddc-mirror-state.json" + +// DefaultMirrorMinInterval is the gate between walks for the same +// subtree. Active mirrors stay current via repeated visits; idle +// mirrors don't generate any upstream traffic until someone hits them. +const DefaultMirrorMinInterval = time.Hour + +// DefaultMirrorParallelism caps how many concurrent file fetches a +// single walk fires. Bound is per-walk, not global — multiple +// subtrees walked at once each get their own budget. Conservative +// enough not to starve interactive requests sharing the same +// connection pool. +const DefaultMirrorParallelism = 4 + +// MirrorScheduler owns the per-subtree access-triggered walk policy. +// Built once at startup from cfg.MirrorSubtree[]; the cache layer +// installs Trigger as its onAccess hook. +type MirrorScheduler struct { + cache *Cache + subtrees []string // normalized: leading "/", no trailing "/" + minInterval time.Duration + parallelism int + + mu sync.Mutex + state map[string]subtreeState + statePath string + inFlight map[string]bool // tracked separately from on-disk state — restarts forget +} + +type subtreeState struct { + LastWalkAt time.Time `json:"last_walk_at"` +} + +type persistedState struct { + Subtrees map[string]subtreeState `json:"subtrees"` +} + +// NewMirrorScheduler constructs a scheduler attached to the given +// cache. Returns nil scheduler (and nil error) when no subtrees are +// configured — caller checks for that. +func NewMirrorScheduler(c *Cache, subtrees []string, minInterval time.Duration, parallelism int) (*MirrorScheduler, error) { + if len(subtrees) == 0 { + return nil, nil + } + if minInterval <= 0 { + minInterval = DefaultMirrorMinInterval + } + if parallelism <= 0 { + parallelism = DefaultMirrorParallelism + } + normalized := make([]string, 0, len(subtrees)) + for _, s := range subtrees { + s = strings.TrimSpace(s) + if s == "" || s == "/" { + s = "/" + } else { + if !strings.HasPrefix(s, "/") { + s = "/" + s + } + s = strings.TrimRight(s, "/") + } + normalized = append(normalized, s) + } + statePath := filepath.Join(c.root, MirrorStateFile) + state := loadMirrorState(statePath) + s := &MirrorScheduler{ + cache: c, + subtrees: normalized, + minInterval: minInterval, + parallelism: parallelism, + state: state, + statePath: statePath, + inFlight: make(map[string]bool), + } + c.onAccess = s.Trigger + return s, nil +} + +// Subtrees returns the configured subtree list (read-only). +func (s *MirrorScheduler) Subtrees() []string { + out := make([]string, len(s.subtrees)) + copy(out, s.subtrees) + return out +} + +// MinInterval returns the configured min walk interval. +func (s *MirrorScheduler) MinInterval() time.Duration { return s.minInterval } + +// Trigger inspects an incoming URL and, if it falls under a +// configured subtree AND a walk is due, kicks one in the background. +// Always non-blocking. Safe to call concurrently. +func (s *MirrorScheduler) Trigger(urlPath string) { + subtree := s.matchSubtree(urlPath) + if subtree == "" { + return + } + s.mu.Lock() + defer s.mu.Unlock() + if s.inFlight[subtree] { + return + } + st := s.state[subtree] + if !st.LastWalkAt.IsZero() && time.Since(st.LastWalkAt) < s.minInterval { + return + } + s.inFlight[subtree] = true + go s.runWalk(subtree) +} + +// runWalk does one walk-and-record cycle for subtree. Always called +// in a goroutine from Trigger. +func (s *MirrorScheduler) runWalk(subtree string) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + slog.Info("mirror walk start", "subtree", subtree) + start := time.Now() + stats, err := s.walkSubtree(ctx, subtree) + dur := time.Since(start) + if err != nil { + slog.Warn("mirror walk failed", "subtree", subtree, "duration", dur, "err", err) + } else { + slog.Info("mirror walk done", "subtree", subtree, "duration", dur, + "dirs", stats.dirs, "files", stats.files, "fetched", stats.fetched, "purged", stats.purged) + } + s.mu.Lock() + defer s.mu.Unlock() + s.inFlight[subtree] = false + s.state[subtree] = subtreeState{LastWalkAt: time.Now()} + persistMirrorState(s.statePath, s.state) +} + +type walkStats struct { + dirs int + files int + fetched int + purged int +} + +// walkSubtree recursively walks the upstream listings under subtree, +// fetching missing/stale files and purging local files no longer in +// the upstream's filtered listing. Bounded parallelism via a +// semaphore. Errors fetching individual files are logged but don't +// abort the walk. +func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walkStats, error) { + sem := make(chan struct{}, s.parallelism) + var stats walkStats + var statsMu sync.Mutex + addFetched := func() { statsMu.Lock(); stats.fetched++; statsMu.Unlock() } + addPurged := func() { statsMu.Lock(); stats.purged++; statsMu.Unlock() } + addDirs := func() { statsMu.Lock(); stats.dirs++; statsMu.Unlock() } + addFiles := func() { statsMu.Lock(); stats.files++; statsMu.Unlock() } + + var fetchWG sync.WaitGroup + + var walkDir func(dirURL string) error + walkDir = func(dirURL string) error { + if !strings.HasSuffix(dirURL, "/") { + dirURL += "/" + } + addDirs() + entries, err := s.fetchListing(ctx, dirURL) + if err != nil { + return err + } + + // Build a set of names the upstream listing exposes so we can + // purge anything local-but-removed-upstream after walking. + upstreamNames := make(map[string]bool, len(entries)) + for _, e := range entries { + name := strings.TrimSuffix(e.Name, "/") + upstreamNames[name] = true + } + + for _, e := range entries { + if ctx.Err() != nil { + return ctx.Err() + } + childURL := dirURL + url.PathEscape(strings.TrimSuffix(e.Name, "/")) + if e.IsDir { + childURL += "/" + if err := walkDir(childURL); err != nil { + slog.Debug("walk subdir failed", "url", childURL, "err", err) + } + continue + } + addFiles() + fetchWG.Add(1) + sem <- struct{}{} + go func(fileURL string) { + defer fetchWG.Done() + defer func() { <-sem }() + if s.fetchFileIfNeeded(ctx, fileURL) { + addFetched() + } + }(childURL) + } + + // Purge local files no longer present upstream. Only acts on + // regular files inside this directory — listing sidecars, + // dot-prefix internals, and subdirectories are left alone. + s.purgeOrphans(dirURL, upstreamNames, addPurged) + return nil + } + + err := walkDir(subtree + "/") + fetchWG.Wait() + return stats, err +} + +// purgeOrphans removes local cached files in dirURL that are absent +// from the upstream listing. Only operates on plain files; ignores +// dot-prefix entries (cache state) and subdirectories (they get their +// own pass during walkSubtree's recursion). +func (s *MirrorScheduler) purgeOrphans(dirURL string, upstreamNames map[string]bool, addPurged func()) { + rel := filepath.FromSlash(strings.Trim(dirURL, "/")) + localDir := filepath.Join(s.cache.root, rel) + entries, err := os.ReadDir(localDir) + if err != nil { + return + } + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if strings.HasPrefix(name, ".") { + continue + } + if upstreamNames[name] { + continue + } + _ = os.Remove(filepath.Join(localDir, name)) + addPurged() + slog.Info("purged orphan", "path", path.Join(dirURL, name)) + } +} + +// fetchListing GETs the upstream JSON listing for dirURL and parses +// the entries. Caches the JSON sidecar as a side-effect (so offline +// directory browsing works for walked subtrees). +func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL string) ([]listingEntry, error) { + target := s.cache.upstream + dirURL + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/json") + if s.cache.bearer != "" { + req.Header.Set("Authorization", "Bearer "+s.cache.bearer) + } + resp, err := s.cache.client.Do(req) + if err != nil { + return nil, fmt.Errorf("fetch listing %s: %w", target, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("listing %s: status %d", target, resp.StatusCode) + } + bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024)) + if err != nil { + return nil, fmt.Errorf("read listing body: %w", err) + } + // Persist the JSON sidecar so the directory is browsable offline. + if path, ok := s.cache.listingCachePathFor(dirURL, "application/json"); ok { + if err := os.MkdirAll(filepath.Dir(path), 0o755); err == nil { + tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*") + if err == nil { + tmpName := tmp.Name() + if _, err := tmp.Write(bodyBytes); err == nil { + _ = tmp.Close() + if lm := resp.Header.Get("Last-Modified"); lm != "" { + if t, err := http.ParseTime(lm); err == nil { + _ = os.Chtimes(tmpName, t, t) + } + } + _ = os.Rename(tmpName, path) + s.cache.maybeWriteMarker() + } else { + _ = tmp.Close() + _ = os.Remove(tmpName) + } + } + } + } + var parsed []listingEntry + if err := json.Unmarshal(bodyBytes, &parsed); err != nil { + return nil, fmt.Errorf("parse listing %s: %w", target, err) + } + return parsed, nil +} + +// listingEntry mirrors the subset of zddc/internal/listing/types.go's +// FileInfo we need. Extra fields ignored. +type listingEntry struct { + Name string `json:"name"` + IsDir bool `json:"is_dir"` +} + +// fetchFileIfNeeded conditional-GETs a file URL. Returns true when +// the cache was updated (200 + write); false when no work was needed +// (304 or skipped). +func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL string) bool { + cachePath, ok := s.cache.cachePathFor(fileURL) + if !ok { + return false + } + target := s.cache.upstream + fileURL + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + if err != nil { + return false + } + if info, err := os.Stat(cachePath); err == nil { + req.Header.Set("If-Modified-Since", info.ModTime().UTC().Format(http.TimeFormat)) + } + if s.cache.bearer != "" { + req.Header.Set("Authorization", "Bearer "+s.cache.bearer) + } + resp, err := s.cache.client.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusNotModified { + return false + } + if resp.StatusCode != http.StatusOK { + // 4xx → ACL revoked or upstream gone; remove local cache. + if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound { + _ = os.Remove(cachePath) + } + return false + } + if !s.cache.responseCacheable(resp) { + return false + } + if err := s.cache.persistOnly(resp, fileURL); err != nil { + slog.Debug("walker persist failed", "url", fileURL, "err", err) + return false + } + s.cache.maybeWriteMarker() + return true +} + +// matchSubtree returns the longest configured subtree that's a prefix +// of urlPath, or "" if none match. "/" matches everything. +func (s *MirrorScheduler) matchSubtree(urlPath string) string { + if urlPath == "" { + urlPath = "/" + } + var best string + for _, st := range s.subtrees { + if st == "/" { + if len(best) == 0 { + best = "/" + } + continue + } + if strings.HasPrefix(urlPath, st+"/") || urlPath == st { + if len(st) > len(best) { + best = st + } + } + } + return best +} + +// loadMirrorState reads the persisted state from disk. Returns an +// empty (initialized) map on missing/corrupt input. +func loadMirrorState(path string) map[string]subtreeState { + state := make(map[string]subtreeState) + bytes, err := os.ReadFile(path) + if err != nil { + return state + } + var parsed persistedState + if err := json.Unmarshal(bytes, &parsed); err != nil { + return state + } + if parsed.Subtrees != nil { + state = parsed.Subtrees + } + return state +} + +// persistMirrorState writes the state file atomically (temp + rename). +// Best-effort: errors are logged but non-fatal. +func persistMirrorState(path string, state map[string]subtreeState) { + bytes, err := json.MarshalIndent(persistedState{Subtrees: state}, "", " ") + if err != nil { + slog.Debug("marshal mirror state", "err", err) + return + } + tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-mirror-state-tmp-*") + if err != nil { + slog.Debug("create mirror state tmp", "err", err) + return + } + tmpName := tmp.Name() + if _, err := tmp.Write(bytes); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpName) + return + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpName) + return + } + if err := os.Rename(tmpName, path); err != nil { + _ = os.Remove(tmpName) + } +} diff --git a/zddc/internal/cache/walker_test.go b/zddc/internal/cache/walker_test.go new file mode 100644 index 0000000..8d2bde6 --- /dev/null +++ b/zddc/internal/cache/walker_test.go @@ -0,0 +1,438 @@ +package cache + +import ( + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "codeberg.org/VARASYS/ZDDC/zddc/internal/config" +) + +// fakeUpstream returns a handler that serves a small fixed tree: +// +// /Vendors/Acme/ → JSON listing of [a.txt, b.txt, sub/] +// /Vendors/Acme/a.txt → "alpha" +// /Vendors/Acme/b.txt → "beta" +// /Vendors/Acme/sub/ → JSON listing of [c.txt] +// /Vendors/Acme/sub/c.txt → "charlie" +// /Vendors/Beta/ → JSON listing of [out-of-scope.txt] +// /Vendors/Beta/out-of-scope.txt → "should-not-be-fetched" +// +// hit counts every URL request so tests can assert which paths the +// walker visits. +func fakeUpstream() (http.HandlerFunc, *sync.Map) { + hits := &sync.Map{} + return func(w http.ResponseWriter, r *http.Request) { + old, _ := hits.LoadOrStore(r.URL.Path, int64(0)) + hits.Store(r.URL.Path, old.(int64)+1) + switch r.URL.Path { + case "/Vendors/Acme/": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[{"name":"a.txt","is_dir":false},{"name":"b.txt","is_dir":false},{"name":"sub/","is_dir":true}]`)) + case "/Vendors/Acme/a.txt": + _, _ = w.Write([]byte("alpha")) + case "/Vendors/Acme/b.txt": + _, _ = w.Write([]byte("beta")) + case "/Vendors/Acme/sub/": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[{"name":"c.txt","is_dir":false}]`)) + case "/Vendors/Acme/sub/c.txt": + _, _ = w.Write([]byte("charlie")) + case "/Vendors/Beta/": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[{"name":"out-of-scope.txt","is_dir":false}]`)) + case "/Vendors/Beta/out-of-scope.txt": + _, _ = w.Write([]byte("should-not-be-fetched")) + default: + http.NotFound(w, r) + } + }, hits +} + +func newWalkerTest(t *testing.T, subtrees []string, minInterval time.Duration) (*Cache, *MirrorScheduler, *httptest.Server, *sync.Map) { + t.Helper() + handler, hits := fakeUpstream() + upstream := httptest.NewServer(handler) + t.Cleanup(upstream.Close) + root := t.TempDir() + c, err := New(config.Config{Root: root, Upstream: upstream.URL, Mode: "mirror"}) + if err != nil { + t.Fatalf("New: %v", err) + } + sched, err := NewMirrorScheduler(c, subtrees, minInterval, 0) + if err != nil { + t.Fatalf("NewMirrorScheduler: %v", err) + } + if sched == nil { + t.Fatal("expected scheduler, got nil") + } + return c, sched, upstream, hits +} + +// hitsCount returns the number of times urlPath was hit on upstream. +func hitsCount(hits *sync.Map, urlPath string) int64 { + v, ok := hits.Load(urlPath) + if !ok { + return 0 + } + return v.(int64) +} + +// waitForFile polls until the named file exists or the deadline +// passes. Used to wait on background walker writes without sleep loops. +func waitForFile(t *testing.T, path string, deadline time.Duration) { + t.Helper() + end := time.Now().Add(deadline) + for time.Now().Before(end) { + if _, err := os.Stat(path); err == nil { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("file did not appear within %v: %s", deadline, path) +} + +func TestNewMirrorScheduler_NoSubtrees(t *testing.T) { + root := t.TempDir() + c, _ := New(config.Config{Root: root, Upstream: "http://example.com", Mode: "cache"}) + sched, err := NewMirrorScheduler(c, nil, 0, 0) + if err != nil { + t.Fatalf("err: %v", err) + } + if sched != nil { + t.Errorf("expected nil scheduler for empty subtree list") + } +} + +func TestMirrorScheduler_NormalizesSubtrees(t *testing.T) { + root := t.TempDir() + c, _ := New(config.Config{Root: root, Upstream: "http://example.com", Mode: "mirror"}) + sched, _ := NewMirrorScheduler(c, []string{"Vendors/Acme/", "/Vendors/Beta", " Public ", "/"}, 0, 0) + got := sched.Subtrees() + want := []string{"/Vendors/Acme", "/Vendors/Beta", "/Public", "/"} + if len(got) != len(want) { + t.Fatalf("subtrees = %v, want %v", got, want) + } + for i := range got { + if got[i] != want[i] { + t.Errorf("subtree[%d] = %q, want %q", i, got[i], want[i]) + } + } +} + +func TestMatchSubtree(t *testing.T) { + c, _ := New(config.Config{Root: t.TempDir(), Upstream: "http://example.com", Mode: "mirror"}) + sched, _ := NewMirrorScheduler(c, []string{"/Vendors/Acme", "/Public"}, 0, 0) + cases := []struct { + url string + want string + }{ + {"/Vendors/Acme/foo.txt", "/Vendors/Acme"}, + {"/Vendors/Acme", "/Vendors/Acme"}, + {"/Vendors/Acme/sub/x.txt", "/Vendors/Acme"}, + {"/Vendors/Beta/x.txt", ""}, + {"/Public/file", "/Public"}, + {"/Other", ""}, + {"", ""}, + } + for _, tc := range cases { + if got := sched.matchSubtree(tc.url); got != tc.want { + t.Errorf("matchSubtree(%q) = %q, want %q", tc.url, got, tc.want) + } + } +} + +func TestMatchSubtree_RootMatchesEverything(t *testing.T) { + c, _ := New(config.Config{Root: t.TempDir(), Upstream: "http://example.com", Mode: "mirror"}) + sched, _ := NewMirrorScheduler(c, []string{"/"}, 0, 0) + for _, u := range []string{"/anything", "/Project/X", "/"} { + if got := sched.matchSubtree(u); got != "/" { + t.Errorf("matchSubtree(%q) = %q, want /", u, got) + } + } +} + +func TestTrigger_FetchesSubtree(t *testing.T) { + c, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond) + + // Trigger via a request URL under the subtree. + sched.Trigger("/Vendors/Acme/a.txt") + // Wait for the walker to drop all expected files. + waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "a.txt"), 2*time.Second) + waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "b.txt"), 2*time.Second) + waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "sub", "c.txt"), 2*time.Second) + + // JSON listing sidecars exist for the walked dirs. + waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", listingCachePrefix+"json"), 2*time.Second) + waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "sub", listingCachePrefix+"json"), 2*time.Second) + + // Out-of-scope path was not visited. + if got := hitsCount(hits, "/Vendors/Beta/"); got != 0 { + t.Errorf("walker hit out-of-scope /Vendors/Beta/ %d times", got) + } +} + +func TestTrigger_OutOfScopeIsNoOp(t *testing.T) { + _, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond) + sched.Trigger("/Other/path.txt") + time.Sleep(150 * time.Millisecond) + // No upstream calls should have happened. + count := 0 + hits.Range(func(_, _ interface{}) bool { count++; return true }) + if count != 0 { + t.Errorf("triggered %d upstream calls for out-of-scope URL", count) + } +} + +func TestTrigger_RateLimitsRapidAccesses(t *testing.T) { + _, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 1*time.Hour) + + sched.Trigger("/Vendors/Acme/a.txt") + // Wait for first walk to complete. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + sched.mu.Lock() + flight := sched.inFlight["/Vendors/Acme"] + st := sched.state["/Vendors/Acme"] + sched.mu.Unlock() + if !flight && !st.LastWalkAt.IsZero() { + break + } + time.Sleep(20 * time.Millisecond) + } + first := hitsCount(hits, "/Vendors/Acme/a.txt") + // Subsequent triggers within the min-interval are no-ops. + for i := 0; i < 5; i++ { + sched.Trigger("/Vendors/Acme/a.txt") + } + time.Sleep(100 * time.Millisecond) + second := hitsCount(hits, "/Vendors/Acme/a.txt") + if second != first { + t.Errorf("rate limit failed: hits went %d → %d", first, second) + } +} + +func TestTrigger_AfterIntervalElapsesWalksAgain(t *testing.T) { + _, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 50*time.Millisecond) + + sched.Trigger("/Vendors/Acme/a.txt") + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + sched.mu.Lock() + flight := sched.inFlight["/Vendors/Acme"] + sched.mu.Unlock() + if !flight { + break + } + time.Sleep(20 * time.Millisecond) + } + first := hitsCount(hits, "/Vendors/Acme/a.txt") + + // Wait past min-interval, trigger again, walk should re-fire. + time.Sleep(100 * time.Millisecond) + sched.Trigger("/Vendors/Acme/a.txt") + deadline = time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if hitsCount(hits, "/Vendors/Acme/a.txt") > first { + break + } + time.Sleep(20 * time.Millisecond) + } + second := hitsCount(hits, "/Vendors/Acme/a.txt") + if second <= first { + t.Errorf("expected walk after interval; hits %d → %d", first, second) + } +} + +func TestTrigger_PurgesOrphanedFiles(t *testing.T) { + c, sched, _, _ := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond) + + // Pre-seed an orphan that's NOT in upstream's listing. + if err := os.MkdirAll(filepath.Join(c.root, "Vendors", "Acme"), 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + orphan := filepath.Join(c.root, "Vendors", "Acme", "stale.txt") + if err := os.WriteFile(orphan, []byte("orphan"), 0o644); err != nil { + t.Fatalf("seed: %v", err) + } + + sched.Trigger("/Vendors/Acme/a.txt") + waitForFile(t, filepath.Join(c.root, "Vendors", "Acme", "a.txt"), 2*time.Second) + + // Wait for walk to fully complete (orphan purge happens after fetches). + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + sched.mu.Lock() + flight := sched.inFlight["/Vendors/Acme"] + sched.mu.Unlock() + if !flight { + break + } + time.Sleep(20 * time.Millisecond) + } + + if _, err := os.Stat(orphan); !os.IsNotExist(err) { + t.Errorf("orphan file not purged: %v", err) + } +} + +func TestPersistedState_SurvivesRestart(t *testing.T) { + c, sched, _, _ := newWalkerTest(t, []string{"/Vendors/Acme"}, 5*time.Millisecond) + + sched.Trigger("/Vendors/Acme/a.txt") + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + sched.mu.Lock() + flight := sched.inFlight["/Vendors/Acme"] + st := sched.state["/Vendors/Acme"] + sched.mu.Unlock() + if !flight && !st.LastWalkAt.IsZero() { + break + } + time.Sleep(20 * time.Millisecond) + } + + // State file should exist. + statePath := filepath.Join(c.root, MirrorStateFile) + if _, err := os.Stat(statePath); err != nil { + t.Fatalf("state file missing: %v", err) + } + + // New scheduler against the same root reads the prior state and + // honors the rate-limit gate. + sched2, _ := NewMirrorScheduler(c, []string{"/Vendors/Acme"}, 1*time.Hour, 0) + st := sched2.state["/Vendors/Acme"] + if st.LastWalkAt.IsZero() { + t.Error("restart did not load prior LastWalkAt from state file") + } +} + +func TestTrigger_ConcurrentSameSubtreeDoesNotDoubleWalk(t *testing.T) { + _, sched, _, hits := newWalkerTest(t, []string{"/Vendors/Acme"}, 1*time.Hour) + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + sched.Trigger("/Vendors/Acme/a.txt") + }() + } + wg.Wait() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + sched.mu.Lock() + flight := sched.inFlight["/Vendors/Acme"] + sched.mu.Unlock() + if !flight { + break + } + time.Sleep(20 * time.Millisecond) + } + // Each file should have been fetched exactly once even though we + // triggered 10 times concurrently. + if got := hitsCount(hits, "/Vendors/Acme/a.txt"); got != 1 { + t.Errorf("a.txt fetched %d times, want 1", got) + } +} + +func TestServeHTTP_KicksMirrorOnAccess(t *testing.T) { + // End-to-end through the cache layer's ServeHTTP, verifying the + // onAccess hook fires and the walker prefetches. + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/Vendors/Acme/": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`[{"name":"only.txt","is_dir":false}]`)) + case "/Vendors/Acme/only.txt": + _, _ = w.Write([]byte("data")) + default: + http.NotFound(w, r) + } + })) + defer upstream.Close() + + root := t.TempDir() + c, err := New(config.Config{Root: root, Upstream: upstream.URL, Mode: "mirror"}) + if err != nil { + t.Fatalf("New: %v", err) + } + sched, err := NewMirrorScheduler(c, []string{"/Vendors/Acme"}, 5*time.Millisecond, 0) + if err != nil { + t.Fatalf("NewMirrorScheduler: %v", err) + } + + rec := httptest.NewRecorder() + c.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/Vendors/Acme/only.txt", nil)) + if rec.Code != http.StatusOK { + t.Fatalf("status = %d", rec.Code) + } + // The user's request itself fetched only.txt. Then the walker + // (kicked async) fetches the listing + only.txt. So only.txt may + // be fetched twice. The walker MUST have hit the listing. + waitForFile(t, filepath.Join(root, "Vendors", "Acme", listingCachePrefix+"json"), 2*time.Second) + waitForFile(t, filepath.Join(root, "Vendors", "Acme", "only.txt"), 2*time.Second) + + // Drain the walk before t.TempDir cleanup races with file writes. + waitForWalkDrain(t, sched, "/Vendors/Acme", 2*time.Second) +} + +// waitForWalkDrain blocks until no walk is in flight for subtree. +// Avoids TempDir cleanup races where the walker is still writing +// when the test finishes. +func waitForWalkDrain(t *testing.T, sched *MirrorScheduler, subtree string, deadline time.Duration) { + t.Helper() + end := time.Now().Add(deadline) + for time.Now().Before(end) { + sched.mu.Lock() + flight := sched.inFlight[subtree] + sched.mu.Unlock() + if !flight { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("walk for %s did not drain within %v", subtree, deadline) +} + +func TestPersistMirrorState_AtomicWrite(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, MirrorStateFile) + state := map[string]subtreeState{ + "/Vendors/Acme": {LastWalkAt: time.Date(2026, 5, 8, 12, 0, 0, 0, time.UTC)}, + } + persistMirrorState(path, state) + + got := loadMirrorState(path) + if got["/Vendors/Acme"].LastWalkAt.IsZero() { + t.Error("round-trip lost LastWalkAt") + } + // No leftover tmp files. + entries, _ := os.ReadDir(dir) + for _, e := range entries { + if strings.HasPrefix(e.Name(), ".zddc-mirror-state-tmp-") { + t.Errorf("leftover tmp: %s", e.Name()) + } + } +} + +func TestLoadMirrorState_MissingReturnsEmpty(t *testing.T) { + got := loadMirrorState(filepath.Join(t.TempDir(), "does-not-exist")) + if got == nil || len(got) != 0 { + t.Errorf("expected empty map, got %v", got) + } +} + +func TestLoadMirrorState_CorruptReturnsEmpty(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "garbage") + _ = os.WriteFile(path, []byte("not json"), 0o644) + got := loadMirrorState(path) + if got == nil || len(got) != 0 { + t.Errorf("corrupt should yield empty map, got %v", got) + } +} diff --git a/zddc/internal/config/config.go b/zddc/internal/config/config.go index 1422643..77826ff 100644 --- a/zddc/internal/config/config.go +++ b/zddc/internal/config/config.go @@ -34,10 +34,12 @@ type Config struct { // Root then becomes the cache directory rather than the served // data root. Master-mode flags (apps, archive, opa, etc.) are // ignored in client mode — see cmd/zddc-server/main.go. - Upstream string // --upstream / ZDDC_UPSTREAM — master URL (https://master.example.com); empty = run as master - Mode string // --mode / ZDDC_MODE — "proxy" (no disk persistence), "cache" (default; persist on access), "mirror" (cache + access-triggered subtree warmer; phase 3) - BearerFile string // --bearer-file / ZDDC_BEARER_FILE — path to a 0600 file containing the master-issued token to forward upstream - SkipTLSVerify bool // --skip-tls-verify / ZDDC_SKIP_TLS_VERIFY=1 — accept self-signed / untrusted upstream certs. Distinct from --no-auth; intended for dev/internal CA scenarios only. + Upstream string // --upstream / ZDDC_UPSTREAM — master URL (https://master.example.com); empty = run as master + Mode string // --mode / ZDDC_MODE — "proxy" (no disk persistence), "cache" (default; persist on access), "mirror" (cache + access-triggered subtree warmer) + BearerFile string // --bearer-file / ZDDC_BEARER_FILE — path to a 0600 file containing the master-issued token to forward upstream + SkipTLSVerify bool // --skip-tls-verify / ZDDC_SKIP_TLS_VERIFY=1 — accept self-signed / untrusted upstream certs. Distinct from --no-auth; intended for dev/internal CA scenarios only. + MirrorSubtree []string // --mirror-subtree / ZDDC_MIRROR_SUBTREE — comma-separated subtree URL paths the access-triggered walker keeps current. Default `/` when --mode=mirror and unset; ignored otherwise. + MirrorMinInterval time.Duration // --mirror-min-interval / ZDDC_MIRROR_MIN_INTERVAL — minimum gap between walks of the same subtree. Idle subtrees stay quiet; bumping this reduces upstream load on busy mirrors. Default 1h. OPAURL string // --opa-url / ZDDC_OPA_URL — policy decider endpoint: "internal" (default), "http(s)://..." (real OPA via HTTP), or "unix:///..." (OPA via Unix socket) OPAFailOpen bool // --opa-fail-open / ZDDC_OPA_FAIL_OPEN=1 — when external OPA is unreachable, allow instead of deny (default: fail closed) OPACacheTTL time.Duration // --opa-cache-ttl / ZDDC_OPA_CACHE_TTL — external mode only: per-decision cache TTL. Default 1s. Set 0s to disable. @@ -107,6 +109,10 @@ func Load(args []string) (Config, error) { "Path to a 0600 file containing the master-issued token forwarded as Authorization: Bearer to upstream. See /.tokens on the master to issue one. Ignored when --upstream is empty.") skipTLSVerifyFlag := fs.Bool("skip-tls-verify", os.Getenv("ZDDC_SKIP_TLS_VERIFY") == "1", "Accept self-signed / untrusted TLS certs from the upstream. Distinct from --no-auth. Intended for dev or internal-CA scenarios only.") + mirrorSubtreeFlag := fs.String("mirror-subtree", os.Getenv("ZDDC_MIRROR_SUBTREE"), + "Comma-separated URL subtrees the access-triggered mirror walker keeps current (e.g. /Vendors/Acme,/Public). Empty + --mode=mirror = walk \"/\" (full mirror). Ignored when --mode != mirror.") + mirrorMinIntervalFlag := fs.Duration("mirror-min-interval", parseDurationOrDefault(os.Getenv("ZDDC_MIRROR_MIN_INTERVAL"), time.Hour), + "Minimum gap between walks of the same mirror subtree. Default 1h. Active mirrors revalidate as a side effect of use; idle subtrees generate zero upstream traffic until next access.") opaURLFlag := fs.String("opa-url", getEnv("ZDDC_OPA_URL", "internal"), "Policy decider endpoint: \"internal\" (built-in Go evaluator, default), \"http(s)://host:port\", or \"unix:///path/to/socket\".") opaFailOpenFlag := fs.Bool("opa-fail-open", os.Getenv("ZDDC_OPA_FAIL_OPEN") == "1", @@ -179,6 +185,8 @@ func Load(args []string) (Config, error) { Mode: *modeFlag, BearerFile: *bearerFileFlag, SkipTLSVerify: *skipTLSVerifyFlag, + MirrorSubtree: parseCSV(*mirrorSubtreeFlag), + MirrorMinInterval: *mirrorMinIntervalFlag, OPAURL: *opaURLFlag, OPAFailOpen: *opaFailOpenFlag, OPACacheTTL: *opaCacheTTLFlag, @@ -304,6 +312,16 @@ func Load(args []string) (Config, error) { if strings.HasSuffix(cfg.Upstream, "/") { cfg.Upstream = strings.TrimRight(cfg.Upstream, "/") } + // Mirror mode: default subtree to "/" (full mirror) if the + // operator opted into mirror mode without specifying one. + if cfg.Mode == "mirror" && len(cfg.MirrorSubtree) == 0 { + cfg.MirrorSubtree = []string{"/"} + } + // Mirror subtrees are ignored in non-mirror modes — drop them + // rather than carry confusing dead config forward. + if cfg.Mode != "mirror" { + cfg.MirrorSubtree = nil + } } return cfg, nil @@ -336,6 +354,8 @@ func Usage(w io.Writer) { fs.String("mode", "cache", "Client mode: proxy / cache / mirror. Ignored when --upstream is empty.") fs.String("bearer-file", "", "Path to a 0600 file holding the master-issued bearer token forwarded to upstream. Ignored when --upstream is empty.") fs.Bool("skip-tls-verify", false, "Accept self-signed / untrusted upstream TLS certs. Distinct from --no-auth. Dev / internal-CA scenarios only.") + fs.String("mirror-subtree", "", "Comma-separated URL subtrees the mirror walker keeps current. Empty + --mode=mirror = full mirror (\"/\"). Ignored when --mode != mirror.") + fs.Duration("mirror-min-interval", time.Hour, "Min gap between walks of the same mirror subtree. Default 1h.") fs.String("opa-url", "internal", "Policy decider: \"internal\", \"http(s)://...\", or \"unix:///...\".") fs.Bool("opa-fail-open", false, "External OPA: allow on transport error (default: deny / fail closed).") fs.Duration("opa-cache-ttl", time.Second, "External OPA: per-decision cache TTL (default 1s; 0 disables).")