ZDDC/zddc/internal/archive/watcher.go
2026-06-11 13:32:31 -05:00

136 lines
3.1 KiB
Go

package archive
import (
"context"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
"codeberg.org/VARASYS/ZDDC/zddc/internal/zddc"
"github.com/fsnotify/fsnotify"
)
// Watcher watches fsRoot for filesystem changes and updates the archive index
// and the zddc ACL policy cache.
type Watcher struct {
fsRoot string
idx *Index
watcher *fsnotify.Watcher
// Debounce: pending dir → timer
mu sync.Mutex
pending map[string]*time.Timer
}
// NewWatcher creates a new Watcher. Call Start to begin watching.
func NewWatcher(fsRoot string, idx *Index) (*Watcher, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &Watcher{
fsRoot: fsRoot,
idx: idx,
watcher: w,
pending: make(map[string]*time.Timer),
}, nil
}
// Start begins watching and blocks until ctx is cancelled.
// It registers the entire directory tree under fsRoot.
func (w *Watcher) Start(ctx context.Context) error {
// Walk and register all directories
if err := filepath.WalkDir(w.fsRoot, func(path string, d os.DirEntry, err error) error {
if err != nil || !d.IsDir() {
return nil
}
return w.watcher.Add(path)
}); err != nil {
return err
}
go func() {
defer w.watcher.Close()
for {
select {
case <-ctx.Done():
return
case event, ok := <-w.watcher.Events:
if !ok {
return
}
w.handleEvent(event)
case err, ok := <-w.watcher.Errors:
if !ok {
return
}
slog.Warn("fsnotify error", "err", err)
}
}
}()
<-ctx.Done()
return nil
}
func (w *Watcher) handleEvent(event fsnotify.Event) {
path := event.Name
base := filepath.Base(path)
// New directory created — register it
if event.Has(fsnotify.Create) {
if info, err := os.Stat(path); err == nil && info.IsDir() {
_ = w.watcher.Add(path)
}
}
// .zddc file changed — invalidate ACL policy cache for the chain rooted
// here AND the global scan cache (which lists every directory containing
// a .zddc file). The scan cache is fsRoot-keyed and only changes when
// .zddc files are added/removed, but distinguishing modify from
// create/remove in fsnotify is fragile so we just always invalidate.
if base == ".zddc" {
dir := filepath.Dir(path)
zddc.InvalidateCache(dir)
zddc.InvalidateScanCache()
return
}
// Skip dot-files
if strings.HasPrefix(base, ".") {
return
}
// For transmittal folder events, schedule a debounced index update
dirPath := filepath.Dir(path)
dirName := filepath.Base(dirPath)
if _, _, _, _, ok := zddc.ParseTransmittalFolder(dirName); ok {
w.scheduleIndexUpdate(dirPath)
}
}
// scheduleIndexUpdate debounces an index update for dirPath (2-second delay).
func (w *Watcher) scheduleIndexUpdate(dirPath string) {
w.mu.Lock()
defer w.mu.Unlock()
if t, ok := w.pending[dirPath]; ok {
t.Reset(2 * time.Second)
return
}
w.pending[dirPath] = time.AfterFunc(2*time.Second, func() {
w.mu.Lock()
delete(w.pending, dirPath)
w.mu.Unlock()
if err := w.idx.UpdateFromDir(w.fsRoot, dirPath); err != nil {
slog.Warn("archive index update failed", "dir", dirPath, "err", err)
}
})
}