Skip to content

Commit f936f69

Browse files
authored
Merge pull request kubernetes#120729 from pohly/events-context
k8s.io/client-go/tools/[events|record]: support context
2 parents 56f3304 + 5dc540f commit f936f69

File tree

12 files changed

+353
-134
lines changed

12 files changed

+353
-134
lines changed

hack/logcheck.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.*
2929
# TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.*
3030
# A few files involved in startup migrated already to contextual
3131
# We can't enable contextual logcheck until all are migrated
32+
contextual k8s.io/client-go/tools/events/.*
33+
contextual k8s.io/client-go/tools/record/.*
3234
contextual k8s.io/dynamic-resource-allocation/.*
3335
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
3436
contextual k8s.io/kubernetes/pkg/controller/.*

staging/src/k8s.io/client-go/tools/events/event_broadcaster.go

Lines changed: 68 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -81,27 +81,27 @@ type EventSinkImpl struct {
8181
}
8282

8383
// Create takes the representation of a event and creates it. Returns the server's representation of the event, and an error, if there is any.
84-
func (e *EventSinkImpl) Create(event *eventsv1.Event) (*eventsv1.Event, error) {
84+
func (e *EventSinkImpl) Create(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
8585
if event.Namespace == "" {
8686
return nil, fmt.Errorf("can't create an event with empty namespace")
8787
}
88-
return e.Interface.Events(event.Namespace).Create(context.TODO(), event, metav1.CreateOptions{})
88+
return e.Interface.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{})
8989
}
9090

9191
// Update takes the representation of a event and updates it. Returns the server's representation of the event, and an error, if there is any.
92-
func (e *EventSinkImpl) Update(event *eventsv1.Event) (*eventsv1.Event, error) {
92+
func (e *EventSinkImpl) Update(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
9393
if event.Namespace == "" {
9494
return nil, fmt.Errorf("can't update an event with empty namespace")
9595
}
96-
return e.Interface.Events(event.Namespace).Update(context.TODO(), event, metav1.UpdateOptions{})
96+
return e.Interface.Events(event.Namespace).Update(ctx, event, metav1.UpdateOptions{})
9797
}
9898

9999
// Patch applies the patch and returns the patched event, and an error, if there is any.
100-
func (e *EventSinkImpl) Patch(event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
100+
func (e *EventSinkImpl) Patch(ctx context.Context, event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
101101
if event.Namespace == "" {
102102
return nil, fmt.Errorf("can't patch an event with empty namespace")
103103
}
104-
return e.Interface.Events(event.Namespace).Patch(context.TODO(), event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
104+
return e.Interface.Events(event.Namespace).Patch(ctx, event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
105105
}
106106

107107
// NewBroadcaster Creates a new event broadcaster.
@@ -124,13 +124,13 @@ func (e *eventBroadcasterImpl) Shutdown() {
124124
}
125125

126126
// refreshExistingEventSeries refresh events TTL
127-
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
127+
func (e *eventBroadcasterImpl) refreshExistingEventSeries(ctx context.Context) {
128128
// TODO: Investigate whether lock contention won't be a problem
129129
e.mu.Lock()
130130
defer e.mu.Unlock()
131131
for isomorphicKey, event := range e.eventCache {
132132
if event.Series != nil {
133-
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
133+
if recordedEvent, retry := recordEvent(ctx, e.sink, event); !retry {
134134
if recordedEvent != nil {
135135
e.eventCache[isomorphicKey] = recordedEvent
136136
}
@@ -142,15 +142,15 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
142142
// finishSeries checks if a series has ended and either:
143143
// - write final count to the apiserver
144144
// - delete a singleton event (i.e. series field is nil) from the cache
145-
func (e *eventBroadcasterImpl) finishSeries() {
145+
func (e *eventBroadcasterImpl) finishSeries(ctx context.Context) {
146146
// TODO: Investigate whether lock contention won't be a problem
147147
e.mu.Lock()
148148
defer e.mu.Unlock()
149149
for isomorphicKey, event := range e.eventCache {
150150
eventSerie := event.Series
151151
if eventSerie != nil {
152152
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
153-
if _, retry := recordEvent(e.sink, event); !retry {
153+
if _, retry := recordEvent(ctx, e.sink, event); !retry {
154154
delete(e.eventCache, isomorphicKey)
155155
}
156156
}
@@ -161,13 +161,13 @@ func (e *eventBroadcasterImpl) finishSeries() {
161161
}
162162

163163
// NewRecorder returns an EventRecorder that records events with the given event source.
164-
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
164+
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger {
165165
hostname, _ := os.Hostname()
166166
reportingInstance := reportingController + "-" + hostname
167-
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
167+
return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
168168
}
169169

170-
func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
170+
func (e *eventBroadcasterImpl) recordToSink(ctx context.Context, event *eventsv1.Event, clock clock.Clock) {
171171
// Make a copy before modification, because there could be multiple listeners.
172172
eventCopy := event.DeepCopy()
173173
go func() {
@@ -197,48 +197,53 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C
197197
}()
198198
if evToRecord != nil {
199199
// TODO: Add a metric counting the number of recording attempts
200-
e.attemptRecording(evToRecord)
200+
e.attemptRecording(ctx, evToRecord)
201201
// We don't want the new recorded Event to be reflected in the
202202
// client's cache because server-side mutations could mess with the
203203
// aggregation mechanism used by the client.
204204
}
205205
}()
206206
}
207207

208-
func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event {
208+
func (e *eventBroadcasterImpl) attemptRecording(ctx context.Context, event *eventsv1.Event) {
209209
tries := 0
210210
for {
211-
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
212-
return recordedEvent
211+
if _, retry := recordEvent(ctx, e.sink, event); !retry {
212+
return
213213
}
214214
tries++
215215
if tries >= maxTriesPerEvent {
216-
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
217-
return nil
216+
klog.FromContext(ctx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
217+
return
218218
}
219219
// Randomize sleep so that various clients won't all be
220-
// synced up if the master goes down.
221-
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
220+
// synced up if the master goes down. Give up when
221+
// the context is canceled.
222+
select {
223+
case <-ctx.Done():
224+
return
225+
case <-time.After(wait.Jitter(e.sleepDuration, 0.25)):
226+
}
222227
}
223228
}
224229

225-
func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
230+
func recordEvent(ctx context.Context, sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
226231
var newEvent *eventsv1.Event
227232
var err error
228233
isEventSeries := event.Series != nil
229234
if isEventSeries {
230235
patch, patchBytesErr := createPatchBytesForSeries(event)
231236
if patchBytesErr != nil {
232-
klog.Errorf("Unable to calculate diff, no merge is possible: %v", patchBytesErr)
237+
klog.FromContext(ctx).Error(patchBytesErr, "Unable to calculate diff, no merge is possible")
233238
return nil, false
234239
}
235-
newEvent, err = sink.Patch(event, patch)
240+
newEvent, err = sink.Patch(ctx, event, patch)
236241
}
237242
// Update can fail because the event may have been removed and it no longer exists.
238243
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
239244
// Making sure that ResourceVersion is empty on creation
240245
event.ResourceVersion = ""
241-
newEvent, err = sink.Create(event)
246+
newEvent, err = sink.Create(ctx, event)
242247
}
243248
if err == nil {
244249
return newEvent, false
@@ -248,7 +253,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
248253
switch err.(type) {
249254
case *restclient.RequestConstructionError:
250255
// We will construct the request the same next time, so don't keep trying.
251-
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
256+
klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
252257
return nil, false
253258
case *errors.StatusError:
254259
if errors.IsAlreadyExists(err) {
@@ -260,9 +265,9 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
260265
if isEventSeries {
261266
return nil, true
262267
}
263-
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
268+
klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
264269
} else {
265-
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
270+
klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
266271
}
267272
return nil, false
268273
case *errors.UnexpectedObjectError:
@@ -271,7 +276,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool)
271276
default:
272277
// This case includes actual http transport errors. Go ahead and retry.
273278
}
274-
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
279+
klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)")
275280
return nil, true
276281
}
277282

@@ -307,29 +312,38 @@ func getKey(event *eventsv1.Event) eventKey {
307312
// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function.
308313
// The return value can be ignored or used to stop recording, if desired.
309314
// TODO: this function should also return an error.
315+
//
316+
// Deprecated: use StartLogging instead.
310317
func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() {
311-
stopWatcher, err := e.StartEventWatcher(
318+
logger := klog.Background().V(int(verbosity))
319+
stopWatcher, err := e.StartLogging(logger)
320+
if err != nil {
321+
logger.Error(err, "Failed to start event watcher")
322+
return func() {}
323+
}
324+
return stopWatcher
325+
}
326+
327+
// StartLogging starts sending events received from this EventBroadcaster to the structured logger.
328+
// To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`).
329+
// The returned function can be ignored or used to stop recording, if desired.
330+
func (e *eventBroadcasterImpl) StartLogging(logger klog.Logger) (func(), error) {
331+
return e.StartEventWatcher(
312332
func(obj runtime.Object) {
313333
event, ok := obj.(*eventsv1.Event)
314334
if !ok {
315-
klog.Errorf("unexpected type, expected eventsv1.Event")
335+
logger.Error(nil, "unexpected type, expected eventsv1.Event")
316336
return
317337
}
318-
klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
338+
logger.Info("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note)
319339
})
320-
if err != nil {
321-
klog.Errorf("failed to start event watcher: '%v'", err)
322-
return func() {}
323-
}
324-
return stopWatcher
325340
}
326341

327342
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
328343
// The return value is used to stop recording
329344
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) {
330345
watcher, err := e.Watch()
331346
if err != nil {
332-
klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err)
333347
return nil, err
334348
}
335349
go func() {
@@ -345,37 +359,42 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime
345359
return watcher.Stop, nil
346360
}
347361

348-
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error {
362+
func (e *eventBroadcasterImpl) startRecordingEvents(ctx context.Context) error {
349363
eventHandler := func(obj runtime.Object) {
350364
event, ok := obj.(*eventsv1.Event)
351365
if !ok {
352-
klog.Errorf("unexpected type, expected eventsv1.Event")
366+
klog.FromContext(ctx).Error(nil, "unexpected type, expected eventsv1.Event")
353367
return
354368
}
355-
e.recordToSink(event, clock.RealClock{})
369+
e.recordToSink(ctx, event, clock.RealClock{})
356370
}
357371
stopWatcher, err := e.StartEventWatcher(eventHandler)
358372
if err != nil {
359373
return err
360374
}
361375
go func() {
362-
<-stopCh
376+
<-ctx.Done()
363377
stopWatcher()
364378
}()
365379
return nil
366380
}
367381

368382
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
383+
// Deprecated: use StartRecordingToSinkWithContext instead.
369384
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
370-
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
371-
go wait.Until(e.finishSeries, finishTime, stopCh)
372-
err := e.startRecordingEvents(stopCh)
385+
err := e.StartRecordingToSinkWithContext(wait.ContextForChannel(stopCh))
373386
if err != nil {
374-
klog.Errorf("unexpected type, expected eventsv1.Event")
375-
return
387+
klog.Background().Error(err, "Failed to start recording to sink")
376388
}
377389
}
378390

391+
// StartRecordingToSinkWithContext starts sending events received from the specified eventBroadcaster to the given sink.
392+
func (e *eventBroadcasterImpl) StartRecordingToSinkWithContext(ctx context.Context) error {
393+
go wait.UntilWithContext(ctx, e.refreshExistingEventSeries, refreshTime)
394+
go wait.UntilWithContext(ctx, e.finishSeries, finishTime)
395+
return e.startRecordingEvents(ctx)
396+
}
397+
379398
type eventBroadcasterAdapterImpl struct {
380399
coreClient typedv1core.EventsGetter
381400
coreBroadcaster record.EventBroadcaster
@@ -409,14 +428,14 @@ func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{
409428
}
410429
}
411430

412-
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
431+
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorderLogger {
413432
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
414433
return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
415434
}
416435
return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
417436
}
418437

419-
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder {
438+
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger {
420439
return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name})
421440
}
422441

staging/src/k8s.io/client-go/tools/events/event_broadcaster_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
eventsv1 "k8s.io/api/events/v1"
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2727
"k8s.io/client-go/kubernetes/fake"
28+
"k8s.io/klog/v2/ktesting"
2829
)
2930

3031
func TestRecordEventToSink(t *testing.T) {
@@ -78,11 +79,12 @@ func TestRecordEventToSink(t *testing.T) {
7879

7980
for _, tc := range testCases {
8081
t.Run(tc.name, func(t *testing.T) {
82+
_, ctx := ktesting.NewTestContext(t)
8183
kubeClient := fake.NewSimpleClientset()
8284
eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()}
8385

8486
for _, ev := range tc.eventsToRecord {
85-
recordEvent(eventSink, &ev)
87+
recordEvent(ctx, eventSink, &ev)
8688
}
8789

8890
recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{})

staging/src/k8s.io/client-go/tools/events/event_recorder.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,45 @@ type recorderImpl struct {
4040
clock clock.Clock
4141
}
4242

43+
var _ EventRecorder = &recorderImpl{}
44+
4345
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
46+
recorder.eventf(klog.Background(), regarding, related, eventtype, reason, action, note, args...)
47+
}
48+
49+
type recorderImplLogger struct {
50+
*recorderImpl
51+
logger klog.Logger
52+
}
53+
54+
var _ EventRecorderLogger = &recorderImplLogger{}
55+
56+
func (recorder *recorderImplLogger) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
57+
recorder.eventf(recorder.logger, regarding, related, eventtype, reason, action, note, args...)
58+
}
59+
60+
func (recorder *recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
61+
return &recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
62+
}
63+
64+
func (recorder *recorderImpl) eventf(logger klog.Logger, regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
4465
timestamp := metav1.MicroTime{Time: time.Now()}
4566
message := fmt.Sprintf(note, args...)
4667
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
4768
if err != nil {
48-
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
69+
logger.Error(err, "Could not construct reference, will not report event", "object", regarding, "eventType", eventtype, "reason", reason, "message", message)
4970
return
5071
}
5172

5273
var refRelated *v1.ObjectReference
5374
if related != nil {
5475
refRelated, err = reference.GetReference(recorder.scheme, related)
5576
if err != nil {
56-
klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
77+
logger.V(9).Info("Could not construct reference", "object", related, "err", err)
5778
}
5879
}
5980
if !util.ValidateEventType(eventtype) {
60-
klog.Errorf("Unsupported event type: '%v'", eventtype)
81+
logger.Error(nil, "Unsupported event type", "eventType", eventtype)
6182
return
6283
}
6384
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)

0 commit comments

Comments
 (0)