ZDDC/zddc/internal/cache/cache.go
ZDDC 1402864c4c fix(cache): track background revalidation goroutines; drain on shutdown + in tests
Root cause of the flaky cache tests (TestServeHTTP_DirectoryListingsCachedAsSidecar
and the other hit-path tests, ~1-in-many under parallel load): on a cache
hit, ServeHTTP launches `go c.revalidate(...)` / `go c.revalidateListing(...)`,
which write into the cache root (MkdirAll + CreateTemp + Rename). Those
goroutines outlive the request — and in tests, the test — so they race
t.TempDir's RemoveAll cleanup, recreating the dir or dropping a temp file
mid-removal. testing then reports "TempDir RemoveAll cleanup: ... directory
not empty" and marks the test failed (with a 0.00s body, no assertion line).
It only surfaced under the full parallel suite / -count because the timing
has to collide.

Fix: track these background goroutines in a sync.WaitGroup via a goBackground
helper, and expose Wait(). newTestCache registers t.Cleanup(c.Wait) — cleanups
fire LIFO and t.TempDir registered its RemoveAll first, so the drain runs
before it (upstream Close was registered earliest, so it runs last and stays
up while goroutines finish). runClient also calls cacheLayer.Wait() after
srv.Shutdown so in-flight sidecar writes complete on graceful shutdown rather
than being abandoned.

Verified: cache package at -count=200 reliably failed before, passes clean
after (0 failures, 0 cleanup errors); full `go test ./...` + vet green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 16:21:37 -05:00

926 lines
30 KiB
Go

// Package cache implements zddc-server's client mode: a downstream
// proxy/cache/mirror that runs the same binary against a master.
// Configured via cfg.Upstream (in main.go), the cache layer replaces
// the master-side dispatcher entirely — every incoming request is
// forwarded to the master with the local instance's bearer token, and
// (in cache or mirror mode) the response body is persisted under
// cfg.Root so subsequent requests serve from disk.
//
// The cache directory layout is intentionally a normal ZDDC root: a
// file fetched from `<master>/foo/bar.txt` is stored at `<root>/foo/
// bar.txt`. No sidecar metadata. The local file's mtime is set to the
// upstream's Last-Modified header so revalidation via
// If-Modified-Since reflects the master's notion of the file's age,
// not when the local cache happened to fetch it. Running
// `zddc-server --root <cache-dir>` without --upstream serves the
// cached files as a regular ZDDC — useful for portable offline
// snapshots and sanity-check inspection.
//
// Phase 2 scope: GET/HEAD only. Range requests, stale-while-
// revalidate, and offline-fallback are supported. Directory listings
// are always proxied live (no listing cache yet); writes (PUT / POST /
// DELETE) and the mirror walker land in later phases.
package cache
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"codeberg.org/VARASYS/ZDDC/zddc/internal/config"
)
// MarkerFile records the upstream URL and first-cached-at timestamp
// in the cache root. Prevents accidentally pointing master mode at a
// cache directory and provides provenance for ops/users.
const MarkerFile = ".zddc-upstream"
// HeaderName is the response header that surfaces cache state to the
// client (and the browser-side UI). Values: hit, revalidated, miss,
// proxy, offline.
const HeaderName = "X-ZDDC-Cache"
// listingCachePrefix is the basename prefix used for directory-listing
// sidecar files. Suffix is the format ("html" or "json"). Dot-prefixed
// so the local "serve cache root as plain master" mode hides them via
// the existing dispatch dot-prefix guard.
const listingCachePrefix = ".zddc-listing."
// Cache is the request handler installed in main.go when cfg.Upstream
// is non-empty. It is safe for concurrent ServeHTTP calls.
type Cache struct {
root string // local cache directory (== cfg.Root in client mode)
upstream string // upstream master URL, no trailing slash
bearer string // forwarded as Authorization: Bearer to upstream; "" disables
mode string // "proxy" | "cache" | "mirror"
persist bool // mode != "proxy" — write responses to disk
client *http.Client
markerOnce sync.Once
// wg tracks background goroutines (cache revalidation on hits,
// mirror-walk hooks) so Wait() can drain them. Without this they
// outlive the request — fine in production until a graceful
// shutdown wants them finished, and in tests they race t.TempDir
// cleanup by writing into the cache root after the test returns.
wg sync.WaitGroup
// onAccess is invoked (when non-nil) after a request is dispatched.
// The walker scheduler installs this hook to kick mirror walks based
// on incoming traffic. Always called in a goroutine — must not
// assume it runs before the response completes. Nil in proxy/cache
// modes; set in mirror mode.
onAccess func(urlPath string)
// outbox holds the offline write queue. Set by main.go after
// construction (avoids a circular dep at New time, since Outbox
// needs a *Cache reference). Nil = writes when offline get 503.
outbox *Outbox
}
// SetOutbox installs the offline-write queue. Called once by main.go
// after both Cache and Outbox are built. nil disables outbox-backed
// offline writes.
func (c *Cache) SetOutbox(o *Outbox) { c.outbox = o }
// goBackground runs fn in a tracked goroutine so Wait can drain
// in-flight background work — cache revalidation kicked off on a hit,
// and the mirror-walk access hook. These must never block the user
// response, but they also shouldn't outlive a graceful shutdown (or,
// in tests, a t.TempDir cleanup that they'd race by writing into the
// cache root after the test returns).
func (c *Cache) goBackground(fn func()) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
fn()
}()
}
// Wait blocks until all tracked background goroutines have finished.
// Intended for graceful shutdown; tests call it before the temp-dir
// cleanup runs.
func (c *Cache) Wait() { c.wg.Wait() }
// New constructs a Cache from the loaded configuration. Validates
// upstream URL, reads the bearer-file (if configured), prepares the
// HTTP client honoring SkipTLSVerify, and ensures the cache root
// exists.
func New(cfg config.Config) (*Cache, error) {
if cfg.Upstream == "" {
return nil, fmt.Errorf("cache.New: cfg.Upstream is empty")
}
upstream := strings.TrimRight(cfg.Upstream, "/")
if _, err := url.Parse(upstream); err != nil {
return nil, fmt.Errorf("cache.New: invalid upstream %q: %w", upstream, err)
}
bearer := ""
if cfg.BearerFile != "" {
b, err := os.ReadFile(cfg.BearerFile)
if err != nil {
return nil, fmt.Errorf("cache.New: read bearer file: %w", err)
}
bearer = strings.TrimSpace(string(b))
if bearer == "" {
return nil, fmt.Errorf("cache.New: bearer file %q is empty", cfg.BearerFile)
}
}
transport := &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
}
if cfg.SkipTLSVerify {
// G402 / CWE-295: deliberate. Documented operator opt-in for
// dev/internal-CA scenarios; never the default.
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} //nolint:gosec
slog.Warn("--skip-tls-verify enabled: upstream TLS certificates will NOT be validated")
}
if err := os.MkdirAll(cfg.Root, 0o755); err != nil {
return nil, fmt.Errorf("cache.New: create cache root %q: %w", cfg.Root, err)
}
mode := cfg.Mode
if mode == "" {
mode = "cache"
}
return &Cache{
root: cfg.Root,
upstream: upstream,
bearer: bearer,
mode: mode,
persist: mode != "proxy",
client: &http.Client{
Transport: transport,
Timeout: 60 * time.Second,
// Don't follow redirects automatically — pass them through to
// the client so the browser can update its address bar
// (e.g. master's no-trailing-slash → trailing-slash 301).
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
},
}, nil
}
// Mode returns the configured mode label for diagnostics.
func (c *Cache) Mode() string { return c.mode }
// Upstream returns the upstream master URL for diagnostics.
func (c *Cache) Upstream() string { return c.upstream }
// ServeHTTP is the cache layer's HTTP entry point. Replaces the
// master-side dispatcher in client mode.
func (c *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Writes (PUT / POST / DELETE) flow through handleWrite — try
// upstream live; on transport error, queue in the outbox for
// replay. Phase 4 supports PUT/POST/DELETE; OPTIONS/PATCH still
// 405 since the master doesn't accept them anyway.
switch r.Method {
case http.MethodPut, http.MethodPost, http.MethodDelete:
c.handleWrite(w, r)
return
case http.MethodGet, http.MethodHead:
// fall through to read pipeline
default:
w.Header().Set("Allow", "GET, HEAD, PUT, POST, DELETE")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
// Mirror walker hook: notify the scheduler that this URL was
// touched so it can decide whether to kick a subtree walk. Runs
// after we've started serving — the user's request never blocks
// on walk scheduling.
if c.onAccess != nil {
urlPath := r.URL.Path
c.goBackground(func() { c.onAccess(urlPath) })
}
// Directory listings: try sidecar listing-cache, fall back to
// proxy. The Accept header determines which sidecar (HTML vs
// JSON) to consult — the master serves both formats and the
// browser asks for HTML for navigation, JSON for the inline-JS
// listing fetcher.
if strings.HasSuffix(r.URL.Path, "/") {
c.serveDirectory(w, r)
return
}
// File request — try cache first when persisting.
if c.persist {
if path, ok := c.cachePathFor(r.URL.Path); ok {
info, err := os.Stat(path)
if err == nil && !info.IsDir() {
c.serveFromDisk(w, r, path, info, "hit")
// Background revalidate; never block the user response.
urlPath, mtime := r.URL.Path, info.ModTime()
c.goBackground(func() { c.revalidate(urlPath, mtime) })
return
}
}
}
// Miss (or proxy mode) → forward to upstream and (optionally)
// persist on the way through.
c.proxy(w, r, c.persist)
}
// serveDirectory handles directory requests. Tries the sidecar
// listing cache first (selected by Accept), revalidates in
// background on a hit, falls back to upstream proxy on miss.
func (c *Cache) serveDirectory(w http.ResponseWriter, r *http.Request) {
if c.persist {
if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok {
info, err := os.Stat(path)
if err == nil && !info.IsDir() {
c.serveListingFromDisk(w, r, path, info, "hit")
urlPath, accept, mtime := r.URL.Path, r.Header.Get("Accept"), info.ModTime()
c.goBackground(func() { c.revalidateListing(urlPath, accept, mtime) })
return
}
}
}
c.proxyDirectory(w, r, c.persist)
}
// proxy forwards the request to upstream and serves the response back
// to the client. When writeToCache is true and the response is a
// cacheable 200, the body is also persisted under cfg.Root.
func (c *Cache) proxy(w http.ResponseWriter, r *http.Request, writeToCache bool) {
upReq, err := c.buildUpstreamRequest(r)
if err != nil {
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
return
}
resp, err := c.client.Do(upReq)
if err != nil {
// Network error. If we have a cached copy, serve it stale.
if writeToCache && r.Method == http.MethodGet {
if path, ok := c.cachePathFor(r.URL.Path); ok {
if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() {
c.serveFromDisk(w, r, path, info, "offline")
return
}
}
}
slog.Warn("upstream fetch failed", "url", upReq.URL.String(), "err", err)
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
// Forward upstream response headers. Skip hop-by-hop headers (RFC
// 7230 §6.1) — Go's transport already drops most, but Connection
// and Transfer-Encoding can sneak through and confuse the client.
for k, vv := range resp.Header {
if isHopByHop(k) {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp)
if cacheable {
w.Header().Set(HeaderName, "miss")
} else if writeToCache {
w.Header().Set(HeaderName, "proxy")
} else {
w.Header().Set(HeaderName, "proxy")
}
w.WriteHeader(resp.StatusCode)
if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified {
return
}
if !cacheable {
_, _ = io.Copy(w, resp.Body)
return
}
// Stream body to client AND to a tmp file in the cache; rename
// atomically only on success.
if err := c.streamAndPersist(w, resp, r.URL.Path); err != nil {
// Mid-stream error: the client got a partial body (HTTP-normal),
// and we already abandoned the cache write. Just log.
slog.Debug("stream-and-persist error", "url", r.URL.Path, "err", err)
} else {
c.maybeWriteMarker()
}
}
// buildUpstreamRequest constructs the outbound request preserving the
// path, query, Range, and Accept headers. Adds the bearer if configured.
func (c *Cache) buildUpstreamRequest(r *http.Request) (*http.Request, error) {
target := c.upstream + r.URL.RequestURI()
upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, nil)
if err != nil {
return nil, err
}
// Preserve the Range header for resumable / partial transfers.
if v := r.Header.Get("Range"); v != "" {
upReq.Header.Set("Range", v)
}
if v := r.Header.Get("If-Range"); v != "" {
upReq.Header.Set("If-Range", v)
}
if v := r.Header.Get("Accept"); v != "" {
upReq.Header.Set("Accept", v)
}
if v := r.Header.Get("Accept-Encoding"); v != "" {
upReq.Header.Set("Accept-Encoding", v)
}
upReq.Header.Set("User-Agent", "zddc-server-cache/0.1")
if c.bearer != "" {
upReq.Header.Set("Authorization", "Bearer "+c.bearer)
}
return upReq, nil
}
// responseCacheable reports whether the response body should be
// persisted. Honors Cache-Control: no-store / private and refuses to
// cache responses without a content body (ranges, 204, etc.).
func (c *Cache) responseCacheable(resp *http.Response) bool {
cc := resp.Header.Get("Cache-Control")
low := strings.ToLower(cc)
if strings.Contains(low, "no-store") || strings.Contains(low, "private") {
return false
}
// Don't cache partial-content responses — the server returned 206
// for a Range request, which means the body covers only part of
// the file. Caching that partial body would corrupt subsequent
// non-range fetches.
if resp.StatusCode != http.StatusOK {
return false
}
return true
}
// streamAndPersist writes resp.Body simultaneously to the client and
// to a temp file in the cache. Renames the temp atomically on success.
// Sets the local file's mtime to upstream's Last-Modified (if
// present) so subsequent revalidations send If-Modified-Since with a
// timestamp upstream can compare against its own state.
func (c *Cache) streamAndPersist(w http.ResponseWriter, resp *http.Response, urlPath string) error {
finalPath, ok := c.cachePathFor(urlPath)
if !ok {
_, err := io.Copy(w, resp.Body)
return err
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmpName := tmp.Name()
mw := io.MultiWriter(tmp, w)
if _, err := io.Copy(mw, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// serveFromDisk serves a cached file via http.ServeContent (which
// handles Range requests, If-Modified-Since, and conditional GETs
// natively). cacheState is the X-ZDDC-Cache value to surface.
func (c *Cache) serveFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) {
f, err := os.Open(path)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer f.Close()
w.Header().Set(HeaderName, cacheState)
http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f)
}
// revalidate fires a conditional GET against upstream after a cache
// hit. 304 = no-op (cache is fresh). 200 = update cache. 403/404 =
// purge (ACL revoked or upstream deleted). Network errors are
// swallowed — staleness via offline is the documented behavior.
func (c *Cache) revalidate(urlPath string, mtime time.Time) {
target := c.upstream + urlPath
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return
}
if !mtime.IsZero() {
req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat))
}
if c.bearer != "" {
req.Header.Set("Authorization", "Bearer "+c.bearer)
}
resp, err := c.client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusNotModified:
return
case http.StatusOK:
if !c.responseCacheable(resp) {
return
}
if err := c.persistOnly(resp, urlPath); err != nil {
slog.Debug("revalidate persist error", "url", urlPath, "err", err)
}
case http.StatusForbidden, http.StatusNotFound:
if path, ok := c.cachePathFor(urlPath); ok {
_ = os.Remove(path)
slog.Info("purged cached entry after upstream 4xx", "url", urlPath, "status", resp.StatusCode)
}
}
}
// persistOnly writes resp.Body to the cache without forwarding it
// anywhere. Used by revalidate (the user's request was already served
// from disk; we just refresh the cache in the background).
func (c *Cache) persistOnly(resp *http.Response, urlPath string) error {
finalPath, ok := c.cachePathFor(urlPath)
if !ok {
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
tmpName := tmp.Name()
if _, err := io.Copy(tmp, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// cachePathForURI is cachePathFor with the query string stripped —
// the cache is keyed by path only. Used by the outbox to map a
// queued write back to its cached file.
func (c *Cache) cachePathForURI(rawURI string) (string, bool) {
if i := strings.Index(rawURI, "?"); i >= 0 {
rawURI = rawURI[:i]
}
return c.cachePathFor(rawURI)
}
// cachePathFor maps a URL path to a local filesystem path under the
// cache root. Returns ok=false on inputs that would escape the root,
// reserve a marker filename, or otherwise be unsafe to write.
func (c *Cache) cachePathFor(urlPath string) (string, bool) {
if urlPath == "" || urlPath == "/" {
return "", false
}
if strings.Contains(urlPath, "..") {
return "", false
}
clean := filepath.FromSlash(strings.TrimPrefix(urlPath, "/"))
abs := filepath.Join(c.root, clean)
if !strings.HasPrefix(abs, c.root+string(filepath.Separator)) && abs != c.root {
return "", false
}
// Don't let URLs collide with internal markers.
if filepath.Base(abs) == MarkerFile {
return "", false
}
return abs, true
}
// maybeWriteMarker writes the .zddc-upstream provenance file once,
// the first time the cache stores anything. Best-effort: an error
// here doesn't fail the request.
func (c *Cache) maybeWriteMarker() {
c.markerOnce.Do(func() {
marker := filepath.Join(c.root, MarkerFile)
if _, err := os.Stat(marker); err == nil {
return
}
body := fmt.Sprintf("upstream: %s\nfirst_cached: %s\nmode: %s\n",
c.upstream, time.Now().UTC().Format(time.RFC3339), c.mode)
_ = os.WriteFile(marker, []byte(body), 0o644)
})
}
// handleWrite proxies a write request (PUT / POST / DELETE) to
// upstream. On a transport error, the request is captured in the
// outbox (if configured) and the client gets 202 Accepted with a
// JSON envelope describing the queued entry. Online HTTP errors
// (4xx/5xx from the master) are forwarded verbatim.
//
// PUT/DELETE include an If-Unmodified-Since precondition derived
// from the local cache file's mtime so the master can reject the
// write if its file changed since we observed it. POST never sends
// a precondition (POST semantics are application-defined).
func (c *Cache) handleWrite(w http.ResponseWriter, r *http.Request) {
// Capture base mtime for PUT/DELETE so replay can use the same
// precondition we'd send live.
var baseModTime time.Time
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
if path, ok := c.cachePathFor(r.URL.Path); ok {
if info, err := os.Stat(path); err == nil && !info.IsDir() {
baseModTime = info.ModTime()
}
}
}
// We may need to send the body to upstream AND then queue it on
// network failure. Buffer up to MaxOutboxBodyBytes so we can
// rewind. Larger requests stream straight through (no offline
// recovery for those; the user just gets 503).
bodyBytes, bodyErr := readBoundedBody(r)
if bodyErr != nil {
http.Error(w, "Bad Request: "+bodyErr.Error(), http.StatusRequestEntityTooLarge)
return
}
upReq, err := c.buildUpstreamWriteRequest(r, bodyBytes, baseModTime)
if err != nil {
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
return
}
resp, err := c.client.Do(upReq)
if err != nil {
// Transport error → queue if outbox is available.
if c.outbox != nil {
r.Body = io.NopCloser(bytesReader(bodyBytes))
entry, qErr := c.outbox.Enqueue(r, baseModTime)
if qErr != nil {
slog.Warn("outbox enqueue failed", "err", qErr)
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable and outbox unavailable", http.StatusServiceUnavailable)
return
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set(HeaderName, "queued")
w.WriteHeader(http.StatusAccepted)
_ = json.NewEncoder(w).Encode(map[string]any{
"queued": true,
"outbox_id": entry.ID,
"method": entry.Method,
"uri": entry.RawURI,
"queued_at": entry.QueuedAt,
"base_mtime": entry.BaseModTime,
})
return
}
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
// Forward upstream's response verbatim.
for k, vv := range resp.Header {
if isHopByHop(k) {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.Header().Set(HeaderName, "proxy")
w.WriteHeader(resp.StatusCode)
_, _ = io.Copy(w, resp.Body)
// On a successful write, drop the cached entry so the next read
// fetches fresh upstream content (which now includes the user's
// change). For PUT we could be smarter (write the new body
// directly to cache) but eviction is simplest.
if c.persist && resp.StatusCode >= 200 && resp.StatusCode < 300 {
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
if path, ok := c.cachePathFor(r.URL.Path); ok {
_ = os.Remove(path)
}
}
}
}
// buildUpstreamWriteRequest constructs the outbound write request
// with the buffered body, content-type passthrough, bearer, and
// (for PUT/DELETE with a base mtime) an If-Unmodified-Since
// precondition.
func (c *Cache) buildUpstreamWriteRequest(r *http.Request, body []byte, baseModTime time.Time) (*http.Request, error) {
target := c.upstream + r.URL.RequestURI()
upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytesReader(body))
if err != nil {
return nil, err
}
if ct := r.Header.Get("Content-Type"); ct != "" {
upReq.Header.Set("Content-Type", ct)
}
if !baseModTime.IsZero() && (r.Method == http.MethodPut || r.Method == http.MethodDelete) {
upReq.Header.Set("If-Unmodified-Since", baseModTime.UTC().Format(http.TimeFormat))
}
if c.bearer != "" {
upReq.Header.Set("Authorization", "Bearer "+c.bearer)
}
upReq.ContentLength = int64(len(body))
return upReq, nil
}
// readBoundedBody slurps the request body up to MaxOutboxBodyBytes.
// Errors out (413) if the body exceeds the cap so we never partial-
// queue. Returns nil for empty bodies.
func readBoundedBody(r *http.Request) ([]byte, error) {
if r.Body == nil {
return nil, nil
}
defer r.Body.Close()
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
return io.ReadAll(limited)
}
// bytesReader is a tiny helper that returns a *bytes.Reader without
// importing bytes everywhere — keeps build/grep noise down.
func bytesReader(b []byte) *bytesReaderType { return &bytesReaderType{r: b} }
type bytesReaderType struct {
r []byte
i int
}
func (b *bytesReaderType) Read(p []byte) (int, error) {
if b.i >= len(b.r) {
return 0, io.EOF
}
n := copy(p, b.r[b.i:])
b.i += n
return n, nil
}
// listingFormat collapses an Accept header to "json" or "html". Mirrors
// the master's content-negotiation: anything Accept-ing application/
// json wins JSON, otherwise HTML.
func listingFormat(accept string) string {
if strings.Contains(strings.ToLower(accept), "application/json") {
return "json"
}
return "html"
}
// listingCachePathFor returns the on-disk sidecar path for a cached
// directory listing. urlPath is the directory URL (always ending in
// "/"); accept is the request's Accept header.
func (c *Cache) listingCachePathFor(urlPath, accept string) (string, bool) {
if !strings.HasSuffix(urlPath, "/") {
return "", false
}
if strings.Contains(urlPath, "..") {
return "", false
}
rel := filepath.FromSlash(strings.Trim(urlPath, "/"))
dir := filepath.Join(c.root, rel)
if !strings.HasPrefix(dir, c.root) {
return "", false
}
return filepath.Join(dir, listingCachePrefix+listingFormat(accept)), true
}
// serveListingFromDisk serves a cached directory listing. Like
// serveFromDisk but always sets Content-Type from the format suffix
// (the master's headers may have been gzip-stripped or otherwise
// massaged by the time we cached them).
func (c *Cache) serveListingFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) {
f, err := os.Open(path)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer f.Close()
if strings.HasSuffix(path, ".json") {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
} else {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
}
w.Header().Set(HeaderName, cacheState)
http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f)
}
// proxyDirectory is proxy() with directory-specific persistence: the
// listing sidecar (selected by Accept) is the cache target instead of
// the literal URL path. Falls back to serving stale on network error.
func (c *Cache) proxyDirectory(w http.ResponseWriter, r *http.Request, writeToCache bool) {
upReq, err := c.buildUpstreamRequest(r)
if err != nil {
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
return
}
resp, err := c.client.Do(upReq)
if err != nil {
if writeToCache && r.Method == http.MethodGet {
if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok {
if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() {
c.serveListingFromDisk(w, r, path, info, "offline")
return
}
}
}
slog.Warn("upstream directory fetch failed", "url", upReq.URL.String(), "err", err)
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
for k, vv := range resp.Header {
if isHopByHop(k) {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp)
if cacheable {
w.Header().Set(HeaderName, "miss")
} else {
w.Header().Set(HeaderName, "proxy")
}
w.WriteHeader(resp.StatusCode)
if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified {
return
}
if !cacheable {
_, _ = io.Copy(w, resp.Body)
return
}
if err := c.streamAndPersistListing(w, resp, r.URL.Path, r.Header.Get("Accept")); err != nil {
slog.Debug("listing stream-and-persist error", "url", r.URL.Path, "err", err)
} else {
c.maybeWriteMarker()
}
}
// streamAndPersistListing is streamAndPersist for directory listings,
// resolving the cache target via listingCachePathFor (Accept-aware).
func (c *Cache) streamAndPersistListing(w http.ResponseWriter, resp *http.Response, urlPath, accept string) error {
finalPath, ok := c.listingCachePathFor(urlPath, accept)
if !ok {
_, err := io.Copy(w, resp.Body)
return err
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmpName := tmp.Name()
mw := io.MultiWriter(tmp, w)
if _, err := io.Copy(mw, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// revalidateListing is the listing-cache analogue of revalidate.
func (c *Cache) revalidateListing(urlPath, accept string, mtime time.Time) {
target := c.upstream + urlPath
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return
}
if !mtime.IsZero() {
req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat))
}
if accept != "" {
req.Header.Set("Accept", accept)
}
if c.bearer != "" {
req.Header.Set("Authorization", "Bearer "+c.bearer)
}
resp, err := c.client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusNotModified:
return
case http.StatusOK:
if !c.responseCacheable(resp) {
return
}
path, ok := c.listingCachePathFor(urlPath, accept)
if !ok {
return
}
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return
}
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*")
if err != nil {
return
}
tmpName := tmp.Name()
if _, err := io.Copy(tmp, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return
}
_ = tmp.Close()
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
_ = os.Rename(tmpName, path)
case http.StatusForbidden, http.StatusNotFound:
if path, ok := c.listingCachePathFor(urlPath, accept); ok {
_ = os.Remove(path)
}
}
}
// isHopByHop reports whether a header name is hop-by-hop per RFC 7230
// §6.1 — these must not be forwarded by a proxy.
func isHopByHop(name string) bool {
switch http.CanonicalHeaderKey(name) {
case "Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te",
"Trailer",
"Transfer-Encoding",
"Upgrade":
return true
}
return false
}