ZDDC/zddc/internal/cache/cache.go
ZDDC 8a049ca2a4 feat(client): outbox — offline write queue + replay with If-Unmodified-Since
PUT / POST / DELETE in client mode now work end-to-end. Online: the
cache layer forwards to upstream and (on success) drops any cached
entry for the path so the next read fetches fresh. PUT/DELETE include
If-Unmodified-Since derived from the cached file's mtime so the master
can reject conflicting writes with 412 Precondition Failed.

When upstream is unreachable, the request is captured in the outbox
at <root>/.zddc-outbox/<id>/ — directory per queued write, mode 0700,
containing meta.json (method, RawURI, Content-Type, base mtime,
queued-at) and body.bin (request body, capped at 256 MiB). The client
gets 202 Accepted + X-ZDDC-Cache: queued and a JSON envelope.

A background replay loop started by runClient processes the queue:
- 2xx → delete entry; drop cached path so next read fetches fresh
- 412 → rename to <id>.conflict-<RFC3339>/ for manual reconciliation
       (body + meta intact for inspection or re-submit)
- 4xx other → drop (retry won't help; logged at WARN)
- 5xx / transport error → leave for next pass

Replay schedule: eager at startup, then 30s while pending falling
back to 5min while idle. Loop honors graceful-shutdown context.
Disabled in --mode=proxy (proxy persists nothing by design — offline
writes return 503 instead of queueing).

Outbox IDs are <unix-nano-base16>-<hex-random> so lex-sort = queue
order; concurrent enqueues never collide. Conflict-rename appends a
4-char random suffix on the unlikely same-second collision.

The local cache is intentionally not updated for offline writes:
until upstream confirms the user reads still see the upstream-cached
version (or 503 if uncached). Trade-off: no "did my queued write
actually win?" ambiguity, at the cost of not seeing one's own
offline edits immediately. Phase 5 will surface .conflict-<ts>/
directories in browse views.

Tests (20 new in outbox_test.go, 5 new in cache_test.go covering
the write path): NewOutbox creates 0700 dir, Enqueue persists meta
+ body, Pending returns lex-sorted entries excluding conflicts,
Replay deletes on 2xx / renames on 412 / leaves on transport error
/ leaves on 5xx / drops on 4xx-other, IUS sent only for PUT/DELETE
with base mtime, query string preserved, ServeHTTP online write
forwards + evicts cache, ServeHTTP offline write queues with 202,
ServeHTTP offline + no outbox returns 503, ServeHTTP PUT sends IUS
from cached mtime, oversize body rejected, IDs lex-sortable,
RunReplayLoop stops on context cancel, concurrent Enqueue 30×
no collisions. Full suite + go vet clean.

Doc updates: zddc/README.md gains a "Writes (online + offline
outbox)" subsection covering both paths and replay outcomes;
"What client mode is NOT, yet" now lists only conflict UI and
multi-tenancy. AGENTS.md client-mode pipeline gains writes +
mirror-mode bullets. ARCHITECTURE.md adds a "Writes: outbox +
offline replay" subsection with the trade-off rationale and the
phase-5-deferred conflict UI hand-off.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 08:20:07 -05:00

897 lines
29 KiB
Go

// Package cache implements zddc-server's client mode: a downstream
// proxy/cache/mirror that runs the same binary against a master.
// Configured via cfg.Upstream (in main.go), the cache layer replaces
// the master-side dispatcher entirely — every incoming request is
// forwarded to the master with the local instance's bearer token, and
// (in cache or mirror mode) the response body is persisted under
// cfg.Root so subsequent requests serve from disk.
//
// The cache directory layout is intentionally a normal ZDDC root: a
// file fetched from `<master>/foo/bar.txt` is stored at `<root>/foo/
// bar.txt`. No sidecar metadata. The local file's mtime is set to the
// upstream's Last-Modified header so revalidation via
// If-Modified-Since reflects the master's notion of the file's age,
// not when the local cache happened to fetch it. Running
// `zddc-server --root <cache-dir>` without --upstream serves the
// cached files as a regular ZDDC — useful for portable offline
// snapshots and sanity-check inspection.
//
// Phase 2 scope: GET/HEAD only. Range requests, stale-while-
// revalidate, and offline-fallback are supported. Directory listings
// are always proxied live (no listing cache yet); writes (PUT / POST /
// DELETE) and the mirror walker land in later phases.
package cache
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"codeberg.org/VARASYS/ZDDC/zddc/internal/config"
)
// MarkerFile records the upstream URL and first-cached-at timestamp
// in the cache root. Prevents accidentally pointing master mode at a
// cache directory and provides provenance for ops/users.
const MarkerFile = ".zddc-upstream"
// HeaderName is the response header that surfaces cache state to the
// client (and the browser-side UI). Values: hit, revalidated, miss,
// proxy, offline.
const HeaderName = "X-ZDDC-Cache"
// listingCachePrefix is the basename prefix used for directory-listing
// sidecar files. Suffix is the format ("html" or "json"). Dot-prefixed
// so the local "serve cache root as plain master" mode hides them via
// the existing dispatch dot-prefix guard.
const listingCachePrefix = ".zddc-listing."
// Cache is the request handler installed in main.go when cfg.Upstream
// is non-empty. It is safe for concurrent ServeHTTP calls.
type Cache struct {
root string // local cache directory (== cfg.Root in client mode)
upstream string // upstream master URL, no trailing slash
bearer string // forwarded as Authorization: Bearer to upstream; "" disables
mode string // "proxy" | "cache" | "mirror"
persist bool // mode != "proxy" — write responses to disk
client *http.Client
markerOnce sync.Once
// onAccess is invoked (when non-nil) after a request is dispatched.
// The walker scheduler installs this hook to kick mirror walks based
// on incoming traffic. Always called in a goroutine — must not
// assume it runs before the response completes. Nil in proxy/cache
// modes; set in mirror mode.
onAccess func(urlPath string)
// outbox holds the offline write queue. Set by main.go after
// construction (avoids a circular dep at New time, since Outbox
// needs a *Cache reference). Nil = writes when offline get 503.
outbox *Outbox
}
// SetOutbox installs the offline-write queue. Called once by main.go
// after both Cache and Outbox are built. nil disables outbox-backed
// offline writes.
func (c *Cache) SetOutbox(o *Outbox) { c.outbox = o }
// New constructs a Cache from the loaded configuration. Validates
// upstream URL, reads the bearer-file (if configured), prepares the
// HTTP client honoring SkipTLSVerify, and ensures the cache root
// exists.
func New(cfg config.Config) (*Cache, error) {
if cfg.Upstream == "" {
return nil, fmt.Errorf("cache.New: cfg.Upstream is empty")
}
upstream := strings.TrimRight(cfg.Upstream, "/")
if _, err := url.Parse(upstream); err != nil {
return nil, fmt.Errorf("cache.New: invalid upstream %q: %w", upstream, err)
}
bearer := ""
if cfg.BearerFile != "" {
b, err := os.ReadFile(cfg.BearerFile)
if err != nil {
return nil, fmt.Errorf("cache.New: read bearer file: %w", err)
}
bearer = strings.TrimSpace(string(b))
if bearer == "" {
return nil, fmt.Errorf("cache.New: bearer file %q is empty", cfg.BearerFile)
}
}
transport := &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
}
if cfg.SkipTLSVerify {
// G402 / CWE-295: deliberate. Documented operator opt-in for
// dev/internal-CA scenarios; never the default.
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} //nolint:gosec
slog.Warn("--skip-tls-verify enabled: upstream TLS certificates will NOT be validated")
}
if err := os.MkdirAll(cfg.Root, 0o755); err != nil {
return nil, fmt.Errorf("cache.New: create cache root %q: %w", cfg.Root, err)
}
mode := cfg.Mode
if mode == "" {
mode = "cache"
}
return &Cache{
root: cfg.Root,
upstream: upstream,
bearer: bearer,
mode: mode,
persist: mode != "proxy",
client: &http.Client{
Transport: transport,
Timeout: 60 * time.Second,
// Don't follow redirects automatically — pass them through to
// the client so the browser can update its address bar
// (e.g. master's no-trailing-slash → trailing-slash 301).
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
},
}, nil
}
// Mode returns the configured mode label for diagnostics.
func (c *Cache) Mode() string { return c.mode }
// Upstream returns the upstream master URL for diagnostics.
func (c *Cache) Upstream() string { return c.upstream }
// ServeHTTP is the cache layer's HTTP entry point. Replaces the
// master-side dispatcher in client mode.
func (c *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Writes (PUT / POST / DELETE) flow through handleWrite — try
// upstream live; on transport error, queue in the outbox for
// replay. Phase 4 supports PUT/POST/DELETE; OPTIONS/PATCH still
// 405 since the master doesn't accept them anyway.
switch r.Method {
case http.MethodPut, http.MethodPost, http.MethodDelete:
c.handleWrite(w, r)
return
case http.MethodGet, http.MethodHead:
// fall through to read pipeline
default:
w.Header().Set("Allow", "GET, HEAD, PUT, POST, DELETE")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
// Mirror walker hook: notify the scheduler that this URL was
// touched so it can decide whether to kick a subtree walk. Runs
// after we've started serving — the user's request never blocks
// on walk scheduling.
if c.onAccess != nil {
go c.onAccess(r.URL.Path)
}
// Directory listings: try sidecar listing-cache, fall back to
// proxy. The Accept header determines which sidecar (HTML vs
// JSON) to consult — the master serves both formats and the
// browser asks for HTML for navigation, JSON for the inline-JS
// listing fetcher.
if strings.HasSuffix(r.URL.Path, "/") {
c.serveDirectory(w, r)
return
}
// File request — try cache first when persisting.
if c.persist {
if path, ok := c.cachePathFor(r.URL.Path); ok {
info, err := os.Stat(path)
if err == nil && !info.IsDir() {
c.serveFromDisk(w, r, path, info, "hit")
// Background revalidate; never block the user response.
go c.revalidate(r.URL.Path, info.ModTime())
return
}
}
}
// Miss (or proxy mode) → forward to upstream and (optionally)
// persist on the way through.
c.proxy(w, r, c.persist)
}
// serveDirectory handles directory requests. Tries the sidecar
// listing cache first (selected by Accept), revalidates in
// background on a hit, falls back to upstream proxy on miss.
func (c *Cache) serveDirectory(w http.ResponseWriter, r *http.Request) {
if c.persist {
if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok {
info, err := os.Stat(path)
if err == nil && !info.IsDir() {
c.serveListingFromDisk(w, r, path, info, "hit")
go c.revalidateListing(r.URL.Path, r.Header.Get("Accept"), info.ModTime())
return
}
}
}
c.proxyDirectory(w, r, c.persist)
}
// proxy forwards the request to upstream and serves the response back
// to the client. When writeToCache is true and the response is a
// cacheable 200, the body is also persisted under cfg.Root.
func (c *Cache) proxy(w http.ResponseWriter, r *http.Request, writeToCache bool) {
upReq, err := c.buildUpstreamRequest(r)
if err != nil {
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
return
}
resp, err := c.client.Do(upReq)
if err != nil {
// Network error. If we have a cached copy, serve it stale.
if writeToCache && r.Method == http.MethodGet {
if path, ok := c.cachePathFor(r.URL.Path); ok {
if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() {
c.serveFromDisk(w, r, path, info, "offline")
return
}
}
}
slog.Warn("upstream fetch failed", "url", upReq.URL.String(), "err", err)
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
// Forward upstream response headers. Skip hop-by-hop headers (RFC
// 7230 §6.1) — Go's transport already drops most, but Connection
// and Transfer-Encoding can sneak through and confuse the client.
for k, vv := range resp.Header {
if isHopByHop(k) {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp)
if cacheable {
w.Header().Set(HeaderName, "miss")
} else if writeToCache {
w.Header().Set(HeaderName, "proxy")
} else {
w.Header().Set(HeaderName, "proxy")
}
w.WriteHeader(resp.StatusCode)
if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified {
return
}
if !cacheable {
_, _ = io.Copy(w, resp.Body)
return
}
// Stream body to client AND to a tmp file in the cache; rename
// atomically only on success.
if err := c.streamAndPersist(w, resp, r.URL.Path); err != nil {
// Mid-stream error: the client got a partial body (HTTP-normal),
// and we already abandoned the cache write. Just log.
slog.Debug("stream-and-persist error", "url", r.URL.Path, "err", err)
} else {
c.maybeWriteMarker()
}
}
// buildUpstreamRequest constructs the outbound request preserving the
// path, query, Range, and Accept headers. Adds the bearer if configured.
func (c *Cache) buildUpstreamRequest(r *http.Request) (*http.Request, error) {
target := c.upstream + r.URL.RequestURI()
upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, nil)
if err != nil {
return nil, err
}
// Preserve the Range header for resumable / partial transfers.
if v := r.Header.Get("Range"); v != "" {
upReq.Header.Set("Range", v)
}
if v := r.Header.Get("If-Range"); v != "" {
upReq.Header.Set("If-Range", v)
}
if v := r.Header.Get("Accept"); v != "" {
upReq.Header.Set("Accept", v)
}
if v := r.Header.Get("Accept-Encoding"); v != "" {
upReq.Header.Set("Accept-Encoding", v)
}
upReq.Header.Set("User-Agent", "zddc-server-cache/0.1")
if c.bearer != "" {
upReq.Header.Set("Authorization", "Bearer "+c.bearer)
}
return upReq, nil
}
// responseCacheable reports whether the response body should be
// persisted. Honors Cache-Control: no-store / private and refuses to
// cache responses without a content body (ranges, 204, etc.).
func (c *Cache) responseCacheable(resp *http.Response) bool {
cc := resp.Header.Get("Cache-Control")
low := strings.ToLower(cc)
if strings.Contains(low, "no-store") || strings.Contains(low, "private") {
return false
}
// Don't cache partial-content responses — the server returned 206
// for a Range request, which means the body covers only part of
// the file. Caching that partial body would corrupt subsequent
// non-range fetches.
if resp.StatusCode != http.StatusOK {
return false
}
return true
}
// streamAndPersist writes resp.Body simultaneously to the client and
// to a temp file in the cache. Renames the temp atomically on success.
// Sets the local file's mtime to upstream's Last-Modified (if
// present) so subsequent revalidations send If-Modified-Since with a
// timestamp upstream can compare against its own state.
func (c *Cache) streamAndPersist(w http.ResponseWriter, resp *http.Response, urlPath string) error {
finalPath, ok := c.cachePathFor(urlPath)
if !ok {
_, err := io.Copy(w, resp.Body)
return err
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmpName := tmp.Name()
mw := io.MultiWriter(tmp, w)
if _, err := io.Copy(mw, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// serveFromDisk serves a cached file via http.ServeContent (which
// handles Range requests, If-Modified-Since, and conditional GETs
// natively). cacheState is the X-ZDDC-Cache value to surface.
func (c *Cache) serveFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) {
f, err := os.Open(path)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer f.Close()
w.Header().Set(HeaderName, cacheState)
http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f)
}
// revalidate fires a conditional GET against upstream after a cache
// hit. 304 = no-op (cache is fresh). 200 = update cache. 403/404 =
// purge (ACL revoked or upstream deleted). Network errors are
// swallowed — staleness via offline is the documented behavior.
func (c *Cache) revalidate(urlPath string, mtime time.Time) {
target := c.upstream + urlPath
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return
}
if !mtime.IsZero() {
req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat))
}
if c.bearer != "" {
req.Header.Set("Authorization", "Bearer "+c.bearer)
}
resp, err := c.client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusNotModified:
return
case http.StatusOK:
if !c.responseCacheable(resp) {
return
}
if err := c.persistOnly(resp, urlPath); err != nil {
slog.Debug("revalidate persist error", "url", urlPath, "err", err)
}
case http.StatusForbidden, http.StatusNotFound:
if path, ok := c.cachePathFor(urlPath); ok {
_ = os.Remove(path)
slog.Info("purged cached entry after upstream 4xx", "url", urlPath, "status", resp.StatusCode)
}
}
}
// persistOnly writes resp.Body to the cache without forwarding it
// anywhere. Used by revalidate (the user's request was already served
// from disk; we just refresh the cache in the background).
func (c *Cache) persistOnly(resp *http.Response, urlPath string) error {
finalPath, ok := c.cachePathFor(urlPath)
if !ok {
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
tmpName := tmp.Name()
if _, err := io.Copy(tmp, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// cachePathForURI is cachePathFor with the query string stripped —
// the cache is keyed by path only. Used by the outbox to map a
// queued write back to its cached file.
func (c *Cache) cachePathForURI(rawURI string) (string, bool) {
if i := strings.Index(rawURI, "?"); i >= 0 {
rawURI = rawURI[:i]
}
return c.cachePathFor(rawURI)
}
// cachePathFor maps a URL path to a local filesystem path under the
// cache root. Returns ok=false on inputs that would escape the root,
// reserve a marker filename, or otherwise be unsafe to write.
func (c *Cache) cachePathFor(urlPath string) (string, bool) {
if urlPath == "" || urlPath == "/" {
return "", false
}
if strings.Contains(urlPath, "..") {
return "", false
}
clean := filepath.FromSlash(strings.TrimPrefix(urlPath, "/"))
abs := filepath.Join(c.root, clean)
if !strings.HasPrefix(abs, c.root+string(filepath.Separator)) && abs != c.root {
return "", false
}
// Don't let URLs collide with internal markers.
if filepath.Base(abs) == MarkerFile {
return "", false
}
return abs, true
}
// maybeWriteMarker writes the .zddc-upstream provenance file once,
// the first time the cache stores anything. Best-effort: an error
// here doesn't fail the request.
func (c *Cache) maybeWriteMarker() {
c.markerOnce.Do(func() {
marker := filepath.Join(c.root, MarkerFile)
if _, err := os.Stat(marker); err == nil {
return
}
body := fmt.Sprintf("upstream: %s\nfirst_cached: %s\nmode: %s\n",
c.upstream, time.Now().UTC().Format(time.RFC3339), c.mode)
_ = os.WriteFile(marker, []byte(body), 0o644)
})
}
// handleWrite proxies a write request (PUT / POST / DELETE) to
// upstream. On a transport error, the request is captured in the
// outbox (if configured) and the client gets 202 Accepted with a
// JSON envelope describing the queued entry. Online HTTP errors
// (4xx/5xx from the master) are forwarded verbatim.
//
// PUT/DELETE include an If-Unmodified-Since precondition derived
// from the local cache file's mtime so the master can reject the
// write if its file changed since we observed it. POST never sends
// a precondition (POST semantics are application-defined).
func (c *Cache) handleWrite(w http.ResponseWriter, r *http.Request) {
// Capture base mtime for PUT/DELETE so replay can use the same
// precondition we'd send live.
var baseModTime time.Time
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
if path, ok := c.cachePathFor(r.URL.Path); ok {
if info, err := os.Stat(path); err == nil && !info.IsDir() {
baseModTime = info.ModTime()
}
}
}
// We may need to send the body to upstream AND then queue it on
// network failure. Buffer up to MaxOutboxBodyBytes so we can
// rewind. Larger requests stream straight through (no offline
// recovery for those; the user just gets 503).
bodyBytes, bodyErr := readBoundedBody(r)
if bodyErr != nil {
http.Error(w, "Bad Request: "+bodyErr.Error(), http.StatusRequestEntityTooLarge)
return
}
upReq, err := c.buildUpstreamWriteRequest(r, bodyBytes, baseModTime)
if err != nil {
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
return
}
resp, err := c.client.Do(upReq)
if err != nil {
// Transport error → queue if outbox is available.
if c.outbox != nil {
r.Body = io.NopCloser(bytesReader(bodyBytes))
entry, qErr := c.outbox.Enqueue(r, baseModTime)
if qErr != nil {
slog.Warn("outbox enqueue failed", "err", qErr)
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable and outbox unavailable", http.StatusServiceUnavailable)
return
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set(HeaderName, "queued")
w.WriteHeader(http.StatusAccepted)
_ = json.NewEncoder(w).Encode(map[string]any{
"queued": true,
"outbox_id": entry.ID,
"method": entry.Method,
"uri": entry.RawURI,
"queued_at": entry.QueuedAt,
"base_mtime": entry.BaseModTime,
})
return
}
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
// Forward upstream's response verbatim.
for k, vv := range resp.Header {
if isHopByHop(k) {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.Header().Set(HeaderName, "proxy")
w.WriteHeader(resp.StatusCode)
_, _ = io.Copy(w, resp.Body)
// On a successful write, drop the cached entry so the next read
// fetches fresh upstream content (which now includes the user's
// change). For PUT we could be smarter (write the new body
// directly to cache) but eviction is simplest.
if c.persist && resp.StatusCode >= 200 && resp.StatusCode < 300 {
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
if path, ok := c.cachePathFor(r.URL.Path); ok {
_ = os.Remove(path)
}
}
}
}
// buildUpstreamWriteRequest constructs the outbound write request
// with the buffered body, content-type passthrough, bearer, and
// (for PUT/DELETE with a base mtime) an If-Unmodified-Since
// precondition.
func (c *Cache) buildUpstreamWriteRequest(r *http.Request, body []byte, baseModTime time.Time) (*http.Request, error) {
target := c.upstream + r.URL.RequestURI()
upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytesReader(body))
if err != nil {
return nil, err
}
if ct := r.Header.Get("Content-Type"); ct != "" {
upReq.Header.Set("Content-Type", ct)
}
if !baseModTime.IsZero() && (r.Method == http.MethodPut || r.Method == http.MethodDelete) {
upReq.Header.Set("If-Unmodified-Since", baseModTime.UTC().Format(http.TimeFormat))
}
if c.bearer != "" {
upReq.Header.Set("Authorization", "Bearer "+c.bearer)
}
upReq.ContentLength = int64(len(body))
return upReq, nil
}
// readBoundedBody slurps the request body up to MaxOutboxBodyBytes.
// Errors out (413) if the body exceeds the cap so we never partial-
// queue. Returns nil for empty bodies.
func readBoundedBody(r *http.Request) ([]byte, error) {
if r.Body == nil {
return nil, nil
}
defer r.Body.Close()
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
return io.ReadAll(limited)
}
// bytesReader is a tiny helper that returns a *bytes.Reader without
// importing bytes everywhere — keeps build/grep noise down.
func bytesReader(b []byte) *bytesReaderType { return &bytesReaderType{r: b} }
type bytesReaderType struct {
r []byte
i int
}
func (b *bytesReaderType) Read(p []byte) (int, error) {
if b.i >= len(b.r) {
return 0, io.EOF
}
n := copy(p, b.r[b.i:])
b.i += n
return n, nil
}
// listingFormat collapses an Accept header to "json" or "html". Mirrors
// the master's content-negotiation: anything Accept-ing application/
// json wins JSON, otherwise HTML.
func listingFormat(accept string) string {
if strings.Contains(strings.ToLower(accept), "application/json") {
return "json"
}
return "html"
}
// listingCachePathFor returns the on-disk sidecar path for a cached
// directory listing. urlPath is the directory URL (always ending in
// "/"); accept is the request's Accept header.
func (c *Cache) listingCachePathFor(urlPath, accept string) (string, bool) {
if !strings.HasSuffix(urlPath, "/") {
return "", false
}
if strings.Contains(urlPath, "..") {
return "", false
}
rel := filepath.FromSlash(strings.Trim(urlPath, "/"))
dir := filepath.Join(c.root, rel)
if !strings.HasPrefix(dir, c.root) {
return "", false
}
return filepath.Join(dir, listingCachePrefix+listingFormat(accept)), true
}
// serveListingFromDisk serves a cached directory listing. Like
// serveFromDisk but always sets Content-Type from the format suffix
// (the master's headers may have been gzip-stripped or otherwise
// massaged by the time we cached them).
func (c *Cache) serveListingFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) {
f, err := os.Open(path)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer f.Close()
if strings.HasSuffix(path, ".json") {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
} else {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
}
w.Header().Set(HeaderName, cacheState)
http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f)
}
// proxyDirectory is proxy() with directory-specific persistence: the
// listing sidecar (selected by Accept) is the cache target instead of
// the literal URL path. Falls back to serving stale on network error.
func (c *Cache) proxyDirectory(w http.ResponseWriter, r *http.Request, writeToCache bool) {
upReq, err := c.buildUpstreamRequest(r)
if err != nil {
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
return
}
resp, err := c.client.Do(upReq)
if err != nil {
if writeToCache && r.Method == http.MethodGet {
if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok {
if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() {
c.serveListingFromDisk(w, r, path, info, "offline")
return
}
}
}
slog.Warn("upstream directory fetch failed", "url", upReq.URL.String(), "err", err)
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
for k, vv := range resp.Header {
if isHopByHop(k) {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp)
if cacheable {
w.Header().Set(HeaderName, "miss")
} else {
w.Header().Set(HeaderName, "proxy")
}
w.WriteHeader(resp.StatusCode)
if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified {
return
}
if !cacheable {
_, _ = io.Copy(w, resp.Body)
return
}
if err := c.streamAndPersistListing(w, resp, r.URL.Path, r.Header.Get("Accept")); err != nil {
slog.Debug("listing stream-and-persist error", "url", r.URL.Path, "err", err)
} else {
c.maybeWriteMarker()
}
}
// streamAndPersistListing is streamAndPersist for directory listings,
// resolving the cache target via listingCachePathFor (Accept-aware).
func (c *Cache) streamAndPersistListing(w http.ResponseWriter, resp *http.Response, urlPath, accept string) error {
finalPath, ok := c.listingCachePathFor(urlPath, accept)
if !ok {
_, err := io.Copy(w, resp.Body)
return err
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmpName := tmp.Name()
mw := io.MultiWriter(tmp, w)
if _, err := io.Copy(mw, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// revalidateListing is the listing-cache analogue of revalidate.
func (c *Cache) revalidateListing(urlPath, accept string, mtime time.Time) {
target := c.upstream + urlPath
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return
}
if !mtime.IsZero() {
req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat))
}
if accept != "" {
req.Header.Set("Accept", accept)
}
if c.bearer != "" {
req.Header.Set("Authorization", "Bearer "+c.bearer)
}
resp, err := c.client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusNotModified:
return
case http.StatusOK:
if !c.responseCacheable(resp) {
return
}
path, ok := c.listingCachePathFor(urlPath, accept)
if !ok {
return
}
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return
}
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*")
if err != nil {
return
}
tmpName := tmp.Name()
if _, err := io.Copy(tmp, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return
}
_ = tmp.Close()
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
_ = os.Rename(tmpName, path)
case http.StatusForbidden, http.StatusNotFound:
if path, ok := c.listingCachePathFor(urlPath, accept); ok {
_ = os.Remove(path)
}
}
}
// isHopByHop reports whether a header name is hop-by-hop per RFC 7230
// §6.1 — these must not be forwarded by a proxy.
func isHopByHop(name string) bool {
switch http.CanonicalHeaderKey(name) {
case "Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te",
"Trailer",
"Transfer-Encoding",
"Upgrade":
return true
}
return false
}