Skip to content

Commit 27a68ae

Browse files
committed
k8s.io/client-go/tools: support context for event recording
Using StartRecordingToSinkWithContext instead of StartRecordingToSink and StartLogging instead of StartStructuredLogging has several advantages: - Spawned goroutines no longer get stuck for extended periods of time during shutdown when passing in a context that gets canceled. - Log output can be directed towards a specific logger instead of the global default, for example one which writes to a testing.T instance. - The new methods return an error when something went wrong instead of merely recording the error. That last point is the reason for deprecating the old methods instead of merely adding new alternatives. Setting a context when constructing an EventBroadcaster makes calling Shutdown optional. It can also be used to specify the logger. Both EventRecorder interfaces in tools/events and tools/record now have a WithLogger helper. Using that method is optional, but recommended to support contextual logging properly. Without it, errors that occur while emitting an event are not associated with the caller.
1 parent f7dacb6 commit 27a68ae

File tree

12 files changed

+339
-120
lines changed

12 files changed

+339
-120
lines changed

hack/logcheck.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
2929
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
3030
# A few files involved in startup migrated already to contextual
3131
# We can't enable contextual logcheck until all are migrated
32+
contextual k8s.io/client-go/tools/events/.*
33+
contextual k8s.io/client-go/tools/record/.*
3234
contextual k8s.io/dynamic-resource-allocation/.*
3335
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
3436
contextual k8s.io/kubernetes/pkg/controller/.*

staging/src/k8s.io/client-go/tools/events/event_broadcaster.go

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,13 @@ func (e *eventBroadcasterImpl) Shutdown() {
124124
}
125125

126126
// refreshExistingEventSeries refresh events TTL
127-
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
127+
func (e *eventBroadcasterImpl) refreshExistingEventSeries(ctx context.Context) {
128128
// TODO: Investigate whether lock contention won't be a problem
129129
e.mu.Lock()
130130
defer e.mu.Unlock()
131131
for isomorphicKey, event := range e.eventCache {
132132
if event.Series != nil {
133-
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
133+
if recordedEvent, retry := recordEvent(ctx, e.sink, event); !retry {
134134
if recordedEvent != nil {
135135
e.eventCache[isomorphicKey] = recordedEvent
136136
}
@@ -142,15 +142,15 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
142142
// finishSeries checks if a series has ended and either:
143143
// - write final count to the apiserver
144144
// - delete a singleton event (i.e. series field is nil) from the cache
145-
func (e *eventBroadcasterImpl) finishSeries() {
145+
func (e *eventBroadcasterImpl) finishSeries(ctx context.Context) {
146146
// TODO: Investigate whether lock contention won't be a problem
147147
e.mu.Lock()
148148
defer e.mu.Unlock()
149149
for isomorphicKey, event := range e.eventCache {
150150
eventSerie := event.Series
151151
if eventSerie != nil {
152152
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
153-
if _, retry := recordEvent(e.sink, event); !retry {
153+
if _, retry := recordEvent(ctx, e.sink, event); !retry {
154154
delete(e.eventCache, isomorphicKey)
155155
}
156156
}
@@ -161,13 +161,13 @@ func (e *eventBroadcasterImpl) finishSeries() {
161161
}
162162

163163
// NewRecorder returns an EventRecorder that records events with the given event source.
164-
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
164+
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger {
165165
hostname, _ := os.Hostname()
166166
reportingInstance := reportingController + "-" + hostname
167-
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
167+
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
168168
}
169169

170-
func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
170+
func (e *eventBroadcasterImpl) recordToSink(ctx context.Context, event *eventsv1.Event, clock clock.Clock) {
171171
// Make a copy before modification, because there could be multiple listeners.
172172
eventCopy := event.DeepCopy()
173173
go func() {
@@ -197,39 +197,44 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
197197
}()
198198
if evToRecord != nil {
199199
// TODO: Add a metric counting the number of recording attempts
200-
e.attemptRecording(evToRecord)
200+
e.attemptRecording(ctx, evToRecord)
201201
// We don't want the new recorded Event to be reflected in the
202202
// client's cache because server-side mutations could mess with the
203203
// aggregation mechanism used by the client.
204204
}
205205
}()
206206
}
207207

208-
func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event {
208+
func (e *eventBroadcasterImpl) attemptRecording(ctx context.Context, event *eventsv1.Event) {
209209
tries := 0
210210
for {
211-
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
212-
return recordedEvent
211+
if _, retry := recordEvent(ctx, e.sink, event); !retry {
212+
return
213213
}
214214
tries++
215215
if tries >= maxTriesPerEvent {
216-
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
217-
return nil
216+
klog.FromContext(ctx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
217+
return
218218
}
219219
// Randomize sleep so that various clients won't all be
220-
// synced up if the master goes down.
221-
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
220+
// synced up if the master goes down. Give up when
221+
// the context is canceled.
222+
select {
223+
case <-ctx.Done():
224+
return
225+
case <-time.After(wait.Jitter(e.sleepDuration, 0.25)):
226+
}
222227
}
223228
}
224229

225-
func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
230+
func recordEvent(ctx context.Context, sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
226231
var newEvent *eventsv1.Event
227232
var err error
228233
isEventSeries := event.Series != nil
229234
if isEventSeries {
230235
patch, patchBytesErr := createPatchBytesForSeries(event)
231236
if patchBytesErr != nil {
232-
klog.Errorf("Unable to calculate diff, no merge is possible: %v", patchBytesErr)
237+
klog.FromContext(ctx).Error(patchBytesErr, "Unable to calculate diff, no merge is possible")
233238
return nil, false
234239
}
235240
newEvent, err = sink.Patch(event, patch)
@@ -248,7 +253,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
248253
switch err.(type) {
249254
case *restclient.RequestConstructionError:
250255
// We will construct the request the same next time, so don't keep trying.
251-
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
256+
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
252257
return nil, false
253258
case *errors.StatusError:
254259
if errors.IsAlreadyExists(err) {
@@ -260,9 +265,9 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
260265
if isEventSeries {
261266
return nil, true
262267
}
263-
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
268+
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
264269
} else {
265-
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
270+
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
266271
}
267272
return nil, false
268273
case *errors.UnexpectedObjectError:
@@ -271,7 +276,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
271276
default:
272277
// This case includes actual http transport errors. Go ahead and retry.
273278
}
274-
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
279+
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)")
275280
return nil, true
276281
}
277282

@@ -307,29 +312,38 @@ func getKey(event *eventsv1.Event) eventKey {
307312
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
308313
// The return value can be ignored or used to stop recording, if desired.
309314
// TODO: this function should also return an error.
315+
//
316+
// Deprecated: use StartLogging instead.
310317
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
311-
stopWatcher, err := e.StartEventWatcher(
318+
logger := klog.Background().V(int(verbosity))
319+
stopWatcher, err := e.StartLogging(logger)
320+
if err != nil {
321+
logger.Error(err, "Failed to start event watcher")
322+
return func() {}
323+
}
324+
return stopWatcher
325+
}
326+
327+
// StartLogging starts sending events received from this EventBroadcaster to the structured logger.
328+
// To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`).
329+
// The returned function can be ignored or used to stop recording, if desired.
330+
func (e *eventBroadcasterImpl) StartLogging(logger klog.Logger) (func(), error) {
331+
return e.StartEventWatcher(
312332
func(obj runtime.Object) {
313333
event, ok := obj.(*eventsv1.Event)
314334
if !ok {
315-
klog.Errorf("unexpected type, expected eventsv1.Event")
335+
logger.Error(nil, "unexpected type, expected eventsv1.Event")
316336
return
317337
}
318-
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
338+
logger.Info("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
319339
})
320-
if err != nil {
321-
klog.Errorf("failed to start event watcher: '%v'", err)
322-
return func() {}
323-
}
324-
return stopWatcher
325340
}
326341

327342
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
328343
// The return value is used to stop recording
329344
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
330345
watcher, err := e.Watch()
331346
if err != nil {
332-
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
333347
return nil, err
334348
}
335349
go func() {
@@ -345,37 +359,42 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
345359
return watcher.Stop, nil
346360
}
347361

348-
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
362+
func (e *eventBroadcasterImpl) startRecordingEvents(ctx context.Context) error {
349363
eventHandler := func(obj runtime.Object) {
350364
event, ok := obj.(*eventsv1.Event)
351365
if !ok {
352-
klog.Errorf("unexpected type, expected eventsv1.Event")
366+
klog.FromContext(ctx).Error(nil, "unexpected type, expected eventsv1.Event")
353367
return
354368
}
355-
e.recordToSink(event, clock.RealClock{})
369+
e.recordToSink(ctx, event, clock.RealClock{})
356370
}
357371
stopWatcher, err := e.StartEventWatcher(eventHandler)
358372
if err != nil {
359373
return err
360374
}
361375
go func() {
362-
<-stopCh
376+
<-ctx.Done()
363377
stopWatcher()
364378
}()
365379
return nil
366380
}
367381

368382
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
383+
// Deprecated: use StartRecordingToSinkWithContext instead.
369384
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
370-
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
371-
go wait.Until(e.finishSeries, finishTime, stopCh)
372-
err := e.startRecordingEvents(stopCh)
385+
err := e.StartRecordingToSinkWithContext(wait.ContextForChannel(stopCh))
373386
if err != nil {
374-
klog.Errorf("unexpected type, expected eventsv1.Event")
375-
return
387+
klog.Background().Error(err, "Failed to start recording to sink")
376388
}
377389
}
378390

391+
// StartRecordingToSinkWithContext starts sending events received from the specified eventBroadcaster to the given sink.
392+
func (e *eventBroadcasterImpl) StartRecordingToSinkWithContext(ctx context.Context) error {
393+
go wait.UntilWithContext(ctx, e.refreshExistingEventSeries, refreshTime)
394+
go wait.UntilWithContext(ctx, e.finishSeries, finishTime)
395+
return e.startRecordingEvents(ctx)
396+
}
397+
379398
type eventBroadcasterAdapterImpl struct {
380399
coreClient typedv1core.EventsGetter
381400
coreBroadcaster record.EventBroadcaster
@@ -409,14 +428,14 @@ func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{
409428
}
410429
}
411430

412-
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
431+
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorderLogger {
413432
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
414433
return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
415434
}
416435
return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
417436
}
418437

419-
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder {
438+
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger {
420439
return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name})
421440
}
422441

staging/src/k8s.io/client-go/tools/events/event_broadcaster_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
eventsv1 "k8s.io/api/events/v1"
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2727
"k8s.io/client-go/kubernetes/fake"
28+
"k8s.io/klog/v2/ktesting"
2829
)
2930

3031
func TestRecordEventToSink(t *testing.T) {
@@ -78,11 +79,12 @@ func TestRecordEventToSink(t *testing.T) {
7879

7980
for _, tc := range testCases {
8081
t.Run(tc.name, func(t *testing.T) {
82+
_, ctx := ktesting.NewTestContext(t)
8183
kubeClient := fake.NewSimpleClientset()
8284
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
8385

8486
for _, ev := range tc.eventsToRecord {
85-
recordEvent(eventSink, &ev)
87+
recordEvent(ctx, eventSink, &ev)
8688
}
8789

8890
recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})

staging/src/k8s.io/client-go/tools/events/event_recorder.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,45 @@ type recorderImpl struct {
4040
clock clock.Clock
4141
}
4242

43+
var _ EventRecorder = &recorderImpl{}
44+
4345
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
46+
recorder.eventf(klog.Background(), regarding, related, eventtype, reason, action, note, args...)
47+
}
48+
49+
type recorderImplLogger struct {
50+
*recorderImpl
51+
logger klog.Logger
52+
}
53+
54+
var _ EventRecorderLogger = &recorderImplLogger{}
55+
56+
func (recorder *recorderImplLogger) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
57+
recorder.eventf(recorder.logger, regarding, related, eventtype, reason, action, note, args...)
58+
}
59+
60+
func (recorder *recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
61+
return &recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
62+
}
63+
64+
func (recorder *recorderImpl) eventf(logger klog.Logger, regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
4465
timestamp := metav1.MicroTime{Time: time.Now()}
4566
message := fmt.Sprintf(note, args...)
4667
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
4768
if err != nil {
48-
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
69+
logger.Error(err, "Could not construct reference, will not report event", "object", regarding, "eventType", eventtype, "reason", reason, "message", message)
4970
return
5071
}
5172

5273
var refRelated *v1.ObjectReference
5374
if related != nil {
5475
refRelated, err = reference.GetReference(recorder.scheme, related)
5576
if err != nil {
56-
klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
77+
logger.V(9).Info("Could not construct reference", "object", related, "err", err)
5778
}
5879
}
5980
if !util.ValidateEventType(eventtype) {
60-
klog.Errorf("Unsupported event type: '%v'", eventtype)
81+
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
6182
return
6283
}
6384
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)

0 commit comments

Comments
 (0)