ZDDC/zddc/internal/cache/walker.go
2026-06-11 13:32:31 -05:00

475 lines
15 KiB
Go

package cache
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
)
// MirrorStateFile is the persisted scheduler state — last walk time
// per subtree. Lives at the cache root so state survives restarts.
const MirrorStateFile = ".zddc-mirror-state.json"
// DefaultMirrorMinInterval is the gate between walks for the same
// subtree. Active mirrors stay current via repeated visits; idle
// mirrors don't generate any upstream traffic until someone hits them.
const DefaultMirrorMinInterval = time.Hour
// DefaultMirrorParallelism caps how many concurrent file fetches a
// single walk fires. Bound is per-walk, not global — multiple
// subtrees walked at once each get their own budget. Conservative
// enough not to starve interactive requests sharing the same
// connection pool.
const DefaultMirrorParallelism = 4
// MirrorScheduler owns the per-subtree access-triggered walk policy.
// Built once at startup from cfg.MirrorSubtree[]; the cache layer
// installs Trigger as its onAccess hook.
type MirrorScheduler struct {
cache *Cache
subtrees []string // normalized: leading "/", no trailing "/"
minInterval time.Duration
parallelism int
mu sync.Mutex
state map[string]subtreeState
statePath string
inFlight map[string]bool // tracked separately from on-disk state — restarts forget
}
type subtreeState struct {
LastWalkAt time.Time `json:"last_walk_at"`
}
type persistedState struct {
Subtrees map[string]subtreeState `json:"subtrees"`
}
// NewMirrorScheduler constructs a scheduler attached to the given
// cache. Returns nil scheduler (and nil error) when no subtrees are
// configured — caller checks for that.
func NewMirrorScheduler(c *Cache, subtrees []string, minInterval time.Duration, parallelism int) (*MirrorScheduler, error) {
if len(subtrees) == 0 {
return nil, nil
}
if minInterval <= 0 {
minInterval = DefaultMirrorMinInterval
}
if parallelism <= 0 {
parallelism = DefaultMirrorParallelism
}
normalized := make([]string, 0, len(subtrees))
for _, s := range subtrees {
s = strings.TrimSpace(s)
if s == "" || s == "/" {
s = "/"
} else {
if !strings.HasPrefix(s, "/") {
s = "/" + s
}
s = strings.TrimRight(s, "/")
}
normalized = append(normalized, s)
}
statePath := filepath.Join(c.root, MirrorStateFile)
state := loadMirrorState(statePath)
s := &MirrorScheduler{
cache: c,
subtrees: normalized,
minInterval: minInterval,
parallelism: parallelism,
state: state,
statePath: statePath,
inFlight: make(map[string]bool),
}
c.onAccess = s.Trigger
return s, nil
}
// Subtrees returns the configured subtree list (read-only).
func (s *MirrorScheduler) Subtrees() []string {
out := make([]string, len(s.subtrees))
copy(out, s.subtrees)
return out
}
// MinInterval returns the configured min walk interval.
func (s *MirrorScheduler) MinInterval() time.Duration { return s.minInterval }
// Trigger inspects an incoming URL and, if it falls under a
// configured subtree AND a walk is due, kicks one in the background.
// Always non-blocking. Safe to call concurrently.
func (s *MirrorScheduler) Trigger(urlPath string) {
subtree := s.matchSubtree(urlPath)
if subtree == "" {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.inFlight[subtree] {
return
}
st := s.state[subtree]
if !st.LastWalkAt.IsZero() && time.Since(st.LastWalkAt) < s.minInterval {
return
}
s.inFlight[subtree] = true
go s.runWalk(subtree)
}
// runWalk does one walk-and-record cycle for subtree. Always called
// in a goroutine from Trigger.
func (s *MirrorScheduler) runWalk(subtree string) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
slog.Info("mirror walk start", "subtree", subtree)
start := time.Now()
stats, err := s.walkSubtree(ctx, subtree)
dur := time.Since(start)
if err != nil {
slog.Warn("mirror walk failed", "subtree", subtree, "duration", dur, "err", err)
} else {
slog.Info("mirror walk done", "subtree", subtree, "duration", dur,
"dirs", stats.dirs, "files", stats.files, "fetched", stats.fetched, "purged", stats.purged)
}
s.mu.Lock()
defer s.mu.Unlock()
s.inFlight[subtree] = false
s.state[subtree] = subtreeState{LastWalkAt: time.Now()}
persistMirrorState(s.statePath, s.state)
}
type walkStats struct {
dirs int
files int
fetched int
purged int
}
// walkSubtree recursively walks the upstream listings under subtree,
// fetching missing/stale files and purging local files no longer in
// the upstream's filtered listing. Bounded parallelism via a
// semaphore. Errors fetching individual files are logged but don't
// abort the walk.
func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walkStats, error) {
sem := make(chan struct{}, s.parallelism)
var stats walkStats
var statsMu sync.Mutex
addFetched := func() { statsMu.Lock(); stats.fetched++; statsMu.Unlock() }
addPurged := func() { statsMu.Lock(); stats.purged++; statsMu.Unlock() }
addDirs := func() { statsMu.Lock(); stats.dirs++; statsMu.Unlock() }
addFiles := func() { statsMu.Lock(); stats.files++; statsMu.Unlock() }
var fetchWG sync.WaitGroup
// walkDir keeps two parallel views of the directory's path:
// dirURL — URL-encoded form, used to compose HTTP requests
// dirPath — decoded form, used as the filesystem cache key.
// Filenames with spaces / parens / etc. need URL escaping for
// HTTP but must NOT land on disk percent-encoded — otherwise the
// orphan-purge pass below can't match local entries against the
// upstream's (decoded) listing names.
var walkDir func(dirURL, dirPath string) error
walkDir = func(dirURL, dirPath string) error {
if !strings.HasSuffix(dirURL, "/") {
dirURL += "/"
}
if !strings.HasSuffix(dirPath, "/") {
dirPath += "/"
}
addDirs()
entries, err := s.fetchListing(ctx, dirURL, dirPath)
if err != nil {
return err
}
// Build a set of names the upstream listing exposes so we can
// purge anything local-but-removed-upstream after walking.
upstreamNames := make(map[string]bool, len(entries))
for _, e := range entries {
name := strings.TrimSuffix(e.Name, "/")
upstreamNames[name] = true
}
for _, e := range entries {
if ctx.Err() != nil {
return ctx.Err()
}
name := strings.TrimSuffix(e.Name, "/")
// Defense-in-depth against a hostile or compromised
// upstream listing: drop any entry name that could
// steer the walker — and the orphan-purge pass below —
// outside the cache root. A healthy master's listing
// pipeline (internal/listing/listing.go) already filters
// these, so this only fires under MITM or upstream
// compromise. An empty / "."/".." / slash-bearing name
// has no legitimate use as a child entry.
if name == "" || name == "." || name == ".." ||
strings.ContainsAny(name, "/\\") {
slog.Warn("walker: dropping unsafe upstream listing entry",
"dir", dirPath, "name", e.Name)
continue
}
childURL := dirURL + url.PathEscape(name)
childPath := dirPath + name
if e.IsDir {
childURL += "/"
childPath += "/"
if err := walkDir(childURL, childPath); err != nil {
slog.Debug("walk subdir failed", "url", childURL, "err", err)
}
continue
}
addFiles()
fetchWG.Add(1)
sem <- struct{}{}
go func(fileURL, filePath string) {
defer fetchWG.Done()
defer func() { <-sem }()
if s.fetchFileIfNeeded(ctx, fileURL, filePath) {
addFetched()
}
}(childURL, childPath)
}
// Purge local files no longer present upstream. Only acts on
// regular files inside this directory — listing sidecars,
// dot-prefix internals, and subdirectories are left alone.
s.purgeOrphans(dirPath, upstreamNames, addPurged)
return nil
}
err := walkDir(subtree+"/", subtree+"/")
fetchWG.Wait()
return stats, err
}
// purgeOrphans removes local cached files in dirPath (decoded URL
// path) that are absent from the upstream listing. Only operates on
// plain files; ignores dot-prefix entries (cache state) and
// subdirectories (they get their own pass during walkSubtree's
// recursion).
func (s *MirrorScheduler) purgeOrphans(dirPath string, upstreamNames map[string]bool, addPurged func()) {
rel := filepath.FromSlash(strings.Trim(dirPath, "/"))
localDir := filepath.Join(s.cache.root, rel)
// Belt-and-suspenders containment check: even though walkDir
// filters unsafe names before recursing, refuse to operate on
// any localDir that filepath.Join resolved outside the cache
// root. Without this, an upstream listing with a `..` entry
// that slipped past the walker's filter could steer os.Remove
// at a parent of cache.root.
if localDir != s.cache.root && !strings.HasPrefix(localDir, s.cache.root+string(filepath.Separator)) {
slog.Warn("walker: refusing purge outside cache root",
"dirPath", dirPath, "resolved", localDir)
return
}
entries, err := os.ReadDir(localDir)
if err != nil {
return
}
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if strings.HasPrefix(name, ".") {
continue
}
if upstreamNames[name] {
continue
}
_ = os.Remove(filepath.Join(localDir, name))
addPurged()
slog.Info("purged orphan", "path", path.Join(dirPath, name))
}
}
// fetchListing GETs the upstream JSON listing for dirURL (encoded)
// and parses the entries. Caches the JSON sidecar at dirPath
// (decoded) as a side-effect so the directory is browsable offline.
func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL, dirPath string) ([]listingEntry, error) {
target := s.cache.upstream + dirURL
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/json")
if s.cache.bearer != "" {
req.Header.Set("Authorization", "Bearer "+s.cache.bearer)
}
resp, err := s.cache.client.Do(req)
if err != nil {
return nil, fmt.Errorf("fetch listing %s: %w", target, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("listing %s: status %d", target, resp.StatusCode)
}
bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024))
if err != nil {
return nil, fmt.Errorf("read listing body: %w", err)
}
// Persist the JSON sidecar so the directory is browsable offline.
// listingCachePathFor expects a decoded directory URL (keyed by
// FS path), so dirPath rather than dirURL here.
if path, ok := s.cache.listingCachePathFor(dirPath, "application/json"); ok {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err == nil {
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*")
if err == nil {
tmpName := tmp.Name()
if _, err := tmp.Write(bodyBytes); err == nil {
_ = tmp.Close()
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
_ = os.Rename(tmpName, path)
s.cache.maybeWriteMarker()
} else {
_ = tmp.Close()
_ = os.Remove(tmpName)
}
}
}
}
var parsed []listingEntry
if err := json.Unmarshal(bodyBytes, &parsed); err != nil {
return nil, fmt.Errorf("parse listing %s: %w", target, err)
}
return parsed, nil
}
// listingEntry mirrors the subset of zddc/internal/listing/types.go's
// FileInfo we need. Extra fields ignored.
type listingEntry struct {
Name string `json:"name"`
IsDir bool `json:"is_dir"`
}
// fetchFileIfNeeded conditional-GETs a file URL. fileURL is the
// URL-encoded HTTP target; filePath is the decoded URL path used as
// the on-disk cache key. Returns true when the cache was updated
// (200 + write); false when no work was needed (304 or skipped).
func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL, filePath string) bool {
cachePath, ok := s.cache.cachePathFor(filePath)
if !ok {
return false
}
target := s.cache.upstream + fileURL
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
if err != nil {
return false
}
if info, err := os.Stat(cachePath); err == nil {
req.Header.Set("If-Modified-Since", info.ModTime().UTC().Format(http.TimeFormat))
}
if s.cache.bearer != "" {
req.Header.Set("Authorization", "Bearer "+s.cache.bearer)
}
resp, err := s.cache.client.Do(req)
if err != nil {
return false
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotModified {
return false
}
if resp.StatusCode != http.StatusOK {
// 4xx → ACL revoked or upstream gone; remove local cache.
if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound {
_ = os.Remove(cachePath)
}
return false
}
if !s.cache.responseCacheable(resp) {
return false
}
// persistOnly resolves the disk path via cachePathFor too — pass
// the decoded filePath, not the encoded fileURL.
if err := s.cache.persistOnly(resp, filePath); err != nil {
slog.Debug("walker persist failed", "path", filePath, "err", err)
return false
}
s.cache.maybeWriteMarker()
return true
}
// matchSubtree returns the longest configured subtree that's a prefix
// of urlPath, or "" if none match. "/" matches everything.
func (s *MirrorScheduler) matchSubtree(urlPath string) string {
if urlPath == "" {
urlPath = "/"
}
var best string
for _, st := range s.subtrees {
if st == "/" {
if len(best) == 0 {
best = "/"
}
continue
}
if strings.HasPrefix(urlPath, st+"/") || urlPath == st {
if len(st) > len(best) {
best = st
}
}
}
return best
}
// loadMirrorState reads the persisted state from disk. Returns an
// empty (initialized) map on missing/corrupt input.
func loadMirrorState(path string) map[string]subtreeState {
state := make(map[string]subtreeState)
bytes, err := os.ReadFile(path)
if err != nil {
return state
}
var parsed persistedState
if err := json.Unmarshal(bytes, &parsed); err != nil {
return state
}
if parsed.Subtrees != nil {
state = parsed.Subtrees
}
return state
}
// persistMirrorState writes the state file atomically (temp + rename).
// Best-effort: errors are logged but non-fatal.
func persistMirrorState(path string, state map[string]subtreeState) {
bytes, err := json.MarshalIndent(persistedState{Subtrees: state}, "", " ")
if err != nil {
slog.Debug("marshal mirror state", "err", err)
return
}
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-mirror-state-tmp-*")
if err != nil {
slog.Debug("create mirror state tmp", "err", err)
return
}
tmpName := tmp.Name()
if _, err := tmp.Write(bytes); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return
}
if err := os.Rename(tmpName, path); err != nil {
_ = os.Remove(tmpName)
}
}