package archive import ( "context" "log/slog" "os" "path/filepath" "strings" "sync" "time" "codeberg.org/VARASYS/ZDDC/zddc/internal/zddc" "github.com/fsnotify/fsnotify" ) // Watcher watches fsRoot for filesystem changes and updates the archive index // and the zddc ACL policy cache. type Watcher struct { fsRoot string idx *Index watcher *fsnotify.Watcher // Debounce: pending dir → timer mu sync.Mutex pending map[string]*time.Timer } // NewWatcher creates a new Watcher. Call Start to begin watching. func NewWatcher(fsRoot string, idx *Index) (*Watcher, error) { w, err := fsnotify.NewWatcher() if err != nil { return nil, err } return &Watcher{ fsRoot: fsRoot, idx: idx, watcher: w, pending: make(map[string]*time.Timer), }, nil } // Start begins watching and blocks until ctx is cancelled. // It registers the entire directory tree under fsRoot. func (w *Watcher) Start(ctx context.Context) error { // Walk and register all directories if err := filepath.WalkDir(w.fsRoot, func(path string, d os.DirEntry, err error) error { if err != nil || !d.IsDir() { return nil } return w.watcher.Add(path) }); err != nil { return err } go func() { defer w.watcher.Close() for { select { case <-ctx.Done(): return case event, ok := <-w.watcher.Events: if !ok { return } w.handleEvent(event) case err, ok := <-w.watcher.Errors: if !ok { return } slog.Warn("fsnotify error", "err", err) } } }() <-ctx.Done() return nil } func (w *Watcher) handleEvent(event fsnotify.Event) { path := event.Name base := filepath.Base(path) // New directory created — register it if event.Has(fsnotify.Create) { if info, err := os.Stat(path); err == nil && info.IsDir() { _ = w.watcher.Add(path) } } // .zddc file changed — invalidate ACL policy cache if base == ".zddc" { dir := filepath.Dir(path) zddc.InvalidateCache(dir) return } // Skip dot-files if strings.HasPrefix(base, ".") { return } // For transmittal folder events, schedule a debounced index update dirPath := filepath.Dir(path) dirName := filepath.Base(dirPath) if transmittalFolderRE.MatchString(dirName) { w.scheduleIndexUpdate(dirPath) } } // scheduleIndexUpdate debounces an index update for dirPath (2-second delay). func (w *Watcher) scheduleIndexUpdate(dirPath string) { w.mu.Lock() defer w.mu.Unlock() if t, ok := w.pending[dirPath]; ok { t.Reset(2 * time.Second) return } w.pending[dirPath] = time.AfterFunc(2*time.Second, func() { w.mu.Lock() delete(w.pending, dirPath) w.mu.Unlock() if err := w.idx.UpdateFromDir(w.fsRoot, dirPath); err != nil { slog.Warn("archive index update failed", "dir", dirPath, "err", err) } }) }