Skip to content

Commit 738dfcf

Browse files
authored
Merge pull request kubernetes#90825 from dopelsunce/master
Fix race condition between Pop and Close FIFO queue
2 parents fa785a5 + d8b9095 commit 738dfcf

File tree

4 files changed

+75
-14
lines changed

4 files changed

+75
-14
lines changed

staging/src/k8s.io/client-go/tools/cache/delta_fifo.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ type DeltaFIFO struct {
183183
// Indication the queue is closed.
184184
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
185185
// Currently, not used to gate any of CRED operations.
186-
closed bool
187-
closedLock sync.Mutex
186+
closed bool
188187

189188
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
190189
// DeltaType when Replace() is called (to preserve backwards compat).
@@ -204,8 +203,8 @@ var (
204203

205204
// Close the queue.
206205
func (f *DeltaFIFO) Close() {
207-
f.closedLock.Lock()
208-
defer f.closedLock.Unlock()
206+
f.lock.Lock()
207+
defer f.lock.Unlock()
209208
f.closed = true
210209
f.cond.Broadcast()
211210
}
@@ -447,8 +446,8 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
447446

448447
// IsClosed checks if the queue is closed
449448
func (f *DeltaFIFO) IsClosed() bool {
450-
f.closedLock.Lock()
451-
defer f.closedLock.Unlock()
449+
f.lock.Lock()
450+
defer f.lock.Unlock()
452451
return f.closed
453452
}
454453

@@ -472,7 +471,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
472471
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
473472
// When Close() is called, the f.closed is set and the condition is broadcasted.
474473
// Which causes this loop to continue and return from the Pop().
475-
if f.IsClosed() {
474+
if f.closed {
476475
return nil, ErrFIFOClosed
477476
}
478477

staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package cache
1919
import (
2020
"fmt"
2121
"reflect"
22+
"runtime"
2223
"testing"
2324
"time"
2425
)
@@ -645,3 +646,36 @@ func TestDeltaFIFO_HasSynced(t *testing.T) {
645646
}
646647
}
647648
}
649+
650+
// TestDeltaFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue
651+
// should unblock and return after Close is called.
652+
func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
653+
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
654+
KeyFunction: testFifoObjectKeyFunc,
655+
KnownObjects: literalListerGetter(func() []testFifoObject {
656+
return []testFifoObject{mkFifoObj("foo", 5)}
657+
}),
658+
})
659+
660+
c := make(chan struct{})
661+
const jobs = 10
662+
for i := 0; i < jobs; i++ {
663+
go func() {
664+
f.Pop(func(obj interface{}) error {
665+
return nil
666+
})
667+
c <- struct{}{}
668+
}()
669+
}
670+
671+
runtime.Gosched()
672+
f.Close()
673+
674+
for i := 0; i < jobs; i++ {
675+
select {
676+
case <-c:
677+
case <-time.After(500 * time.Millisecond):
678+
t.Fatalf("timed out waiting for Pop to return after Close")
679+
}
680+
}
681+
}

staging/src/k8s.io/client-go/tools/cache/fifo.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,7 @@ type FIFO struct {
128128
// Indication the queue is closed.
129129
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
130130
// Currently, not used to gate any of CRED operations.
131-
closed bool
132-
closedLock sync.Mutex
131+
closed bool
133132
}
134133

135134
var (
@@ -138,8 +137,8 @@ var (
138137

139138
// Close the queue.
140139
func (f *FIFO) Close() {
141-
f.closedLock.Lock()
142-
defer f.closedLock.Unlock()
140+
f.lock.Lock()
141+
defer f.lock.Unlock()
143142
f.closed = true
144143
f.cond.Broadcast()
145144
}
@@ -262,8 +261,8 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
262261

263262
// IsClosed checks if the queue is closed
264263
func (f *FIFO) IsClosed() bool {
265-
f.closedLock.Lock()
266-
defer f.closedLock.Unlock()
264+
f.lock.Lock()
265+
defer f.lock.Unlock()
267266
if f.closed {
268267
return true
269268
}
@@ -284,7 +283,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
284283
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
285284
// When Close() is called, the f.closed is set and the condition is broadcasted.
286285
// Which causes this loop to continue and return from the Pop().
287-
if f.IsClosed() {
286+
if f.closed {
288287
return nil, ErrFIFOClosed
289288
}
290289

staging/src/k8s.io/client-go/tools/cache/fifo_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package cache
1919
import (
2020
"fmt"
2121
"reflect"
22+
"runtime"
2223
"testing"
2324
"time"
2425
)
@@ -278,3 +279,31 @@ func TestFIFO_HasSynced(t *testing.T) {
278279
}
279280
}
280281
}
282+
283+
// TestFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue
284+
// should unblock and return after Close is called.
285+
func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
286+
f := NewFIFO(testFifoObjectKeyFunc)
287+
288+
c := make(chan struct{})
289+
const jobs = 10
290+
for i := 0; i < jobs; i++ {
291+
go func() {
292+
f.Pop(func(obj interface{}) error {
293+
return nil
294+
})
295+
c <- struct{}{}
296+
}()
297+
}
298+
299+
runtime.Gosched()
300+
f.Close()
301+
302+
for i := 0; i < jobs; i++ {
303+
select {
304+
case <-c:
305+
case <-time.After(500 * time.Millisecond):
306+
t.Fatalf("timed out waiting for Pop to return after Close")
307+
}
308+
}
309+
}

0 commit comments

Comments
 (0)