@@ -23,17 +23,16 @@ import (
2323 "sync"
2424
2525 "github.com/go-logr/logr"
26- corev1 "k8s.io/api/core/v1"
2726 "k8s.io/apimachinery/pkg/runtime"
28- corev1client "k8s.io/client-go/kubernetes/typed/core /v1"
27+ eventsv1client "k8s.io/client-go/kubernetes/typed/events /v1"
2928 "k8s.io/client-go/rest"
30- "k8s.io/client-go/tools/record "
29+ "k8s.io/client-go/tools/events "
3130)
3231
3332// EventBroadcasterProducer makes an event broadcaster, returning
3433// whether or not the broadcaster should be stopped with the Provider,
3534// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
36- type EventBroadcasterProducer func () (caster record .EventBroadcaster , stopWithProvider bool )
35+ type EventBroadcasterProducer func () (caster events .EventBroadcaster , stopWithProvider bool )
3736
3837// Provider is a recorder.Provider that records events to the k8s API server
3938// and to a logr Logger.
@@ -45,11 +44,11 @@ type Provider struct {
4544 scheme * runtime.Scheme
4645 // logger is the logger to use when logging diagnostic event info
4746 logger logr.Logger
48- evtClient corev1client. EventInterface
47+ evtClient eventsv1client. EventsV1Interface
4948 makeBroadcaster EventBroadcasterProducer
5049
5150 broadcasterOnce sync.Once
52- broadcaster record .EventBroadcaster
51+ broadcaster events .EventBroadcaster
5352 stopBroadcaster bool
5453}
5554
@@ -89,45 +88,48 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
8988
9089// getBroadcaster ensures that a broadcaster is started for this
9190// provider, and returns it. It's threadsafe.
92- func (p * Provider ) getBroadcaster () record .EventBroadcaster {
91+ func (p * Provider ) getBroadcaster () events .EventBroadcaster {
9392 // NB(directxman12): this can technically still leak if something calls
9493 // "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
9594 // create the broadcaster in start, we could race with other things that
9695 // are started at the same time & want to emit events. The alternative is
9796 // silently swallowing events and more locking, but that seems suboptimal.
9897
9998 p .broadcasterOnce .Do (func () {
100- broadcaster , stop := p .makeBroadcaster ()
101- broadcaster .StartRecordingToSink (& corev1client.EventSinkImpl {Interface : p .evtClient })
102- broadcaster .StartEventWatcher (
103- func (e * corev1.Event ) {
99+ if p .broadcaster == nil {
100+ p .broadcaster = events .NewBroadcaster (& events.EventSinkImpl {Interface : p .evtClient })
101+ }
102+ // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider.
103+ p .broadcaster .StartRecordingToSink (nil )
104+
105+ // TODO(clebs): figure out if we still need this and how the change would make sense.
106+ p .broadcaster .StartEventWatcher (
107+ func (e runtime.Object ) {
104108 p .logger .V (1 ).Info (e .Message , "type" , e .Type , "object" , e .InvolvedObject , "reason" , e .Reason )
105109 })
106- p .broadcaster = broadcaster
107- p .stopBroadcaster = stop
108110 })
109111
110112 return p .broadcaster
111113}
112114
113115// NewProvider create a new Provider instance.
114- func NewProvider (config * rest.Config , httpClient * http.Client , scheme * runtime.Scheme , logger logr.Logger , makeBroadcaster EventBroadcasterProducer ) (* Provider , error ) {
116+ func NewProvider (config * rest.Config , httpClient * http.Client , scheme * runtime.Scheme , logger logr.Logger , broadcaster events. EventBroadcaster , stopWithProvider bool ) (* Provider , error ) {
115117 if httpClient == nil {
116118 panic ("httpClient must not be nil" )
117119 }
118120
119- corev1Client , err := corev1client .NewForConfigAndClient (config , httpClient )
121+ eventsv1Client , err := eventsv1client .NewForConfigAndClient (config , httpClient )
120122 if err != nil {
121123 return nil , fmt .Errorf ("failed to init client: %w" , err )
122124 }
123125
124- p := & Provider {scheme : scheme , logger : logger , makeBroadcaster : makeBroadcaster , evtClient : corev1Client . Events ( "" ) }
126+ p := & Provider {scheme : scheme , logger : logger , broadcaster : broadcaster , stopBroadcaster : stopWithProvider , evtClient : eventsv1Client }
125127 return p , nil
126128}
127129
128- // GetEventRecorderFor returns an event recorder that broadcasts to this provider's
130+ // GetEventRecorder returns an event recorder that broadcasts to this provider's
129131// broadcaster. All events will be associated with a component of the given name.
130- func (p * Provider ) GetEventRecorderFor (name string ) record .EventRecorder {
132+ func (p * Provider ) GetEventRecorder (name string ) events .EventRecorder {
131133 return & lazyRecorder {
132134 prov : p ,
133135 name : name ,
@@ -141,41 +143,23 @@ type lazyRecorder struct {
141143 name string
142144
143145 recOnce sync.Once
144- rec record .EventRecorder
146+ rec events .EventRecorder
145147}
146148
147149// ensureRecording ensures that a concrete recorder is populated for this recorder.
148150func (l * lazyRecorder ) ensureRecording () {
149151 l .recOnce .Do (func () {
150152 broadcaster := l .prov .getBroadcaster ()
151- l .rec = broadcaster .NewRecorder (l .prov .scheme , corev1. EventSource { Component : l .name } )
153+ l .rec = broadcaster .NewRecorder (l .prov .scheme , l .name )
152154 })
153155}
154156
155- func (l * lazyRecorder ) Event (object runtime.Object , eventtype , reason , message string ) {
156- l .ensureRecording ()
157-
158- l .prov .lock .RLock ()
159- if ! l .prov .stopped {
160- l .rec .Event (object , eventtype , reason , message )
161- }
162- l .prov .lock .RUnlock ()
163- }
164- func (l * lazyRecorder ) Eventf (object runtime.Object , eventtype , reason , messageFmt string , args ... interface {}) {
165- l .ensureRecording ()
166-
167- l .prov .lock .RLock ()
168- if ! l .prov .stopped {
169- l .rec .Eventf (object , eventtype , reason , messageFmt , args ... )
170- }
171- l .prov .lock .RUnlock ()
172- }
173- func (l * lazyRecorder ) AnnotatedEventf (object runtime.Object , annotations map [string ]string , eventtype , reason , messageFmt string , args ... interface {}) {
157+ func (l * lazyRecorder ) Eventf (regarding runtime.Object , related runtime.Object , eventtype , reason , action , note string , args ... interface {}) {
174158 l .ensureRecording ()
175159
176160 l .prov .lock .RLock ()
177161 if ! l .prov .stopped {
178- l .rec .AnnotatedEventf ( object , annotations , eventtype , reason , messageFmt , args ... )
162+ l .rec .Eventf ( regarding , related , eventtype , reason , action , note , args ... )
179163 }
180164 l .prov .lock .RUnlock ()
181165}
0 commit comments