Skip to content

Commit b139a82

Browse files
authored
fix: resolve data race in service discovery map access (#5408)
1 parent bdddf1f commit b139a82

File tree

2 files changed

+69
-3
lines changed

2 files changed

+69
-3
lines changed

core/discov/internal/registry.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -433,16 +433,16 @@ func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.C
433433
}
434434

435435
ctx, cancel := context.WithCancel(cli.Ctx())
436+
437+
c.lock.Lock()
436438
if watcher, ok := c.watchers[key]; ok {
437439
watcher.cancel = cancel
438440
} else {
439441
val := newWatchValue()
440442
val.cancel = cancel
441-
442-
c.lock.Lock()
443443
c.watchers[key] = val
444-
c.lock.Unlock()
445444
}
445+
c.lock.Unlock()
446446

447447
rch = cli.Watch(clientv3.WithRequireLeader(ctx), wkey, ops...)
448448

core/discov/internal/registry_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,72 @@ func TestRegistry_Unmonitor(t *testing.T) {
477477
assert.Nil(t, watchVals)
478478
}
479479

480+
// TestCluster_ConcurrentMonitor tests the race condition fix in setupWatch
481+
// This test specifically covers the scenario from issue #5394 where:
482+
// - addListener() writes to the watchers map (with lock)
483+
// - setupWatch() reads from the watchers map (now with lock after fix)
484+
// Running with -race flag will detect any race conditions
485+
func TestCluster_ConcurrentMonitor(t *testing.T) {
486+
ctrl := gomock.NewController(t)
487+
defer ctrl.Finish()
488+
489+
cli := NewMockEtcdClient(ctrl)
490+
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
491+
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(make(chan clientv3.WatchResponse)).AnyTimes()
492+
493+
c := &cluster{
494+
endpoints: []string{"localhost:2379"},
495+
key: "test-cluster",
496+
watchers: make(map[watchKey]*watchValue),
497+
watchGroup: threading.NewRoutineGroup(),
498+
done: make(chan lang.PlaceholderType),
499+
lock: sync.RWMutex{},
500+
}
501+
502+
// Spawn multiple concurrent operations that simulate the race condition:
503+
// - Some goroutines call addListener (write to map)
504+
// - Some goroutines call setupWatch (read from map)
505+
var wg sync.WaitGroup
506+
numGoroutines := 20
507+
wg.Add(numGoroutines)
508+
509+
keys := []watchKey{
510+
{key: "key-0", exactMatch: false},
511+
{key: "key-1", exactMatch: false},
512+
{key: "key-2", exactMatch: false},
513+
}
514+
515+
for i := 0; i < numGoroutines; i++ {
516+
idx := i
517+
go func() {
518+
defer wg.Done()
519+
key := keys[idx%len(keys)]
520+
521+
if idx%2 == 0 {
522+
// Half the goroutines add listeners (write operation)
523+
c.addListener(key, &mockListener{})
524+
} else {
525+
// Half the goroutines setup watches (read operation)
526+
_, _ = c.setupWatch(cli, key, 0)
527+
}
528+
}()
529+
}
530+
531+
// Wait for all goroutines to complete
532+
wg.Wait()
533+
534+
// Verify that watchers were correctly added
535+
c.lock.RLock()
536+
assert.True(t, len(c.watchers) > 0, "watchers should be added")
537+
for _, watcher := range c.watchers {
538+
assert.NotNil(t, watcher, "watcher should not be nil")
539+
}
540+
c.lock.RUnlock()
541+
542+
// Clean up
543+
close(c.done)
544+
}
545+
480546
type mockListener struct {
481547
}
482548

0 commit comments

Comments
 (0)