Skip to content

Commit f4ea903

Browse files
authored
Merge pull request kubernetes#124635 from pohly/event-broadcaster-shutdown-fix
client-go/tools/record: fix and test Broadcaster shutdown + logging
2 parents fad52ae + ff779f1 commit f4ea903

File tree

3 files changed

+89
-25
lines changed

3 files changed

+89
-25
lines changed

staging/src/k8s.io/client-go/tools/record/event.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
203203
// - The context was nil.
204204
// - The context was context.Background() to begin with.
205205
//
206-
// Both cases get checked here.
207-
haveCtxCancelation := ctx.Done() == nil
206+
// Both cases get checked here: we have cancelation if (and only if) there is a channel.
207+
haveCtxCancelation := ctx.Done() != nil
208208

209209
eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
210210

staging/src/k8s.io/client-go/tools/record/event_test.go

Lines changed: 84 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@ package record
1919
import (
2020
"context"
2121
"encoding/json"
22+
stderrors "errors"
2223
"fmt"
2324
"net/http"
2425
"strconv"
2526
"sync"
2627
"testing"
2728
"time"
2829

30+
"github.com/stretchr/testify/assert"
31+
"go.uber.org/goleak"
32+
2933
v1 "k8s.io/api/core/v1"
3034
"k8s.io/apimachinery/pkg/api/errors"
3135
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -34,6 +38,8 @@ import (
3438
"k8s.io/client-go/kubernetes/scheme"
3539
restclient "k8s.io/client-go/rest"
3640
ref "k8s.io/client-go/tools/reference"
41+
"k8s.io/klog/v2"
42+
"k8s.io/klog/v2/ktesting"
3743
"k8s.io/utils/clock"
3844
testclocks "k8s.io/utils/clock/testing"
3945
)
@@ -104,13 +110,38 @@ func OnPatchFactory(testCache map[string]*v1.Event, patchEvent chan<- *v1.Event)
104110
}
105111
}
106112

113+
// newBroadcasterForTests creates a new broadcaster which produces per-test log
114+
// output if StartStructuredLogging is used. Will be shut down automatically
115+
// after the test.
116+
func newBroadcasterForTests(tb testing.TB) EventBroadcaster {
117+
_, ctx := ktesting.NewTestContext(tb)
118+
caster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
119+
tb.Cleanup(caster.Shutdown)
120+
return caster
121+
}
122+
123+
func TestBroadcasterShutdown(t *testing.T) {
124+
_, ctx := ktesting.NewTestContext(t)
125+
ctx, cancel := context.WithCancelCause(ctx)
126+
127+
// Start a broadcaster with background activity.
128+
caster := NewBroadcaster(WithContext(ctx))
129+
caster.StartStructuredLogging(0)
130+
131+
// Stop it.
132+
cancel(stderrors.New("time to stop"))
133+
134+
// Ensure that the broadcaster goroutine is not left running.
135+
goleak.VerifyNone(t)
136+
}
137+
107138
func TestNonRacyShutdown(t *testing.T) {
108139
// Attempt to simulate previously racy conditions, and ensure that no race
109140
// occurs: Nominally, calling "Eventf" *followed by* shutdown from the same
110141
// thread should be a safe operation, but it's not if we launch recorder.Action
111142
// in a goroutine.
112143

113-
caster := NewBroadcasterForTests(0)
144+
caster := newBroadcasterForTests(t)
114145
clock := testclocks.NewFakeClock(time.Now())
115146
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock)
116147

@@ -151,14 +182,15 @@ func TestEventf(t *testing.T) {
151182
t.Fatal(err)
152183
}
153184
table := []struct {
154-
obj k8sruntime.Object
155-
eventtype string
156-
reason string
157-
messageFmt string
158-
elements []interface{}
159-
expect *v1.Event
160-
expectLog string
161-
expectUpdate bool
185+
obj k8sruntime.Object
186+
eventtype string
187+
reason string
188+
messageFmt string
189+
elements []interface{}
190+
expect *v1.Event
191+
expectLog string
192+
expectStructuredLog string
193+
expectUpdate bool
162194
}{
163195
{
164196
obj: testRef,
@@ -186,7 +218,9 @@ func TestEventf(t *testing.T) {
186218
Count: 1,
187219
Type: v1.EventTypeNormal,
188220
},
189-
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
221+
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
222+
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
223+
`,
190224
expectUpdate: false,
191225
},
192226
{
@@ -214,7 +248,9 @@ func TestEventf(t *testing.T) {
214248
Count: 1,
215249
Type: v1.EventTypeNormal,
216250
},
217-
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
251+
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:""}): type: 'Normal' reason: 'Killed' some other verbose message: 1`,
252+
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="" kind="Pod" apiVersion="v1" type="Normal" reason="Killed" message="some other verbose message: 1"
253+
`,
218254
expectUpdate: false,
219255
},
220256
{
@@ -243,7 +279,9 @@ func TestEventf(t *testing.T) {
243279
Count: 2,
244280
Type: v1.EventTypeNormal,
245281
},
246-
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
282+
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
283+
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
284+
`,
247285
expectUpdate: true,
248286
},
249287
{
@@ -272,7 +310,9 @@ func TestEventf(t *testing.T) {
272310
Count: 1,
273311
Type: v1.EventTypeNormal,
274312
},
275-
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
313+
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
314+
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
315+
`,
276316
expectUpdate: false,
277317
},
278318
{
@@ -301,7 +341,9 @@ func TestEventf(t *testing.T) {
301341
Count: 3,
302342
Type: v1.EventTypeNormal,
303343
},
304-
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
344+
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"bar", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[2]"}): type: 'Normal' reason: 'Started' some verbose message: 1`,
345+
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[2]" kind="Pod" apiVersion="v1" type="Normal" reason="Started" message="some verbose message: 1"
346+
`,
305347
expectUpdate: true,
306348
},
307349
{
@@ -330,7 +372,9 @@ func TestEventf(t *testing.T) {
330372
Count: 1,
331373
Type: v1.EventTypeNormal,
332374
},
333-
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
375+
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
376+
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
377+
`,
334378
expectUpdate: false,
335379
},
336380
{
@@ -359,7 +403,9 @@ func TestEventf(t *testing.T) {
359403
Count: 2,
360404
Type: v1.EventTypeNormal,
361405
},
362-
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
406+
expectLog: `Event(v1.ObjectReference{Kind:"Pod", Namespace:"baz", Name:"foo", UID:"differentUid", APIVersion:"v1", ResourceVersion:"", FieldPath:"spec.containers[3]"}): type: 'Normal' reason: 'Stopped' some verbose message: 1`,
407+
expectStructuredLog: `INFO Event occurred object="baz/foo" fieldPath="spec.containers[3]" kind="Pod" apiVersion="v1" type="Normal" reason="Stopped" message="some verbose message: 1"
408+
`,
363409
expectUpdate: true,
364410
},
365411
}
@@ -377,23 +423,36 @@ func TestEventf(t *testing.T) {
377423
},
378424
OnPatch: OnPatchFactory(testCache, patchEvent),
379425
}
380-
eventBroadcaster := NewBroadcasterForTests(0)
426+
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
427+
logSink := logger.GetSink().(ktesting.Underlier)
428+
ctx := klog.NewContext(context.Background(), logger)
429+
eventBroadcaster := NewBroadcaster(WithSleepDuration(0), WithContext(ctx))
430+
defer eventBroadcaster.Shutdown()
381431
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
382432

383433
clock := testclocks.NewFakeClock(time.Now())
384434
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
385435
for index, item := range table {
386436
clock.Step(1 * time.Second)
437+
//nolint:logcheck // Intentionally testing StartLogging here.
387438
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
388439
if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a {
389440
t.Errorf("Expected '%v', got '%v'", e, a)
390441
}
391442
logCalled <- struct{}{}
392443
})
444+
oldEnd := len(logSink.GetBuffer().String())
445+
structuredLogWatcher := eventBroadcaster.StartStructuredLogging(0)
393446
recorder.Eventf(item.obj, item.eventtype, item.reason, item.messageFmt, item.elements...)
394447

395448
<-logCalled
396449

450+
// We don't get notified by the structured test logger directly.
451+
// Instead, we periodically check what new output it has produced.
452+
assert.EventuallyWithT(t, func(t *assert.CollectT) {
453+
assert.Equal(t, item.expectStructuredLog, logSink.GetBuffer().String()[oldEnd:], "new structured log output")
454+
}, time.Minute, time.Millisecond)
455+
397456
// validate event
398457
if item.expectUpdate {
399458
actualEvent := <-patchEvent
@@ -403,6 +462,7 @@ func TestEventf(t *testing.T) {
403462
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
404463
}
405464
logWatcher.Stop()
465+
structuredLogWatcher.Stop()
406466
}
407467
sinkWatcher.Stop()
408468
}
@@ -561,8 +621,9 @@ func TestLotsOfEvents(t *testing.T) {
561621
},
562622
}
563623

564-
eventBroadcaster := NewBroadcasterForTests(0)
624+
eventBroadcaster := newBroadcasterForTests(t)
565625
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
626+
//nolint:logcheck // Intentionally using StartLogging here to get notified.
566627
logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
567628
loggerCalled <- struct{}{}
568629
})
@@ -658,7 +719,7 @@ func TestEventfNoNamespace(t *testing.T) {
658719
},
659720
OnPatch: OnPatchFactory(testCache, patchEvent),
660721
}
661-
eventBroadcaster := NewBroadcasterForTests(0)
722+
eventBroadcaster := newBroadcasterForTests(t)
662723
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
663724

664725
clock := testclocks.NewFakeClock(time.Now())
@@ -953,7 +1014,7 @@ func TestMultiSinkCache(t *testing.T) {
9531014
OnPatch: OnPatchFactory(testCache2, patchEvent2),
9541015
}
9551016

956-
eventBroadcaster := NewBroadcasterForTests(0)
1017+
eventBroadcaster := newBroadcasterForTests(t)
9571018
clock := testclocks.NewFakeClock(time.Now())
9581019
recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
9591020

@@ -971,6 +1032,9 @@ func TestMultiSinkCache(t *testing.T) {
9711032
validateEvent(strconv.Itoa(index), actualEvent, item.expect, t)
9721033
}
9731034
}
1035+
// Stop before creating more events, otherwise the On* callbacks above
1036+
// get stuck writing to the channel that we don't read from anymore.
1037+
sinkWatcher.Stop()
9741038

9751039
// Another StartRecordingToSink call should start to record events with new clean cache.
9761040
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
@@ -988,6 +1052,5 @@ func TestMultiSinkCache(t *testing.T) {
9881052
}
9891053
}
9901054

991-
sinkWatcher.Stop()
9921055
sinkWatcher2.Stop()
9931056
}

staging/src/k8s.io/client-go/tools/record/main_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ limitations under the License.
1717
package record
1818

1919
import (
20-
"os"
2120
"testing"
21+
22+
"go.uber.org/goleak"
2223
)
2324

2425
func TestMain(m *testing.M) {
25-
os.Exit(m.Run())
26+
goleak.VerifyTestMain(m)
2627
}

0 commit comments

Comments
 (0)