Skip to content

Commit a1f1e43

Browse files
corylanouclaude
andcommitted
fix(directory-watcher): address race conditions and state consistency issues
This commit fixes several critical and moderate issues identified in code review: **Critical Fixes:** 1. **Meta-path collision detection**: Add validation in NewDBsFromDirectoryConfig to detect when multiple databases would share the same meta-path, which would cause replication state corruption. Returns clear error message identifying the conflicting databases. 2. **Store.AddDB documentation**: Improve comments explaining the double-check locking pattern used to handle concurrent additions of the same database. The pattern prevents duplicates while avoiding holding locks during slow Open() operations. **Moderate Fixes:** 3. **Directory removal state consistency**: Refactor removeDatabase and removeDatabasesUnder to only delete from local map after successful Store.RemoveDB. Prevents inconsistent state if removal fails. 4. **Context propagation**: Replace context.Background() with dm.ctx in directory_watcher.go for proper cancellation during shutdown. **Testing:** - All unit tests pass - Integration test failures are pre-existing on this branch, not introduced by these changes (verified by testing before/after) Fixes identified in PR #827 code review. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 41563f4 commit a1f1e43

File tree

3 files changed

+34
-5
lines changed

3 files changed

+34
-5
lines changed

cmd/litestream/directory_watcher.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,18 +280,21 @@ func (dm *DirectoryMonitor) handlePotentialDatabase(path string) {
280280
func (dm *DirectoryMonitor) removeDatabase(path string) {
281281
dm.mu.Lock()
282282
db := dm.dbs[path]
283-
delete(dm.dbs, path)
284283
dm.mu.Unlock()
285284

286285
if db == nil {
287286
return
288287
}
289288

290-
if err := dm.store.RemoveDB(context.Background(), db.Path()); err != nil {
289+
if err := dm.store.RemoveDB(dm.ctx, db.Path()); err != nil {
291290
dm.logger.Error("remove database from store", "path", path, "error", err)
292291
return
293292
}
294293

294+
dm.mu.Lock()
295+
delete(dm.dbs, path)
296+
dm.mu.Unlock()
297+
295298
dm.logger.Info("removed database from replication", "path", path)
296299
}
297300

@@ -300,21 +303,29 @@ func (dm *DirectoryMonitor) removeDatabasesUnder(dir string) {
300303

301304
dm.mu.Lock()
302305
var toClose []*litestream.DB
306+
var toClosePaths []string
303307
for path, db := range dm.dbs {
304308
if path == dir || strings.HasPrefix(path, prefix) {
305309
toClose = append(toClose, db)
306-
delete(dm.dbs, path)
310+
toClosePaths = append(toClosePaths, path)
307311
}
308312
}
309313
dm.mu.Unlock()
310314

311-
for _, db := range toClose {
315+
// Remove from store first, only delete from local map on success
316+
for i, db := range toClose {
312317
if db == nil {
313318
continue
314319
}
315-
if err := dm.store.RemoveDB(context.Background(), db.Path()); err != nil {
320+
if err := dm.store.RemoveDB(dm.ctx, db.Path()); err != nil {
316321
dm.logger.Error("remove database from store", "path", db.Path(), "error", err)
322+
continue
317323
}
324+
325+
// Only remove from local map after successful store removal
326+
dm.mu.Lock()
327+
delete(dm.dbs, toClosePaths[i])
328+
dm.mu.Unlock()
318329
}
319330
}
320331

cmd/litestream/main.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,11 +605,22 @@ func NewDBsFromDirectoryConfig(dbc *DBConfig) ([]*litestream.DB, error) {
605605

606606
// Create DB instances for each found database
607607
var dbs []*litestream.DB
608+
metaPaths := make(map[string]string)
609+
608610
for _, dbPath := range dbPaths {
609611
db, err := newDBFromDirectoryEntry(dbc, dirPath, dbPath)
610612
if err != nil {
611613
return nil, fmt.Errorf("failed to create DB for %s: %w", dbPath, err)
612614
}
615+
616+
// Validate unique meta-path to prevent replication state corruption
617+
if mp := db.MetaPath(); mp != "" {
618+
if existingDB, exists := metaPaths[mp]; exists {
619+
return nil, fmt.Errorf("meta-path collision: databases %s and %s would share meta-path %s, causing replication state corruption", existingDB, dbPath, mp)
620+
}
621+
metaPaths[mp] = dbPath
622+
}
623+
613624
dbs = append(dbs, db)
614625
}
615626

store.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ func (s *Store) AddDB(db *DB) error {
164164
return fmt.Errorf("db required")
165165
}
166166

167+
// First check: see if database already exists
167168
s.mu.Lock()
168169
for _, existing := range s.dbs {
169170
if existing.Path() == db.Path() {
@@ -176,15 +177,21 @@ func (s *Store) AddDB(db *DB) error {
176177
// Apply store-wide retention settings before opening the database.
177178
db.L0Retention = s.L0Retention
178179

180+
// Open the database without holding the lock to avoid blocking other operations.
181+
// The double-check pattern below handles the race condition.
179182
if err := db.Open(); err != nil {
180183
return fmt.Errorf("open db: %w", err)
181184
}
182185

186+
// Second check: verify database wasn't added by another goroutine while we were opening.
187+
// If it was, close our instance and return without error.
183188
s.mu.Lock()
184189
defer s.mu.Unlock()
185190

186191
for _, existing := range s.dbs {
187192
if existing.Path() == db.Path() {
193+
// Another goroutine added this database while we were opening.
194+
// Close our instance to avoid resource leaks.
188195
if err := db.Close(context.Background()); err != nil {
189196
slog.Error("close duplicate db", "path", db.Path(), "error", err)
190197
}

0 commit comments

Comments
 (0)