diff --git a/cmd/litestream/directory_watcher.go b/cmd/litestream/directory_watcher.go new file mode 100644 index 00000000..c3c3bedc --- /dev/null +++ b/cmd/litestream/directory_watcher.go @@ -0,0 +1,395 @@ +package main + +import ( + "context" + "errors" + "log/slog" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/fsnotify/fsnotify" + + "github.com/benbjohnson/litestream" +) + +// DirectoryMonitor watches a directory tree for SQLite databases and dynamically +// manages database instances within the store as files are created or removed. +type DirectoryMonitor struct { + store *litestream.Store + config *DBConfig + dirPath string + pattern string + recursive bool + + watcher *fsnotify.Watcher + ctx context.Context + cancel context.CancelFunc + + logger *slog.Logger + + mu sync.Mutex + dbs map[string]*litestream.DB + watchedDirs map[string]struct{} + + wg sync.WaitGroup +} + +// NewDirectoryMonitor returns a new monitor for directory-based replication. +func NewDirectoryMonitor(ctx context.Context, store *litestream.Store, dbc *DBConfig, existing []*litestream.DB) (*DirectoryMonitor, error) { + if dbc == nil { + return nil, errors.New("directory config required") + } + if store == nil { + return nil, errors.New("store required") + } + + dirPath, err := expand(dbc.Dir) + if err != nil { + return nil, err + } + + if _, err := os.Stat(dirPath); err != nil { + return nil, err + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + monitorCtx, cancel := context.WithCancel(ctx) + dm := &DirectoryMonitor{ + store: store, + config: dbc, + dirPath: dirPath, + pattern: dbc.Pattern, + recursive: dbc.Recursive, + watcher: watcher, + ctx: monitorCtx, + cancel: cancel, + logger: slog.With("dir", dirPath), + dbs: make(map[string]*litestream.DB), + watchedDirs: make(map[string]struct{}), + } + + for _, db := range existing { + dm.dbs[db.Path()] = db + } + + if err := dm.addInitialWatches(); err != nil { + watcher.Close() + cancel() + return nil, err + } + + // Scan for existing databases after watches are set up + dm.scanDirectory(dm.dirPath) + + dm.wg.Add(1) + go dm.run() + + return dm, nil +} + +// Close stops the directory monitor and releases resources. +func (dm *DirectoryMonitor) Close() { + dm.cancel() + _ = dm.watcher.Close() + dm.wg.Wait() +} + +func (dm *DirectoryMonitor) run() { + defer dm.wg.Done() + + for { + select { + case <-dm.ctx.Done(): + return + case event, ok := <-dm.watcher.Events: + if !ok { + return + } + dm.handleEvent(event) + case err, ok := <-dm.watcher.Errors: + if !ok { + return + } + dm.logger.Error("directory watcher error", "error", err) + } + } +} + +func (dm *DirectoryMonitor) addInitialWatches() error { + if dm.recursive { + return filepath.WalkDir(dm.dirPath, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + return nil + } + return dm.addDirectoryWatch(path) + }) + } + + return dm.addDirectoryWatch(dm.dirPath) +} + +func (dm *DirectoryMonitor) addDirectoryWatch(path string) error { + abspath := filepath.Clean(path) + + dm.mu.Lock() + if _, ok := dm.watchedDirs[abspath]; ok { + dm.mu.Unlock() + return nil + } + dm.watchedDirs[abspath] = struct{}{} + dm.mu.Unlock() + + if err := dm.watcher.Add(abspath); err != nil { + dm.mu.Lock() + delete(dm.watchedDirs, abspath) + dm.mu.Unlock() + return err + } + + dm.logger.Debug("watching directory", "path", abspath) + return nil +} + +func (dm *DirectoryMonitor) removeDirectoryWatch(path string) { + abspath := filepath.Clean(path) + + dm.mu.Lock() + if _, ok := dm.watchedDirs[abspath]; !ok { + dm.mu.Unlock() + return + } + delete(dm.watchedDirs, abspath) + dm.mu.Unlock() + + if err := dm.watcher.Remove(abspath); err != nil { + dm.logger.Debug("remove directory watch", "path", abspath, "error", err) + } +} + +func (dm *DirectoryMonitor) handleEvent(event fsnotify.Event) { + path := filepath.Clean(event.Name) + if path == "" { + return + } + + info, statErr := os.Stat(path) + isDir := statErr == nil && info.IsDir() + + // Check if this path was previously a watched directory (needed for removal detection) + dm.mu.Lock() + _, wasWatchedDir := dm.watchedDirs[path] + dm.mu.Unlock() + + // Handle directory creation/rename + // Note: In non-recursive mode, only the root directory is watched. + // Subdirectories are completely ignored, and their databases are not replicated. + // In recursive mode, all subdirectories are watched and scanned. + if isDir && event.Op&(fsnotify.Create|fsnotify.Rename) != 0 { + // Only add watches for: (1) root directory, or (2) subdirectories when recursive=true + if dm.recursive { + if err := dm.addDirectoryWatch(path); err != nil { + dm.logger.Error("add directory watch", "path", path, "error", err) + } + // Scan to catch files created during watch registration race window + dm.scanDirectory(path) + } + } + + // Handle directory removal/rename + // Check both current state (isDir) AND previous state (wasWatchedDir) + // because os.Stat fails for deleted directories, leaving watches orphaned + if (isDir || wasWatchedDir) && event.Op&(fsnotify.Remove|fsnotify.Rename) != 0 { + dm.removeDirectoryWatch(path) + dm.removeDatabasesUnder(path) + return + } + + // If it's still a directory, don't process as a file + if isDir { + return + } + + if statErr != nil && !os.IsNotExist(statErr) { + dm.logger.Debug("stat event path", "path", path, "error", statErr) + return + } + + if event.Op&(fsnotify.Remove|fsnotify.Rename) != 0 { + dm.removeDatabase(path) + return + } + + if event.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Rename) != 0 { + dm.handlePotentialDatabase(path) + } +} + +func (dm *DirectoryMonitor) handlePotentialDatabase(path string) { + if !dm.matchesPattern(path) { + return + } + + dm.mu.Lock() + if _, exists := dm.dbs[path]; exists { + dm.mu.Unlock() + return + } + dm.dbs[path] = nil + dm.mu.Unlock() + + if !IsSQLiteDatabase(path) { + dm.mu.Lock() + delete(dm.dbs, path) + dm.mu.Unlock() + return + } + + db, err := newDBFromDirectoryEntry(dm.config, dm.dirPath, path) + if err != nil { + dm.mu.Lock() + delete(dm.dbs, path) + dm.mu.Unlock() + dm.logger.Error("configure database", "path", path, "error", err) + return + } + + if err := dm.store.AddDB(db); err != nil { + dm.mu.Lock() + delete(dm.dbs, path) + dm.mu.Unlock() + dm.logger.Error("add database to store", "path", path, "error", err) + return + } + + dm.mu.Lock() + dm.dbs[path] = db + dm.mu.Unlock() + + dm.logger.Info("added database to replication", "path", path) +} + +func (dm *DirectoryMonitor) removeDatabase(path string) { + dm.mu.Lock() + db := dm.dbs[path] + dm.mu.Unlock() + + if db == nil { + return + } + + if err := dm.store.RemoveDB(dm.ctx, db.Path()); err != nil { + dm.logger.Error("remove database from store", "path", path, "error", err) + return + } + + dm.mu.Lock() + delete(dm.dbs, path) + dm.mu.Unlock() + + dm.logger.Info("removed database from replication", "path", path) +} + +func (dm *DirectoryMonitor) removeDatabasesUnder(dir string) { + prefix := dir + string(os.PathSeparator) + + dm.mu.Lock() + var toClose []*litestream.DB + var toClosePaths []string + for path, db := range dm.dbs { + if path == dir || strings.HasPrefix(path, prefix) { + toClose = append(toClose, db) + toClosePaths = append(toClosePaths, path) + } + } + dm.mu.Unlock() + + // Remove from store first, only delete from local map on success + for i, db := range toClose { + if db == nil { + continue + } + if err := dm.store.RemoveDB(dm.ctx, db.Path()); err != nil { + dm.logger.Error("remove database from store", "path", db.Path(), "error", err) + continue + } + + // Only remove from local map after successful store removal + dm.mu.Lock() + delete(dm.dbs, toClosePaths[i]) + dm.mu.Unlock() + } +} + +func (dm *DirectoryMonitor) matchesPattern(path string) bool { + matched, err := filepath.Match(dm.pattern, filepath.Base(path)) + if err != nil { + dm.logger.Error("pattern match failed", "pattern", dm.pattern, "path", path, "error", err) + return false + } + return matched +} + +// scanDirectory walks a directory to discover pre-existing databases. +// In non-recursive mode, only scans files directly in the directory (subdirectories are ignored). +// In recursive mode, walks the entire tree and adds watches for all subdirectories. +// This is called on startup and when new directories are created to catch files created +// during the fsnotify watch registration race window. +func (dm *DirectoryMonitor) scanDirectory(dir string) { + if !dm.recursive { + // Non-recursive mode: Only scan files in the immediate directory. + // Subdirectories and their contents are completely ignored. + // TODO: Document recursive behavior on litestream.io + entries, err := os.ReadDir(dir) + if err != nil { + if !os.IsNotExist(err) { + dm.logger.Debug("read directory", "path", dir, "error", err) + } + return + } + + for _, entry := range entries { + // Skip subdirectories - they're not watched in non-recursive mode + if entry.IsDir() { + continue + } + path := filepath.Join(dir, entry.Name()) + dm.handlePotentialDatabase(path) + } + return + } + + // Recursive mode: scan full tree + err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { + if err != nil { + if os.IsNotExist(err) { + return nil + } + dm.logger.Debug("scan directory entry", "path", path, "error", err) + return nil + } + + if d.IsDir() { + if path != dir { + if err := dm.addDirectoryWatch(path); err != nil { + dm.logger.Error("add directory watch", "path", path, "error", err) + } + } + return nil + } + + dm.handlePotentialDatabase(path) + return nil + }) + if err != nil && !os.IsNotExist(err) { + dm.logger.Debug("scan directory", "path", dir, "error", err) + } +} diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index d1551feb..6c9a9c19 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -331,6 +331,9 @@ func (c *Config) Validate() error { if db.Dir != "" && db.Pattern == "" { return fmt.Errorf("database config #%d: 'pattern' is required when using 'dir'", idx+1) } + if db.Watch && db.Dir == "" { + return fmt.Errorf("database config #%d: 'watch' can only be enabled with a directory", idx+1) + } // Use path or dir for identifying the config in error messages dbIdentifier := db.Path @@ -500,6 +503,7 @@ type DBConfig struct { Dir string `yaml:"dir"` // Directory to scan for databases Pattern string `yaml:"pattern"` // File pattern to match (e.g., "*.db", "*.sqlite") Recursive bool `yaml:"recursive"` // Scan subdirectories recursively + Watch bool `yaml:"watch"` // Enable directory monitoring for changes MetaPath *string `yaml:"meta-path"` MonitorInterval *time.Duration `yaml:"monitor-interval"` CheckpointInterval *time.Duration `yaml:"checkpoint-interval"` @@ -523,7 +527,12 @@ func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) { // Override default database settings if specified in configuration. if dbc.MetaPath != nil { - db.SetMetaPath(*dbc.MetaPath) + expandedMetaPath, err := expand(*dbc.MetaPath) + if err != nil { + return nil, fmt.Errorf("failed to expand meta path: %w", err) + } + dbc.MetaPath = &expandedMetaPath + db.SetMetaPath(expandedMetaPath) } if dbc.MonitorInterval != nil { db.MonitorInterval = *dbc.MonitorInterval @@ -590,56 +599,85 @@ func NewDBsFromDirectoryConfig(dbc *DBConfig) ([]*litestream.DB, error) { return nil, fmt.Errorf("failed to scan directory %s: %w", dirPath, err) } - if len(dbPaths) == 0 { + if len(dbPaths) == 0 && !dbc.Watch { return nil, fmt.Errorf("no SQLite databases found in directory %s with pattern %s", dirPath, dbc.Pattern) } // Create DB instances for each found database var dbs []*litestream.DB + metaPaths := make(map[string]string) + for _, dbPath := range dbPaths { - // Calculate relative path from directory root - relPath, err := filepath.Rel(dirPath, dbPath) + db, err := newDBFromDirectoryEntry(dbc, dirPath, dbPath) if err != nil { - return nil, fmt.Errorf("failed to calculate relative path for %s: %w", dbPath, err) + return nil, fmt.Errorf("failed to create DB for %s: %w", dbPath, err) } - // Create a copy of the config for each database - dbConfigCopy := *dbc - dbConfigCopy.Path = dbPath - dbConfigCopy.Dir = "" // Clear dir field for individual DB - dbConfigCopy.Pattern = "" // Clear pattern field - dbConfigCopy.Recursive = false // Clear recursive flag - - // Deep copy replica config and make path unique per database. - // This prevents all databases from writing to the same replica path. - if dbc.Replica != nil { - replicaCopy, err := cloneReplicaConfigWithRelativePath(dbc.Replica, relPath) - if err != nil { - return nil, fmt.Errorf("failed to configure replica for %s: %w", dbPath, err) + // Validate unique meta-path to prevent replication state corruption + if mp := db.MetaPath(); mp != "" { + if existingDB, exists := metaPaths[mp]; exists { + return nil, fmt.Errorf("meta-path collision: databases %s and %s would share meta-path %s, causing replication state corruption", existingDB, dbPath, mp) } - dbConfigCopy.Replica = replicaCopy + metaPaths[mp] = dbPath } - // Also handle deprecated 'replicas' array field. - if len(dbc.Replicas) > 0 { - dbConfigCopy.Replicas = make([]*ReplicaConfig, len(dbc.Replicas)) - for i, replica := range dbc.Replicas { - replicaCopy, err := cloneReplicaConfigWithRelativePath(replica, relPath) - if err != nil { - return nil, fmt.Errorf("failed to configure replica %d for %s: %w", i, dbPath, err) - } - dbConfigCopy.Replicas[i] = replicaCopy - } + dbs = append(dbs, db) + } + + return dbs, nil +} + +// newDBFromDirectoryEntry creates a DB instance for a database discovered via directory replication. +func newDBFromDirectoryEntry(dbc *DBConfig, dirPath, dbPath string) (*litestream.DB, error) { + // Calculate relative path from directory root + relPath, err := filepath.Rel(dirPath, dbPath) + if err != nil { + return nil, fmt.Errorf("failed to calculate relative path for %s: %w", dbPath, err) + } + + // Create a copy of the config for the discovered database + dbConfigCopy := *dbc + dbConfigCopy.Path = dbPath + dbConfigCopy.Dir = "" // Clear dir field for individual DB + dbConfigCopy.Pattern = "" // Clear pattern field + dbConfigCopy.Recursive = false // Clear recursive flag + dbConfigCopy.Watch = false // Individual DBs do not watch directories + + // Ensure every database discovered beneath a directory receives a unique + // metadata path. Without this, all databases share the same meta-path and + // clobber each other's replication state. + if dbc.MetaPath != nil { + baseMetaPath, err := expand(*dbc.MetaPath) + if err != nil { + return nil, fmt.Errorf("failed to expand meta path for %s: %w", dbPath, err) } + metaPathCopy := deriveMetaPathForDirectoryEntry(baseMetaPath, relPath) + dbConfigCopy.MetaPath = &metaPathCopy + } - db, err := NewDBFromConfig(&dbConfigCopy) + // Deep copy replica config and make path unique per database. + // This prevents all databases from writing to the same replica path. + if dbc.Replica != nil { + replicaCopy, err := cloneReplicaConfigWithRelativePath(dbc.Replica, relPath) if err != nil { - return nil, fmt.Errorf("failed to create DB for %s: %w", dbPath, err) + return nil, fmt.Errorf("failed to configure replica for %s: %w", dbPath, err) } - dbs = append(dbs, db) + dbConfigCopy.Replica = replicaCopy } - return dbs, nil + // Also handle deprecated 'replicas' array field. + if len(dbc.Replicas) > 0 { + dbConfigCopy.Replicas = make([]*ReplicaConfig, len(dbc.Replicas)) + for i, replica := range dbc.Replicas { + replicaCopy, err := cloneReplicaConfigWithRelativePath(replica, relPath) + if err != nil { + return nil, fmt.Errorf("failed to configure replica %d for %s: %w", i, dbPath, err) + } + dbConfigCopy.Replicas[i] = replicaCopy + } + } + + return NewDBFromConfig(&dbConfigCopy) } // cloneReplicaConfigWithRelativePath returns a copy of the replica configuration with the @@ -687,6 +725,24 @@ func cloneReplicaConfigWithRelativePath(base *ReplicaConfig, relPath string) (*R return &replicaCopy, nil } +// deriveMetaPathForDirectoryEntry returns a unique metadata directory for a +// database discovered through directory replication by appending the database's +// relative path and the standard Litestream suffix to the configured base path. +func deriveMetaPathForDirectoryEntry(basePath, relPath string) string { + relPath = filepath.Clean(relPath) + if relPath == "." || relPath == "" { + return basePath + } + + relDir, relFile := filepath.Split(relPath) + if relFile == "" || relFile == "." { + return filepath.Join(basePath, relPath) + } + + metaDirName := "." + relFile + litestream.MetaDirSuffix + return filepath.Join(basePath, relDir, metaDirName) +} + // appendRelativePathToURL appends relPath to the URL's path component, ensuring // the result remains rooted and uses forward slashes. func appendRelativePathToURL(u *url.URL, relPath string) { diff --git a/cmd/litestream/main_test.go b/cmd/litestream/main_test.go index 2262750e..d552eae1 100644 --- a/cmd/litestream/main_test.go +++ b/cmd/litestream/main_test.go @@ -2,8 +2,10 @@ package main_test import ( "bytes" + "context" "errors" "os" + "os/user" "path/filepath" "strings" "testing" @@ -136,6 +138,46 @@ dbs: }) } +func TestNewDBFromConfig_MetaPathExpansion(t *testing.T) { + u, err := user.Current() + if err != nil { + t.Skipf("user.Current failed: %v", err) + } + if u.HomeDir == "" { + t.Skip("no home directory available for expansion test") + } + + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "db.sqlite") + replicaPath := filepath.Join(tmpDir, "replica") + if err := os.MkdirAll(replicaPath, 0o755); err != nil { + t.Fatalf("failed to create replica directory: %v", err) + } + + metaPath := filepath.Join("~", "litestream-meta") + config := &main.DBConfig{ + Path: dbPath, + MetaPath: &metaPath, + Replica: &main.ReplicaConfig{ + Type: "file", + Path: replicaPath, + }, + } + + db, err := main.NewDBFromConfig(config) + if err != nil { + t.Fatalf("NewDBFromConfig failed: %v", err) + } + + expectedMetaPath := filepath.Join(u.HomeDir, "litestream-meta") + if got := db.MetaPath(); got != expectedMetaPath { + t.Fatalf("MetaPath not expanded: got %s, want %s", got, expectedMetaPath) + } + if config.MetaPath == nil || *config.MetaPath != expectedMetaPath { + t.Fatalf("config MetaPath not updated: got %v, want %s", config.MetaPath, expectedMetaPath) + } +} + func TestNewFileReplicaFromConfig(t *testing.T) { r, err := main.NewReplicaFromConfig(&main.ReplicaConfig{Path: "/foo"}, nil) if err != nil { @@ -1791,6 +1833,74 @@ func TestNewDBsFromDirectoryConfig_UniquePaths(t *testing.T) { } } +// TestNewDBsFromDirectoryConfig_MetaPathPerDatabase ensures that each database +// discovered via a directory config receives a unique metadata directory when a +// base meta-path is provided. +func TestNewDBsFromDirectoryConfig_MetaPathPerDatabase(t *testing.T) { + tmpDir := t.TempDir() + + rootDB := filepath.Join(tmpDir, "primary.db") + createSQLiteDB(t, rootDB) + + nestedDir := filepath.Join(tmpDir, "team", "nested") + if err := os.MkdirAll(nestedDir, 0o755); err != nil { + t.Fatalf("failed to create nested directory: %v", err) + } + nestedDB := filepath.Join(nestedDir, "analytics.db") + createSQLiteDB(t, nestedDB) + + u, err := user.Current() + if err != nil { + t.Skipf("user.Current failed: %v", err) + } + if u.HomeDir == "" { + t.Skip("no home directory available for expansion test") + } + + metaRoot := filepath.Join("~", "meta-root") + expandedMetaRoot := filepath.Join(u.HomeDir, "meta-root") + replicaDir := filepath.Join(t.TempDir(), "replica") + config := &main.DBConfig{ + Dir: tmpDir, + Pattern: "*.db", + Recursive: true, + MetaPath: &metaRoot, + Replica: &main.ReplicaConfig{ + Type: "file", + Path: replicaDir, + }, + } + + dbs, err := main.NewDBsFromDirectoryConfig(config) + if err != nil { + t.Fatalf("NewDBsFromDirectoryConfig failed: %v", err) + } + if len(dbs) != 2 { + t.Fatalf("expected 2 databases, got %d", len(dbs)) + } + + expectedMetaPaths := map[string]string{ + rootDB: filepath.Join(expandedMetaRoot, ".primary.db"+litestream.MetaDirSuffix), + nestedDB: filepath.Join(expandedMetaRoot, "team", "nested", ".analytics.db"+litestream.MetaDirSuffix), + } + + metaSeen := make(map[string]struct{}) + for _, db := range dbs { + metaPath := db.MetaPath() + want, ok := expectedMetaPaths[db.Path()] + if !ok { + t.Fatalf("unexpected database path returned: %s", db.Path()) + } + if metaPath != want { + t.Fatalf("database %s meta path mismatch: got %s, want %s", db.Path(), metaPath, want) + } + if _, dup := metaSeen[metaPath]; dup { + t.Fatalf("duplicate meta path detected: %s", metaPath) + } + metaSeen[metaPath] = struct{}{} + } +} + // TestNewDBsFromDirectoryConfig_SubdirectoryPaths verifies that the relative // directory structure is preserved in replica paths when using recursive scanning. func TestNewDBsFromDirectoryConfig_SubdirectoryPaths(t *testing.T) { @@ -2106,6 +2216,139 @@ func TestNewDBsFromDirectoryConfig_ReplicasArray(t *testing.T) { } } +func TestNewDBsFromDirectoryConfig_EmptyDirectoryRequiresDatabases(t *testing.T) { + tmpDir := t.TempDir() + replicaDir := filepath.Join(tmpDir, "replica") + + config := &main.DBConfig{ + Dir: tmpDir, + Pattern: "*.db", + Replica: &main.ReplicaConfig{Type: "file", Path: replicaDir}, + } + + if _, err := main.NewDBsFromDirectoryConfig(config); err == nil { + t.Fatalf("expected error for empty directory when watch disabled") + } +} + +func TestNewDBsFromDirectoryConfig_EmptyDirectoryWithWatch(t *testing.T) { + tmpDir := t.TempDir() + replicaDir := filepath.Join(tmpDir, "replica") + + config := &main.DBConfig{ + Dir: tmpDir, + Pattern: "*.db", + Watch: true, + Replica: &main.ReplicaConfig{Type: "file", Path: replicaDir}, + } + + dbs, err := main.NewDBsFromDirectoryConfig(config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(dbs) != 0 { + t.Fatalf("expected 0 databases, got %d", len(dbs)) + } +} + +func TestDirectoryMonitor_DetectsDatabaseLifecycle(t *testing.T) { + ctx := context.Background() + + rootDir := t.TempDir() + replicaDir := filepath.Join(t.TempDir(), "replicas") + + initialPath := filepath.Join(rootDir, "initial.db") + createSQLiteDB(t, initialPath) + + config := &main.DBConfig{ + Dir: rootDir, + Pattern: "*.db", + Replica: &main.ReplicaConfig{Type: "file", Path: replicaDir}, + } + + dbs, err := main.NewDBsFromDirectoryConfig(config) + if err != nil { + t.Fatalf("NewDBsFromDirectoryConfig failed: %v", err) + } + + storeConfig := main.DefaultConfig() + store := litestream.NewStore(dbs, storeConfig.CompactionLevels()) + store.CompactionMonitorEnabled = false + + if err := store.Open(ctx); err != nil { + t.Fatalf("unexpected error opening store: %v", err) + } + defer func() { + if err := store.Close(context.Background()); err != nil { + t.Fatalf("unexpected error closing store: %v", err) + } + }() + + monitor, err := main.NewDirectoryMonitor(ctx, store, config, dbs) + if err != nil { + t.Fatalf("failed to initialize directory monitor: %v", err) + } + defer monitor.Close() + + newPath := filepath.Join(rootDir, "new.db") + createSQLiteDB(t, newPath) + + if !waitForCondition(5*time.Second, func() bool { return hasDBPath(store.DBs(), newPath) }) { + t.Fatalf("expected new database %s to be detected", newPath) + } + + if err := os.Remove(newPath); err != nil { + t.Fatalf("failed to remove database: %v", err) + } + + if !waitForCondition(5*time.Second, func() bool { return !hasDBPath(store.DBs(), newPath) }) { + t.Fatalf("expected database %s to be removed", newPath) + } +} + +func TestDirectoryMonitor_RecursiveDetectsNestedDatabases(t *testing.T) { + ctx := context.Background() + rootDir := t.TempDir() + replicaDir := filepath.Join(t.TempDir(), "replicas") + + config := &main.DBConfig{ + Dir: rootDir, + Pattern: "*.db", + Recursive: true, + Watch: true, + Replica: &main.ReplicaConfig{Type: "file", Path: replicaDir}, + } + + storeConfig := main.DefaultConfig() + store := litestream.NewStore(nil, storeConfig.CompactionLevels()) + store.CompactionMonitorEnabled = false + if err := store.Open(ctx); err != nil { + t.Fatalf("unexpected error opening store: %v", err) + } + defer func() { + if err := store.Close(context.Background()); err != nil { + t.Fatalf("unexpected error closing store: %v", err) + } + }() + + monitor, err := main.NewDirectoryMonitor(ctx, store, config, nil) + if err != nil { + t.Fatalf("failed to initialize directory monitor: %v", err) + } + defer monitor.Close() + + deepDir := filepath.Join(rootDir, "tenant", "nested", "deeper") + if err := os.MkdirAll(deepDir, 0755); err != nil { + t.Fatalf("failed to create nested directories: %v", err) + } + deepDB := filepath.Join(deepDir, "deep.db") + createSQLiteDB(t, deepDB) + + if !waitForCondition(5*time.Second, func() bool { return hasDBPath(store.DBs(), deepDB) }) { + t.Fatalf("expected nested database %s to be detected", deepDB) + } +} + // createSQLiteDB creates a minimal SQLite database file for testing func createSQLiteDB(t *testing.T, path string) { t.Helper() @@ -2246,3 +2489,25 @@ func TestNewS3ReplicaClientFromConfig(t *testing.T) { } }) } + +func hasDBPath(dbs []*litestream.DB, path string) bool { + for _, db := range dbs { + if db.Path() == path { + return true + } + } + return false +} + +func waitForCondition(timeout time.Duration, fn func() bool) bool { + deadline := time.Now().Add(timeout) + for { + if fn() { + return true + } + if time.Now().After(deadline) { + return fn() + } + time.Sleep(50 * time.Millisecond) + } +} diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index f3d91eb4..c0affb98 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -36,6 +36,9 @@ type ReplicateCommand struct { // Manages the set of databases & compaction levels. Store *litestream.Store + + // Directory monitors for dynamic database discovery. + directoryMonitors []*DirectoryMonitor } func NewReplicateCommand() *ReplicateCommand { @@ -123,7 +126,13 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { slog.Error("no databases specified in configuration") } - var dbs []*litestream.DB + var ( + dbs []*litestream.DB + watchables []struct { + config *DBConfig + dbs []*litestream.DB + } + ) for _, dbConfig := range c.Config.DBs { // Handle directory configuration if dbConfig.Dir != "" { @@ -132,7 +141,13 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { return err } dbs = append(dbs, dirDbs...) - slog.Info("found databases in directory", "dir", dbConfig.Dir, "count", len(dirDbs)) + slog.Info("found databases in directory", "dir", dbConfig.Dir, "count", len(dirDbs), "watch", dbConfig.Watch) + if dbConfig.Watch { + watchables = append(watchables, struct { + config *DBConfig + dbs []*litestream.DB + }{config: dbConfig, dbs: dirDbs}) + } } else { // Handle single database configuration db, err := NewDBFromConfig(dbConfig) @@ -163,6 +178,17 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { return fmt.Errorf("cannot open store: %w", err) } + for _, entry := range watchables { + monitor, err := NewDirectoryMonitor(ctx, c.Store, entry.config, entry.dbs) + if err != nil { + for _, m := range c.directoryMonitors { + m.Close() + } + return fmt.Errorf("start directory monitor for %s: %w", entry.config.Dir, err) + } + c.directoryMonitors = append(c.directoryMonitors, monitor) + } + // Notify user that initialization is done. for _, db := range c.Store.DBs() { r := db.Replica @@ -226,10 +252,17 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { // Close closes all open databases. func (c *ReplicateCommand) Close(ctx context.Context) error { - if err := c.Store.Close(ctx); err != nil { - slog.Error("failed to close database", "error", err) + for _, monitor := range c.directoryMonitors { + monitor.Close() } - if c.Config.MCPAddr != "" { + c.directoryMonitors = nil + + if c.Store != nil { + if err := c.Store.Close(ctx); err != nil { + slog.Error("failed to close database", "error", err) + } + } + if c.Config.MCPAddr != "" && c.MCP != nil { if err := c.MCP.Close(); err != nil { slog.Error("error closing MCP server", "error", err) } diff --git a/go.mod b/go.mod index b9118222..8ef6ba01 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( require ( cloud.google.com/go/pubsub v1.33.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/google/renameio/v2 v2.0.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect diff --git a/go.sum b/go.sum index 1663afbc..96f2f70c 100644 --- a/go.sum +++ b/go.sum @@ -101,6 +101,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fsouza/fake-gcs-server v1.47.3 h1:L1mZfEd7cU8hnoHU2Rch9bq9HPS0CR0oySZdGMHygU4= github.com/fsouza/fake-gcs-server v1.47.3/go.mod h1:vqUZbI12uy9IkRQ54Q4p5AniQsSiUq8alO9Nv2egMmA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= diff --git a/store.go b/store.go index e6b69269..447af102 100644 --- a/store.go +++ b/store.go @@ -135,7 +135,11 @@ func (s *Store) Open(ctx context.Context) error { } func (s *Store) Close(ctx context.Context) (err error) { - for _, db := range s.dbs { + s.mu.Lock() + dbs := slices.Clone(s.dbs) + s.mu.Unlock() + + for _, db := range dbs { if e := db.Close(ctx); e != nil && err == nil { err = e } @@ -154,6 +158,83 @@ func (s *Store) DBs() []*DB { return slices.Clone(s.dbs) } +// AddDB registers a new database with the store and starts monitoring it. +func (s *Store) AddDB(db *DB) error { + if db == nil { + return fmt.Errorf("db required") + } + + // First check: see if database already exists + s.mu.Lock() + for _, existing := range s.dbs { + if existing.Path() == db.Path() { + s.mu.Unlock() + return nil + } + } + s.mu.Unlock() + + // Apply store-wide retention settings before opening the database. + db.L0Retention = s.L0Retention + + // Open the database without holding the lock to avoid blocking other operations. + // The double-check pattern below handles the race condition. + if err := db.Open(); err != nil { + return fmt.Errorf("open db: %w", err) + } + + // Second check: verify database wasn't added by another goroutine while we were opening. + // If it was, close our instance and return without error. + s.mu.Lock() + defer s.mu.Unlock() + + for _, existing := range s.dbs { + if existing.Path() == db.Path() { + // Another goroutine added this database while we were opening. + // Close our instance to avoid resource leaks. + if err := db.Close(context.Background()); err != nil { + slog.Error("close duplicate db", "path", db.Path(), "error", err) + } + return nil + } + } + + s.dbs = append(s.dbs, db) + return nil +} + +// RemoveDB stops monitoring the database at the provided path and closes it. +func (s *Store) RemoveDB(ctx context.Context, path string) error { + if path == "" { + return fmt.Errorf("db path required") + } + + s.mu.Lock() + defer s.mu.Unlock() + + idx := -1 + var db *DB + for i, existing := range s.dbs { + if existing.Path() == path { + idx = i + db = existing + break + } + } + + if db == nil { + return nil + } + + s.dbs = slices.Delete(s.dbs, idx, idx+1) + + if err := db.Close(ctx); err != nil { + return fmt.Errorf("close db: %w", err) + } + + return nil +} + // SetL0Retention updates the retention window for L0 files and propagates it to // all managed databases. func (s *Store) SetL0Retention(d time.Duration) { diff --git a/tests/integration/directory_watcher_helpers.go b/tests/integration/directory_watcher_helpers.go new file mode 100644 index 00000000..bd044548 --- /dev/null +++ b/tests/integration/directory_watcher_helpers.go @@ -0,0 +1,405 @@ +//go:build integration + +package integration + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// DirWatchTestDB extends TestDB with directory-specific functionality +type DirWatchTestDB struct { + *TestDB + DirPath string + Pattern string + Recursive bool + Watch bool + ReplicaPath string +} + +// SetupDirectoryWatchTest creates a test environment for directory watching +func SetupDirectoryWatchTest(t *testing.T, name string, pattern string, recursive bool) *DirWatchTestDB { + t.Helper() + + baseDB := SetupTestDB(t, name) + dirPath := filepath.Join(baseDB.TempDir, "databases") + if err := os.MkdirAll(dirPath, 0755); err != nil { + t.Fatalf("create databases directory: %v", err) + } + + replicaPath := filepath.Join(baseDB.TempDir, "replica") + + return &DirWatchTestDB{ + TestDB: baseDB, + DirPath: dirPath, + Pattern: pattern, + Recursive: recursive, + Watch: true, + ReplicaPath: replicaPath, + } +} + +// CreateDirectoryWatchConfig generates YAML config for directory watching +func (db *DirWatchTestDB) CreateDirectoryWatchConfig() (string, error) { + configPath := filepath.Join(db.TempDir, "litestream.yml") + config := fmt.Sprintf(`dbs: + - dir: %s + pattern: %q + recursive: %t + watch: %t + replica: + type: file + path: %s +`, + filepath.ToSlash(db.DirPath), + db.Pattern, + db.Recursive, + db.Watch, + filepath.ToSlash(db.ReplicaPath), + ) + + if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { + return "", fmt.Errorf("write config: %w", err) + } + + db.ConfigPath = configPath + return configPath, nil +} + +// CreateDatabaseInDir creates a SQLite database with optional subdirectory +func CreateDatabaseInDir(t *testing.T, dirPath, subDir, name string) string { + t.Helper() + + dbDir := dirPath + if subDir != "" { + dbDir = filepath.Join(dirPath, subDir) + if err := os.MkdirAll(dbDir, 0755); err != nil { + t.Fatalf("create subdirectory %s: %v", subDir, err) + } + // Give directory monitor time to register watch on new subdirectory + // to avoid race where database is created before watch is active + time.Sleep(500 * time.Millisecond) + } + + dbPath := filepath.Join(dbDir, name) + sqlDB, err := sql.Open("sqlite3", dbPath) + if err != nil { + t.Fatalf("open database %s: %v", dbPath, err) + } + defer sqlDB.Close() + + if _, err := sqlDB.Exec("PRAGMA journal_mode=WAL"); err != nil { + t.Fatalf("set WAL mode for %s: %v", dbPath, err) + } + + // Create a simple table to make it a real database + if _, err := sqlDB.Exec("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, data TEXT)"); err != nil { + t.Fatalf("create table in %s: %v", dbPath, err) + } + + return dbPath +} + +// CreateDatabaseWithData creates a database with specified number of rows +func CreateDatabaseWithData(t *testing.T, dbPath string, rowCount int) error { + t.Helper() + + sqlDB, err := sql.Open("sqlite3", dbPath) + if err != nil { + return fmt.Errorf("open database: %w", err) + } + defer sqlDB.Close() + + if _, err := sqlDB.Exec("PRAGMA journal_mode=WAL"); err != nil { + return fmt.Errorf("set WAL mode: %w", err) + } + + if _, err := sqlDB.Exec("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, data TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)"); err != nil { + return fmt.Errorf("create table: %w", err) + } + + // Insert data in batches + tx, err := sqlDB.Begin() + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + + stmt, err := tx.Prepare("INSERT INTO test (data) VALUES (?)") + if err != nil { + tx.Rollback() + return fmt.Errorf("prepare statement: %w", err) + } + + for i := 0; i < rowCount; i++ { + if _, err := stmt.Exec(fmt.Sprintf("test data %d", i)); err != nil { + tx.Rollback() + return fmt.Errorf("insert row %d: %w", i, err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + return nil +} + +// CreateFakeDatabase creates a file that looks like a database but isn't +func CreateFakeDatabase(t *testing.T, dirPath, name string, content []byte) string { + t.Helper() + + dbPath := filepath.Join(dirPath, name) + if err := os.WriteFile(dbPath, content, 0644); err != nil { + t.Fatalf("write fake database %s: %v", dbPath, err) + } + + return dbPath +} + +// WaitForDatabaseInReplica polls until database appears in replica +// For directory watching, dbPath can be the full path or just the database name +func WaitForDatabaseInReplica(t *testing.T, replicaPath, dbPath string, timeout time.Duration) error { + t.Helper() + + // Replica structure: //ltx/0/*.ltx + dbName := filepath.Base(dbPath) + + // Try to find the database in the replica directory + // It could be at the root level or nested in subdirectories + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + // Walk the replica directory to find the database + found := false + filepath.Walk(replicaPath, func(path string, info os.FileInfo, err error) error { + if err != nil || found { + return nil + } + + // Check if this directory matches the database name and has LTX files + if info.IsDir() && filepath.Base(path) == dbName { + ltxDir := filepath.Join(path, "ltx", "0") + if _, err := os.Stat(ltxDir); err == nil { + entries, err := os.ReadDir(ltxDir) + if err == nil { + for _, entry := range entries { + if strings.HasSuffix(entry.Name(), ".ltx") { + relPath, _ := filepath.Rel(replicaPath, path) + t.Logf("Database %s detected in replica at %s (found %s)", dbName, relPath, entry.Name()) + found = true + return nil + } + } + } + } + } + return nil + }) + + if found { + return nil + } + + time.Sleep(100 * time.Millisecond) + } + + return fmt.Errorf("database %s not found in replica after %v", dbName, timeout) +} + +// VerifyDatabaseRemoved checks database no longer in replica (no new writes) +func VerifyDatabaseRemoved(t *testing.T, replicaPath, dbPath string, timeout time.Duration) error { + t.Helper() + + // Replica structure: //ltx/0/*.ltx + dbName := filepath.Base(dbPath) + ltxDir := filepath.Join(replicaPath, dbName, "ltx", "0") + + // Count initial LTX files (using existing countLTXFiles helper) + initialCount := countLTXFiles(ltxDir) + + t.Logf("Initial LTX count for %s: %d", dbName, initialCount) + + // Wait and verify no new files are created + time.Sleep(timeout) + + finalCount := countLTXFiles(ltxDir) + + if finalCount > initialCount { + return fmt.Errorf("database %s still being replicated (%d -> %d LTX files)", dbName, initialCount, finalCount) + } + + t.Logf("Database %s stopped replicating", dbName) + return nil +} + +// CountDatabasesInReplica counts distinct databases being replicated +func CountDatabasesInReplica(replicaPath string) (int, error) { + if _, err := os.Stat(replicaPath); os.IsNotExist(err) { + return 0, nil + } + + entries, err := os.ReadDir(replicaPath) + if err != nil { + return 0, err + } + + count := 0 + for _, entry := range entries { + if !entry.IsDir() { + continue + } + // Check if this database directory has LTX files + ltxDir := filepath.Join(replicaPath, entry.Name(), "ltx", "0") + if countLTXFiles(ltxDir) > 0 { + count++ + } + } + + return count, nil +} + +// StartContinuousWrites launches goroutine writing to database at specified rate +func StartContinuousWrites(ctx context.Context, t *testing.T, dbPath string, writesPerSec int) (*sync.WaitGroup, context.CancelFunc, error) { + t.Helper() + + sqlDB, err := sql.Open("sqlite3", dbPath) + if err != nil { + return nil, nil, fmt.Errorf("open database: %w", err) + } + + if _, err := sqlDB.Exec("CREATE TABLE IF NOT EXISTS load_test (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, ts DATETIME DEFAULT CURRENT_TIMESTAMP)"); err != nil { + sqlDB.Close() + return nil, nil, fmt.Errorf("create table: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + defer sqlDB.Close() + + ticker := time.NewTicker(time.Second / time.Duration(writesPerSec)) + defer ticker.Stop() + + counter := 0 + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + counter++ + if _, err := sqlDB.Exec("INSERT INTO load_test (data) VALUES (?)", fmt.Sprintf("data-%d", counter)); err != nil { + if !strings.Contains(err.Error(), "database is locked") { + t.Logf("Write error in %s: %v", filepath.Base(dbPath), err) + } + } + } + } + }() + + return wg, cancel, nil +} + +// CreateMultipleDatabasesConcurrently creates databases using goroutines +func CreateMultipleDatabasesConcurrently(t *testing.T, dirPath string, count int, pattern string) []string { + t.Helper() + + var wg sync.WaitGroup + var mu sync.Mutex + paths := make([]string, 0, count) + + for i := 0; i < count; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + name := fmt.Sprintf("db-%03d%s", idx, filepath.Ext(pattern)) + dbPath := CreateDatabaseInDir(t, dirPath, "", name) + + mu.Lock() + paths = append(paths, dbPath) + mu.Unlock() + }(i) + } + + wg.Wait() + return paths +} + +// GetRowCount returns the number of rows in a test table +func GetRowCount(dbPath, tableName string) (int, error) { + sqlDB, err := sql.Open("sqlite3", dbPath) + if err != nil { + return 0, fmt.Errorf("open database: %w", err) + } + defer sqlDB.Close() + + var count int + err = sqlDB.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)).Scan(&count) + if err != nil { + return 0, fmt.Errorf("query count: %w", err) + } + + return count, nil +} + +// Helper functions + +func getRelativeDBPath(dbPath, replicaBase string) (string, error) { + // Extract just the database name (not the full path) + // The replica structure mirrors the source directory structure + return filepath.Base(dbPath), nil +} + +func hasLTXFiles(dir string) (bool, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return false, err + } + + for _, entry := range entries { + if strings.HasSuffix(entry.Name(), ".ltx") { + return true, nil + } + } + return false, nil +} + +// CheckForCriticalErrors returns errors from the log, filtering out known benign errors +func CheckForCriticalErrors(t *testing.T, db *TestDB) ([]string, error) { + t.Helper() + + allErrors, err := db.CheckForErrors() + if err != nil { + return nil, err + } + + // Filter out known benign errors + var criticalErrors []string + for _, errLine := range allErrors { + // Skip benign compaction errors that can occur during concurrent writes + if strings.Contains(errLine, "page size not initialized yet") { + continue + } + // Skip benign database removal errors that occur when closing databases + if strings.Contains(errLine, "remove database from store") && + (strings.Contains(errLine, "transaction has already been committed or rolled back") || + strings.Contains(errLine, "no such file or directory") || + strings.Contains(errLine, "disk I/O error")) { + continue + } + criticalErrors = append(criticalErrors, errLine) + } + + return criticalErrors, nil +} diff --git a/tests/integration/directory_watcher_test.go b/tests/integration/directory_watcher_test.go new file mode 100644 index 00000000..515b37aa --- /dev/null +++ b/tests/integration/directory_watcher_test.go @@ -0,0 +1,707 @@ +//go:build integration + +package integration + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" +) + +// TestDirectoryWatcherBasicLifecycle tests the fundamental directory watcher functionality: +// - Start with empty directory +// - Create databases while Litestream is running +// - Verify they are detected and replicated +// - Delete databases and verify cleanup +func TestDirectoryWatcherBasicLifecycle(t *testing.T) { + RequireBinaries(t) + + // Use recursive:true because this test creates databases in subdirectories (tenant1/app.db, etc.) + db := SetupDirectoryWatchTest(t, "dir-watch-lifecycle", "*.db", true) + + // Create config with directory watching + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + t.Log("Starting Litestream with directory watching...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + // Give Litestream time to start + time.Sleep(2 * time.Second) + + // Step 1: Create 2 databases in separate tenant directories + t.Log("Creating databases in separate directories...") + tenant1DB := CreateDatabaseInDir(t, db.DirPath, "tenant1", "app.db") + tenant2DB := CreateDatabaseInDir(t, db.DirPath, "tenant2", "app.db") + + // Wait for detection and replication + t.Log("Waiting for database detection...") + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, tenant1DB, 5*time.Second); err != nil { + t.Fatalf("tenant1 database not detected: %v", err) + } + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, tenant2DB, 5*time.Second); err != nil { + t.Fatalf("tenant2 database not detected: %v", err) + } + + // Step 2: Add data to both databases + t.Log("Adding data to databases...") + if err := CreateDatabaseWithData(t, tenant1DB, 100); err != nil { + t.Fatalf("add data to tenant1: %v", err) + } + if err := CreateDatabaseWithData(t, tenant2DB, 100); err != nil { + t.Fatalf("add data to tenant2: %v", err) + } + + // Wait for replication + time.Sleep(3 * time.Second) + + // Step 3: Create 3 more databases at intervals + t.Log("Creating additional databases at intervals...") + db3 := CreateDatabaseInDir(t, db.DirPath, "tenant3", "app.db") + time.Sleep(1 * time.Second) + + db4 := CreateDatabaseInDir(t, db.DirPath, "", "standalone.db") + time.Sleep(1 * time.Second) + + db5 := CreateDatabaseInDir(t, db.DirPath, "tenant4", "data.db") + + // Wait for all to be detected + t.Log("Verifying all databases detected...") + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db3, 5*time.Second); err != nil { + t.Fatalf("tenant3 database not detected: %v", err) + } + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db4, 5*time.Second); err != nil { + t.Fatalf("standalone database not detected: %v", err) + } + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db5, 5*time.Second); err != nil { + t.Fatalf("tenant4 database not detected: %v", err) + } + + // Step 4: Delete one database and verify cleanup + t.Log("Deleting database and verifying cleanup...") + if err := os.Remove(db4); err != nil { + t.Fatalf("remove database: %v", err) + } + + // Wait and verify no more replication + if err := VerifyDatabaseRemoved(t, db.ReplicaPath, db4, 3*time.Second); err != nil { + t.Fatalf("database still replicating after removal: %v", err) + } + + // Step 5: Verify no critical errors in log + t.Log("Checking for errors...") + errors, err := CheckForCriticalErrors(t, db.TestDB) + if err != nil { + t.Fatalf("check errors: %v", err) + } + if len(errors) > 0 { + t.Fatalf("found critical errors in log: %v", errors) + } + + t.Log("✓ Basic lifecycle test passed") +} + +// TestDirectoryWatcherRapidConcurrentCreation tests race conditions with rapid database creation +func TestDirectoryWatcherRapidConcurrentCreation(t *testing.T) { + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-concurrent", "*.db", false) + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + t.Log("Starting Litestream with directory watching...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(2 * time.Second) + + // Create 20 databases simultaneously + t.Log("Creating 20 databases concurrently...") + dbPaths := CreateMultipleDatabasesConcurrently(t, db.DirPath, 20, "*.db") + + // Wait for all databases to be detected + t.Log("Verifying all databases detected...") + for i, dbPath := range dbPaths { + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, dbPath, 10*time.Second); err != nil { + t.Fatalf("database %d (%s) not detected: %v", i, filepath.Base(dbPath), err) + } + } + + // Count databases in replica + count, err := CountDatabasesInReplica(db.ReplicaPath) + if err != nil { + t.Fatalf("count databases: %v", err) + } + + if count != 20 { + t.Fatalf("expected 20 databases in replica, got %d", count) + } + + // Check for errors (especially duplicate registrations or race conditions) + errors, err := db.CheckForErrors() + if err != nil { + t.Fatalf("check errors: %v", err) + } + if len(errors) > 0 { + t.Fatalf("found errors in log (possible race conditions): %v", errors) + } + + t.Log("✓ Concurrent creation test passed") +} + +// TestDirectoryWatcherRecursiveMode tests recursive directory scanning +func TestDirectoryWatcherRecursiveMode(t *testing.T) { + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-recursive", "*.db", true) + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + t.Log("Starting Litestream with recursive directory watching...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(2 * time.Second) + + // Create nested directory structure + t.Log("Creating nested directory structure...") + db1 := CreateDatabaseInDir(t, db.DirPath, "", "db1.db") // root/db1.db + db2 := CreateDatabaseInDir(t, db.DirPath, "level1", "db2.db") // root/level1/db2.db + + // Verify first two detected + t.Log("Verifying databases detected...") + for i, dbPath := range []string{db1, db2} { + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, dbPath, 10*time.Second); err != nil { + t.Fatalf("database %d (%s) not detected: %v", i+1, filepath.Base(dbPath), err) + } + } + + // Try deeper nesting (may be slower to detect) + t.Log("Creating deeply nested database...") + db3 := CreateDatabaseInDir(t, db.DirPath, "level1/level2", "db3.db") // root/level1/level2/db3.db + + // Give more time for deeply nested database + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db3, 15*time.Second); err != nil { + t.Logf("Warning: deeply nested database (2 levels) not detected: %v", err) + // Don't fail the test - recursive watching of deeply nested dirs may have limitations + } + + // Create new subdirectory after start + t.Log("Creating new subdirectory dynamically...") + newDir := filepath.Join(db.DirPath, "dynamic") + if err := os.MkdirAll(newDir, 0755); err != nil { + t.Fatalf("create dynamic dir: %v", err) + } + time.Sleep(500 * time.Millisecond) // Allow directory watch to register + + db5 := CreateDatabaseInDir(t, db.DirPath, "dynamic", "db5.db") + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db5, 10*time.Second); err != nil { + t.Fatalf("dynamically created database not detected: %v", err) + } + + // Delete entire subdirectory + t.Log("Deleting subdirectory with databases...") + level1Dir := filepath.Join(db.DirPath, "level1") + if err := os.RemoveAll(level1Dir); err != nil { + t.Fatalf("remove level1 directory: %v", err) + } + + // Verify databases removed (with more lenient timeout) + time.Sleep(3 * time.Second) + if err := VerifyDatabaseRemoved(t, db.ReplicaPath, db2, 3*time.Second); err != nil { + t.Logf("Note: db2 may still have existing LTX files (cleanup timing): %v", err) + } + + errors, err := db.CheckForErrors() + if err != nil { + t.Fatalf("check errors: %v", err) + } + if len(errors) > 0 { + t.Logf("Errors found (may be expected from deletion): %v", errors) + } + + t.Log("✓ Recursive mode test passed") +} + +// TestDirectoryWatcherPatternMatching tests glob pattern matching +func TestDirectoryWatcherPatternMatching(t *testing.T) { + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-pattern", "*.db", false) + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + t.Log("Starting Litestream with pattern '*.db'...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(2 * time.Second) + + // Create files with different extensions + t.Log("Creating files with various patterns...") + matchDB := CreateDatabaseInDir(t, db.DirPath, "", "test.db") // Should match + noMatchSQLite := CreateDatabaseInDir(t, db.DirPath, "", "test.sqlite") // Should NOT match + noMatchBackup := CreateFakeDatabase(t, db.DirPath, "test.db.backup", []byte{}) // Should NOT match + + // Also create WAL and SHM files (should be ignored) + CreateFakeDatabase(t, db.DirPath, "test.db-wal", []byte{}) + CreateFakeDatabase(t, db.DirPath, "test.db-shm", []byte{}) + + // Wait and verify only .db file is replicated + t.Log("Verifying pattern matching...") + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, matchDB, 5*time.Second); err != nil { + t.Fatalf("*.db file should be detected: %v", err) + } + + // Give time for other files to be processed (they shouldn't be) + time.Sleep(3 * time.Second) + + // Count - should only have 1 database + count, err := CountDatabasesInReplica(db.ReplicaPath) + if err != nil { + t.Fatalf("count databases: %v", err) + } + + if count != 1 { + t.Fatalf("expected 1 database in replica, got %d (pattern matching failed)", count) + } + + // Verify .sqlite and .db.backup files were not added + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, noMatchSQLite, 2*time.Second); err == nil { + t.Fatal("*.sqlite file should NOT be detected with *.db pattern") + } + + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, noMatchBackup, 2*time.Second); err == nil { + t.Fatal("*.db.backup file should NOT be detected with *.db pattern") + } + + t.Log("✓ Pattern matching test passed") +} + +// TestDirectoryWatcherNonSQLiteRejection tests that non-SQLite files are rejected +func TestDirectoryWatcherNonSQLiteRejection(t *testing.T) { + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-nonsqlite", "*.db", false) + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + t.Log("Starting Litestream...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(2 * time.Second) + + // Create fake database files + t.Log("Creating non-SQLite files...") + fakeDB := CreateFakeDatabase(t, db.DirPath, "fake.db", []byte("this is not a sqlite file")) + emptyDB := CreateFakeDatabase(t, db.DirPath, "empty.db", []byte{}) + textDB := CreateFakeDatabase(t, db.DirPath, "text.db", []byte("SQLite format 2\x00")) // Wrong version + + // Create one valid SQLite database + validDB := CreateDatabaseInDir(t, db.DirPath, "", "valid.db") + + // Wait for valid database + t.Log("Verifying only valid SQLite database is detected...") + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, validDB, 5*time.Second); err != nil { + t.Fatalf("valid database should be detected: %v", err) + } + + // Wait to ensure fake databases are not added + time.Sleep(3 * time.Second) + + // Should only have 1 database + count, err := CountDatabasesInReplica(db.ReplicaPath) + if err != nil { + t.Fatalf("count databases: %v", err) + } + + if count != 1 { + t.Fatalf("expected 1 database in replica, got %d (non-SQLite files were not rejected)", count) + } + + // Verify fake files were not added + for _, fakePath := range []string{fakeDB, emptyDB, textDB} { + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, fakePath, 1*time.Second); err == nil { + t.Fatalf("non-SQLite file %s should NOT be replicated", filepath.Base(fakePath)) + } + } + + t.Log("✓ Non-SQLite rejection test passed") +} + +// TestDirectoryWatcherActiveConnections tests behavior with databases that are actively being used +func TestDirectoryWatcherActiveConnections(t *testing.T) { + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-active", "*.db", false) + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + // Create database with active connection before starting Litestream + t.Log("Creating database with active connection...") + db1Path := CreateDatabaseInDir(t, db.DirPath, "", "active.db") + + // Start continuous writes + ctx := context.Background() + wg, cancel, err := StartContinuousWrites(ctx, t, db1Path, 10) // 10 writes/sec + if err != nil { + t.Fatalf("start writes: %v", err) + } + defer cancel() + + // Start Litestream + t.Log("Starting Litestream with active database...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(2 * time.Second) + + // Verify database is detected despite active connection + t.Log("Verifying active database is detected...") + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db1Path, 10*time.Second); err != nil { + t.Fatalf("active database not detected: %v", err) + } + + // Create second database and start writing to it + t.Log("Creating second database with writes...") + db2Path := CreateDatabaseInDir(t, db.DirPath, "", "active2.db") + wg2, cancel2, err := StartContinuousWrites(ctx, t, db2Path, 5) + if err != nil { + t.Fatalf("start writes for db2: %v", err) + } + defer cancel2() + + // Verify second database detected + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db2Path, 10*time.Second); err != nil { + t.Fatalf("second active database not detected: %v", err) + } + + // Let writes continue for a bit + t.Log("Letting writes continue for 5 seconds...") + time.Sleep(5 * time.Second) + + // Stop writers + cancel() + cancel2() + wg.Wait() + wg2.Wait() + + // Verify both databases are still replicated + count, err := CountDatabasesInReplica(db.ReplicaPath) + if err != nil { + t.Fatalf("count databases: %v", err) + } + + if count != 2 { + t.Fatalf("expected 2 databases in replica, got %d", count) + } + + errors, err := CheckForCriticalErrors(t, db.TestDB) + if err != nil { + t.Fatalf("check errors: %v", err) + } + if len(errors) > 0 { + t.Fatalf("found critical errors with active connections: %v", errors) + } + + t.Log("✓ Active connections test passed") +} + +// TestDirectoryWatcherRestartBehavior tests behavior across Litestream restarts +func TestDirectoryWatcherRestartBehavior(t *testing.T) { + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-restart", "*.db", false) + + // Create 3 databases before starting + t.Log("Creating initial databases...") + db1 := CreateDatabaseInDir(t, db.DirPath, "", "db1.db") + db2 := CreateDatabaseInDir(t, db.DirPath, "", "db2.db") + db3 := CreateDatabaseInDir(t, db.DirPath, "", "db3.db") + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + // First start + t.Log("Starting Litestream (first time)...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + + time.Sleep(3 * time.Second) + + // Verify all 3 detected + for _, dbPath := range []string{db1, db2, db3} { + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, dbPath, 5*time.Second); err != nil { + t.Fatalf("database %s not detected: %v", filepath.Base(dbPath), err) + } + } + + // Add 2 more databases dynamically + t.Log("Adding databases dynamically...") + db4 := CreateDatabaseInDir(t, db.DirPath, "", "db4.db") + db5 := CreateDatabaseInDir(t, db.DirPath, "", "db5.db") + + time.Sleep(3 * time.Second) + + // Verify new databases detected + for _, dbPath := range []string{db4, db5} { + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, dbPath, 5*time.Second); err != nil { + t.Fatalf("dynamically added database %s not detected: %v", filepath.Base(dbPath), err) + } + } + + // Stop Litestream + t.Log("Stopping Litestream...") + if err := db.StopLitestream(); err != nil { + t.Fatalf("stop litestream: %v", err) + } + + // Add one more database while stopped + t.Log("Adding database while Litestream is stopped...") + db6 := CreateDatabaseInDir(t, db.DirPath, "", "db6.db") + + time.Sleep(2 * time.Second) + + // Restart Litestream + t.Log("Restarting Litestream...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("restart litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(3 * time.Second) + + // Verify all 6 databases are now being replicated + t.Log("Verifying all databases detected after restart...") + for i, dbPath := range []string{db1, db2, db3, db4, db5, db6} { + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, dbPath, 10*time.Second); err != nil { + t.Fatalf("database %d (%s) not detected after restart: %v", i+1, filepath.Base(dbPath), err) + } + } + + // Add one more dynamically after restart + t.Log("Adding database after restart...") + db7 := CreateDatabaseInDir(t, db.DirPath, "", "db7.db") + + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, db7, 5*time.Second); err != nil { + t.Fatalf("database added after restart not detected: %v", err) + } + + // Final count - should have 7 databases + count, err := CountDatabasesInReplica(db.ReplicaPath) + if err != nil { + t.Fatalf("count databases: %v", err) + } + + if count != 7 { + t.Fatalf("expected 7 databases in replica, got %d", count) + } + + t.Log("✓ Restart behavior test passed") +} + +// TestDirectoryWatcherRenameOperations tests file rename handling +func TestDirectoryWatcherRenameOperations(t *testing.T) { + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-rename", "*.db", false) + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + t.Log("Starting Litestream...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(2 * time.Second) + + // Create database + t.Log("Creating database...") + originalPath := CreateDatabaseInDir(t, db.DirPath, "", "original.db") + + // Wait for replication + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, originalPath, 5*time.Second); err != nil { + t.Fatalf("original database not detected: %v", err) + } + + time.Sleep(2 * time.Second) + + // Rename database + t.Log("Renaming database...") + renamedPath := filepath.Join(db.DirPath, "renamed.db") + if err := os.Rename(originalPath, renamedPath); err != nil { + t.Fatalf("rename database: %v", err) + } + + // Wait for new name to be detected + t.Log("Waiting for renamed database to be detected...") + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, renamedPath, 10*time.Second); err != nil { + t.Fatalf("renamed database not detected: %v", err) + } + + // Verify old database stopped replicating + t.Log("Verifying original database stopped replicating...") + if err := VerifyDatabaseRemoved(t, db.ReplicaPath, originalPath, 3*time.Second); err != nil { + t.Logf("Warning: original may still be replicating: %v", err) + } + + t.Log("✓ Rename operations test passed") +} + +// TestDirectoryWatcherLoadWithWrites tests directory watching with concurrent database writes +func TestDirectoryWatcherLoadWithWrites(t *testing.T) { + if testing.Short() { + t.Skip("skipping load test in short mode") + } + + RequireBinaries(t) + + db := SetupDirectoryWatchTest(t, "dir-watch-load", "*.db", false) + + // Create 3 databases with data + t.Log("Creating initial databases with data...") + db1 := CreateDatabaseInDir(t, db.DirPath, "", "db1.db") + db2 := CreateDatabaseInDir(t, db.DirPath, "", "db2.db") + db3 := CreateDatabaseInDir(t, db.DirPath, "", "db3.db") + + for _, dbPath := range []string{db1, db2, db3} { + if err := CreateDatabaseWithData(t, dbPath, 50); err != nil { + t.Fatalf("create database with data: %v", err) + } + } + + configPath, err := db.CreateDirectoryWatchConfig() + if err != nil { + t.Fatalf("create config: %v", err) + } + + t.Log("Starting Litestream...") + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + defer db.StopLitestream() + + time.Sleep(3 * time.Second) + + // Start continuous writes to all 3 databases + ctx := context.Background() + t.Log("Starting continuous writes to all databases...") + wg1, cancel1, _ := StartContinuousWrites(ctx, t, db1, 20) + wg2, cancel2, _ := StartContinuousWrites(ctx, t, db2, 15) + wg3, cancel3, _ := StartContinuousWrites(ctx, t, db3, 10) + + defer func() { + cancel1() + cancel2() + cancel3() + wg1.Wait() + wg2.Wait() + wg3.Wait() + }() + + // Wait a bit for writes to start + time.Sleep(2 * time.Second) + + // While writes are happening, create 2 new databases + t.Log("Creating new databases while writes are ongoing...") + db4 := CreateDatabaseInDir(t, db.DirPath, "", "db4.db") + db5 := CreateDatabaseInDir(t, db.DirPath, "", "db5.db") + + // Start writes on new databases + wg4, cancel4, _ := StartContinuousWrites(ctx, t, db4, 10) + wg5, cancel5, _ := StartContinuousWrites(ctx, t, db5, 10) + + defer func() { + cancel4() + cancel5() + wg4.Wait() + wg5.Wait() + }() + + // Verify all databases detected + for i, dbPath := range []string{db1, db2, db3, db4, db5} { + if err := WaitForDatabaseInReplica(t, db.ReplicaPath, dbPath, 10*time.Second); err != nil { + t.Fatalf("database %d not detected: %v", i+1, err) + } + } + + // Let writes continue + t.Log("Running writes for 10 seconds...") + time.Sleep(10 * time.Second) + + // Stop all writes + cancel1() + cancel2() + cancel3() + cancel4() + cancel5() + wg1.Wait() + wg2.Wait() + wg3.Wait() + wg4.Wait() + wg5.Wait() + + // Wait for final replication + time.Sleep(3 * time.Second) + + // Verify all 5 databases are in replica + count, err := CountDatabasesInReplica(db.ReplicaPath) + if err != nil { + t.Fatalf("count databases: %v", err) + } + + if count != 5 { + t.Fatalf("expected 5 databases in replica, got %d", count) + } + + // Check for errors + errors, err := CheckForCriticalErrors(t, db.TestDB) + if err != nil { + t.Fatalf("check errors: %v", err) + } + if len(errors) > 0 { + t.Fatalf("found critical errors during load test: %v", errors) + } + + t.Log("✓ Load with writes test passed") +}