@@ -34,7 +34,11 @@ import (
34
34
"k8s.io/apimachinery/pkg/util/json"
35
35
"k8s.io/apimachinery/pkg/util/strategicpatch"
36
36
"k8s.io/apimachinery/pkg/util/wait"
37
+ clientset "k8s.io/client-go/kubernetes"
38
+ "k8s.io/client-go/kubernetes/scheme"
39
+ typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
37
40
typedv1beta1 "k8s.io/client-go/kubernetes/typed/events/v1beta1"
41
+ "k8s.io/client-go/tools/record"
38
42
"k8s.io/client-go/tools/record/util"
39
43
"k8s.io/klog/v2"
40
44
)
@@ -314,3 +318,56 @@ func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
314
318
stopWatcher ()
315
319
}()
316
320
}
321
+
322
+ type eventBroadcasterAdapterImpl struct {
323
+ coreClient typedv1core.EventsGetter
324
+ coreBroadcaster record.EventBroadcaster
325
+ v1beta1Client typedv1beta1.EventsGetter
326
+ v1beta1Broadcaster EventBroadcaster
327
+ }
328
+
329
+ // NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
330
+ // migration of individual components to the new Event API.
331
+ func NewEventBroadcasterAdapter (client clientset.Interface ) EventBroadcasterAdapter {
332
+ eventClient := & eventBroadcasterAdapterImpl {}
333
+ if _ , err := client .Discovery ().ServerResourcesForGroupVersion (v1beta1 .SchemeGroupVersion .String ()); err == nil {
334
+ eventClient .v1beta1Client = client .EventsV1beta1 ()
335
+ eventClient .v1beta1Broadcaster = NewBroadcaster (& EventSinkImpl {Interface : eventClient .v1beta1Client .Events ("" )})
336
+ }
337
+ // Even though there can soon exist cases when coreBroadcaster won't really be needed,
338
+ // we create it unconditionally because its overhead is minor and will simplify using usage
339
+ // patterns of this library in all components.
340
+ eventClient .coreClient = client .CoreV1 ()
341
+ eventClient .coreBroadcaster = record .NewBroadcaster ()
342
+ return eventClient
343
+ }
344
+
345
+ // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
346
+ func (e * eventBroadcasterAdapterImpl ) StartRecordingToSink (stopCh <- chan struct {}) {
347
+ if e .v1beta1Broadcaster != nil && e .v1beta1Client != nil {
348
+ e .v1beta1Broadcaster .StartRecordingToSink (stopCh )
349
+ }
350
+ if e .coreBroadcaster != nil && e .coreClient != nil {
351
+ e .coreBroadcaster .StartRecordingToSink (& typedv1core.EventSinkImpl {Interface : e .coreClient .Events ("" )})
352
+ }
353
+ }
354
+
355
+ func (e * eventBroadcasterAdapterImpl ) NewRecorder (name string ) EventRecorder {
356
+ if e .v1beta1Broadcaster != nil && e .v1beta1Client != nil {
357
+ return e .v1beta1Broadcaster .NewRecorder (scheme .Scheme , name )
358
+ }
359
+ return record .NewEventRecorderAdapter (e .DeprecatedNewLegacyRecorder (name ))
360
+ }
361
+
362
+ func (e * eventBroadcasterAdapterImpl ) DeprecatedNewLegacyRecorder (name string ) record.EventRecorder {
363
+ return e .coreBroadcaster .NewRecorder (scheme .Scheme , corev1.EventSource {Component : name })
364
+ }
365
+
366
+ func (e * eventBroadcasterAdapterImpl ) Shutdown () {
367
+ if e .coreBroadcaster != nil {
368
+ e .coreBroadcaster .Shutdown ()
369
+ }
370
+ if e .v1beta1Broadcaster != nil {
371
+ e .v1beta1Broadcaster .Shutdown ()
372
+ }
373
+ }
0 commit comments