@@ -23,13 +23,20 @@ import (
2323 "sync"
2424
2525 "github.com/go-logr/logr"
26+ corev1 "k8s.io/api/core/v1"
2627 "k8s.io/apimachinery/pkg/runtime"
27- eventsv1client "k8s.io/client-go/kubernetes/typed/events /v1"
28+ corev1client "k8s.io/client-go/kubernetes/typed/core /v1"
2829 "k8s.io/client-go/rest"
2930 "k8s.io/client-go/tools/events"
3031 "k8s.io/client-go/tools/record"
3132)
3233
34+ // EventBroadcasterProducer makes an event broadcaster, returning
35+ // whether or not the broadcaster should be stopped with the Provider,
36+ // or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
37+ // This producer currently produces both a
38+ type EventBroadcasterProducer func () (deprecatedCaster record.EventBroadcaster , caster events.EventBroadcaster , stopWithProvider bool )
39+
3340// Provider is a recorder.Provider that records events to the k8s API server
3441// and to a logr Logger.
3542type Provider struct {
@@ -39,12 +46,15 @@ type Provider struct {
3946 // scheme to specify when creating a recorder
4047 scheme * runtime.Scheme
4148 // logger is the logger to use when logging diagnostic event info
42- logger logr.Logger
43- evtClient eventsv1client.EventsV1Interface
49+ logger logr.Logger
50+ evtClient corev1client.EventInterface
51+ makeBroadcaster EventBroadcasterProducer
4452
4553 broadcasterOnce sync.Once
4654 broadcaster events.EventBroadcaster
47- stopBroadcaster bool
55+ // Deprecated: will be removed in a future release. Use the broadcaster above instead.
56+ deprecatedBroadcaster record.EventBroadcaster
57+ stopBroadcaster bool
4858}
4959
5060// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
@@ -65,10 +75,11 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
6575 // almost certainly already been started (e.g. by leader election). We
6676 // need to invoke this to ensure that we don't inadvertently race with
6777 // an invocation of getBroadcaster.
68- broadcaster := p .getBroadcaster ()
78+ deprecatedBroadcaster , broadcaster := p .getBroadcaster ()
6979 if p .stopBroadcaster {
7080 p .lock .Lock ()
7181 broadcaster .Shutdown ()
82+ deprecatedBroadcaster .Shutdown ()
7283 p .stopped = true
7384 p .lock .Unlock ()
7485 }
@@ -83,47 +94,52 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
8394
8495// getBroadcaster ensures that a broadcaster is started for this
8596// provider, and returns it. It's threadsafe.
86- func (p * Provider ) getBroadcaster () events.EventBroadcaster {
97+ func (p * Provider ) getBroadcaster () (record. EventBroadcaster , events.EventBroadcaster ) {
8798 // NB(directxman12): this can technically still leak if something calls
8899 // "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
89100 // create the broadcaster in start, we could race with other things that
90101 // are started at the same time & want to emit events. The alternative is
91102 // silently swallowing events and more locking, but that seems suboptimal.
92103
93104 p .broadcasterOnce .Do (func () {
94- if p .broadcaster == nil {
95- p .broadcaster = events .NewBroadcaster (& events.EventSinkImpl {Interface : p .evtClient })
96- }
105+ p .deprecatedBroadcaster , p .broadcaster , p .stopBroadcaster = p .makeBroadcaster ()
106+
107+ // init old broadcaster
108+ p .deprecatedBroadcaster .StartRecordingToSink (& corev1client.EventSinkImpl {Interface : p .evtClient })
109+ p .deprecatedBroadcaster .StartEventWatcher (
110+ func (e * corev1.Event ) {
111+ p .logger .V (1 ).Info (e .Message , "type" , e .Type , "object" , e .InvolvedObject , "reason" , e .Reason )
112+ })
113+
114+ // init new broadcaster
97115 // TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider.
98116 _ = p .broadcaster .StartRecordingToSinkWithContext (context .TODO ())
99117 })
100118
101- return p .broadcaster
119+ return p .deprecatedBroadcaster , p . broadcaster
102120}
103121
104122// NewProvider create a new Provider instance.
105- func NewProvider (config * rest.Config , httpClient * http.Client , scheme * runtime.Scheme , logger logr.Logger , broadcaster events. EventBroadcaster , stopWithProvider bool ) (* Provider , error ) {
123+ func NewProvider (config * rest.Config , httpClient * http.Client , scheme * runtime.Scheme , logger logr.Logger , makeBroadcaster EventBroadcasterProducer ) (* Provider , error ) {
106124 if httpClient == nil {
107125 panic ("httpClient must not be nil" )
108126 }
109127
110- eventsv1Client , err := eventsv1client .NewForConfigAndClient (config , httpClient )
128+ corev1Client , err := corev1client .NewForConfigAndClient (config , httpClient )
111129 if err != nil {
112130 return nil , fmt .Errorf ("failed to init client: %w" , err )
113131 }
114132
115- p := & Provider {scheme : scheme , logger : logger , broadcaster : broadcaster , stopBroadcaster : stopWithProvider , evtClient : eventsv1Client }
133+ p := & Provider {scheme : scheme , logger : logger , makeBroadcaster : makeBroadcaster , evtClient : corev1Client . Events ( "" ) }
116134 return p , nil
117135}
118136
119137// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
120138// broadcaster. All events will be associated with a component of the given name.
121139func (p * Provider ) GetEventRecorderFor (name string ) record.EventRecorder {
122- return & oldRecorder {
123- newRecorder : & lazyRecorder {
124- prov : p ,
125- name : name ,
126- },
140+ return & deprecatedRecorder {
141+ prov : p ,
142+ name : name ,
127143 }
128144}
129145
@@ -149,7 +165,7 @@ type lazyRecorder struct {
149165// ensureRecording ensures that a concrete recorder is populated for this recorder.
150166func (l * lazyRecorder ) ensureRecording () {
151167 l .recOnce .Do (func () {
152- broadcaster := l .prov .getBroadcaster ()
168+ _ , broadcaster := l .prov .getBroadcaster ()
153169 l .rec = broadcaster .NewRecorder (l .prov .scheme , l .name )
154170 })
155171}
@@ -164,21 +180,50 @@ func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object,
164180 l .prov .lock .RUnlock ()
165181}
166182
167- // oldRecorder is a wrapper around the events.EventRecorder that implements the old record.EventRecorder API.
168- // This is a temporary solution to support both the old and new events APIs without duplicating everything.
169- // Internally it calls the new events API from the old API funcs and no longer supported parameters are ignored (e.g. annotations).
170- type oldRecorder struct {
171- newRecorder * lazyRecorder
183+ // deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release.
184+ // Deprecated: will be removed in a future release.
185+ type deprecatedRecorder struct {
186+ prov * Provider
187+ name string
188+
189+ recOnce sync.Once
190+ rec record.EventRecorder
172191}
173192
174- func (l * oldRecorder ) Event (object runtime.Object , eventtype , reason , message string ) {
175- l .newRecorder .Eventf (object , nil , eventtype , reason , "no action" , message )
193+ // ensureRecording ensures that a concrete recorder is populated for this recorder.
194+ func (l * deprecatedRecorder ) ensureRecording () {
195+ l .recOnce .Do (func () {
196+ deprecatedBroadcaster , _ := l .prov .getBroadcaster ()
197+ l .rec = deprecatedBroadcaster .NewRecorder (l .prov .scheme , corev1.EventSource {Component : l .name })
198+ })
176199}
177200
178- func (l * oldRecorder ) Eventf (object runtime.Object , eventtype , reason , messageFmt string , args ... any ) {
179- l .newRecorder .Eventf (object , nil , eventtype , reason , "no action" , messageFmt , args ... )
201+ func (l * deprecatedRecorder ) Event (object runtime.Object , eventtype , reason , message string ) {
202+ l .ensureRecording ()
203+
204+ l .prov .lock .RLock ()
205+ if ! l .prov .stopped {
206+ l .rec .Event (object , eventtype , reason , message )
207+ }
208+ l .prov .lock .RUnlock ()
209+ }
210+
211+ func (l * deprecatedRecorder ) Eventf (object runtime.Object , eventtype , reason , messageFmt string , args ... any ) {
212+ l .ensureRecording ()
213+
214+ l .prov .lock .RLock ()
215+ if ! l .prov .stopped {
216+ l .rec .Eventf (object , eventtype , reason , messageFmt , args ... )
217+ }
218+ l .prov .lock .RUnlock ()
180219}
181220
182- func (l * oldRecorder ) AnnotatedEventf (object runtime.Object , annotations map [string ]string , eventtype , reason , messageFmt string , args ... any ) {
183- l .newRecorder .Eventf (object , nil , eventtype , reason , "no action" , messageFmt , args ... )
221+ func (l * deprecatedRecorder ) AnnotatedEventf (object runtime.Object , annotations map [string ]string , eventtype , reason , messageFmt string , args ... any ) {
222+ l .ensureRecording ()
223+
224+ l .prov .lock .RLock ()
225+ if ! l .prov .stopped {
226+ l .rec .AnnotatedEventf (object , annotations , eventtype , reason , messageFmt , args ... )
227+ }
228+ l .prov .lock .RUnlock ()
184229}
0 commit comments