ZDDC/zddc/internal/cache/cache.go
ZDDC ca00904f1e feat(client): cache mode — on-demand fetch + persist + offline fallback
zddc-server can now run as a downstream client of another zddc-server.
Set --upstream <url> and the master-side machinery (archive index, apps
server, watcher, OPA decider, ACL middleware, token store) is bypassed
entirely; cmd/zddc-server/main.go short-circuits to runClient(cfg)
which uses zddc/internal/cache/Cache as the entire request handler.

Three modes via --mode <proxy|cache|mirror>:
- proxy: forward upstream live, no disk persistence
- cache (default): persist responses on access; subsequent hits serve
  from disk + background If-Modified-Since revalidate
- mirror: accepted but currently behaves like cache; the access-
  triggered walker lands in phase 3

Cache directory layout is intentionally a normal ZDDC root: a file
fetched from <master>/foo/bar.txt is stored at <root>/foo/bar.txt with
no sidecar metadata. The local file's mtime is set to the upstream's
Last-Modified header so revalidation reflects the master's notion of
file age, not local fetch time. Running zddc-server --root <cache-dir>
without --upstream serves the cached files as a plain master — useful
for portable offline snapshots. A small .zddc-upstream marker is
written once on first persist for provenance.

Pipeline (GET/HEAD only — writes deferred):
- Hit → http.ServeContent serves directly (range-aware, 304-aware) +
  background revalidate (304 no-op, 200 overwrite, 403/404 purge)
- Miss → forward to upstream with the configured bearer; tee response
  body to client + tmp-file atomically renamed into the cache
- Network error + cached → serve stale + X-ZDDC-Cache: offline
- Network error + no cache → 503 + X-ZDDC-Cache: offline
- Directories always proxy live (no listing cache yet — phase 3)
- Cache-Control: no-store / private and non-200 responses bypass cache

Range requests work end-to-end (Range/If-Range headers forwarded on
miss; http.ServeContent handles them natively on hit). Hop-by-hop
headers per RFC 7230 §6.1 are dropped from forwarded responses.

New flags (also as ZDDC_* env vars), all ignored when --upstream is
empty (so master deployments are untouched):
- --upstream <url>
- --mode proxy|cache|mirror (default cache)
- --bearer-file <path> (0600 file with the master-issued token)
- --skip-tls-verify (separate from --no-auth; for self-signed dev)

Validation: --upstream must be http(s)://...; trailing / is trimmed.
Mode validated to one of the three known values. The startup
no-root-.zddc check is skipped in client mode (the cache directory
starts empty by design). The plain-HTTP-on-non-loopback check is also
skipped (the local instance never reads the email header to decide
anything; auth is forwarded to upstream as a Bearer).

Tests: zddc/internal/cache/cache_test.go runs httptest.NewServer as
the upstream and covers miss-then-hit, proxy-mode-no-persist,
directory-never-cached, HEAD-no-body, offline-with-cache,
offline-no-cache → 503, bearer forwarding, query-string preservation,
no-store bypass, path-traversal rejection, error-status forwarding,
revalidate-on-403/404/200/304, range-on-hit, concurrent-same-URL,
cache-path boundary cases. 23 new tests, full suite + go vet clean.

Live two-instance smoke verified: master at 127.0.0.1:18443, client
at :18444 with --mode cache, miss→hit→hit transitions work, file
materialises under cache root with parent dirs created, marker file
written once, range-on-hit returns 206, master sees background 304s
on every hit, killing master leaves cached files serving from disk
and never-cached files returning 503 + offline header.

Doc updates: zddc/README.md gains a "Client mode" section with the
modes table, flag reference, pipeline summary, two-instance recipe,
and explicit list of phase-2 limitations; AGENTS.md adds the four
new env vars to the reference table and a "Client mode" subsection
with smoke-test recipe and a pointer to the cache package;
ARCHITECTURE.md adds "Master + proxy/cache/mirror" before "Bearer
token issuance," covering the topology, the persist/warm switches,
the cache-IS-a-ZDDC-root invariant, the request pipeline, and the
v1-out-of-scope multi-tenancy note; CLAUDE.md's zddc/ entry
expanded to mention both deployment shapes so future agents pick it
up by default.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 07:57:14 -05:00

479 lines
15 KiB
Go

// Package cache implements zddc-server's client mode: a downstream
// proxy/cache/mirror that runs the same binary against a master.
// Configured via cfg.Upstream (in main.go), the cache layer replaces
// the master-side dispatcher entirely — every incoming request is
// forwarded to the master with the local instance's bearer token, and
// (in cache or mirror mode) the response body is persisted under
// cfg.Root so subsequent requests serve from disk.
//
// The cache directory layout is intentionally a normal ZDDC root: a
// file fetched from `<master>/foo/bar.txt` is stored at `<root>/foo/
// bar.txt`. No sidecar metadata. The local file's mtime is set to the
// upstream's Last-Modified header so revalidation via
// If-Modified-Since reflects the master's notion of the file's age,
// not when the local cache happened to fetch it. Running
// `zddc-server --root <cache-dir>` without --upstream serves the
// cached files as a regular ZDDC — useful for portable offline
// snapshots and sanity-check inspection.
//
// Phase 2 scope: GET/HEAD only. Range requests, stale-while-
// revalidate, and offline-fallback are supported. Directory listings
// are always proxied live (no listing cache yet); writes (PUT / POST /
// DELETE) and the mirror walker land in later phases.
package cache
import (
"crypto/tls"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"codeberg.org/VARASYS/ZDDC/zddc/internal/config"
)
// MarkerFile records the upstream URL and first-cached-at timestamp
// in the cache root. Prevents accidentally pointing master mode at a
// cache directory and provides provenance for ops/users.
const MarkerFile = ".zddc-upstream"
// HeaderName is the response header that surfaces cache state to the
// client (and the browser-side UI). Values: hit, revalidated, miss,
// proxy, offline.
const HeaderName = "X-ZDDC-Cache"
// Cache is the request handler installed in main.go when cfg.Upstream
// is non-empty. It is safe for concurrent ServeHTTP calls.
type Cache struct {
root string // local cache directory (== cfg.Root in client mode)
upstream string // upstream master URL, no trailing slash
bearer string // forwarded as Authorization: Bearer to upstream; "" disables
mode string // "proxy" | "cache" | "mirror"
persist bool // mode != "proxy" — write responses to disk
client *http.Client
markerOnce sync.Once
}
// New constructs a Cache from the loaded configuration. Validates
// upstream URL, reads the bearer-file (if configured), prepares the
// HTTP client honoring SkipTLSVerify, and ensures the cache root
// exists.
func New(cfg config.Config) (*Cache, error) {
if cfg.Upstream == "" {
return nil, fmt.Errorf("cache.New: cfg.Upstream is empty")
}
upstream := strings.TrimRight(cfg.Upstream, "/")
if _, err := url.Parse(upstream); err != nil {
return nil, fmt.Errorf("cache.New: invalid upstream %q: %w", upstream, err)
}
bearer := ""
if cfg.BearerFile != "" {
b, err := os.ReadFile(cfg.BearerFile)
if err != nil {
return nil, fmt.Errorf("cache.New: read bearer file: %w", err)
}
bearer = strings.TrimSpace(string(b))
if bearer == "" {
return nil, fmt.Errorf("cache.New: bearer file %q is empty", cfg.BearerFile)
}
}
transport := &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
}
if cfg.SkipTLSVerify {
// G402 / CWE-295: deliberate. Documented operator opt-in for
// dev/internal-CA scenarios; never the default.
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} //nolint:gosec
slog.Warn("--skip-tls-verify enabled: upstream TLS certificates will NOT be validated")
}
if err := os.MkdirAll(cfg.Root, 0o755); err != nil {
return nil, fmt.Errorf("cache.New: create cache root %q: %w", cfg.Root, err)
}
mode := cfg.Mode
if mode == "" {
mode = "cache"
}
return &Cache{
root: cfg.Root,
upstream: upstream,
bearer: bearer,
mode: mode,
persist: mode != "proxy",
client: &http.Client{
Transport: transport,
Timeout: 60 * time.Second,
// Don't follow redirects automatically — pass them through to
// the client so the browser can update its address bar
// (e.g. master's no-trailing-slash → trailing-slash 301).
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
},
}, nil
}
// Mode returns the configured mode label for diagnostics.
func (c *Cache) Mode() string { return c.mode }
// Upstream returns the upstream master URL for diagnostics.
func (c *Cache) Upstream() string { return c.upstream }
// ServeHTTP is the cache layer's HTTP entry point. Replaces the
// master-side dispatcher in client mode.
func (c *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Phase 2: read-only. Writes are deferred to the outbox phase.
// Forward HEAD as GET-without-body to keep the response shape
// consistent with what http.ServeContent would do.
if r.Method != http.MethodGet && r.Method != http.MethodHead {
w.Header().Set("Allow", "GET, HEAD")
http.Error(w, "Method Not Allowed: writes are not yet supported in client mode", http.StatusMethodNotAllowed)
return
}
// Directory listings are always proxied live in v1. The cache
// directory's actual filesystem listing would be inaccurate (it
// only contains visited files), and full listing-cache support
// belongs with the mirror walker in phase 3.
if strings.HasSuffix(r.URL.Path, "/") {
c.proxy(w, r, false /* writeToCache */)
return
}
// File request — try cache first when persisting.
if c.persist {
if path, ok := c.cachePathFor(r.URL.Path); ok {
info, err := os.Stat(path)
if err == nil && !info.IsDir() {
c.serveFromDisk(w, r, path, info, "hit")
// Background revalidate; never block the user response.
go c.revalidate(r.URL.Path, info.ModTime())
return
}
}
}
// Miss (or proxy mode) → forward to upstream and (optionally)
// persist on the way through.
c.proxy(w, r, c.persist)
}
// proxy forwards the request to upstream and serves the response back
// to the client. When writeToCache is true and the response is a
// cacheable 200, the body is also persisted under cfg.Root.
func (c *Cache) proxy(w http.ResponseWriter, r *http.Request, writeToCache bool) {
upReq, err := c.buildUpstreamRequest(r)
if err != nil {
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
return
}
resp, err := c.client.Do(upReq)
if err != nil {
// Network error. If we have a cached copy, serve it stale.
if writeToCache && r.Method == http.MethodGet {
if path, ok := c.cachePathFor(r.URL.Path); ok {
if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() {
c.serveFromDisk(w, r, path, info, "offline")
return
}
}
}
slog.Warn("upstream fetch failed", "url", upReq.URL.String(), "err", err)
w.Header().Set(HeaderName, "offline")
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
// Forward upstream response headers. Skip hop-by-hop headers (RFC
// 7230 §6.1) — Go's transport already drops most, but Connection
// and Transfer-Encoding can sneak through and confuse the client.
for k, vv := range resp.Header {
if isHopByHop(k) {
continue
}
for _, v := range vv {
w.Header().Add(k, v)
}
}
cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp)
if cacheable {
w.Header().Set(HeaderName, "miss")
} else if writeToCache {
w.Header().Set(HeaderName, "proxy")
} else {
w.Header().Set(HeaderName, "proxy")
}
w.WriteHeader(resp.StatusCode)
if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified {
return
}
if !cacheable {
_, _ = io.Copy(w, resp.Body)
return
}
// Stream body to client AND to a tmp file in the cache; rename
// atomically only on success.
if err := c.streamAndPersist(w, resp, r.URL.Path); err != nil {
// Mid-stream error: the client got a partial body (HTTP-normal),
// and we already abandoned the cache write. Just log.
slog.Debug("stream-and-persist error", "url", r.URL.Path, "err", err)
} else {
c.maybeWriteMarker()
}
}
// buildUpstreamRequest constructs the outbound request preserving the
// path, query, Range, and Accept headers. Adds the bearer if configured.
func (c *Cache) buildUpstreamRequest(r *http.Request) (*http.Request, error) {
target := c.upstream + r.URL.RequestURI()
upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, nil)
if err != nil {
return nil, err
}
// Preserve the Range header for resumable / partial transfers.
if v := r.Header.Get("Range"); v != "" {
upReq.Header.Set("Range", v)
}
if v := r.Header.Get("If-Range"); v != "" {
upReq.Header.Set("If-Range", v)
}
if v := r.Header.Get("Accept"); v != "" {
upReq.Header.Set("Accept", v)
}
if v := r.Header.Get("Accept-Encoding"); v != "" {
upReq.Header.Set("Accept-Encoding", v)
}
upReq.Header.Set("User-Agent", "zddc-server-cache/0.1")
if c.bearer != "" {
upReq.Header.Set("Authorization", "Bearer "+c.bearer)
}
return upReq, nil
}
// responseCacheable reports whether the response body should be
// persisted. Honors Cache-Control: no-store / private and refuses to
// cache responses without a content body (ranges, 204, etc.).
func (c *Cache) responseCacheable(resp *http.Response) bool {
cc := resp.Header.Get("Cache-Control")
low := strings.ToLower(cc)
if strings.Contains(low, "no-store") || strings.Contains(low, "private") {
return false
}
// Don't cache partial-content responses — the server returned 206
// for a Range request, which means the body covers only part of
// the file. Caching that partial body would corrupt subsequent
// non-range fetches.
if resp.StatusCode != http.StatusOK {
return false
}
return true
}
// streamAndPersist writes resp.Body simultaneously to the client and
// to a temp file in the cache. Renames the temp atomically on success.
// Sets the local file's mtime to upstream's Last-Modified (if
// present) so subsequent revalidations send If-Modified-Since with a
// timestamp upstream can compare against its own state.
func (c *Cache) streamAndPersist(w http.ResponseWriter, resp *http.Response, urlPath string) error {
finalPath, ok := c.cachePathFor(urlPath)
if !ok {
_, err := io.Copy(w, resp.Body)
return err
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, copyErr := io.Copy(w, resp.Body)
if copyErr != nil {
return copyErr
}
return err
}
tmpName := tmp.Name()
mw := io.MultiWriter(tmp, w)
if _, err := io.Copy(mw, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// serveFromDisk serves a cached file via http.ServeContent (which
// handles Range requests, If-Modified-Since, and conditional GETs
// natively). cacheState is the X-ZDDC-Cache value to surface.
func (c *Cache) serveFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) {
f, err := os.Open(path)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
defer f.Close()
w.Header().Set(HeaderName, cacheState)
http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f)
}
// revalidate fires a conditional GET against upstream after a cache
// hit. 304 = no-op (cache is fresh). 200 = update cache. 403/404 =
// purge (ACL revoked or upstream deleted). Network errors are
// swallowed — staleness via offline is the documented behavior.
func (c *Cache) revalidate(urlPath string, mtime time.Time) {
target := c.upstream + urlPath
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return
}
if !mtime.IsZero() {
req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat))
}
if c.bearer != "" {
req.Header.Set("Authorization", "Bearer "+c.bearer)
}
resp, err := c.client.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusNotModified:
return
case http.StatusOK:
if !c.responseCacheable(resp) {
return
}
if err := c.persistOnly(resp, urlPath); err != nil {
slog.Debug("revalidate persist error", "url", urlPath, "err", err)
}
case http.StatusForbidden, http.StatusNotFound:
if path, ok := c.cachePathFor(urlPath); ok {
_ = os.Remove(path)
slog.Info("purged cached entry after upstream 4xx", "url", urlPath, "status", resp.StatusCode)
}
}
}
// persistOnly writes resp.Body to the cache without forwarding it
// anywhere. Used by revalidate (the user's request was already served
// from disk; we just refresh the cache in the background).
func (c *Cache) persistOnly(resp *http.Response, urlPath string) error {
finalPath, ok := c.cachePathFor(urlPath)
if !ok {
_, _ = io.Copy(io.Discard, resp.Body)
return nil
}
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
if err != nil {
_, _ = io.Copy(io.Discard, resp.Body)
return err
}
tmpName := tmp.Name()
if _, err := io.Copy(tmp, resp.Body); err != nil {
_ = tmp.Close()
_ = os.Remove(tmpName)
return err
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmpName)
return err
}
if lm := resp.Header.Get("Last-Modified"); lm != "" {
if t, err := http.ParseTime(lm); err == nil {
_ = os.Chtimes(tmpName, t, t)
}
}
return os.Rename(tmpName, finalPath)
}
// cachePathFor maps a URL path to a local filesystem path under the
// cache root. Returns ok=false on inputs that would escape the root,
// reserve a marker filename, or otherwise be unsafe to write.
func (c *Cache) cachePathFor(urlPath string) (string, bool) {
if urlPath == "" || urlPath == "/" {
return "", false
}
if strings.Contains(urlPath, "..") {
return "", false
}
clean := filepath.FromSlash(strings.TrimPrefix(urlPath, "/"))
abs := filepath.Join(c.root, clean)
if !strings.HasPrefix(abs, c.root+string(filepath.Separator)) && abs != c.root {
return "", false
}
// Don't let URLs collide with internal markers.
if filepath.Base(abs) == MarkerFile {
return "", false
}
return abs, true
}
// maybeWriteMarker writes the .zddc-upstream provenance file once,
// the first time the cache stores anything. Best-effort: an error
// here doesn't fail the request.
func (c *Cache) maybeWriteMarker() {
c.markerOnce.Do(func() {
marker := filepath.Join(c.root, MarkerFile)
if _, err := os.Stat(marker); err == nil {
return
}
body := fmt.Sprintf("upstream: %s\nfirst_cached: %s\nmode: %s\n",
c.upstream, time.Now().UTC().Format(time.RFC3339), c.mode)
_ = os.WriteFile(marker, []byte(body), 0o644)
})
}
// isHopByHop reports whether a header name is hop-by-hop per RFC 7230
// §6.1 — these must not be forwarded by a proxy.
func isHopByHop(name string) bool {
switch http.CanonicalHeaderKey(name) {
case "Connection",
"Keep-Alive",
"Proxy-Authenticate",
"Proxy-Authorization",
"Te",
"Trailer",
"Transfer-Encoding",
"Upgrade":
return true
}
return false
}