-
Notifications
You must be signed in to change notification settings - Fork 1.2k
⚠️ Migration to the new events API #3262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
a460c4f
032b80c
c91e239
fa9c081
709d552
047d2b2
be84c84
03d8e1a
adfebed
c0481cb
70be4c0
f06675e
ad8a528
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,16 +24,19 @@ import ( | |
|
||
"github.com/go-logr/logr" | ||
corev1 "k8s.io/api/core/v1" | ||
eventsv1 "k8s.io/api/events/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/events" | ||
"k8s.io/client-go/tools/record" | ||
) | ||
|
||
// EventBroadcasterProducer makes an event broadcaster, returning | ||
// whether or not the broadcaster should be stopped with the Provider, | ||
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider). | ||
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool) | ||
// This producer currently produces both an old API and a new API broadcaster. | ||
type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool) | ||
|
||
// Provider is a recorder.Provider that records events to the k8s API server | ||
// and to a logr Logger. | ||
|
@@ -48,9 +51,12 @@ type Provider struct { | |
evtClient corev1client.EventInterface | ||
makeBroadcaster EventBroadcasterProducer | ||
|
||
broadcasterOnce sync.Once | ||
broadcaster record.EventBroadcaster | ||
stopBroadcaster bool | ||
broadcasterOnce sync.Once | ||
broadcaster events.EventBroadcaster | ||
cancelSinkRecordingFunc context.CancelFunc | ||
// Deprecated: will be removed in a future release. Use the broadcaster above instead. | ||
deprecatedBroadcaster record.EventBroadcaster | ||
stopBroadcaster bool | ||
} | ||
|
||
// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to | ||
|
@@ -71,10 +77,12 @@ func (p *Provider) Stop(shutdownCtx context.Context) { | |
// almost certainly already been started (e.g. by leader election). We | ||
// need to invoke this to ensure that we don't inadvertently race with | ||
// an invocation of getBroadcaster. | ||
broadcaster := p.getBroadcaster() | ||
deprecatedBroadcaster, broadcaster := p.getBroadcaster() | ||
if p.stopBroadcaster { | ||
p.lock.Lock() | ||
broadcaster.Shutdown() | ||
p.cancelSinkRecordingFunc() | ||
deprecatedBroadcaster.Shutdown() | ||
p.stopped = true | ||
p.lock.Unlock() | ||
} | ||
|
@@ -89,25 +97,42 @@ func (p *Provider) Stop(shutdownCtx context.Context) { | |
|
||
// getBroadcaster ensures that a broadcaster is started for this | ||
// provider, and returns it. It's threadsafe. | ||
func (p *Provider) getBroadcaster() record.EventBroadcaster { | ||
func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) { | ||
// NB(directxman12): this can technically still leak if something calls | ||
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we | ||
// create the broadcaster in start, we could race with other things that | ||
// are started at the same time & want to emit events. The alternative is | ||
// silently swallowing events and more locking, but that seems suboptimal. | ||
|
||
p.broadcasterOnce.Do(func() { | ||
broadcaster, stop := p.makeBroadcaster() | ||
broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) | ||
broadcaster.StartEventWatcher( | ||
p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster() | ||
|
||
// init deprecated broadcaster | ||
p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient}) | ||
p.deprecatedBroadcaster.StartEventWatcher( | ||
func(e *corev1.Event) { | ||
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason) | ||
}) | ||
p.broadcaster = broadcaster | ||
p.stopBroadcaster = stop | ||
|
||
// init new broadcaster | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
p.cancelSinkRecordingFunc = cancel | ||
if err := p.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil { | ||
p.logger.Error(err, "error starting recording for broadcaster") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the back and forth on this. I thought a bit more about it. Can you please check if we can return the error here (and below) I think otherwise a controller just silently won't produce events if this fails and that seems bad There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked this out and we can propagate the errors by using a closure on the
We could probably refactor the provider to move the broadcaster initialisation to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll take another look next week. |
||
} | ||
|
||
_, err := p.broadcaster.StartEventWatcher(func(event runtime.Object) { | ||
e, isEvt := event.(*eventsv1.Event) | ||
if isEvt { | ||
p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Related, "action", e.Action, "reason", e.Reason) | ||
} | ||
}) | ||
if err != nil { | ||
p.logger.Error(err, "error starting event watcher for broadcaster") | ||
} | ||
}) | ||
|
||
return p.broadcaster | ||
return p.deprecatedBroadcaster, p.broadcaster | ||
} | ||
|
||
// NewProvider create a new Provider instance. | ||
|
@@ -128,6 +153,15 @@ func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.S | |
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's | ||
// broadcaster. All events will be associated with a component of the given name. | ||
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder { | ||
return &deprecatedRecorder{ | ||
prov: p, | ||
name: name, | ||
} | ||
} | ||
|
||
// GetEventRecorder returns an event recorder that broadcasts to this provider's | ||
// broadcaster. All events will be associated with a component of the given name. | ||
func (p *Provider) GetEventRecorder(name string) events.EventRecorder { | ||
return &lazyRecorder{ | ||
prov: p, | ||
name: name, | ||
|
@@ -141,18 +175,47 @@ type lazyRecorder struct { | |
name string | ||
|
||
recOnce sync.Once | ||
rec record.EventRecorder | ||
rec events.EventRecorder | ||
} | ||
|
||
// ensureRecording ensures that a concrete recorder is populated for this recorder. | ||
func (l *lazyRecorder) ensureRecording() { | ||
l.recOnce.Do(func() { | ||
broadcaster := l.prov.getBroadcaster() | ||
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) | ||
_, broadcaster := l.prov.getBroadcaster() | ||
l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name) | ||
}) | ||
} | ||
|
||
func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...any) { | ||
l.ensureRecording() | ||
|
||
l.prov.lock.RLock() | ||
if !l.prov.stopped { | ||
l.rec.Eventf(regarding, related, eventtype, reason, action, note, args...) | ||
} | ||
l.prov.lock.RUnlock() | ||
} | ||
|
||
// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release. | ||
// | ||
// Deprecated: will be removed in a future release. | ||
type deprecatedRecorder struct { | ||
prov *Provider | ||
name string | ||
|
||
recOnce sync.Once | ||
rec record.EventRecorder | ||
} | ||
|
||
// ensureRecording ensures that a concrete recorder is populated for this recorder. | ||
func (l *deprecatedRecorder) ensureRecording() { | ||
l.recOnce.Do(func() { | ||
deprecatedBroadcaster, _ := l.prov.getBroadcaster() | ||
l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name}) | ||
}) | ||
} | ||
|
||
func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) { | ||
func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) { | ||
l.ensureRecording() | ||
|
||
l.prov.lock.RLock() | ||
|
@@ -161,7 +224,8 @@ func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message s | |
} | ||
l.prov.lock.RUnlock() | ||
} | ||
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { | ||
|
||
func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) { | ||
l.ensureRecording() | ||
|
||
l.prov.lock.RLock() | ||
|
@@ -170,7 +234,8 @@ func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageF | |
} | ||
l.prov.lock.RUnlock() | ||
} | ||
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { | ||
|
||
func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) { | ||
l.ensureRecording() | ||
|
||
l.prov.lock.RLock() | ||
|
Uh oh!
There was an error while loading. Please reload this page.