package cache import ( "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "net/url" "os" "path" "path/filepath" "strings" "sync" "time" ) // MirrorStateFile is the persisted scheduler state — last walk time // per subtree. Lives at the cache root so state survives restarts. const MirrorStateFile = ".zddc-mirror-state.json" // DefaultMirrorMinInterval is the gate between walks for the same // subtree. Active mirrors stay current via repeated visits; idle // mirrors don't generate any upstream traffic until someone hits them. const DefaultMirrorMinInterval = time.Hour // DefaultMirrorParallelism caps how many concurrent file fetches a // single walk fires. Bound is per-walk, not global — multiple // subtrees walked at once each get their own budget. Conservative // enough not to starve interactive requests sharing the same // connection pool. const DefaultMirrorParallelism = 4 // MirrorScheduler owns the per-subtree access-triggered walk policy. // Built once at startup from cfg.MirrorSubtree[]; the cache layer // installs Trigger as its onAccess hook. type MirrorScheduler struct { cache *Cache subtrees []string // normalized: leading "/", no trailing "/" minInterval time.Duration parallelism int mu sync.Mutex state map[string]subtreeState statePath string inFlight map[string]bool // tracked separately from on-disk state — restarts forget } type subtreeState struct { LastWalkAt time.Time `json:"last_walk_at"` } type persistedState struct { Subtrees map[string]subtreeState `json:"subtrees"` } // NewMirrorScheduler constructs a scheduler attached to the given // cache. Returns nil scheduler (and nil error) when no subtrees are // configured — caller checks for that. func NewMirrorScheduler(c *Cache, subtrees []string, minInterval time.Duration, parallelism int) (*MirrorScheduler, error) { if len(subtrees) == 0 { return nil, nil } if minInterval <= 0 { minInterval = DefaultMirrorMinInterval } if parallelism <= 0 { parallelism = DefaultMirrorParallelism } normalized := make([]string, 0, len(subtrees)) for _, s := range subtrees { s = strings.TrimSpace(s) if s == "" || s == "/" { s = "/" } else { if !strings.HasPrefix(s, "/") { s = "/" + s } s = strings.TrimRight(s, "/") } normalized = append(normalized, s) } statePath := filepath.Join(c.root, MirrorStateFile) state := loadMirrorState(statePath) s := &MirrorScheduler{ cache: c, subtrees: normalized, minInterval: minInterval, parallelism: parallelism, state: state, statePath: statePath, inFlight: make(map[string]bool), } c.onAccess = s.Trigger return s, nil } // Subtrees returns the configured subtree list (read-only). func (s *MirrorScheduler) Subtrees() []string { out := make([]string, len(s.subtrees)) copy(out, s.subtrees) return out } // MinInterval returns the configured min walk interval. func (s *MirrorScheduler) MinInterval() time.Duration { return s.minInterval } // Trigger inspects an incoming URL and, if it falls under a // configured subtree AND a walk is due, kicks one in the background. // Always non-blocking. Safe to call concurrently. func (s *MirrorScheduler) Trigger(urlPath string) { subtree := s.matchSubtree(urlPath) if subtree == "" { return } s.mu.Lock() defer s.mu.Unlock() if s.inFlight[subtree] { return } st := s.state[subtree] if !st.LastWalkAt.IsZero() && time.Since(st.LastWalkAt) < s.minInterval { return } s.inFlight[subtree] = true go s.runWalk(subtree) } // runWalk does one walk-and-record cycle for subtree. Always called // in a goroutine from Trigger. func (s *MirrorScheduler) runWalk(subtree string) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) defer cancel() slog.Info("mirror walk start", "subtree", subtree) start := time.Now() stats, err := s.walkSubtree(ctx, subtree) dur := time.Since(start) if err != nil { slog.Warn("mirror walk failed", "subtree", subtree, "duration", dur, "err", err) } else { slog.Info("mirror walk done", "subtree", subtree, "duration", dur, "dirs", stats.dirs, "files", stats.files, "fetched", stats.fetched, "purged", stats.purged) } s.mu.Lock() defer s.mu.Unlock() s.inFlight[subtree] = false s.state[subtree] = subtreeState{LastWalkAt: time.Now()} persistMirrorState(s.statePath, s.state) } type walkStats struct { dirs int files int fetched int purged int } // walkSubtree recursively walks the upstream listings under subtree, // fetching missing/stale files and purging local files no longer in // the upstream's filtered listing. Bounded parallelism via a // semaphore. Errors fetching individual files are logged but don't // abort the walk. func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walkStats, error) { sem := make(chan struct{}, s.parallelism) var stats walkStats var statsMu sync.Mutex addFetched := func() { statsMu.Lock(); stats.fetched++; statsMu.Unlock() } addPurged := func() { statsMu.Lock(); stats.purged++; statsMu.Unlock() } addDirs := func() { statsMu.Lock(); stats.dirs++; statsMu.Unlock() } addFiles := func() { statsMu.Lock(); stats.files++; statsMu.Unlock() } var fetchWG sync.WaitGroup // walkDir keeps two parallel views of the directory's path: // dirURL — URL-encoded form, used to compose HTTP requests // dirPath — decoded form, used as the filesystem cache key. // Filenames with spaces / parens / etc. need URL escaping for // HTTP but must NOT land on disk percent-encoded — otherwise the // orphan-purge pass below can't match local entries against the // upstream's (decoded) listing names. var walkDir func(dirURL, dirPath string) error walkDir = func(dirURL, dirPath string) error { if !strings.HasSuffix(dirURL, "/") { dirURL += "/" } if !strings.HasSuffix(dirPath, "/") { dirPath += "/" } addDirs() entries, err := s.fetchListing(ctx, dirURL, dirPath) if err != nil { return err } // Build a set of names the upstream listing exposes so we can // purge anything local-but-removed-upstream after walking. upstreamNames := make(map[string]bool, len(entries)) for _, e := range entries { name := strings.TrimSuffix(e.Name, "/") upstreamNames[name] = true } for _, e := range entries { if ctx.Err() != nil { return ctx.Err() } name := strings.TrimSuffix(e.Name, "/") // Defense-in-depth against a hostile or compromised // upstream listing: drop any entry name that could // steer the walker — and the orphan-purge pass below — // outside the cache root. A healthy master's listing // pipeline (internal/listing/listing.go) already filters // these, so this only fires under MITM or upstream // compromise. An empty / "."/".." / slash-bearing name // has no legitimate use as a child entry. if name == "" || name == "." || name == ".." || strings.ContainsAny(name, "/\\") { slog.Warn("walker: dropping unsafe upstream listing entry", "dir", dirPath, "name", e.Name) continue } childURL := dirURL + url.PathEscape(name) childPath := dirPath + name if e.IsDir { childURL += "/" childPath += "/" if err := walkDir(childURL, childPath); err != nil { slog.Debug("walk subdir failed", "url", childURL, "err", err) } continue } addFiles() fetchWG.Add(1) sem <- struct{}{} go func(fileURL, filePath string) { defer fetchWG.Done() defer func() { <-sem }() if s.fetchFileIfNeeded(ctx, fileURL, filePath) { addFetched() } }(childURL, childPath) } // Purge local files no longer present upstream. Only acts on // regular files inside this directory — listing sidecars, // dot-prefix internals, and subdirectories are left alone. s.purgeOrphans(dirPath, upstreamNames, addPurged) return nil } err := walkDir(subtree+"/", subtree+"/") fetchWG.Wait() return stats, err } // purgeOrphans removes local cached files in dirPath (decoded URL // path) that are absent from the upstream listing. Only operates on // plain files; ignores dot-prefix entries (cache state) and // subdirectories (they get their own pass during walkSubtree's // recursion). func (s *MirrorScheduler) purgeOrphans(dirPath string, upstreamNames map[string]bool, addPurged func()) { rel := filepath.FromSlash(strings.Trim(dirPath, "/")) localDir := filepath.Join(s.cache.root, rel) // Belt-and-suspenders containment check: even though walkDir // filters unsafe names before recursing, refuse to operate on // any localDir that filepath.Join resolved outside the cache // root. Without this, an upstream listing with a `..` entry // that slipped past the walker's filter could steer os.Remove // at a parent of cache.root. if localDir != s.cache.root && !strings.HasPrefix(localDir, s.cache.root+string(filepath.Separator)) { slog.Warn("walker: refusing purge outside cache root", "dirPath", dirPath, "resolved", localDir) return } entries, err := os.ReadDir(localDir) if err != nil { return } for _, e := range entries { if e.IsDir() { continue } name := e.Name() if strings.HasPrefix(name, ".") { continue } if upstreamNames[name] { continue } _ = os.Remove(filepath.Join(localDir, name)) addPurged() slog.Info("purged orphan", "path", path.Join(dirPath, name)) } } // fetchListing GETs the upstream JSON listing for dirURL (encoded) // and parses the entries. Caches the JSON sidecar at dirPath // (decoded) as a side-effect so the directory is browsable offline. func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL, dirPath string) ([]listingEntry, error) { target := s.cache.upstream + dirURL req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) if err != nil { return nil, err } req.Header.Set("Accept", "application/json") if s.cache.bearer != "" { req.Header.Set("Authorization", "Bearer "+s.cache.bearer) } resp, err := s.cache.client.Do(req) if err != nil { return nil, fmt.Errorf("fetch listing %s: %w", target, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("listing %s: status %d", target, resp.StatusCode) } bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024)) if err != nil { return nil, fmt.Errorf("read listing body: %w", err) } // Persist the JSON sidecar so the directory is browsable offline. // listingCachePathFor expects a decoded directory URL (keyed by // FS path), so dirPath rather than dirURL here. if path, ok := s.cache.listingCachePathFor(dirPath, "application/json"); ok { if err := os.MkdirAll(filepath.Dir(path), 0o755); err == nil { tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*") if err == nil { tmpName := tmp.Name() if _, err := tmp.Write(bodyBytes); err == nil { _ = tmp.Close() if lm := resp.Header.Get("Last-Modified"); lm != "" { if t, err := http.ParseTime(lm); err == nil { _ = os.Chtimes(tmpName, t, t) } } _ = os.Rename(tmpName, path) s.cache.maybeWriteMarker() } else { _ = tmp.Close() _ = os.Remove(tmpName) } } } } var parsed []listingEntry if err := json.Unmarshal(bodyBytes, &parsed); err != nil { return nil, fmt.Errorf("parse listing %s: %w", target, err) } return parsed, nil } // listingEntry mirrors the subset of zddc/internal/listing/types.go's // FileInfo we need. Extra fields ignored. type listingEntry struct { Name string `json:"name"` IsDir bool `json:"is_dir"` } // fetchFileIfNeeded conditional-GETs a file URL. fileURL is the // URL-encoded HTTP target; filePath is the decoded URL path used as // the on-disk cache key. Returns true when the cache was updated // (200 + write); false when no work was needed (304 or skipped). func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL, filePath string) bool { cachePath, ok := s.cache.cachePathFor(filePath) if !ok { return false } target := s.cache.upstream + fileURL req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) if err != nil { return false } if info, err := os.Stat(cachePath); err == nil { req.Header.Set("If-Modified-Since", info.ModTime().UTC().Format(http.TimeFormat)) } if s.cache.bearer != "" { req.Header.Set("Authorization", "Bearer "+s.cache.bearer) } resp, err := s.cache.client.Do(req) if err != nil { return false } defer resp.Body.Close() if resp.StatusCode == http.StatusNotModified { return false } if resp.StatusCode != http.StatusOK { // 4xx → ACL revoked or upstream gone; remove local cache. if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound { _ = os.Remove(cachePath) } return false } if !s.cache.responseCacheable(resp) { return false } // persistOnly resolves the disk path via cachePathFor too — pass // the decoded filePath, not the encoded fileURL. if err := s.cache.persistOnly(resp, filePath); err != nil { slog.Debug("walker persist failed", "path", filePath, "err", err) return false } s.cache.maybeWriteMarker() return true } // matchSubtree returns the longest configured subtree that's a prefix // of urlPath, or "" if none match. "/" matches everything. func (s *MirrorScheduler) matchSubtree(urlPath string) string { if urlPath == "" { urlPath = "/" } var best string for _, st := range s.subtrees { if st == "/" { if len(best) == 0 { best = "/" } continue } if strings.HasPrefix(urlPath, st+"/") || urlPath == st { if len(st) > len(best) { best = st } } } return best } // loadMirrorState reads the persisted state from disk. Returns an // empty (initialized) map on missing/corrupt input. func loadMirrorState(path string) map[string]subtreeState { state := make(map[string]subtreeState) bytes, err := os.ReadFile(path) if err != nil { return state } var parsed persistedState if err := json.Unmarshal(bytes, &parsed); err != nil { return state } if parsed.Subtrees != nil { state = parsed.Subtrees } return state } // persistMirrorState writes the state file atomically (temp + rename). // Best-effort: errors are logged but non-fatal. func persistMirrorState(path string, state map[string]subtreeState) { bytes, err := json.MarshalIndent(persistedState{Subtrees: state}, "", " ") if err != nil { slog.Debug("marshal mirror state", "err", err) return } tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-mirror-state-tmp-*") if err != nil { slog.Debug("create mirror state tmp", "err", err) return } tmpName := tmp.Name() if _, err := tmp.Write(bytes); err != nil { _ = tmp.Close() _ = os.Remove(tmpName) return } if err := tmp.Close(); err != nil { _ = os.Remove(tmpName) return } if err := os.Rename(tmpName, path); err != nil { _ = os.Remove(tmpName) } }