455 lines
14 KiB
Go
455 lines
14 KiB
Go
package cache
|
||
|
||
import (
|
||
"context"
|
||
"crypto/rand"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"sort"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// OutboxDir is the leaf directory under the cache root where queued
|
||
// offline writes are persisted. Each entry is itself a directory:
|
||
//
|
||
// <cache-root>/.zddc-outbox/<id>/meta.json — request metadata
|
||
// <cache-root>/.zddc-outbox/<id>/body.bin — request body
|
||
//
|
||
// Conflicted replays (412 Precondition Failed) are renamed to
|
||
//
|
||
// <cache-root>/.zddc-outbox/<id>.conflict-<rfc3339>/
|
||
//
|
||
// Operators clear the conflict directory after manually reconciling.
|
||
const OutboxDir = ".zddc-outbox"
|
||
|
||
// outboxConflictPrefix is appended to a conflict-renamed entry. The
|
||
// timestamp is RFC3339 (filename-safe — uses ":" via percent-encoding
|
||
// when written to the filename). The full literal is
|
||
// `.conflict-<RFC3339>` and is grep-friendly across the whole tree.
|
||
const outboxConflictPrefix = ".conflict-"
|
||
|
||
// MaxOutboxBodyBytes caps the per-entry body size persisted to disk.
|
||
// Larger writes still go through online — only the offline-queue path
|
||
// has this limit. Default 256 MiB matches MaxWriteBytes on master.
|
||
const MaxOutboxBodyBytes = 256 * 1024 * 1024
|
||
|
||
// ReplayInterval governs how often the background loop attempts
|
||
// replay when the outbox is non-empty. Empty-outbox iterations sleep
|
||
// 10× longer so an idle client doesn't spin.
|
||
const (
|
||
ReplayInterval = 30 * time.Second
|
||
ReplayIdleInterval = 5 * time.Minute
|
||
)
|
||
|
||
// OutboxEntry is the persisted shape of a queued write. The body
|
||
// lives in a sibling file (body.bin) so it can be streamed back to
|
||
// upstream at replay time without buffering the whole thing in
|
||
// memory.
|
||
type OutboxEntry struct {
|
||
// ID is the directory name. Sortable by lexical order so replays
|
||
// happen in queue order: <unix-nano>-<random>.
|
||
ID string `json:"id"`
|
||
|
||
// Method, RawURI, ContentType identify the request.
|
||
Method string `json:"method"`
|
||
RawURI string `json:"raw_uri"`
|
||
ContentType string `json:"content_type,omitempty"`
|
||
|
||
// BaseModTime is the local cache file's mtime at queue time. Used
|
||
// at replay to send If-Unmodified-Since so the master rejects the
|
||
// write (412) if its file changed since we observed it. Zero =
|
||
// no precondition (e.g. PUT to a path with no prior cache).
|
||
BaseModTime time.Time `json:"base_mod_time,omitempty"`
|
||
|
||
QueuedAt time.Time `json:"queued_at"`
|
||
}
|
||
|
||
// Outbox owns the on-disk write-queue + the background replay loop.
|
||
// Constructed from cache.New when client mode is active; nil
|
||
// otherwise. Safe for concurrent Enqueue calls.
|
||
type Outbox struct {
|
||
cache *Cache
|
||
dir string
|
||
mu sync.Mutex // serializes file creation; replay holds it briefly per-entry
|
||
|
||
// wake is a non-blocking signal channel. Enqueue posts to it
|
||
// (drop-on-full) so RunReplayLoop reacts to new entries within
|
||
// ms, not after its idle-poll interval. Buffered=1 so a burst of
|
||
// enqueues collapses to a single wake.
|
||
wake chan struct{}
|
||
}
|
||
|
||
// NewOutbox opens (and creates if missing) the outbox directory under
|
||
// cfg.Root. Mode 0700 — single-user-trust at the FS layer.
|
||
func NewOutbox(c *Cache) (*Outbox, error) {
|
||
dir := filepath.Join(c.root, OutboxDir)
|
||
if err := os.MkdirAll(dir, 0o700); err != nil {
|
||
return nil, fmt.Errorf("create outbox dir: %w", err)
|
||
}
|
||
_ = os.Chmod(dir, 0o700)
|
||
return &Outbox{cache: c, dir: dir, wake: make(chan struct{}, 1)}, nil
|
||
}
|
||
|
||
// Dir exposes the on-disk path for tests / diagnostics.
|
||
func (o *Outbox) Dir() string { return o.dir }
|
||
|
||
// Enqueue persists the request to disk so it can be replayed when
|
||
// upstream is reachable again. Returns the created entry. The body
|
||
// reader is fully consumed; callers should not assume r.Body is
|
||
// readable afterwards.
|
||
//
|
||
// baseModTime is the local cache file's mtime at the moment we
|
||
// decided to queue (zero when none — e.g. PUT to a never-cached
|
||
// path). It becomes the If-Unmodified-Since precondition at replay.
|
||
func (o *Outbox) Enqueue(r *http.Request, baseModTime time.Time) (*OutboxEntry, error) {
|
||
o.mu.Lock()
|
||
defer o.mu.Unlock()
|
||
|
||
id, err := newOutboxID()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("outbox id: %w", err)
|
||
}
|
||
entryDir := filepath.Join(o.dir, id)
|
||
if err := os.MkdirAll(entryDir, 0o700); err != nil {
|
||
return nil, fmt.Errorf("create entry dir: %w", err)
|
||
}
|
||
|
||
// Body first — failure cleans up the empty entry dir.
|
||
bodyPath := filepath.Join(entryDir, "body.bin")
|
||
bf, err := os.OpenFile(bodyPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
|
||
if err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, fmt.Errorf("create body file: %w", err)
|
||
}
|
||
limited := http.MaxBytesReader(nil, r.Body, MaxOutboxBodyBytes)
|
||
if _, err := io.Copy(bf, limited); err != nil {
|
||
_ = bf.Close()
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, fmt.Errorf("save body: %w", err)
|
||
}
|
||
if err := bf.Close(); err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, err
|
||
}
|
||
|
||
entry := OutboxEntry{
|
||
ID: id,
|
||
Method: r.Method,
|
||
RawURI: r.URL.RequestURI(),
|
||
ContentType: r.Header.Get("Content-Type"),
|
||
BaseModTime: baseModTime,
|
||
QueuedAt: time.Now().UTC().Truncate(time.Second),
|
||
}
|
||
metaBytes, err := json.MarshalIndent(entry, "", " ")
|
||
if err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, err
|
||
}
|
||
metaPath := filepath.Join(entryDir, "meta.json")
|
||
if err := writeFileAtomic(metaPath, metaBytes, 0o600); err != nil {
|
||
_ = os.RemoveAll(entryDir)
|
||
return nil, err
|
||
}
|
||
// Signal the replay loop. Non-blocking — buffered=1 collapses
|
||
// a burst of enqueues to a single wake. If the loop is busy
|
||
// replaying, our entry will be picked up on the next pass.
|
||
select {
|
||
case o.wake <- struct{}{}:
|
||
default:
|
||
}
|
||
return &entry, nil
|
||
}
|
||
|
||
// Pending returns the queued entries in lexical-ID order (which is
|
||
// queue order, since IDs lead with unix-nano). Entries that fail to
|
||
// parse are skipped. Conflict-renamed entries are excluded.
|
||
func (o *Outbox) Pending() ([]OutboxEntry, error) {
|
||
dirEntries, err := os.ReadDir(o.dir)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
return nil, nil
|
||
}
|
||
return nil, err
|
||
}
|
||
var ids []string
|
||
for _, e := range dirEntries {
|
||
if !e.IsDir() {
|
||
continue
|
||
}
|
||
name := e.Name()
|
||
if isConflictName(name) {
|
||
continue
|
||
}
|
||
ids = append(ids, name)
|
||
}
|
||
sort.Strings(ids)
|
||
out := make([]OutboxEntry, 0, len(ids))
|
||
for _, id := range ids {
|
||
entry, err := o.loadEntry(id)
|
||
if err != nil {
|
||
slog.Debug("outbox: skip unreadable entry", "id", id, "err", err)
|
||
continue
|
||
}
|
||
out = append(out, *entry)
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// loadEntry reads + parses meta.json for the given ID.
|
||
func (o *Outbox) loadEntry(id string) (*OutboxEntry, error) {
|
||
metaPath := filepath.Join(o.dir, id, "meta.json")
|
||
bytes, err := os.ReadFile(metaPath)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
var entry OutboxEntry
|
||
if err := json.Unmarshal(bytes, &entry); err != nil {
|
||
return nil, err
|
||
}
|
||
return &entry, nil
|
||
}
|
||
|
||
// Replay iterates Pending() in order and fires each entry against
|
||
// upstream. Returns the count of successfully-replayed entries and
|
||
// the count of conflict-renamed entries. Network errors leave the
|
||
// entry in place for the next iteration.
|
||
func (o *Outbox) Replay(ctx context.Context) (replayed, conflicts int, err error) {
|
||
pending, err := o.Pending()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
for _, entry := range pending {
|
||
if ctx.Err() != nil {
|
||
return replayed, conflicts, ctx.Err()
|
||
}
|
||
outcome, err := o.replayOne(ctx, entry)
|
||
switch outcome {
|
||
case replayDone:
|
||
replayed++
|
||
case replayConflict:
|
||
conflicts++
|
||
case replayDefer:
|
||
// transient — leave entry in place
|
||
if err != nil {
|
||
slog.Debug("outbox: defer entry", "id", entry.ID, "err", err)
|
||
}
|
||
}
|
||
}
|
||
return replayed, conflicts, nil
|
||
}
|
||
|
||
type replayOutcome int
|
||
|
||
const (
|
||
replayDone replayOutcome = iota
|
||
replayConflict
|
||
replayDefer
|
||
)
|
||
|
||
// replayOne fires a single outbox entry. Outcomes:
|
||
//
|
||
// - 2xx → entry directory deleted (replayDone).
|
||
// - 412 Precondition Failed → entry renamed to <id>.conflict-<ts>/
|
||
// (replayConflict). Operator manually reconciles.
|
||
// - 4xx other than 412 → entry deleted (replayDone with a warn log).
|
||
// The master rejected the request for a reason other than
|
||
// concurrency; retrying won't help.
|
||
// - 5xx, network errors → entry left in place (replayDefer).
|
||
func (o *Outbox) replayOne(ctx context.Context, entry OutboxEntry) (replayOutcome, error) {
|
||
bodyPath := filepath.Join(o.dir, entry.ID, "body.bin")
|
||
bf, err := os.Open(bodyPath)
|
||
if err != nil {
|
||
// Body missing — entry is malformed. Move to conflict so we
|
||
// don't loop on it forever.
|
||
_ = o.markConflict(entry.ID)
|
||
return replayConflict, fmt.Errorf("open body: %w", err)
|
||
}
|
||
defer bf.Close()
|
||
|
||
target := o.cache.upstream + entry.RawURI
|
||
req, err := http.NewRequestWithContext(ctx, entry.Method, target, bf)
|
||
if err != nil {
|
||
_ = o.markConflict(entry.ID)
|
||
return replayConflict, err
|
||
}
|
||
if entry.ContentType != "" {
|
||
req.Header.Set("Content-Type", entry.ContentType)
|
||
}
|
||
if !entry.BaseModTime.IsZero() {
|
||
req.Header.Set("If-Unmodified-Since", entry.BaseModTime.UTC().Format(http.TimeFormat))
|
||
}
|
||
if o.cache.bearer != "" {
|
||
req.Header.Set("Authorization", "Bearer "+o.cache.bearer)
|
||
}
|
||
|
||
resp, err := o.cache.client.Do(req)
|
||
if err != nil {
|
||
// Transport / network error — defer.
|
||
return replayDefer, err
|
||
}
|
||
defer resp.Body.Close()
|
||
_, _ = io.Copy(io.Discard, resp.Body) // drain so connection reuses
|
||
|
||
switch {
|
||
case resp.StatusCode >= 200 && resp.StatusCode < 300:
|
||
// Success. For PUT, refresh the local cache from upstream so
|
||
// the user's view stays consistent. (We don't have the new
|
||
// body here; just remove the cached entry so next read
|
||
// fetches fresh.)
|
||
if entry.Method == http.MethodPut || entry.Method == http.MethodDelete {
|
||
if path, ok := o.cache.cachePathForURI(entry.RawURI); ok {
|
||
_ = os.Remove(path)
|
||
}
|
||
}
|
||
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
|
||
slog.Info("outbox: replayed", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
|
||
return replayDone, nil
|
||
|
||
case resp.StatusCode == http.StatusPreconditionFailed:
|
||
// Conflict — base version on master changed since we observed.
|
||
_ = o.markConflict(entry.ID)
|
||
slog.Warn("outbox: conflict", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI)
|
||
return replayConflict, nil
|
||
|
||
case resp.StatusCode >= 400 && resp.StatusCode < 500:
|
||
// Permanent rejection (auth, bad request, etc). Drop the
|
||
// entry — retrying won't help.
|
||
_ = os.RemoveAll(filepath.Join(o.dir, entry.ID))
|
||
slog.Warn("outbox: dropped", "id", entry.ID, "method", entry.Method, "uri", entry.RawURI, "status", resp.StatusCode)
|
||
return replayDone, nil
|
||
|
||
default:
|
||
// 5xx — defer for next iteration.
|
||
return replayDefer, fmt.Errorf("upstream status %d", resp.StatusCode)
|
||
}
|
||
}
|
||
|
||
// markConflict renames the entry directory to <id>.conflict-<ts>/.
|
||
// Idempotent: if the rename target already exists, a counter is
|
||
// appended. Best-effort.
|
||
func (o *Outbox) markConflict(id string) error {
|
||
src := filepath.Join(o.dir, id)
|
||
ts := time.Now().UTC().Format("20060102T150405Z")
|
||
dst := filepath.Join(o.dir, id+outboxConflictPrefix+ts)
|
||
if _, err := os.Stat(dst); err == nil {
|
||
dst = dst + "-" + shortRandom()
|
||
}
|
||
return os.Rename(src, dst)
|
||
}
|
||
|
||
// RunReplayLoop is the background goroutine that drives Replay on a
|
||
// schedule. Stops when ctx is cancelled. Safe to call once per
|
||
// process; not designed for multiple concurrent loops over the same
|
||
// outbox.
|
||
//
|
||
// Wakes on three signals:
|
||
// - timer (ReplayInterval when entries pending, ReplayIdleInterval when idle)
|
||
// - Enqueue posting to o.wake (instant reaction to new offline writes)
|
||
// - context cancel (shutdown)
|
||
func (o *Outbox) RunReplayLoop(ctx context.Context) {
|
||
// Eagerly attempt replay at startup so a re-launched client
|
||
// catches up before any user request fires.
|
||
o.tryReplay(ctx)
|
||
for {
|
||
interval := ReplayIdleInterval
|
||
pending, err := o.Pending()
|
||
if err == nil && len(pending) > 0 {
|
||
interval = ReplayInterval
|
||
}
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-time.After(interval):
|
||
o.tryReplay(ctx)
|
||
case <-o.wake:
|
||
// New entry arrived while we were idle. Replay
|
||
// immediately rather than waiting for the timer.
|
||
o.tryReplay(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
// tryReplay is a single replay pass with a bounded timeout. Errors
|
||
// logged at debug; per-entry outcomes logged inside Replay/replayOne.
|
||
// The pass itself logs at info when it runs against >0 entries so an
|
||
// operator watching the log sees the replay loop is alive even if
|
||
// every attempt defers.
|
||
func (o *Outbox) tryReplay(ctx context.Context) {
|
||
tCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
||
defer cancel()
|
||
pending, _ := o.Pending()
|
||
if len(pending) > 0 {
|
||
slog.Info("outbox: replay attempt", "pending", len(pending))
|
||
}
|
||
replayed, conflicts, err := o.Replay(tCtx)
|
||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||
slog.Debug("outbox: replay error", "err", err)
|
||
}
|
||
if replayed > 0 || conflicts > 0 {
|
||
slog.Info("outbox: replay pass complete", "replayed", replayed, "conflicts", conflicts)
|
||
}
|
||
}
|
||
|
||
// isConflictName reports whether the directory name is an outbox
|
||
// conflict-renamed entry (`<id>.conflict-<ts>` form).
|
||
func isConflictName(name string) bool {
|
||
for i := 0; i+len(outboxConflictPrefix) <= len(name); i++ {
|
||
if name[i:i+len(outboxConflictPrefix)] == outboxConflictPrefix {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// newOutboxID returns a sortable-by-time ID with a random suffix:
|
||
// <unix-nano-base16>-<6-hex>. Lexical sort ≅ chronological for the
|
||
// foreseeable lifetime of the outbox (until ~year 2262).
|
||
func newOutboxID() (string, error) {
|
||
var randBuf [3]byte
|
||
if _, err := rand.Read(randBuf[:]); err != nil {
|
||
return "", err
|
||
}
|
||
return fmt.Sprintf("%016x-%s", time.Now().UnixNano(), hex.EncodeToString(randBuf[:])), nil
|
||
}
|
||
|
||
// shortRandom returns 4 random hex chars. Used to disambiguate
|
||
// conflict-renamed directories that collide on the same RFC3339
|
||
// timestamp.
|
||
func shortRandom() string {
|
||
var b [2]byte
|
||
_, _ = rand.Read(b[:])
|
||
return hex.EncodeToString(b[:])
|
||
}
|
||
|
||
// writeFileAtomic writes data to path via temp + rename in the same
|
||
// directory. Mode applied to the final file.
|
||
func writeFileAtomic(path string, data []byte, mode os.FileMode) error {
|
||
tmp, err := os.CreateTemp(filepath.Dir(path), ".tmp-*")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
tmpName := tmp.Name()
|
||
if _, err := tmp.Write(data); err != nil {
|
||
_ = tmp.Close()
|
||
_ = os.Remove(tmpName)
|
||
return err
|
||
}
|
||
if err := tmp.Chmod(mode); err != nil {
|
||
_ = tmp.Close()
|
||
_ = os.Remove(tmpName)
|
||
return err
|
||
}
|
||
if err := tmp.Close(); err != nil {
|
||
_ = os.Remove(tmpName)
|
||
return err
|
||
}
|
||
return os.Rename(tmpName, path)
|
||
}
|