Skip to content

Commit 18a7341

Browse files
committed
Fix race condition causing negative WaitGroup counter in delta watches
1 parent 6f7ed21 commit 18a7341

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

pkg/cache/v3/simple.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -563,8 +563,9 @@ func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *status
563563
// sending them in the correct order. Go's default implementation
564564
// of maps are randomized order when ranged over.
565565
if cache.ads {
566-
info.orderResponseWatches()
567-
for _, key := range info.orderedWatches {
566+
// Create a local copy of ordered watches to avoid race conditions
567+
orderedWatches := info.getOrderedWatches()
568+
for _, key := range orderedWatches {
568569
err := respond(info.watches[key.ID], key.ID)
569570
if err != nil {
570571
return err
@@ -600,11 +601,12 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
600601
// of maps are randomized order when ranged over.
601602
if cache.ads {
602603
start := time.Now()
603-
info.orderResponseDeltaWatches()
604+
// Create a local copy of ordered watches to avoid race conditions
605+
orderedWatches := info.getOrderedDeltaWatches()
604606
// Use a buffered channel to safely collect ids to delete from multiple goroutines.
605-
toDeleteCh := make(chan int64, len(info.orderedDeltaWatches))
607+
toDeleteCh := make(chan int64, len(orderedWatches))
606608
wg := sync.WaitGroup{}
607-
for _, k := range info.orderedDeltaWatches {
609+
for _, k := range orderedWatches {
608610
wg.Add(1)
609611
watch := info.deltaWatches[k.ID]
610612
// One goroutine for each client request awaiting response

pkg/cache/v3/status.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,26 @@ func (info *statusInfo) orderResponseWatches() {
196196
sort.Sort(info.orderedWatches)
197197
}
198198

199+
// getOrderedWatches creates and returns a local copy of ordered watch keys.
200+
// This avoids race conditions by not using a shared struct field.
201+
func (info *statusInfo) getOrderedWatches() keys {
202+
orderedWatches := make(keys, len(info.watches))
203+
204+
var index int
205+
for id, watch := range info.watches {
206+
orderedWatches[index] = key{
207+
ID: id,
208+
TypeURL: watch.Request.GetTypeUrl(),
209+
}
210+
index++
211+
}
212+
213+
// Sort our list which we can use in the SetSnapshot functions.
214+
// This is only run when we enable ADS on the cache.
215+
sort.Sort(orderedWatches)
216+
return orderedWatches
217+
}
218+
199219
// orderResponseDeltaWatches will track a list of delta watch keys and order them if
200220
// true is passed.
201221
func (info *statusInfo) orderResponseDeltaWatches() {
@@ -214,3 +234,23 @@ func (info *statusInfo) orderResponseDeltaWatches() {
214234
// This is only run when we enable ADS on the cache.
215235
sort.Sort(info.orderedDeltaWatches)
216236
}
237+
238+
// getOrderedDeltaWatches creates and returns a local copy of ordered delta watch keys.
239+
// This avoids race conditions by not using a shared struct field.
240+
func (info *statusInfo) getOrderedDeltaWatches() keys {
241+
orderedWatches := make(keys, len(info.deltaWatches))
242+
243+
var index int
244+
for id, deltaWatch := range info.deltaWatches {
245+
orderedWatches[index] = key{
246+
ID: id,
247+
TypeURL: deltaWatch.Request.GetTypeUrl(),
248+
}
249+
index++
250+
}
251+
252+
// Sort our list which we can use in the SetSnapshot functions.
253+
// This is only run when we enable ADS on the cache.
254+
sort.Sort(orderedWatches)
255+
return orderedWatches
256+
}

0 commit comments

Comments
 (0)