Skip to content

Commit 0ba4373

Browse files
committed
client-go/tools/cache: goroutine leak checking
Several tests leaked goroutines. All of those get fixed where possible without API changes. Goleak is used to prevent regressions. One new test specifically covers shutdown of an informer and its event handlers.
1 parent e3c5840 commit 0ba4373

File tree

6 files changed

+128
-27
lines changed

6 files changed

+128
-27
lines changed

staging/src/k8s.io/client-go/tools/cache/controller_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
func Example() {
4040
// source simulates an apiserver object endpoint.
4141
source := fcache.NewFakeControllerSource()
42+
defer source.Shutdown()
4243

4344
// This will hold the downstream state, as we know it.
4445
downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
@@ -128,6 +129,7 @@ func Example() {
128129
func ExampleNewInformer() {
129130
// source simulates an apiserver object endpoint.
130131
source := fcache.NewFakeControllerSource()
132+
defer source.Shutdown()
131133

132134
// Let's do threadsafe output to get predictable test results.
133135
deletionCounter := make(chan string, 1000)
@@ -189,7 +191,7 @@ func TestHammerController(t *testing.T) {
189191
// race detector.
190192

191193
// source simulates an apiserver object endpoint.
192-
source := fcache.NewFakeControllerSource()
194+
source := newFakeControllerSource(t)
193195

194196
// Let's do threadsafe output to get predictable test results.
195197
outputSetLock := sync.Mutex{}
@@ -300,7 +302,7 @@ func TestUpdate(t *testing.T) {
300302
// call to update.
301303

302304
// source simulates an apiserver object endpoint.
303-
source := fcache.NewFakeControllerSource()
305+
source := newFakeControllerSource(t)
304306

305307
const (
306308
FROM = "from"
@@ -410,7 +412,7 @@ func TestUpdate(t *testing.T) {
410412

411413
func TestPanicPropagated(t *testing.T) {
412414
// source simulates an apiserver object endpoint.
413-
source := fcache.NewFakeControllerSource()
415+
source := newFakeControllerSource(t)
414416

415417
// Make a controller that just panic if the AddFunc is called.
416418
_, controller := NewInformer(
@@ -456,7 +458,7 @@ func TestPanicPropagated(t *testing.T) {
456458

457459
func TestTransformingInformer(t *testing.T) {
458460
// source simulates an apiserver object endpoint.
459-
source := fcache.NewFakeControllerSource()
461+
source := newFakeControllerSource(t)
460462

461463
makePod := func(name, generation string) *v1.Pod {
462464
return &v1.Pod{
@@ -578,7 +580,7 @@ func TestTransformingInformer(t *testing.T) {
578580

579581
func TestTransformingInformerRace(t *testing.T) {
580582
// source simulates an apiserver object endpoint.
581-
source := fcache.NewFakeControllerSource()
583+
source := newFakeControllerSource(t)
582584

583585
label := "to-be-transformed"
584586
makePod := func(name string) *v1.Pod {

staging/src/k8s.io/client-go/tools/cache/main_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,27 @@ limitations under the License.
1717
package cache
1818

1919
import (
20-
"os"
2120
"testing"
21+
22+
"go.uber.org/goleak"
2223
)
2324

2425
func TestMain(m *testing.M) {
25-
os.Exit(m.Run())
26+
options := []goleak.Option{
27+
// These tests run goroutines which get stuck in Pop.
28+
// This cannot be fixed without modifying the API.
29+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addReplace.func1"),
30+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"),
31+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"),
32+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"),
33+
34+
// TODO: fix the following tests by adding WithContext APIs and cancellation.
35+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"),
36+
// Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135):
37+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"),
38+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"),
39+
// ???
40+
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"),
41+
}
42+
goleak.VerifyTestMain(m, options...)
2643
}

staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ import (
2929
"github.com/google/go-cmp/cmp"
3030
"github.com/google/go-cmp/cmp/cmpopts"
3131
"github.com/stretchr/testify/assert"
32+
"github.com/stretchr/testify/require"
33+
3234
v1 "k8s.io/api/core/v1"
3335
"k8s.io/apimachinery/pkg/api/meta"
3436
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3537
"k8s.io/apimachinery/pkg/util/sets"
3638
"k8s.io/apimachinery/pkg/util/wait"
37-
fcache "k8s.io/client-go/tools/cache/testing"
3839
testingclock "k8s.io/utils/clock/testing"
3940
)
4041

@@ -123,7 +124,7 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
123124
func TestIndexer(t *testing.T) {
124125
assert := assert.New(t)
125126
// source simulates an apiserver object endpoint.
126-
source := fcache.NewFakeControllerSource()
127+
source := newFakeControllerSource(t)
127128
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
128129
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
129130
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
@@ -197,7 +198,7 @@ func TestIndexer(t *testing.T) {
197198

198199
func TestListenerResyncPeriods(t *testing.T) {
199200
// source simulates an apiserver object endpoint.
200-
source := fcache.NewFakeControllerSource()
201+
source := newFakeControllerSource(t)
201202
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
202203
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
203204

@@ -284,7 +285,7 @@ func TestListenerResyncPeriods(t *testing.T) {
284285

285286
func TestResyncCheckPeriod(t *testing.T) {
286287
// source simulates an apiserver object endpoint.
287-
source := fcache.NewFakeControllerSource()
288+
source := newFakeControllerSource(t)
288289

289290
// create the shared informer and resync every 12 hours
290291
informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
@@ -356,7 +357,7 @@ func TestResyncCheckPeriod(t *testing.T) {
356357

357358
// verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed
358359
func TestSharedInformerInitializationRace(t *testing.T) {
359-
source := fcache.NewFakeControllerSource()
360+
source := newFakeControllerSource(t)
360361
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
361362
listener := newTestListener("raceListener", 0)
362363

@@ -371,7 +372,7 @@ func TestSharedInformerInitializationRace(t *testing.T) {
371372
// resync and no resync see the expected state.
372373
func TestSharedInformerWatchDisruption(t *testing.T) {
373374
// source simulates an apiserver object endpoint.
374-
source := fcache.NewFakeControllerSource()
375+
source := newFakeControllerSource(t)
375376

376377
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
377378
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
@@ -446,7 +447,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
446447
}
447448

448449
func TestSharedInformerErrorHandling(t *testing.T) {
449-
source := fcache.NewFakeControllerSource()
450+
source := newFakeControllerSource(t)
450451
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
451452
source.ListError = fmt.Errorf("Access Denied")
452453

@@ -474,7 +475,7 @@ func TestSharedInformerErrorHandling(t *testing.T) {
474475
// TestSharedInformerStartRace is a regression test to ensure there is no race between
475476
// Run and SetWatchErrorHandler, and Run and SetTransform.
476477
func TestSharedInformerStartRace(t *testing.T) {
477-
source := fcache.NewFakeControllerSource()
478+
source := newFakeControllerSource(t)
478479
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
479480
stop := make(chan struct{})
480481
go func() {
@@ -500,7 +501,7 @@ func TestSharedInformerStartRace(t *testing.T) {
500501

501502
func TestSharedInformerTransformer(t *testing.T) {
502503
// source simulates an apiserver object endpoint.
503-
source := fcache.NewFakeControllerSource()
504+
source := newFakeControllerSource(t)
504505

505506
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
506507
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
@@ -531,7 +532,7 @@ func TestSharedInformerTransformer(t *testing.T) {
531532
}
532533

533534
func TestSharedInformerRemoveHandler(t *testing.T) {
534-
source := fcache.NewFakeControllerSource()
535+
source := newFakeControllerSource(t)
535536
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
536537

537538
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
@@ -569,12 +570,12 @@ func TestSharedInformerRemoveHandler(t *testing.T) {
569570
}
570571

571572
func TestSharedInformerRemoveForeignHandler(t *testing.T) {
572-
source := fcache.NewFakeControllerSource()
573+
source := newFakeControllerSource(t)
573574
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
574575

575576
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
576577

577-
source2 := fcache.NewFakeControllerSource()
578+
source2 := newFakeControllerSource(t)
578579
source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
579580

580581
informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@@ -651,7 +652,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
651652
}
652653

653654
func TestSharedInformerMultipleRegistration(t *testing.T) {
654-
source := fcache.NewFakeControllerSource()
655+
source := newFakeControllerSource(t)
655656
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
656657

657658
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@@ -719,7 +720,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
719720
}
720721

721722
func TestRemovingRemovedSharedInformer(t *testing.T) {
722-
source := fcache.NewFakeControllerSource()
723+
source := newFakeControllerSource(t)
723724
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
724725

725726
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@@ -751,7 +752,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) {
751752
// listeners without tripping it up. There are not really many assertions in this
752753
// test. Meant to be run with -race to find race conditions
753754
func TestSharedInformerHandlerAbuse(t *testing.T) {
754-
source := fcache.NewFakeControllerSource()
755+
source := newFakeControllerSource(t)
755756
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
756757

757758
ctx, cancel := context.WithCancel(context.Background())
@@ -865,7 +866,7 @@ func TestSharedInformerHandlerAbuse(t *testing.T) {
865866
}
866867

867868
func TestStateSharedInformer(t *testing.T) {
868-
source := fcache.NewFakeControllerSource()
869+
source := newFakeControllerSource(t)
869870
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
870871

871872
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@@ -914,7 +915,7 @@ func TestStateSharedInformer(t *testing.T) {
914915
}
915916

916917
func TestAddOnStoppedSharedInformer(t *testing.T) {
917-
source := fcache.NewFakeControllerSource()
918+
source := newFakeControllerSource(t)
918919
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
919920

920921
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@@ -947,7 +948,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
947948
}
948949

949950
func TestRemoveOnStoppedSharedInformer(t *testing.T) {
950-
source := fcache.NewFakeControllerSource()
951+
source := newFakeControllerSource(t)
951952
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
952953

953954
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@@ -976,7 +977,7 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) {
976977

977978
func TestRemoveWhileActive(t *testing.T) {
978979
// source simulates an apiserver object endpoint.
979-
source := fcache.NewFakeControllerSource()
980+
source := newFakeControllerSource(t)
980981

981982
// create the shared informer and resync every 12 hours
982983
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
@@ -1012,7 +1013,7 @@ func TestRemoveWhileActive(t *testing.T) {
10121013

10131014
func TestAddWhileActive(t *testing.T) {
10141015
// source simulates an apiserver object endpoint.
1015-
source := fcache.NewFakeControllerSource()
1016+
source := newFakeControllerSource(t)
10161017

10171018
// create the shared informer and resync every 12 hours
10181019
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
@@ -1072,3 +1073,52 @@ func TestAddWhileActive(t *testing.T) {
10721073
return
10731074
}
10741075
}
1076+
1077+
// TestShutdown depends on goleak.VerifyTestMain in main_test.go to verify that
1078+
// all goroutines really have stopped in the different scenarios.
1079+
func TestShutdown(t *testing.T) {
1080+
t.Run("no-context", func(t *testing.T) {
1081+
source := newFakeControllerSource(t)
1082+
stop := make(chan struct{})
1083+
defer close(stop)
1084+
1085+
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
1086+
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
1087+
AddFunc: func(_ any) {},
1088+
})
1089+
require.NoError(t, err)
1090+
defer func() {
1091+
assert.NoError(t, informer.RemoveEventHandler(handler))
1092+
}()
1093+
go informer.Run(stop)
1094+
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
1095+
})
1096+
1097+
t.Run("no-context-later", func(t *testing.T) {
1098+
source := newFakeControllerSource(t)
1099+
stop := make(chan struct{})
1100+
defer close(stop)
1101+
1102+
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
1103+
go informer.Run(stop)
1104+
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
1105+
1106+
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
1107+
AddFunc: func(_ any) {},
1108+
})
1109+
require.NoError(t, err)
1110+
assert.NoError(t, informer.RemoveEventHandler(handler))
1111+
})
1112+
1113+
t.Run("no-run", func(t *testing.T) {
1114+
source := newFakeControllerSource(t)
1115+
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
1116+
_, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
1117+
AddFunc: func(_ any) {},
1118+
})
1119+
require.NoError(t, err)
1120+
1121+
// At this point, neither informer nor handler have any goroutines running
1122+
// and it doesn't matter that nothing gets stopped or removed.
1123+
})
1124+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"testing"
21+
22+
fcache "k8s.io/client-go/tools/cache/testing"
23+
)
24+
25+
func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource {
26+
source := fcache.NewFakeControllerSource()
27+
tb.Cleanup(source.Shutdown)
28+
return source
29+
}

staging/src/k8s.io/code-generator/examples/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

staging/src/k8s.io/component-helpers/go.sum

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)