--mode mirror layers an access-triggered walker on top of the cache
pipeline. When an incoming request's URL falls under one of the
configured --mirror-subtree paths, the scheduler kicks off a recursive
walk of that subtree iff (a) no walk for that subtree is in flight and
(b) now - last_walk_at >= --mirror-min-interval (default 1h). Walks
run in a goroutine; the user's request never blocks on scheduling.
Why access-triggered: a naive "walk on a fixed timer" would produce
thundering-herd polls on a master from many vendor mirrors most of
which are idle most of the time. Demand-triggering means idle mirrors
generate zero upstream traffic until someone hits them; active
mirrors stay current as a side effect of normal use.
The walk:
1. Recursively fetches JSON listings under the subtree, persisting
each at <dir>/.zddc-listing.json so directory browsing works
offline for walked subtrees.
2. For each file, fires a conditional If-Modified-Since GET (bounded
parallelism; default 4 concurrent) — 304 no-op, 200 overwrites,
403/404 purges the local cache.
3. After enumeration, per-directory orphan purge: local files absent
from upstream's filtered listing are removed (handles upstream
deletes + ACL revocations).
State persists at <root>/.zddc-mirror-state.json as
{subtrees: {<path>: {last_walk_at}}}. In-flight tracking is in-memory
only — a crash mid-walk lets the next access retry without manual
cleanup. Subtree path matching is longest-prefix-wins; "/" is a
catch-all (full mirror, the default when --mode=mirror is set without
explicit --mirror-subtree).
The cache layer also gained directory-listing caching (independent of
mirror mode but enabled by it). Directories are now stored at
<dir>/.zddc-listing.<html|json> sidecars, varied by Accept header.
Hit/miss/offline semantics mirror the file pipeline. Phase 2's
limitation that directories always proxied live (no offline browse)
is now resolved for any directory the user has visited or that mirror
mode has walked.
Mirror scope falls out of auth: the walker uses the local instance's
bearer, so it sees exactly what the user can see at upstream. Admin
bearer → full mirror; vendor bearer → vendor's permitted subtree;
no code distinguishes the cases.
New flags (also as ZDDC_* env vars), ignored when --mode != mirror:
- --mirror-subtree <csv> — repeatable subtrees (comma-separated);
empty + --mode=mirror = "/" (full mirror)
- --mirror-min-interval <duration> — default 1h
Tests (15 new in walker_test.go, 3 new in cache_test.go): subtree
normalization, longest-prefix matching, root-as-catch-all, walk
fetches all files in scope, out-of-scope URLs are no-op, rate-
limiting prevents double-walks within min-interval, walks re-fire
after interval elapses, orphan purge removes local-only files,
state file survives restart, concurrent triggers don't double-walk,
end-to-end ServeHTTP-kicks-mirror-on-access, listing format varies
by Accept, listing offline serves stale, persisted state atomic
write + corrupt-input handling. Full suite + go vet clean.
Doc updates: zddc/README.md flags table gains the two new entries
plus a "Mirror mode (access-triggered subtree walker)" subsection
with trigger semantics and properties; the "What client mode is NOT,
yet" list shrinks accordingly. AGENTS.md env-var table gains the
two new entries. ARCHITECTURE.md "Master + proxy/cache/mirror"
section now documents the walker scheduler / walk algorithm / state
file in a "Mirror walker (access-triggered)" subsection.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
431 lines
12 KiB
Go
431 lines
12 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// MirrorStateFile is the persisted scheduler state — last walk time
|
|
// per subtree. Lives at the cache root so state survives restarts.
|
|
const MirrorStateFile = ".zddc-mirror-state.json"
|
|
|
|
// DefaultMirrorMinInterval is the gate between walks for the same
|
|
// subtree. Active mirrors stay current via repeated visits; idle
|
|
// mirrors don't generate any upstream traffic until someone hits them.
|
|
const DefaultMirrorMinInterval = time.Hour
|
|
|
|
// DefaultMirrorParallelism caps how many concurrent file fetches a
|
|
// single walk fires. Bound is per-walk, not global — multiple
|
|
// subtrees walked at once each get their own budget. Conservative
|
|
// enough not to starve interactive requests sharing the same
|
|
// connection pool.
|
|
const DefaultMirrorParallelism = 4
|
|
|
|
// MirrorScheduler owns the per-subtree access-triggered walk policy.
|
|
// Built once at startup from cfg.MirrorSubtree[]; the cache layer
|
|
// installs Trigger as its onAccess hook.
|
|
type MirrorScheduler struct {
|
|
cache *Cache
|
|
subtrees []string // normalized: leading "/", no trailing "/"
|
|
minInterval time.Duration
|
|
parallelism int
|
|
|
|
mu sync.Mutex
|
|
state map[string]subtreeState
|
|
statePath string
|
|
inFlight map[string]bool // tracked separately from on-disk state — restarts forget
|
|
}
|
|
|
|
type subtreeState struct {
|
|
LastWalkAt time.Time `json:"last_walk_at"`
|
|
}
|
|
|
|
type persistedState struct {
|
|
Subtrees map[string]subtreeState `json:"subtrees"`
|
|
}
|
|
|
|
// NewMirrorScheduler constructs a scheduler attached to the given
|
|
// cache. Returns nil scheduler (and nil error) when no subtrees are
|
|
// configured — caller checks for that.
|
|
func NewMirrorScheduler(c *Cache, subtrees []string, minInterval time.Duration, parallelism int) (*MirrorScheduler, error) {
|
|
if len(subtrees) == 0 {
|
|
return nil, nil
|
|
}
|
|
if minInterval <= 0 {
|
|
minInterval = DefaultMirrorMinInterval
|
|
}
|
|
if parallelism <= 0 {
|
|
parallelism = DefaultMirrorParallelism
|
|
}
|
|
normalized := make([]string, 0, len(subtrees))
|
|
for _, s := range subtrees {
|
|
s = strings.TrimSpace(s)
|
|
if s == "" || s == "/" {
|
|
s = "/"
|
|
} else {
|
|
if !strings.HasPrefix(s, "/") {
|
|
s = "/" + s
|
|
}
|
|
s = strings.TrimRight(s, "/")
|
|
}
|
|
normalized = append(normalized, s)
|
|
}
|
|
statePath := filepath.Join(c.root, MirrorStateFile)
|
|
state := loadMirrorState(statePath)
|
|
s := &MirrorScheduler{
|
|
cache: c,
|
|
subtrees: normalized,
|
|
minInterval: minInterval,
|
|
parallelism: parallelism,
|
|
state: state,
|
|
statePath: statePath,
|
|
inFlight: make(map[string]bool),
|
|
}
|
|
c.onAccess = s.Trigger
|
|
return s, nil
|
|
}
|
|
|
|
// Subtrees returns the configured subtree list (read-only).
|
|
func (s *MirrorScheduler) Subtrees() []string {
|
|
out := make([]string, len(s.subtrees))
|
|
copy(out, s.subtrees)
|
|
return out
|
|
}
|
|
|
|
// MinInterval returns the configured min walk interval.
|
|
func (s *MirrorScheduler) MinInterval() time.Duration { return s.minInterval }
|
|
|
|
// Trigger inspects an incoming URL and, if it falls under a
|
|
// configured subtree AND a walk is due, kicks one in the background.
|
|
// Always non-blocking. Safe to call concurrently.
|
|
func (s *MirrorScheduler) Trigger(urlPath string) {
|
|
subtree := s.matchSubtree(urlPath)
|
|
if subtree == "" {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.inFlight[subtree] {
|
|
return
|
|
}
|
|
st := s.state[subtree]
|
|
if !st.LastWalkAt.IsZero() && time.Since(st.LastWalkAt) < s.minInterval {
|
|
return
|
|
}
|
|
s.inFlight[subtree] = true
|
|
go s.runWalk(subtree)
|
|
}
|
|
|
|
// runWalk does one walk-and-record cycle for subtree. Always called
|
|
// in a goroutine from Trigger.
|
|
func (s *MirrorScheduler) runWalk(subtree string) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
|
defer cancel()
|
|
slog.Info("mirror walk start", "subtree", subtree)
|
|
start := time.Now()
|
|
stats, err := s.walkSubtree(ctx, subtree)
|
|
dur := time.Since(start)
|
|
if err != nil {
|
|
slog.Warn("mirror walk failed", "subtree", subtree, "duration", dur, "err", err)
|
|
} else {
|
|
slog.Info("mirror walk done", "subtree", subtree, "duration", dur,
|
|
"dirs", stats.dirs, "files", stats.files, "fetched", stats.fetched, "purged", stats.purged)
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.inFlight[subtree] = false
|
|
s.state[subtree] = subtreeState{LastWalkAt: time.Now()}
|
|
persistMirrorState(s.statePath, s.state)
|
|
}
|
|
|
|
type walkStats struct {
|
|
dirs int
|
|
files int
|
|
fetched int
|
|
purged int
|
|
}
|
|
|
|
// walkSubtree recursively walks the upstream listings under subtree,
|
|
// fetching missing/stale files and purging local files no longer in
|
|
// the upstream's filtered listing. Bounded parallelism via a
|
|
// semaphore. Errors fetching individual files are logged but don't
|
|
// abort the walk.
|
|
func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walkStats, error) {
|
|
sem := make(chan struct{}, s.parallelism)
|
|
var stats walkStats
|
|
var statsMu sync.Mutex
|
|
addFetched := func() { statsMu.Lock(); stats.fetched++; statsMu.Unlock() }
|
|
addPurged := func() { statsMu.Lock(); stats.purged++; statsMu.Unlock() }
|
|
addDirs := func() { statsMu.Lock(); stats.dirs++; statsMu.Unlock() }
|
|
addFiles := func() { statsMu.Lock(); stats.files++; statsMu.Unlock() }
|
|
|
|
var fetchWG sync.WaitGroup
|
|
|
|
var walkDir func(dirURL string) error
|
|
walkDir = func(dirURL string) error {
|
|
if !strings.HasSuffix(dirURL, "/") {
|
|
dirURL += "/"
|
|
}
|
|
addDirs()
|
|
entries, err := s.fetchListing(ctx, dirURL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Build a set of names the upstream listing exposes so we can
|
|
// purge anything local-but-removed-upstream after walking.
|
|
upstreamNames := make(map[string]bool, len(entries))
|
|
for _, e := range entries {
|
|
name := strings.TrimSuffix(e.Name, "/")
|
|
upstreamNames[name] = true
|
|
}
|
|
|
|
for _, e := range entries {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
childURL := dirURL + url.PathEscape(strings.TrimSuffix(e.Name, "/"))
|
|
if e.IsDir {
|
|
childURL += "/"
|
|
if err := walkDir(childURL); err != nil {
|
|
slog.Debug("walk subdir failed", "url", childURL, "err", err)
|
|
}
|
|
continue
|
|
}
|
|
addFiles()
|
|
fetchWG.Add(1)
|
|
sem <- struct{}{}
|
|
go func(fileURL string) {
|
|
defer fetchWG.Done()
|
|
defer func() { <-sem }()
|
|
if s.fetchFileIfNeeded(ctx, fileURL) {
|
|
addFetched()
|
|
}
|
|
}(childURL)
|
|
}
|
|
|
|
// Purge local files no longer present upstream. Only acts on
|
|
// regular files inside this directory — listing sidecars,
|
|
// dot-prefix internals, and subdirectories are left alone.
|
|
s.purgeOrphans(dirURL, upstreamNames, addPurged)
|
|
return nil
|
|
}
|
|
|
|
err := walkDir(subtree + "/")
|
|
fetchWG.Wait()
|
|
return stats, err
|
|
}
|
|
|
|
// purgeOrphans removes local cached files in dirURL that are absent
|
|
// from the upstream listing. Only operates on plain files; ignores
|
|
// dot-prefix entries (cache state) and subdirectories (they get their
|
|
// own pass during walkSubtree's recursion).
|
|
func (s *MirrorScheduler) purgeOrphans(dirURL string, upstreamNames map[string]bool, addPurged func()) {
|
|
rel := filepath.FromSlash(strings.Trim(dirURL, "/"))
|
|
localDir := filepath.Join(s.cache.root, rel)
|
|
entries, err := os.ReadDir(localDir)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, e := range entries {
|
|
if e.IsDir() {
|
|
continue
|
|
}
|
|
name := e.Name()
|
|
if strings.HasPrefix(name, ".") {
|
|
continue
|
|
}
|
|
if upstreamNames[name] {
|
|
continue
|
|
}
|
|
_ = os.Remove(filepath.Join(localDir, name))
|
|
addPurged()
|
|
slog.Info("purged orphan", "path", path.Join(dirURL, name))
|
|
}
|
|
}
|
|
|
|
// fetchListing GETs the upstream JSON listing for dirURL and parses
|
|
// the entries. Caches the JSON sidecar as a side-effect (so offline
|
|
// directory browsing works for walked subtrees).
|
|
func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL string) ([]listingEntry, error) {
|
|
target := s.cache.upstream + dirURL
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("Accept", "application/json")
|
|
if s.cache.bearer != "" {
|
|
req.Header.Set("Authorization", "Bearer "+s.cache.bearer)
|
|
}
|
|
resp, err := s.cache.client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("fetch listing %s: %w", target, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("listing %s: status %d", target, resp.StatusCode)
|
|
}
|
|
bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read listing body: %w", err)
|
|
}
|
|
// Persist the JSON sidecar so the directory is browsable offline.
|
|
if path, ok := s.cache.listingCachePathFor(dirURL, "application/json"); ok {
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err == nil {
|
|
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*")
|
|
if err == nil {
|
|
tmpName := tmp.Name()
|
|
if _, err := tmp.Write(bodyBytes); err == nil {
|
|
_ = tmp.Close()
|
|
if lm := resp.Header.Get("Last-Modified"); lm != "" {
|
|
if t, err := http.ParseTime(lm); err == nil {
|
|
_ = os.Chtimes(tmpName, t, t)
|
|
}
|
|
}
|
|
_ = os.Rename(tmpName, path)
|
|
s.cache.maybeWriteMarker()
|
|
} else {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
var parsed []listingEntry
|
|
if err := json.Unmarshal(bodyBytes, &parsed); err != nil {
|
|
return nil, fmt.Errorf("parse listing %s: %w", target, err)
|
|
}
|
|
return parsed, nil
|
|
}
|
|
|
|
// listingEntry mirrors the subset of zddc/internal/listing/types.go's
|
|
// FileInfo we need. Extra fields ignored.
|
|
type listingEntry struct {
|
|
Name string `json:"name"`
|
|
IsDir bool `json:"is_dir"`
|
|
}
|
|
|
|
// fetchFileIfNeeded conditional-GETs a file URL. Returns true when
|
|
// the cache was updated (200 + write); false when no work was needed
|
|
// (304 or skipped).
|
|
func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL string) bool {
|
|
cachePath, ok := s.cache.cachePathFor(fileURL)
|
|
if !ok {
|
|
return false
|
|
}
|
|
target := s.cache.upstream + fileURL
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if info, err := os.Stat(cachePath); err == nil {
|
|
req.Header.Set("If-Modified-Since", info.ModTime().UTC().Format(http.TimeFormat))
|
|
}
|
|
if s.cache.bearer != "" {
|
|
req.Header.Set("Authorization", "Bearer "+s.cache.bearer)
|
|
}
|
|
resp, err := s.cache.client.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode == http.StatusNotModified {
|
|
return false
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
// 4xx → ACL revoked or upstream gone; remove local cache.
|
|
if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound {
|
|
_ = os.Remove(cachePath)
|
|
}
|
|
return false
|
|
}
|
|
if !s.cache.responseCacheable(resp) {
|
|
return false
|
|
}
|
|
if err := s.cache.persistOnly(resp, fileURL); err != nil {
|
|
slog.Debug("walker persist failed", "url", fileURL, "err", err)
|
|
return false
|
|
}
|
|
s.cache.maybeWriteMarker()
|
|
return true
|
|
}
|
|
|
|
// matchSubtree returns the longest configured subtree that's a prefix
|
|
// of urlPath, or "" if none match. "/" matches everything.
|
|
func (s *MirrorScheduler) matchSubtree(urlPath string) string {
|
|
if urlPath == "" {
|
|
urlPath = "/"
|
|
}
|
|
var best string
|
|
for _, st := range s.subtrees {
|
|
if st == "/" {
|
|
if len(best) == 0 {
|
|
best = "/"
|
|
}
|
|
continue
|
|
}
|
|
if strings.HasPrefix(urlPath, st+"/") || urlPath == st {
|
|
if len(st) > len(best) {
|
|
best = st
|
|
}
|
|
}
|
|
}
|
|
return best
|
|
}
|
|
|
|
// loadMirrorState reads the persisted state from disk. Returns an
|
|
// empty (initialized) map on missing/corrupt input.
|
|
func loadMirrorState(path string) map[string]subtreeState {
|
|
state := make(map[string]subtreeState)
|
|
bytes, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return state
|
|
}
|
|
var parsed persistedState
|
|
if err := json.Unmarshal(bytes, &parsed); err != nil {
|
|
return state
|
|
}
|
|
if parsed.Subtrees != nil {
|
|
state = parsed.Subtrees
|
|
}
|
|
return state
|
|
}
|
|
|
|
// persistMirrorState writes the state file atomically (temp + rename).
|
|
// Best-effort: errors are logged but non-fatal.
|
|
func persistMirrorState(path string, state map[string]subtreeState) {
|
|
bytes, err := json.MarshalIndent(persistedState{Subtrees: state}, "", " ")
|
|
if err != nil {
|
|
slog.Debug("marshal mirror state", "err", err)
|
|
return
|
|
}
|
|
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-mirror-state-tmp-*")
|
|
if err != nil {
|
|
slog.Debug("create mirror state tmp", "err", err)
|
|
return
|
|
}
|
|
tmpName := tmp.Name()
|
|
if _, err := tmp.Write(bytes); err != nil {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
return
|
|
}
|
|
if err := tmp.Close(); err != nil {
|
|
_ = os.Remove(tmpName)
|
|
return
|
|
}
|
|
if err := os.Rename(tmpName, path); err != nil {
|
|
_ = os.Remove(tmpName)
|
|
}
|
|
}
|