PUT / POST / DELETE in client mode now work end-to-end. Online: the
cache layer forwards to upstream and (on success) drops any cached
entry for the path so the next read fetches fresh. PUT/DELETE include
If-Unmodified-Since derived from the cached file's mtime so the master
can reject conflicting writes with 412 Precondition Failed.
When upstream is unreachable, the request is captured in the outbox
at <root>/.zddc-outbox/<id>/ — directory per queued write, mode 0700,
containing meta.json (method, RawURI, Content-Type, base mtime,
queued-at) and body.bin (request body, capped at 256 MiB). The client
gets 202 Accepted + X-ZDDC-Cache: queued and a JSON envelope.
A background replay loop started by runClient processes the queue:
- 2xx → delete entry; drop cached path so next read fetches fresh
- 412 → rename to <id>.conflict-<RFC3339>/ for manual reconciliation
(body + meta intact for inspection or re-submit)
- 4xx other → drop (retry won't help; logged at WARN)
- 5xx / transport error → leave for next pass
Replay schedule: eager at startup, then 30s while pending falling
back to 5min while idle. Loop honors graceful-shutdown context.
Disabled in --mode=proxy (proxy persists nothing by design — offline
writes return 503 instead of queueing).
Outbox IDs are <unix-nano-base16>-<hex-random> so lex-sort = queue
order; concurrent enqueues never collide. Conflict-rename appends a
4-char random suffix on the unlikely same-second collision.
The local cache is intentionally not updated for offline writes:
until upstream confirms the user reads still see the upstream-cached
version (or 503 if uncached). Trade-off: no "did my queued write
actually win?" ambiguity, at the cost of not seeing one's own
offline edits immediately. Phase 5 will surface .conflict-<ts>/
directories in browse views.
Tests (20 new in outbox_test.go, 5 new in cache_test.go covering
the write path): NewOutbox creates 0700 dir, Enqueue persists meta
+ body, Pending returns lex-sorted entries excluding conflicts,
Replay deletes on 2xx / renames on 412 / leaves on transport error
/ leaves on 5xx / drops on 4xx-other, IUS sent only for PUT/DELETE
with base mtime, query string preserved, ServeHTTP online write
forwards + evicts cache, ServeHTTP offline write queues with 202,
ServeHTTP offline + no outbox returns 503, ServeHTTP PUT sends IUS
from cached mtime, oversize body rejected, IDs lex-sortable,
RunReplayLoop stops on context cancel, concurrent Enqueue 30×
no collisions. Full suite + go vet clean.
Doc updates: zddc/README.md gains a "Writes (online + offline
outbox)" subsection covering both paths and replay outcomes;
"What client mode is NOT, yet" now lists only conflict UI and
multi-tenancy. AGENTS.md client-mode pipeline gains writes +
mirror-mode bullets. ARCHITECTURE.md adds a "Writes: outbox +
offline replay" subsection with the trade-off rationale and the
phase-5-deferred conflict UI hand-off.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
897 lines
29 KiB
Go
897 lines
29 KiB
Go
// Package cache implements zddc-server's client mode: a downstream
|
|
// proxy/cache/mirror that runs the same binary against a master.
|
|
// Configured via cfg.Upstream (in main.go), the cache layer replaces
|
|
// the master-side dispatcher entirely — every incoming request is
|
|
// forwarded to the master with the local instance's bearer token, and
|
|
// (in cache or mirror mode) the response body is persisted under
|
|
// cfg.Root so subsequent requests serve from disk.
|
|
//
|
|
// The cache directory layout is intentionally a normal ZDDC root: a
|
|
// file fetched from `<master>/foo/bar.txt` is stored at `<root>/foo/
|
|
// bar.txt`. No sidecar metadata. The local file's mtime is set to the
|
|
// upstream's Last-Modified header so revalidation via
|
|
// If-Modified-Since reflects the master's notion of the file's age,
|
|
// not when the local cache happened to fetch it. Running
|
|
// `zddc-server --root <cache-dir>` without --upstream serves the
|
|
// cached files as a regular ZDDC — useful for portable offline
|
|
// snapshots and sanity-check inspection.
|
|
//
|
|
// Phase 2 scope: GET/HEAD only. Range requests, stale-while-
|
|
// revalidate, and offline-fallback are supported. Directory listings
|
|
// are always proxied live (no listing cache yet); writes (PUT / POST /
|
|
// DELETE) and the mirror walker land in later phases.
|
|
package cache
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"codeberg.org/VARASYS/ZDDC/zddc/internal/config"
|
|
)
|
|
|
|
// MarkerFile records the upstream URL and first-cached-at timestamp
|
|
// in the cache root. Prevents accidentally pointing master mode at a
|
|
// cache directory and provides provenance for ops/users.
|
|
const MarkerFile = ".zddc-upstream"
|
|
|
|
// HeaderName is the response header that surfaces cache state to the
|
|
// client (and the browser-side UI). Values: hit, revalidated, miss,
|
|
// proxy, offline.
|
|
const HeaderName = "X-ZDDC-Cache"
|
|
|
|
// listingCachePrefix is the basename prefix used for directory-listing
|
|
// sidecar files. Suffix is the format ("html" or "json"). Dot-prefixed
|
|
// so the local "serve cache root as plain master" mode hides them via
|
|
// the existing dispatch dot-prefix guard.
|
|
const listingCachePrefix = ".zddc-listing."
|
|
|
|
// Cache is the request handler installed in main.go when cfg.Upstream
|
|
// is non-empty. It is safe for concurrent ServeHTTP calls.
|
|
type Cache struct {
|
|
root string // local cache directory (== cfg.Root in client mode)
|
|
upstream string // upstream master URL, no trailing slash
|
|
bearer string // forwarded as Authorization: Bearer to upstream; "" disables
|
|
mode string // "proxy" | "cache" | "mirror"
|
|
persist bool // mode != "proxy" — write responses to disk
|
|
client *http.Client
|
|
|
|
markerOnce sync.Once
|
|
|
|
// onAccess is invoked (when non-nil) after a request is dispatched.
|
|
// The walker scheduler installs this hook to kick mirror walks based
|
|
// on incoming traffic. Always called in a goroutine — must not
|
|
// assume it runs before the response completes. Nil in proxy/cache
|
|
// modes; set in mirror mode.
|
|
onAccess func(urlPath string)
|
|
|
|
// outbox holds the offline write queue. Set by main.go after
|
|
// construction (avoids a circular dep at New time, since Outbox
|
|
// needs a *Cache reference). Nil = writes when offline get 503.
|
|
outbox *Outbox
|
|
}
|
|
|
|
// SetOutbox installs the offline-write queue. Called once by main.go
|
|
// after both Cache and Outbox are built. nil disables outbox-backed
|
|
// offline writes.
|
|
func (c *Cache) SetOutbox(o *Outbox) { c.outbox = o }
|
|
|
|
// New constructs a Cache from the loaded configuration. Validates
|
|
// upstream URL, reads the bearer-file (if configured), prepares the
|
|
// HTTP client honoring SkipTLSVerify, and ensures the cache root
|
|
// exists.
|
|
func New(cfg config.Config) (*Cache, error) {
|
|
if cfg.Upstream == "" {
|
|
return nil, fmt.Errorf("cache.New: cfg.Upstream is empty")
|
|
}
|
|
upstream := strings.TrimRight(cfg.Upstream, "/")
|
|
if _, err := url.Parse(upstream); err != nil {
|
|
return nil, fmt.Errorf("cache.New: invalid upstream %q: %w", upstream, err)
|
|
}
|
|
|
|
bearer := ""
|
|
if cfg.BearerFile != "" {
|
|
b, err := os.ReadFile(cfg.BearerFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cache.New: read bearer file: %w", err)
|
|
}
|
|
bearer = strings.TrimSpace(string(b))
|
|
if bearer == "" {
|
|
return nil, fmt.Errorf("cache.New: bearer file %q is empty", cfg.BearerFile)
|
|
}
|
|
}
|
|
|
|
transport := &http.Transport{
|
|
MaxIdleConns: 10,
|
|
IdleConnTimeout: 30 * time.Second,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
ResponseHeaderTimeout: 30 * time.Second,
|
|
}
|
|
if cfg.SkipTLSVerify {
|
|
// G402 / CWE-295: deliberate. Documented operator opt-in for
|
|
// dev/internal-CA scenarios; never the default.
|
|
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} //nolint:gosec
|
|
slog.Warn("--skip-tls-verify enabled: upstream TLS certificates will NOT be validated")
|
|
}
|
|
|
|
if err := os.MkdirAll(cfg.Root, 0o755); err != nil {
|
|
return nil, fmt.Errorf("cache.New: create cache root %q: %w", cfg.Root, err)
|
|
}
|
|
|
|
mode := cfg.Mode
|
|
if mode == "" {
|
|
mode = "cache"
|
|
}
|
|
|
|
return &Cache{
|
|
root: cfg.Root,
|
|
upstream: upstream,
|
|
bearer: bearer,
|
|
mode: mode,
|
|
persist: mode != "proxy",
|
|
client: &http.Client{
|
|
Transport: transport,
|
|
Timeout: 60 * time.Second,
|
|
// Don't follow redirects automatically — pass them through to
|
|
// the client so the browser can update its address bar
|
|
// (e.g. master's no-trailing-slash → trailing-slash 301).
|
|
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
|
return http.ErrUseLastResponse
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// Mode returns the configured mode label for diagnostics.
|
|
func (c *Cache) Mode() string { return c.mode }
|
|
|
|
// Upstream returns the upstream master URL for diagnostics.
|
|
func (c *Cache) Upstream() string { return c.upstream }
|
|
|
|
// ServeHTTP is the cache layer's HTTP entry point. Replaces the
|
|
// master-side dispatcher in client mode.
|
|
func (c *Cache) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
// Writes (PUT / POST / DELETE) flow through handleWrite — try
|
|
// upstream live; on transport error, queue in the outbox for
|
|
// replay. Phase 4 supports PUT/POST/DELETE; OPTIONS/PATCH still
|
|
// 405 since the master doesn't accept them anyway.
|
|
switch r.Method {
|
|
case http.MethodPut, http.MethodPost, http.MethodDelete:
|
|
c.handleWrite(w, r)
|
|
return
|
|
case http.MethodGet, http.MethodHead:
|
|
// fall through to read pipeline
|
|
default:
|
|
w.Header().Set("Allow", "GET, HEAD, PUT, POST, DELETE")
|
|
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
// Mirror walker hook: notify the scheduler that this URL was
|
|
// touched so it can decide whether to kick a subtree walk. Runs
|
|
// after we've started serving — the user's request never blocks
|
|
// on walk scheduling.
|
|
if c.onAccess != nil {
|
|
go c.onAccess(r.URL.Path)
|
|
}
|
|
|
|
// Directory listings: try sidecar listing-cache, fall back to
|
|
// proxy. The Accept header determines which sidecar (HTML vs
|
|
// JSON) to consult — the master serves both formats and the
|
|
// browser asks for HTML for navigation, JSON for the inline-JS
|
|
// listing fetcher.
|
|
if strings.HasSuffix(r.URL.Path, "/") {
|
|
c.serveDirectory(w, r)
|
|
return
|
|
}
|
|
|
|
// File request — try cache first when persisting.
|
|
if c.persist {
|
|
if path, ok := c.cachePathFor(r.URL.Path); ok {
|
|
info, err := os.Stat(path)
|
|
if err == nil && !info.IsDir() {
|
|
c.serveFromDisk(w, r, path, info, "hit")
|
|
// Background revalidate; never block the user response.
|
|
go c.revalidate(r.URL.Path, info.ModTime())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Miss (or proxy mode) → forward to upstream and (optionally)
|
|
// persist on the way through.
|
|
c.proxy(w, r, c.persist)
|
|
}
|
|
|
|
// serveDirectory handles directory requests. Tries the sidecar
|
|
// listing cache first (selected by Accept), revalidates in
|
|
// background on a hit, falls back to upstream proxy on miss.
|
|
func (c *Cache) serveDirectory(w http.ResponseWriter, r *http.Request) {
|
|
if c.persist {
|
|
if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok {
|
|
info, err := os.Stat(path)
|
|
if err == nil && !info.IsDir() {
|
|
c.serveListingFromDisk(w, r, path, info, "hit")
|
|
go c.revalidateListing(r.URL.Path, r.Header.Get("Accept"), info.ModTime())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
c.proxyDirectory(w, r, c.persist)
|
|
}
|
|
|
|
// proxy forwards the request to upstream and serves the response back
|
|
// to the client. When writeToCache is true and the response is a
|
|
// cacheable 200, the body is also persisted under cfg.Root.
|
|
func (c *Cache) proxy(w http.ResponseWriter, r *http.Request, writeToCache bool) {
|
|
upReq, err := c.buildUpstreamRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
resp, err := c.client.Do(upReq)
|
|
if err != nil {
|
|
// Network error. If we have a cached copy, serve it stale.
|
|
if writeToCache && r.Method == http.MethodGet {
|
|
if path, ok := c.cachePathFor(r.URL.Path); ok {
|
|
if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() {
|
|
c.serveFromDisk(w, r, path, info, "offline")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
slog.Warn("upstream fetch failed", "url", upReq.URL.String(), "err", err)
|
|
w.Header().Set(HeaderName, "offline")
|
|
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Forward upstream response headers. Skip hop-by-hop headers (RFC
|
|
// 7230 §6.1) — Go's transport already drops most, but Connection
|
|
// and Transfer-Encoding can sneak through and confuse the client.
|
|
for k, vv := range resp.Header {
|
|
if isHopByHop(k) {
|
|
continue
|
|
}
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
|
|
cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp)
|
|
if cacheable {
|
|
w.Header().Set(HeaderName, "miss")
|
|
} else if writeToCache {
|
|
w.Header().Set(HeaderName, "proxy")
|
|
} else {
|
|
w.Header().Set(HeaderName, "proxy")
|
|
}
|
|
|
|
w.WriteHeader(resp.StatusCode)
|
|
|
|
if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified {
|
|
return
|
|
}
|
|
|
|
if !cacheable {
|
|
_, _ = io.Copy(w, resp.Body)
|
|
return
|
|
}
|
|
|
|
// Stream body to client AND to a tmp file in the cache; rename
|
|
// atomically only on success.
|
|
if err := c.streamAndPersist(w, resp, r.URL.Path); err != nil {
|
|
// Mid-stream error: the client got a partial body (HTTP-normal),
|
|
// and we already abandoned the cache write. Just log.
|
|
slog.Debug("stream-and-persist error", "url", r.URL.Path, "err", err)
|
|
} else {
|
|
c.maybeWriteMarker()
|
|
}
|
|
}
|
|
|
|
// buildUpstreamRequest constructs the outbound request preserving the
|
|
// path, query, Range, and Accept headers. Adds the bearer if configured.
|
|
func (c *Cache) buildUpstreamRequest(r *http.Request) (*http.Request, error) {
|
|
target := c.upstream + r.URL.RequestURI()
|
|
upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Preserve the Range header for resumable / partial transfers.
|
|
if v := r.Header.Get("Range"); v != "" {
|
|
upReq.Header.Set("Range", v)
|
|
}
|
|
if v := r.Header.Get("If-Range"); v != "" {
|
|
upReq.Header.Set("If-Range", v)
|
|
}
|
|
if v := r.Header.Get("Accept"); v != "" {
|
|
upReq.Header.Set("Accept", v)
|
|
}
|
|
if v := r.Header.Get("Accept-Encoding"); v != "" {
|
|
upReq.Header.Set("Accept-Encoding", v)
|
|
}
|
|
upReq.Header.Set("User-Agent", "zddc-server-cache/0.1")
|
|
if c.bearer != "" {
|
|
upReq.Header.Set("Authorization", "Bearer "+c.bearer)
|
|
}
|
|
return upReq, nil
|
|
}
|
|
|
|
// responseCacheable reports whether the response body should be
|
|
// persisted. Honors Cache-Control: no-store / private and refuses to
|
|
// cache responses without a content body (ranges, 204, etc.).
|
|
func (c *Cache) responseCacheable(resp *http.Response) bool {
|
|
cc := resp.Header.Get("Cache-Control")
|
|
low := strings.ToLower(cc)
|
|
if strings.Contains(low, "no-store") || strings.Contains(low, "private") {
|
|
return false
|
|
}
|
|
// Don't cache partial-content responses — the server returned 206
|
|
// for a Range request, which means the body covers only part of
|
|
// the file. Caching that partial body would corrupt subsequent
|
|
// non-range fetches.
|
|
if resp.StatusCode != http.StatusOK {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// streamAndPersist writes resp.Body simultaneously to the client and
|
|
// to a temp file in the cache. Renames the temp atomically on success.
|
|
// Sets the local file's mtime to upstream's Last-Modified (if
|
|
// present) so subsequent revalidations send If-Modified-Since with a
|
|
// timestamp upstream can compare against its own state.
|
|
func (c *Cache) streamAndPersist(w http.ResponseWriter, resp *http.Response, urlPath string) error {
|
|
finalPath, ok := c.cachePathFor(urlPath)
|
|
if !ok {
|
|
_, err := io.Copy(w, resp.Body)
|
|
return err
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
|
|
_, copyErr := io.Copy(w, resp.Body)
|
|
if copyErr != nil {
|
|
return copyErr
|
|
}
|
|
return err
|
|
}
|
|
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
|
|
if err != nil {
|
|
_, copyErr := io.Copy(w, resp.Body)
|
|
if copyErr != nil {
|
|
return copyErr
|
|
}
|
|
return err
|
|
}
|
|
tmpName := tmp.Name()
|
|
mw := io.MultiWriter(tmp, w)
|
|
if _, err := io.Copy(mw, resp.Body); err != nil {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
return err
|
|
}
|
|
if err := tmp.Close(); err != nil {
|
|
_ = os.Remove(tmpName)
|
|
return err
|
|
}
|
|
if lm := resp.Header.Get("Last-Modified"); lm != "" {
|
|
if t, err := http.ParseTime(lm); err == nil {
|
|
_ = os.Chtimes(tmpName, t, t)
|
|
}
|
|
}
|
|
return os.Rename(tmpName, finalPath)
|
|
}
|
|
|
|
// serveFromDisk serves a cached file via http.ServeContent (which
|
|
// handles Range requests, If-Modified-Since, and conditional GETs
|
|
// natively). cacheState is the X-ZDDC-Cache value to surface.
|
|
func (c *Cache) serveFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
w.Header().Set(HeaderName, cacheState)
|
|
http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f)
|
|
}
|
|
|
|
// revalidate fires a conditional GET against upstream after a cache
|
|
// hit. 304 = no-op (cache is fresh). 200 = update cache. 403/404 =
|
|
// purge (ACL revoked or upstream deleted). Network errors are
|
|
// swallowed — staleness via offline is the documented behavior.
|
|
func (c *Cache) revalidate(urlPath string, mtime time.Time) {
|
|
target := c.upstream + urlPath
|
|
req, err := http.NewRequest(http.MethodGet, target, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !mtime.IsZero() {
|
|
req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat))
|
|
}
|
|
if c.bearer != "" {
|
|
req.Header.Set("Authorization", "Bearer "+c.bearer)
|
|
}
|
|
resp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
switch resp.StatusCode {
|
|
case http.StatusNotModified:
|
|
return
|
|
case http.StatusOK:
|
|
if !c.responseCacheable(resp) {
|
|
return
|
|
}
|
|
if err := c.persistOnly(resp, urlPath); err != nil {
|
|
slog.Debug("revalidate persist error", "url", urlPath, "err", err)
|
|
}
|
|
case http.StatusForbidden, http.StatusNotFound:
|
|
if path, ok := c.cachePathFor(urlPath); ok {
|
|
_ = os.Remove(path)
|
|
slog.Info("purged cached entry after upstream 4xx", "url", urlPath, "status", resp.StatusCode)
|
|
}
|
|
}
|
|
}
|
|
|
|
// persistOnly writes resp.Body to the cache without forwarding it
|
|
// anywhere. Used by revalidate (the user's request was already served
|
|
// from disk; we just refresh the cache in the background).
|
|
func (c *Cache) persistOnly(resp *http.Response, urlPath string) error {
|
|
finalPath, ok := c.cachePathFor(urlPath)
|
|
if !ok {
|
|
_, _ = io.Copy(io.Discard, resp.Body)
|
|
return nil
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
|
|
_, _ = io.Copy(io.Discard, resp.Body)
|
|
return err
|
|
}
|
|
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
|
|
if err != nil {
|
|
_, _ = io.Copy(io.Discard, resp.Body)
|
|
return err
|
|
}
|
|
tmpName := tmp.Name()
|
|
if _, err := io.Copy(tmp, resp.Body); err != nil {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
return err
|
|
}
|
|
if err := tmp.Close(); err != nil {
|
|
_ = os.Remove(tmpName)
|
|
return err
|
|
}
|
|
if lm := resp.Header.Get("Last-Modified"); lm != "" {
|
|
if t, err := http.ParseTime(lm); err == nil {
|
|
_ = os.Chtimes(tmpName, t, t)
|
|
}
|
|
}
|
|
return os.Rename(tmpName, finalPath)
|
|
}
|
|
|
|
// cachePathForURI is cachePathFor with the query string stripped —
|
|
// the cache is keyed by path only. Used by the outbox to map a
|
|
// queued write back to its cached file.
|
|
func (c *Cache) cachePathForURI(rawURI string) (string, bool) {
|
|
if i := strings.Index(rawURI, "?"); i >= 0 {
|
|
rawURI = rawURI[:i]
|
|
}
|
|
return c.cachePathFor(rawURI)
|
|
}
|
|
|
|
// cachePathFor maps a URL path to a local filesystem path under the
|
|
// cache root. Returns ok=false on inputs that would escape the root,
|
|
// reserve a marker filename, or otherwise be unsafe to write.
|
|
func (c *Cache) cachePathFor(urlPath string) (string, bool) {
|
|
if urlPath == "" || urlPath == "/" {
|
|
return "", false
|
|
}
|
|
if strings.Contains(urlPath, "..") {
|
|
return "", false
|
|
}
|
|
clean := filepath.FromSlash(strings.TrimPrefix(urlPath, "/"))
|
|
abs := filepath.Join(c.root, clean)
|
|
if !strings.HasPrefix(abs, c.root+string(filepath.Separator)) && abs != c.root {
|
|
return "", false
|
|
}
|
|
// Don't let URLs collide with internal markers.
|
|
if filepath.Base(abs) == MarkerFile {
|
|
return "", false
|
|
}
|
|
return abs, true
|
|
}
|
|
|
|
// maybeWriteMarker writes the .zddc-upstream provenance file once,
|
|
// the first time the cache stores anything. Best-effort: an error
|
|
// here doesn't fail the request.
|
|
func (c *Cache) maybeWriteMarker() {
|
|
c.markerOnce.Do(func() {
|
|
marker := filepath.Join(c.root, MarkerFile)
|
|
if _, err := os.Stat(marker); err == nil {
|
|
return
|
|
}
|
|
body := fmt.Sprintf("upstream: %s\nfirst_cached: %s\nmode: %s\n",
|
|
c.upstream, time.Now().UTC().Format(time.RFC3339), c.mode)
|
|
_ = os.WriteFile(marker, []byte(body), 0o644)
|
|
})
|
|
}
|
|
|
|
// handleWrite proxies a write request (PUT / POST / DELETE) to
|
|
// upstream. On a transport error, the request is captured in the
|
|
// outbox (if configured) and the client gets 202 Accepted with a
|
|
// JSON envelope describing the queued entry. Online HTTP errors
|
|
// (4xx/5xx from the master) are forwarded verbatim.
|
|
//
|
|
// PUT/DELETE include an If-Unmodified-Since precondition derived
|
|
// from the local cache file's mtime so the master can reject the
|
|
// write if its file changed since we observed it. POST never sends
|
|
// a precondition (POST semantics are application-defined).
|
|
func (c *Cache) handleWrite(w http.ResponseWriter, r *http.Request) {
|
|
// Capture base mtime for PUT/DELETE so replay can use the same
|
|
// precondition we'd send live.
|
|
var baseModTime time.Time
|
|
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
|
|
if path, ok := c.cachePathFor(r.URL.Path); ok {
|
|
if info, err := os.Stat(path); err == nil && !info.IsDir() {
|
|
baseModTime = info.ModTime()
|
|
}
|
|
}
|
|
}
|
|
|
|
// We may need to send the body to upstream AND then queue it on
|
|
// network failure. Buffer up to MaxOutboxBodyBytes so we can
|
|
// rewind. Larger requests stream straight through (no offline
|
|
// recovery for those; the user just gets 503).
|
|
bodyBytes, bodyErr := readBoundedBody(r)
|
|
if bodyErr != nil {
|
|
http.Error(w, "Bad Request: "+bodyErr.Error(), http.StatusRequestEntityTooLarge)
|
|
return
|
|
}
|
|
|
|
upReq, err := c.buildUpstreamWriteRequest(r, bodyBytes, baseModTime)
|
|
if err != nil {
|
|
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
resp, err := c.client.Do(upReq)
|
|
if err != nil {
|
|
// Transport error → queue if outbox is available.
|
|
if c.outbox != nil {
|
|
r.Body = io.NopCloser(bytesReader(bodyBytes))
|
|
entry, qErr := c.outbox.Enqueue(r, baseModTime)
|
|
if qErr != nil {
|
|
slog.Warn("outbox enqueue failed", "err", qErr)
|
|
w.Header().Set(HeaderName, "offline")
|
|
http.Error(w, "Service Unavailable: upstream unreachable and outbox unavailable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
w.Header().Set(HeaderName, "queued")
|
|
w.WriteHeader(http.StatusAccepted)
|
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
|
"queued": true,
|
|
"outbox_id": entry.ID,
|
|
"method": entry.Method,
|
|
"uri": entry.RawURI,
|
|
"queued_at": entry.QueuedAt,
|
|
"base_mtime": entry.BaseModTime,
|
|
})
|
|
return
|
|
}
|
|
w.Header().Set(HeaderName, "offline")
|
|
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// Forward upstream's response verbatim.
|
|
for k, vv := range resp.Header {
|
|
if isHopByHop(k) {
|
|
continue
|
|
}
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
w.Header().Set(HeaderName, "proxy")
|
|
w.WriteHeader(resp.StatusCode)
|
|
_, _ = io.Copy(w, resp.Body)
|
|
|
|
// On a successful write, drop the cached entry so the next read
|
|
// fetches fresh upstream content (which now includes the user's
|
|
// change). For PUT we could be smarter (write the new body
|
|
// directly to cache) but eviction is simplest.
|
|
if c.persist && resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
|
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
|
|
if path, ok := c.cachePathFor(r.URL.Path); ok {
|
|
_ = os.Remove(path)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// buildUpstreamWriteRequest constructs the outbound write request
|
|
// with the buffered body, content-type passthrough, bearer, and
|
|
// (for PUT/DELETE with a base mtime) an If-Unmodified-Since
|
|
// precondition.
|
|
func (c *Cache) buildUpstreamWriteRequest(r *http.Request, body []byte, baseModTime time.Time) (*http.Request, error) {
|
|
target := c.upstream + r.URL.RequestURI()
|
|
upReq, err := http.NewRequestWithContext(r.Context(), r.Method, target, bytesReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ct := r.Header.Get("Content-Type"); ct != "" {
|
|
upReq.Header.Set("Content-Type", ct)
|
|
}
|
|
if !baseModTime.IsZero() && (r.Method == http.MethodPut || r.Method == http.MethodDelete) {
|
|
upReq.Header.Set("If-Unmodified-Since", baseModTime.UTC().Format(http.TimeFormat))
|
|
}
|
|
if c.bearer != "" {
|
|
upReq.Header.Set("Authorization", "Bearer "+c.bearer)
|
|
}
|
|
upReq.ContentLength = int64(len(body))
|
|
return upReq, nil
|
|
}
|
|
|
|
// readBoundedBody slurps the request body up to MaxOutboxBodyBytes.
|
|
// Errors out (413) if the body exceeds the cap so we never partial-
|
|
// queue. Returns nil for empty bodies.
|
|
func readBoundedBody(r *http.Request) ([]byte, error) {
|
|
if r.Body == nil {
|
|
return nil, nil
|
|
}
|
|
defer r.Body.Close()
|
|
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
|
|
return io.ReadAll(limited)
|
|
}
|
|
|
|
// bytesReader is a tiny helper that returns a *bytes.Reader without
|
|
// importing bytes everywhere — keeps build/grep noise down.
|
|
func bytesReader(b []byte) *bytesReaderType { return &bytesReaderType{r: b} }
|
|
|
|
type bytesReaderType struct {
|
|
r []byte
|
|
i int
|
|
}
|
|
|
|
func (b *bytesReaderType) Read(p []byte) (int, error) {
|
|
if b.i >= len(b.r) {
|
|
return 0, io.EOF
|
|
}
|
|
n := copy(p, b.r[b.i:])
|
|
b.i += n
|
|
return n, nil
|
|
}
|
|
|
|
// listingFormat collapses an Accept header to "json" or "html". Mirrors
|
|
// the master's content-negotiation: anything Accept-ing application/
|
|
// json wins JSON, otherwise HTML.
|
|
func listingFormat(accept string) string {
|
|
if strings.Contains(strings.ToLower(accept), "application/json") {
|
|
return "json"
|
|
}
|
|
return "html"
|
|
}
|
|
|
|
// listingCachePathFor returns the on-disk sidecar path for a cached
|
|
// directory listing. urlPath is the directory URL (always ending in
|
|
// "/"); accept is the request's Accept header.
|
|
func (c *Cache) listingCachePathFor(urlPath, accept string) (string, bool) {
|
|
if !strings.HasSuffix(urlPath, "/") {
|
|
return "", false
|
|
}
|
|
if strings.Contains(urlPath, "..") {
|
|
return "", false
|
|
}
|
|
rel := filepath.FromSlash(strings.Trim(urlPath, "/"))
|
|
dir := filepath.Join(c.root, rel)
|
|
if !strings.HasPrefix(dir, c.root) {
|
|
return "", false
|
|
}
|
|
return filepath.Join(dir, listingCachePrefix+listingFormat(accept)), true
|
|
}
|
|
|
|
// serveListingFromDisk serves a cached directory listing. Like
|
|
// serveFromDisk but always sets Content-Type from the format suffix
|
|
// (the master's headers may have been gzip-stripped or otherwise
|
|
// massaged by the time we cached them).
|
|
func (c *Cache) serveListingFromDisk(w http.ResponseWriter, r *http.Request, path string, info os.FileInfo, cacheState string) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
if strings.HasSuffix(path, ".json") {
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
} else {
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
}
|
|
w.Header().Set(HeaderName, cacheState)
|
|
http.ServeContent(w, r, filepath.Base(path), info.ModTime(), f)
|
|
}
|
|
|
|
// proxyDirectory is proxy() with directory-specific persistence: the
|
|
// listing sidecar (selected by Accept) is the cache target instead of
|
|
// the literal URL path. Falls back to serving stale on network error.
|
|
func (c *Cache) proxyDirectory(w http.ResponseWriter, r *http.Request, writeToCache bool) {
|
|
upReq, err := c.buildUpstreamRequest(r)
|
|
if err != nil {
|
|
http.Error(w, "Bad Request: "+err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
resp, err := c.client.Do(upReq)
|
|
if err != nil {
|
|
if writeToCache && r.Method == http.MethodGet {
|
|
if path, ok := c.listingCachePathFor(r.URL.Path, r.Header.Get("Accept")); ok {
|
|
if info, sErr := os.Stat(path); sErr == nil && !info.IsDir() {
|
|
c.serveListingFromDisk(w, r, path, info, "offline")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
slog.Warn("upstream directory fetch failed", "url", upReq.URL.String(), "err", err)
|
|
w.Header().Set(HeaderName, "offline")
|
|
http.Error(w, "Service Unavailable: upstream unreachable", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
for k, vv := range resp.Header {
|
|
if isHopByHop(k) {
|
|
continue
|
|
}
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
cacheable := writeToCache && resp.StatusCode == http.StatusOK && c.responseCacheable(resp)
|
|
if cacheable {
|
|
w.Header().Set(HeaderName, "miss")
|
|
} else {
|
|
w.Header().Set(HeaderName, "proxy")
|
|
}
|
|
w.WriteHeader(resp.StatusCode)
|
|
if r.Method == http.MethodHead || resp.StatusCode == http.StatusNotModified {
|
|
return
|
|
}
|
|
if !cacheable {
|
|
_, _ = io.Copy(w, resp.Body)
|
|
return
|
|
}
|
|
if err := c.streamAndPersistListing(w, resp, r.URL.Path, r.Header.Get("Accept")); err != nil {
|
|
slog.Debug("listing stream-and-persist error", "url", r.URL.Path, "err", err)
|
|
} else {
|
|
c.maybeWriteMarker()
|
|
}
|
|
}
|
|
|
|
// streamAndPersistListing is streamAndPersist for directory listings,
|
|
// resolving the cache target via listingCachePathFor (Accept-aware).
|
|
func (c *Cache) streamAndPersistListing(w http.ResponseWriter, resp *http.Response, urlPath, accept string) error {
|
|
finalPath, ok := c.listingCachePathFor(urlPath, accept)
|
|
if !ok {
|
|
_, err := io.Copy(w, resp.Body)
|
|
return err
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(finalPath), 0o755); err != nil {
|
|
_, copyErr := io.Copy(w, resp.Body)
|
|
if copyErr != nil {
|
|
return copyErr
|
|
}
|
|
return err
|
|
}
|
|
tmp, err := os.CreateTemp(filepath.Dir(finalPath), ".zddc-cache-tmp-*")
|
|
if err != nil {
|
|
_, copyErr := io.Copy(w, resp.Body)
|
|
if copyErr != nil {
|
|
return copyErr
|
|
}
|
|
return err
|
|
}
|
|
tmpName := tmp.Name()
|
|
mw := io.MultiWriter(tmp, w)
|
|
if _, err := io.Copy(mw, resp.Body); err != nil {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
return err
|
|
}
|
|
if err := tmp.Close(); err != nil {
|
|
_ = os.Remove(tmpName)
|
|
return err
|
|
}
|
|
if lm := resp.Header.Get("Last-Modified"); lm != "" {
|
|
if t, err := http.ParseTime(lm); err == nil {
|
|
_ = os.Chtimes(tmpName, t, t)
|
|
}
|
|
}
|
|
return os.Rename(tmpName, finalPath)
|
|
}
|
|
|
|
// revalidateListing is the listing-cache analogue of revalidate.
|
|
func (c *Cache) revalidateListing(urlPath, accept string, mtime time.Time) {
|
|
target := c.upstream + urlPath
|
|
req, err := http.NewRequest(http.MethodGet, target, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !mtime.IsZero() {
|
|
req.Header.Set("If-Modified-Since", mtime.UTC().Format(http.TimeFormat))
|
|
}
|
|
if accept != "" {
|
|
req.Header.Set("Accept", accept)
|
|
}
|
|
if c.bearer != "" {
|
|
req.Header.Set("Authorization", "Bearer "+c.bearer)
|
|
}
|
|
resp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
switch resp.StatusCode {
|
|
case http.StatusNotModified:
|
|
return
|
|
case http.StatusOK:
|
|
if !c.responseCacheable(resp) {
|
|
return
|
|
}
|
|
path, ok := c.listingCachePathFor(urlPath, accept)
|
|
if !ok {
|
|
return
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
|
return
|
|
}
|
|
tmp, err := os.CreateTemp(filepath.Dir(path), ".zddc-cache-tmp-*")
|
|
if err != nil {
|
|
return
|
|
}
|
|
tmpName := tmp.Name()
|
|
if _, err := io.Copy(tmp, resp.Body); err != nil {
|
|
_ = tmp.Close()
|
|
_ = os.Remove(tmpName)
|
|
return
|
|
}
|
|
_ = tmp.Close()
|
|
if lm := resp.Header.Get("Last-Modified"); lm != "" {
|
|
if t, err := http.ParseTime(lm); err == nil {
|
|
_ = os.Chtimes(tmpName, t, t)
|
|
}
|
|
}
|
|
_ = os.Rename(tmpName, path)
|
|
case http.StatusForbidden, http.StatusNotFound:
|
|
if path, ok := c.listingCachePathFor(urlPath, accept); ok {
|
|
_ = os.Remove(path)
|
|
}
|
|
}
|
|
}
|
|
|
|
// isHopByHop reports whether a header name is hop-by-hop per RFC 7230
|
|
// §6.1 — these must not be forwarded by a proxy.
|
|
func isHopByHop(name string) bool {
|
|
switch http.CanonicalHeaderKey(name) {
|
|
case "Connection",
|
|
"Keep-Alive",
|
|
"Proxy-Authenticate",
|
|
"Proxy-Authorization",
|
|
"Te",
|
|
"Trailer",
|
|
"Transfer-Encoding",
|
|
"Upgrade":
|
|
return true
|
|
}
|
|
return false
|
|
}
|