diff --git a/zddc/internal/cache/outbox.go b/zddc/internal/cache/outbox.go index 9b85052..47b2cc5 100644 --- a/zddc/internal/cache/outbox.go +++ b/zddc/internal/cache/outbox.go @@ -79,6 +79,12 @@ type Outbox struct { cache *Cache dir string mu sync.Mutex // serializes file creation; replay holds it briefly per-entry + + // wake is a non-blocking signal channel. Enqueue posts to it + // (drop-on-full) so RunReplayLoop reacts to new entries within + // ms, not after its idle-poll interval. Buffered=1 so a burst of + // enqueues collapses to a single wake. + wake chan struct{} } // NewOutbox opens (and creates if missing) the outbox directory under @@ -89,7 +95,7 @@ func NewOutbox(c *Cache) (*Outbox, error) { return nil, fmt.Errorf("create outbox dir: %w", err) } _ = os.Chmod(dir, 0o700) - return &Outbox{cache: c, dir: dir}, nil + return &Outbox{cache: c, dir: dir, wake: make(chan struct{}, 1)}, nil } // Dir exposes the on-disk path for tests / diagnostics. @@ -152,6 +158,13 @@ func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, _ = os.RemoveAll(entryDir) return nil, err } + // Signal the replay loop. Non-blocking — buffered=1 collapses + // a burst of enqueues to a single wake. If the loop is busy + // replaying, our entry will be picked up on the next pass. + select { + case o.wake <- struct{}{}: + default: + } return &entry, nil } @@ -336,6 +349,11 @@ func (o *Outbox) markConflict(id string) error { // schedule. Stops when ctx is cancelled. Safe to call once per // process; not designed for multiple concurrent loops over the same // outbox. +// +// Wakes on three signals: +// - timer (ReplayInterval when entries pending, ReplayIdleInterval when idle) +// - Enqueue posting to o.wake (instant reaction to new offline writes) +// - context cancel (shutdown) func (o *Outbox) RunReplayLoop(ctx context.Context) { // Eagerly attempt replay at startup so a re-launched client // catches up before any user request fires. @@ -351,15 +369,26 @@ func (o *Outbox) RunReplayLoop(ctx context.Context) { return case <-time.After(interval): o.tryReplay(ctx) + case <-o.wake: + // New entry arrived while we were idle. Replay + // immediately rather than waiting for the timer. + o.tryReplay(ctx) } } } // tryReplay is a single replay pass with a bounded timeout. Errors // logged at debug; per-entry outcomes logged inside Replay/replayOne. +// The pass itself logs at info when it runs against >0 entries so an +// operator watching the log sees the replay loop is alive even if +// every attempt defers. func (o *Outbox) tryReplay(ctx context.Context) { tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() + pending, _ := o.Pending() + if len(pending) > 0 { + slog.Info("outbox: replay attempt", "pending", len(pending)) + } replayed, conflicts, err := o.Replay(tCtx) if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { slog.Debug("outbox: replay error", "err", err) diff --git a/zddc/internal/cache/walker.go b/zddc/internal/cache/walker.go index 62a85f4..afdfd17 100644 --- a/zddc/internal/cache/walker.go +++ b/zddc/internal/cache/walker.go @@ -172,13 +172,23 @@ func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walk var fetchWG sync.WaitGroup - var walkDir func(dirURL string) error - walkDir = func(dirURL string) error { + // walkDir keeps two parallel views of the directory's path: + // dirURL — URL-encoded form, used to compose HTTP requests + // dirPath — decoded form, used as the filesystem cache key. + // Filenames with spaces / parens / etc. need URL escaping for + // HTTP but must NOT land on disk percent-encoded — otherwise the + // orphan-purge pass below can't match local entries against the + // upstream's (decoded) listing names. + var walkDir func(dirURL, dirPath string) error + walkDir = func(dirURL, dirPath string) error { if !strings.HasSuffix(dirURL, "/") { dirURL += "/" } + if !strings.HasSuffix(dirPath, "/") { + dirPath += "/" + } addDirs() - entries, err := s.fetchListing(ctx, dirURL) + entries, err := s.fetchListing(ctx, dirURL, dirPath) if err != nil { return err } @@ -195,10 +205,13 @@ func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walk if ctx.Err() != nil { return ctx.Err() } - childURL := dirURL + url.PathEscape(strings.TrimSuffix(e.Name, "/")) + name := strings.TrimSuffix(e.Name, "/") + childURL := dirURL + url.PathEscape(name) + childPath := dirPath + name if e.IsDir { childURL += "/" - if err := walkDir(childURL); err != nil { + childPath += "/" + if err := walkDir(childURL, childPath); err != nil { slog.Debug("walk subdir failed", "url", childURL, "err", err) } continue @@ -206,33 +219,34 @@ func (s *MirrorScheduler) walkSubtree(ctx context.Context, subtree string) (walk addFiles() fetchWG.Add(1) sem <- struct{}{} - go func(fileURL string) { + go func(fileURL, filePath string) { defer fetchWG.Done() defer func() { <-sem }() - if s.fetchFileIfNeeded(ctx, fileURL) { + if s.fetchFileIfNeeded(ctx, fileURL, filePath) { addFetched() } - }(childURL) + }(childURL, childPath) } // Purge local files no longer present upstream. Only acts on // regular files inside this directory — listing sidecars, // dot-prefix internals, and subdirectories are left alone. - s.purgeOrphans(dirURL, upstreamNames, addPurged) + s.purgeOrphans(dirPath, upstreamNames, addPurged) return nil } - err := walkDir(subtree + "/") + err := walkDir(subtree+"/", subtree+"/") fetchWG.Wait() return stats, err } -// purgeOrphans removes local cached files in dirURL that are absent -// from the upstream listing. Only operates on plain files; ignores -// dot-prefix entries (cache state) and subdirectories (they get their -// own pass during walkSubtree's recursion). -func (s *MirrorScheduler) purgeOrphans(dirURL string, upstreamNames map[string]bool, addPurged func()) { - rel := filepath.FromSlash(strings.Trim(dirURL, "/")) +// purgeOrphans removes local cached files in dirPath (decoded URL +// path) that are absent from the upstream listing. Only operates on +// plain files; ignores dot-prefix entries (cache state) and +// subdirectories (they get their own pass during walkSubtree's +// recursion). +func (s *MirrorScheduler) purgeOrphans(dirPath string, upstreamNames map[string]bool, addPurged func()) { + rel := filepath.FromSlash(strings.Trim(dirPath, "/")) localDir := filepath.Join(s.cache.root, rel) entries, err := os.ReadDir(localDir) if err != nil { @@ -251,14 +265,14 @@ func (s *MirrorScheduler) purgeOrphans(dirURL string, upstreamNames map[string]b } _ = os.Remove(filepath.Join(localDir, name)) addPurged() - slog.Info("purged orphan", "path", path.Join(dirURL, name)) + slog.Info("purged orphan", "path", path.Join(dirPath, name)) } } -// fetchListing GETs the upstream JSON listing for dirURL and parses -// the entries. Caches the JSON sidecar as a side-effect (so offline -// directory browsing works for walked subtrees). -func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL string) ([]listingEntry, error) { +// fetchListing GETs the upstream JSON listing for dirURL (encoded) +// and parses the entries. Caches the JSON sidecar at dirPath +// (decoded) as a side-effect so the directory is browsable offline. +func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL, dirPath string) ([]listingEntry, error) { target := s.cache.upstream + dirURL req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) if err != nil { @@ -281,7 +295,9 @@ func (s *MirrorScheduler) fetchListing(ctx context.Context, dirURL string) ([]li return nil, fmt.Errorf("read listing body: %w", err) } // Persist the JSON sidecar so the directory is browsable offline. - if path, ok := s.cache.listingCachePathFor(dirURL, "application/json"); ok { + // listingCachePathFor expects a decoded directory URL (keyed by + // FS path), so dirPath rather than dirURL here. + if path, ok := s.cache.listingCachePathFor(dirPath, "application/json"); ok { if err := os.MkdirAll(filepath.Dir(path), 0o755); err == nil { tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*") if err == nil { @@ -316,11 +332,12 @@ type listingEntry struct { IsDir bool `json:"is_dir"` } -// fetchFileIfNeeded conditional-GETs a file URL. Returns true when -// the cache was updated (200 + write); false when no work was needed -// (304 or skipped). -func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL string) bool { - cachePath, ok := s.cache.cachePathFor(fileURL) +// fetchFileIfNeeded conditional-GETs a file URL. fileURL is the +// URL-encoded HTTP target; filePath is the decoded URL path used as +// the on-disk cache key. Returns true when the cache was updated +// (200 + write); false when no work was needed (304 or skipped). +func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL, filePath string) bool { + cachePath, ok := s.cache.cachePathFor(filePath) if !ok { return false } @@ -353,8 +370,10 @@ func (s *MirrorScheduler) fetchFileIfNeeded(ctx context.Context, fileURL string) if !s.cache.responseCacheable(resp) { return false } - if err := s.cache.persistOnly(resp, fileURL); err != nil { - slog.Debug("walker persist failed", "url", fileURL, "err", err) + // persistOnly resolves the disk path via cachePathFor too — pass + // the decoded filePath, not the encoded fileURL. + if err := s.cache.persistOnly(resp, filePath); err != nil { + slog.Debug("walker persist failed", "path", filePath, "err", err) return false } s.cache.maybeWriteMarker() diff --git a/zddc/internal/handler/fileapi.go b/zddc/internal/handler/fileapi.go index 25c2e38..fc4004b 100644 --- a/zddc/internal/handler/fileapi.go +++ b/zddc/internal/handler/fileapi.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "strings" + "time" "codeberg.org/VARASYS/ZDDC/zddc/internal/config" "codeberg.org/VARASYS/ZDDC/zddc/internal/policy" @@ -239,7 +240,18 @@ func fileETagOnDisk(absPath string) (string, error) { // exist, the wildcard "*" form fails (per RFC) but a specific ETag is // treated as a no-current-file hit (412). This distinguishes // create-new from update-existing semantically. +// +// Also honors If-Unmodified-Since (RFC 7232 §3.4): the request fails +// with 412 if the current file's mtime is strictly later than the +// header value. Used by the cache layer's offline-write outbox to +// detect concurrent modifications without ETag round-trips — the +// cached file's mtime (set from upstream's Last-Modified) becomes the +// base for the precondition. Either header (or both) can be present; +// both must pass. func checkIfMatch(w http.ResponseWriter, r *http.Request, absPath string) bool { + if !checkIfUnmodifiedSince(w, r, absPath) { + return false + } header := strings.TrimSpace(r.Header.Get("If-Match")) if header == "" { return true @@ -264,6 +276,40 @@ func checkIfMatch(w http.ResponseWriter, r *http.Request, absPath string) bool { return true } +// checkIfUnmodifiedSince evaluates the RFC 7232 §3.4 precondition. +// Returns true (pass) when the header is absent or unparseable, when +// the target file does not exist, or when the file's current mtime +// is at or before the header value. Returns false (fail) and writes a +// 412 response when the file has been modified after the header time. +// +// mtime comparison uses the file's mod time truncated to whole +// seconds — HTTP-Date format has 1-second resolution, so a finer +// comparison would spuriously fail on filesystems that retain ns +// precision. "After" therefore means strictly greater than the +// header value at second resolution. +func checkIfUnmodifiedSince(w http.ResponseWriter, r *http.Request, absPath string) bool { + header := strings.TrimSpace(r.Header.Get("If-Unmodified-Since")) + if header == "" { + return true + } + since, err := http.ParseTime(header) + if err != nil { + // Per RFC 7232: if the header value is unparseable, ignore. + return true + } + info, err := os.Stat(absPath) + if err != nil { + // Missing file → no resource to compare against. Pass. + return true + } + current := info.ModTime().Truncate(time.Second) + if current.After(since.Truncate(time.Second)) { + http.Error(w, "Precondition Failed — If-Unmodified-Since: file modified at "+current.UTC().Format(http.TimeFormat), http.StatusPreconditionFailed) + return false + } + return true +} + func serveFilePut(cfg config.Config, w http.ResponseWriter, r *http.Request) { abs, cleanURL, ok, status, msg := resolveTargetPath(cfg, r.URL.Path) if !ok {