475 lines
15 KiB
Go
475 lines
15 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// MirrorStateFile is the persisted scheduler state — last walk time
|
|
// per subtree. Lives at the cache root so state survives restarts.
|
|
const MirrorStateFile = ".zddc-mirror-state.json"
|
|
|
|
// DefaultMirrorMinInterval is the gate between walks for the same
|
|
// subtree. Active mirrors stay current via repeated visits; idle
|
|
// mirrors don't generate any upstream traffic until someone hits them.
|
|
const DefaultMirrorMinInterval = time.Hour
|
|
|
|
// DefaultMirrorParallelism caps how many concurrent file fetches a
|
|
// single walk fires. Bound is per-walk, not global — multiple
|
|
// subtrees walked at once each get their own budget. Conservative
|
|
// enough not to starve interactive requests sharing the same
|
|
// connection pool.
|
|
const DefaultMirrorParallelism = 4
|
|
|
|
// MirrorScheduler owns the per-subtree access-triggered walk policy.
|
|
// Built once at startup from cfg.MirrorSubtree[]; the cache layer
|
|
// installs Trigger as its onAccess hook.
|
|
type MirrorScheduler struct {
|
|
cache *Cache
|
|
subtrees []string // normalized: leading "/", no trailing "/"
|
|
minInterval time.Duration
|
|
parallelism int
|
|
|
|
mu sync.Mutex
|
|
state map[string]subtreeState
|
|
statePath string
|
|
inFlight map[string]bool // tracked separately from on-disk state — restarts forget
|
|
}
|
|
|
|
type subtreeState struct {
|
|
LastWalkAt time.Time `json:"last_walk_at"`
|
|
}
|
|
|
|
type persistedState struct {
|
|
Subtrees map[string]subtreeState `json:"subtrees"`
|
|
}
|
|
|
|
// NewMirrorScheduler constructs a scheduler attached to the given
|
|
// cache. Returns nil scheduler (and nil error) when no subtrees are
|
|
// configured — caller checks for that.
|
|
func NewMirrorScheduler(c *Cache, subtrees []string, minInterval time.Duration, parallelism int) (*MirrorScheduler, error) {
|
|
if len(subtrees) == 0 {
|
|
return nil, nil
|
|
}
|
|
if minInterval <= 0 {
|
|
minInterval = DefaultMirrorMinInterval
|
|
}
|
|
if parallelism <= 0 {
|
|
parallelism = DefaultMirrorParallelism
|
|
}
|
|
normalized := make([]string, 0, len(subtrees))
|
|
for _, s := range subtrees {
|
|
s = strings.TrimSpace(s)
|
|
if s == "" || s == "/" {
|
|
s = "/"
|
|
} else {
|
|
if !strings.HasPrefix(s, "/") {
|
|
s = "/" + s
|
|
}
|
|
s = strings.TrimRight(s, "/")
|
|
}
|
|
normalized = append(normalized, s)
|
|
}
|
|
statePath := filepath.Join(c.root, MirrorStateFile)
|
|
state := loadMirrorState(statePath)
|
|
s := &MirrorScheduler{
|
|
cache: c,
|
|
subtrees: normalized,
|
|
minInterval: minInterval,
|
|
parallelism: parallelism,
|
|
state: state,
|
|
statePath: statePath,
|
|
inFlight: make(map[string]bool),
|
|
}
|
|
c.onAccess = s.Trigger
|
|
return s, nil
|
|
}
|
|
|
|
// Subtrees returns the configured subtree list (read-only).
|
|
func (s *MirrorScheduler) Subtrees() []string {
|
|
out := make([]string, len(s.subtrees))
|
|
copy(out, s.subtrees)
|
|
return out
|
|
}
|
|
|
|
// MinInterval returns the configured min walk interval.
|
|
func (s *MirrorScheduler) MinInterval() time.Duration { return s.minInterval }
|
|
|
|
// Trigger inspects an incoming URL and, if it falls under a
|
|
// configured subtree AND a walk is due, kicks one in the background.
|
|
// Always non-blocking. Safe to call concurrently.
|
|
func (s *MirrorScheduler) Trigger(urlPath string) {
|
|
subtree := s.matchSubtree(urlPath)
|
|
if subtree == "" {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.inFlight[subtree] {
|
|
return
|
|
}
|
|
st := s.state[subtree]
|
|
if !st.LastWalkAt.IsZero() && time.Since(st.LastWalkAt) < s.minInterval {
|
|
return
|
|
}
|
|
s.inFlight[subtree] = true
|
|
go s.runWalk(subtree)
|
|
}
|
|
|
|
// runWalk does one walk-and-record cycle for subtree. Always called
|
|
// in a goroutine from Trigger.
|
|
func (s *MirrorScheduler) runWalk(subtree string) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
|
|
defer cancel()
|
|
slog.Info("mirror walk start", "subtree", subtree)
|
|
start := time.Now()
|
|
stats, err := s.walkSubtree(ctx, subtree)
|
|
dur := time.Since(start)
|
|
if err != nil {
|
|
slog.Warn("mirror walk failed", "subtree", subtree, "duration", dur, "err", err)
|
|
} else {
|
|
slog.Info("mirror walk done", "subtree", subtree, "duration", dur,
|
|
"dirs", stats.dirs, "files", stats.files, "fetched", stats.fetched, "purged", stats.purged)
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.inFlight[subtree] = false
|
|
s.state[subtree] = subtreeState{LastWalkAt: time.Now()}
|
|
persistMirrorState(s.statePath, s.state)
|
|
}
|
|
|
|
type walkStats struct {
|
|
dirs int
|
|
files int
|
|
fetched int
|
|
purged int
|
|
}
|
|
|
|
// walkSubtree recursively walks the upstream listings under subtree,
|
|
// fetching missing/stale files and purging local files no longer in
|
|
// the upstream's filtered listing. Bounded parallelism via a
|
|
// semaphore. Errors fetching individual files are logged but don't
|
|
// abort the walk.
|
|
func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walkStats, error) {
|
|
sem := make(chan struct{}, s.parallelism)
|
|
var stats walkStats
|
|
var statsMu sync.Mutex
|
|
addFetched := func() { statsMu.Lock(); stats.fetched++; statsMu.Unlock() }
|
|
addPurged := func() { statsMu.Lock(); stats.purged++; statsMu.Unlock() }
|
|
addDirs := func() { statsMu.Lock(); stats.dirs++; statsMu.Unlock() }
|
|
addFiles := func() { statsMu.Lock(); stats.files++; statsMu.Unlock() }
|
|
|
|
var fetchWG sync.WaitGroup
|
|
|
|
// walkDir keeps two parallel views of the directory's path:
|
|
// dirURL — URL-encoded form, used to compose HTTP requests
|
|
// dirPath — decoded form, used as the filesystem cache key.
|
|
// Filenames with spaces / parens / etc. need URL escaping for
|
|
// HTTP but must NOT land on disk percent-encoded — otherwise the
|
|
// orphan-purge pass below can't match local entries against the
|
|
// upstream's (decoded) listing names.
|
|
var walkDir func(dirURL, dirPath string) error
|
|
walkDir = func(dirURL, dirPath string) error {
|
|
if !strings.HasSuffix(dirURL, "/") {
|
|
dirURL += "/"
|
|
}
|
|
if !strings.HasSuffix(dirPath, "/") {
|
|
dirPath += "/"
|
|
}
|
|
addDirs()
|
|
entries, err := s.fetchListing(ctx, dirURL, dirPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Build a set of names the upstream listing exposes so we can
|
|
// purge anything local-but-removed-upstream after walking.
|
|
upstreamNames := make(map[string]bool, len(entries))
|
|
for _, e := range entries {
|
|
name := strings.TrimSuffix(e.Name, "/")
|
|
upstreamNames[name] = true
|
|
}
|
|
|
|
for _, e := range entries {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
name := strings.TrimSuffix(e.Name, "/")
|
|
// Defense-in-depth against a hostile or compromised
|
|
// upstream listing: drop any entry name that could
|
|
// steer the walker — and the orphan-purge pass below —
|
|
// outside the cache root. A healthy master's listing
|
|
// pipeline (internal/listing/listing.go) already filters
|
|
// these, so this only fires under MITM or upstream
|
|
// compromise. An empty / "."/".." / slash-bearing name
|
|
// has no legitimate use as a child entry.
|
|
if name == "" || name == "." || name == ".." ||
|
|
strings.ContainsAny(name, "/\\") {
|
|
slog.Warn("walker: dropping unsafe upstream listing entry",
|
|
"dir", dirPath, "name", e.Name)
|
|
continue
|
|
}
|
|
childURL := dirURL + url.PathEscape(name)
|
|
childPath := dirPath + name
|
|
if e.IsDir {
|
|
childURL += "/"
|
|
childPath += "/"
|
|
if err := walkDir(childURL, childPath); err != nil {
|
|
slog.Debug("walk subdir failed", "url", childURL, "err", err)
|
|
}
|
|
continue
|
|
}
|
|
addFiles()
|
|
fetchWG.Add(1)
|
|
sem <- struct{}{}
|
|
go func(fileURL, filePath string) {
|
|
defer fetchWG.Done()
|
|
defer func() { <-sem }()
|
|
if s.fetchFileIfNeeded(ctx, fileURL, filePath) {
|
|
addFetched()
|
|
}
|
|
}(childURL, childPath)
|
|
}
|
|
|
|
// Purge local files no longer present upstream. Only acts on
|
|
// regular files inside this directory — listing sidecars,
|
|
// dot-prefix internals, and subdirectories are left alone.
|
|
s.purgeOrphans(dirPath, upstreamNames, addPurged)
|
|
return nil
|
|
}
|
|
|
|
err := walkDir(subtree+"/", subtree+"/")
|
|
fetchWG.Wait()
|
|
return stats, err
|
|
}
|
|
|
|
// purgeOrphans removes local cached files in dirPath (decoded URL
|
|
// path) that are absent from the upstream listing. Only operates on
|
|
// plain files; ignores dot-prefix entries (cache state) and
|
|
// subdirectories (they get their own pass during walkSubtree's
|
|
// recursion).
|
|
func (s *MirrorScheduler) purgeOrphans(dirPath string, upstreamNames map[string]bool, addPurged func()) {
|
|
rel := filepath.FromSlash(strings.Trim(dirPath, "/"))
|
|
localDir := filepath.Join(s.cache.root, rel)
|
|
// Belt-and-suspenders containment check: even though walkDir
|
|
// filters unsafe names before recursing, refuse to operate on
|
|
// any localDir that filepath.Join resolved outside the cache
|
|
// root. Without this, an upstream listing with a `..` entry
|
|
// that slipped past the walker's filter could steer os.Remove
|
|
// at a parent of cache.root.
|
|
if localDir != s.cache.root && !strings.HasPrefix(localDir, s.cache.root+string(filepath.Separator)) {
|
|
slog.Warn("walker: refusing purge outside cache root",
|
|
"dirPath", dirPath, "resolved", localDir)
|
|
return
|
|
}
|
|
entries, err := os.ReadDir(localDir)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, e := range entries {
|
|
if e.IsDir() {
|
|
continue
|
|
}
|
|
name := e.Name()
|
|
if strings.HasPrefix(name, ".") {
|
|
continue
|
|
}
|
|
if upstreamNames[name] {
|
|
continue
|
|
}
|
|
_ = os.Remove(filepath.Join(localDir, name))
|
|
addPurged()
|
|
slog.Info("purged orphan", "path", path.Join(dirPath, name))
|
|
}
|
|
}
|
|
|
|
// fetchListing GETs the upstream JSON listing for dirURL (encoded)
|
|
// and parses the entries. Caches the JSON sidecar at dirPath
|
|
// (decoded) as a side-effect so the directory is browsable offline.
|
|
func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL, dirPath string) ([]listingEntry, error) {
|
|
target := s.cache.upstream + dirURL
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("Accept", "application/json")
|
|
if s.cache.bearer != "" {
|
|
req.Header.Set("Authorization", "Bearer "+s.cache.bearer)
|
|
}
|
|
resp, err := s.cache.client.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("fetch listing %s: %w", target, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("listing %s: status %d", target, resp.StatusCode)
|
|
}
|
|
bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read listing body: %w", err)
|
|
}
|
|
// Persist the JSON sidecar so the directory is browsable offline.
|
|
// listingCachePathFor expects a decoded directory URL (keyed by
|
|
// FS path), so dirPath rather than dirURL here.
|
|
if path, ok := s.cache.listingCachePathFor(dirPath, "application/json"); ok {
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err == nil {
|
|
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*")
|
|
if err == nil {
|
|
tmpName := tmp.Name()
|
|
if _, err := tmp.Write(bodyBytes); err == nil {
|
|
_ = tmp.Close()
|
|
if lm := resp.Header.Get("Last-Modified"); lm != "" {
|
|
if t, err := http.ParseTime(lm); err == nil {
|
|
_ = os.Chtimes(tmpName, t, t)
|
|
}
|
|
}
|
|
_ = os.Rename(tmpName, path)
|
|
s.cache.maybeWriteMarker()
|
|
} else {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
var parsed []listingEntry
|
|
if err := json.Unmarshal(bodyBytes, &parsed); err != nil {
|
|
return nil, fmt.Errorf("parse listing %s: %w", target, err)
|
|
}
|
|
return parsed, nil
|
|
}
|
|
|
|
// listingEntry mirrors the subset of zddc/internal/listing/types.go's
|
|
// FileInfo we need. Extra fields ignored.
|
|
type listingEntry struct {
|
|
Name string `json:"name"`
|
|
IsDir bool `json:"is_dir"`
|
|
}
|
|
|
|
// fetchFileIfNeeded conditional-GETs a file URL. fileURL is the
|
|
// URL-encoded HTTP target; filePath is the decoded URL path used as
|
|
// the on-disk cache key. Returns true when the cache was updated
|
|
// (200 + write); false when no work was needed (304 or skipped).
|
|
func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL, filePath string) bool {
|
|
cachePath, ok := s.cache.cachePathFor(filePath)
|
|
if !ok {
|
|
return false
|
|
}
|
|
target := s.cache.upstream + fileURL
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
if info, err := os.Stat(cachePath); err == nil {
|
|
req.Header.Set("If-Modified-Since", info.ModTime().UTC().Format(http.TimeFormat))
|
|
}
|
|
if s.cache.bearer != "" {
|
|
req.Header.Set("Authorization", "Bearer "+s.cache.bearer)
|
|
}
|
|
resp, err := s.cache.client.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode == http.StatusNotModified {
|
|
return false
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
// 4xx → ACL revoked or upstream gone; remove local cache.
|
|
if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusNotFound {
|
|
_ = os.Remove(cachePath)
|
|
}
|
|
return false
|
|
}
|
|
if !s.cache.responseCacheable(resp) {
|
|
return false
|
|
}
|
|
// persistOnly resolves the disk path via cachePathFor too — pass
|
|
// the decoded filePath, not the encoded fileURL.
|
|
if err := s.cache.persistOnly(resp, filePath); err != nil {
|
|
slog.Debug("walker persist failed", "path", filePath, "err", err)
|
|
return false
|
|
}
|
|
s.cache.maybeWriteMarker()
|
|
return true
|
|
}
|
|
|
|
// matchSubtree returns the longest configured subtree that's a prefix
|
|
// of urlPath, or "" if none match. "/" matches everything.
|
|
func (s *MirrorScheduler) matchSubtree(urlPath string) string {
|
|
if urlPath == "" {
|
|
urlPath = "/"
|
|
}
|
|
var best string
|
|
for _, st := range s.subtrees {
|
|
if st == "/" {
|
|
if len(best) == 0 {
|
|
best = "/"
|
|
}
|
|
continue
|
|
}
|
|
if strings.HasPrefix(urlPath, st+"/") || urlPath == st {
|
|
if len(st) > len(best) {
|
|
best = st
|
|
}
|
|
}
|
|
}
|
|
return best
|
|
}
|
|
|
|
// loadMirrorState reads the persisted state from disk. Returns an
|
|
// empty (initialized) map on missing/corrupt input.
|
|
func loadMirrorState(path string) map[string]subtreeState {
|
|
state := make(map[string]subtreeState)
|
|
bytes, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return state
|
|
}
|
|
var parsed persistedState
|
|
if err := json.Unmarshal(bytes, &parsed); err != nil {
|
|
return state
|
|
}
|
|
if parsed.Subtrees != nil {
|
|
state = parsed.Subtrees
|
|
}
|
|
return state
|
|
}
|
|
|
|
// persistMirrorState writes the state file atomically (temp + rename).
|
|
// Best-effort: errors are logged but non-fatal.
|
|
func persistMirrorState(path string, state map[string]subtreeState) {
|
|
bytes, err := json.MarshalIndent(persistedState{Subtrees: state}, "", " ")
|
|
if err != nil {
|
|
slog.Debug("marshal mirror state", "err", err)
|
|
return
|
|
}
|
|
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-mirror-state-tmp-*")
|
|
if err != nil {
|
|
slog.Debug("create mirror state tmp", "err", err)
|
|
return
|
|
}
|
|
tmpName := tmp.Name()
|
|
if _, err := tmp.Write(bytes); err != nil {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
return
|
|
}
|
|
if err := tmp.Close(); err != nil {
|
|
_ = os.Remove(tmpName)
|
|
return
|
|
}
|
|
if err := os.Rename(tmpName, path); err != nil {
|
|
_ = os.Remove(tmpName)
|
|
}
|
|
}
|