Skip to content

Commit c3bb59d

Browse files
committed
Fix etcd3 watcher flake
1 parent 4bcaa51 commit c3bb59d

File tree

2 files changed

+6
-29
lines changed

2 files changed

+6
-29
lines changed

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,13 @@ func isCancelError(err error) bool {
226226

227227
func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) {
228228
watchClosedCh := make(chan struct{})
229-
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
230-
231229
var resultChanWG sync.WaitGroup
230+
231+
resultChanWG.Add(1)
232+
go func() {
233+
defer resultChanWG.Done()
234+
wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
235+
}()
232236
wc.processEvents(&resultChanWG)
233237

234238
select {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23-
"sync"
2423
"testing"
2524
"time"
2625

@@ -168,32 +167,6 @@ func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) {
168167
// impact.
169168
// =======================================================================
170169

171-
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
172-
origCtx, store, _ := testSetup(t)
173-
ctx, cancel := context.WithCancel(origCtx)
174-
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
175-
// make resultChan and errChan blocking to ensure ordering.
176-
w.resultChan = make(chan watch.Event)
177-
// The event flow goes like:
178-
// - first we send an error, it should block on resultChan.
179-
// - Then we cancel ctx. The blocking on resultChan should be freed up
180-
// and run() goroutine should return.
181-
var wg sync.WaitGroup
182-
wg.Add(1)
183-
go func() {
184-
w.run(false, true)
185-
wg.Done()
186-
}()
187-
wg.Add(1)
188-
go func() {
189-
w.sendError(fmt.Errorf("some error"))
190-
wg.Done()
191-
}()
192-
cancel()
193-
// Ensure that both run() and sendError() don't hung forever.
194-
wg.Wait()
195-
}
196-
197170
// TestWatchErrorIncorrectConfiguration checks if an error
198171
// will be returned when the storage hasn't been properly
199172
// initialised for watch requests

0 commit comments

Comments
 (0)