Skip to content

Commit 2799070

Browse files
committed
Support both events APIs
Signed-off-by: Borja Clemente <[email protected]>
1 parent 7e63d7a commit 2799070

File tree

6 files changed

+166
-47
lines changed

6 files changed

+166
-47
lines changed

pkg/cluster/cluster.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import (
2525
"k8s.io/apimachinery/pkg/api/meta"
2626
"k8s.io/apimachinery/pkg/runtime"
2727
"k8s.io/client-go/kubernetes/scheme"
28+
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
2829
"k8s.io/client-go/rest"
2930
"k8s.io/client-go/tools/events"
31+
"k8s.io/client-go/tools/record"
3032

3133
"sigs.k8s.io/controller-runtime/pkg/cache"
3234
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -126,10 +128,16 @@ type Options struct {
126128
//
127129
// Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers
128130
// is shorter than the lifetime of your process.
129-
EventBroadcaster events.EventBroadcaster
131+
EventBroadcaster record.EventBroadcaster
132+
133+
// makeBroadcaster allows deferring the creation of the broadcaster to
134+
// avoid leaking goroutines if we never call Start on this manager. It also
135+
// returns whether or not this is a "owned" broadcaster, and as such should be
136+
// stopped with the manager.
137+
makeBroadcaster intrec.EventBroadcasterProducer
130138

131139
// Dependency injection for testing
132-
newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*intrec.Provider, error)
140+
newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
133141
}
134142

135143
// Option can be used to manipulate Options.
@@ -223,7 +231,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
223231
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
224232
// to the particular controller that it's being injected into, rather than a generic one like is here.
225233
// Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil).
226-
recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.EventBroadcaster, options.EventBroadcaster == nil)
234+
recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
227235
if err != nil {
228236
return nil, err
229237
}
@@ -276,6 +284,27 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
276284
options.newRecorderProvider = intrec.NewProvider
277285
}
278286

287+
// This is duplicated with pkg/manager, we need it here to provide
288+
// the user with an EventBroadcaster and there for the Leader election
289+
evtCl, err := eventsv1client.NewForConfigAndClient(config, options.HTTPClient)
290+
if err != nil {
291+
return options, err
292+
}
293+
294+
// This is duplicated with pkg/manager, we need it here to provide
295+
// the user with an EventBroadcaster and there for the Leader election
296+
if options.EventBroadcaster == nil {
297+
// defer initialization to avoid leaking by default
298+
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
299+
return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true
300+
}
301+
} else {
302+
// keep supporting the options.EventBroadcaster in the old API, but do not introduce it for the new one.
303+
options.makeBroadcaster = func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
304+
return options.EventBroadcaster, events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), false
305+
}
306+
}
307+
279308
if options.Logger.GetSink() == nil {
280309
options.Logger = logf.RuntimeLog.WithName("cluster")
281310
}

pkg/cluster/cluster_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"k8s.io/apimachinery/pkg/api/meta"
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/client-go/rest"
32-
"k8s.io/client-go/tools/events"
3332
"sigs.k8s.io/controller-runtime/pkg/cache"
3433
"sigs.k8s.io/controller-runtime/pkg/client"
3534
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
@@ -87,7 +86,7 @@ var _ = Describe("cluster.Cluster", func() {
8786

8887
It("should return an error it can't create a recorder.Provider", func() {
8988
c, err := New(cfg, func(o *Options) {
90-
o.newRecorderProvider = func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ events.EventBroadcaster, _ bool) (*intrec.Provider, error) {
89+
o.newRecorderProvider = func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) {
9190
return nil, fmt.Errorf("expected error")
9291
}
9392
})

pkg/internal/recorder/recorder.go

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,20 @@ import (
2323
"sync"
2424

2525
"github.com/go-logr/logr"
26+
corev1 "k8s.io/api/core/v1"
2627
"k8s.io/apimachinery/pkg/runtime"
27-
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
28+
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
2829
"k8s.io/client-go/rest"
2930
"k8s.io/client-go/tools/events"
3031
"k8s.io/client-go/tools/record"
3132
)
3233

34+
// EventBroadcasterProducer makes an event broadcaster, returning
35+
// whether or not the broadcaster should be stopped with the Provider,
36+
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
37+
// This producer currently produces both a
38+
type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool)
39+
3340
// Provider is a recorder.Provider that records events to the k8s API server
3441
// and to a logr Logger.
3542
type Provider struct {
@@ -39,12 +46,15 @@ type Provider struct {
3946
// scheme to specify when creating a recorder
4047
scheme *runtime.Scheme
4148
// logger is the logger to use when logging diagnostic event info
42-
logger logr.Logger
43-
evtClient eventsv1client.EventsV1Interface
49+
logger logr.Logger
50+
evtClient corev1client.EventInterface
51+
makeBroadcaster EventBroadcasterProducer
4452

4553
broadcasterOnce sync.Once
4654
broadcaster events.EventBroadcaster
47-
stopBroadcaster bool
55+
// Deprecated: will be removed in a future release. Use the broadcaster above instead.
56+
deprecatedBroadcaster record.EventBroadcaster
57+
stopBroadcaster bool
4858
}
4959

5060
// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
@@ -65,10 +75,11 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
6575
// almost certainly already been started (e.g. by leader election). We
6676
// need to invoke this to ensure that we don't inadvertently race with
6777
// an invocation of getBroadcaster.
68-
broadcaster := p.getBroadcaster()
78+
deprecatedBroadcaster, broadcaster := p.getBroadcaster()
6979
if p.stopBroadcaster {
7080
p.lock.Lock()
7181
broadcaster.Shutdown()
82+
deprecatedBroadcaster.Shutdown()
7283
p.stopped = true
7384
p.lock.Unlock()
7485
}
@@ -83,47 +94,52 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
8394

8495
// getBroadcaster ensures that a broadcaster is started for this
8596
// provider, and returns it. It's threadsafe.
86-
func (p *Provider) getBroadcaster() events.EventBroadcaster {
97+
func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadcaster) {
8798
// NB(directxman12): this can technically still leak if something calls
8899
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
89100
// create the broadcaster in start, we could race with other things that
90101
// are started at the same time & want to emit events. The alternative is
91102
// silently swallowing events and more locking, but that seems suboptimal.
92103

93104
p.broadcasterOnce.Do(func() {
94-
if p.broadcaster == nil {
95-
p.broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: p.evtClient})
96-
}
105+
p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster()
106+
107+
// init old broadcaster
108+
p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
109+
p.deprecatedBroadcaster.StartEventWatcher(
110+
func(e *corev1.Event) {
111+
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
112+
})
113+
114+
// init new broadcaster
97115
// TODO(clebs): figure out how to manage the context/channel that StartRecordingToSink needs inside the provider.
98116
_ = p.broadcaster.StartRecordingToSinkWithContext(context.TODO())
99117
})
100118

101-
return p.broadcaster
119+
return p.deprecatedBroadcaster, p.broadcaster
102120
}
103121

104122
// NewProvider create a new Provider instance.
105-
func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, broadcaster events.EventBroadcaster, stopWithProvider bool) (*Provider, error) {
123+
func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
106124
if httpClient == nil {
107125
panic("httpClient must not be nil")
108126
}
109127

110-
eventsv1Client, err := eventsv1client.NewForConfigAndClient(config, httpClient)
128+
corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient)
111129
if err != nil {
112130
return nil, fmt.Errorf("failed to init client: %w", err)
113131
}
114132

115-
p := &Provider{scheme: scheme, logger: logger, broadcaster: broadcaster, stopBroadcaster: stopWithProvider, evtClient: eventsv1Client}
133+
p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")}
116134
return p, nil
117135
}
118136

119137
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
120138
// broadcaster. All events will be associated with a component of the given name.
121139
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
122-
return &oldRecorder{
123-
newRecorder: &lazyRecorder{
124-
prov: p,
125-
name: name,
126-
},
140+
return &deprecatedRecorder{
141+
prov: p,
142+
name: name,
127143
}
128144
}
129145

@@ -149,7 +165,7 @@ type lazyRecorder struct {
149165
// ensureRecording ensures that a concrete recorder is populated for this recorder.
150166
func (l *lazyRecorder) ensureRecording() {
151167
l.recOnce.Do(func() {
152-
broadcaster := l.prov.getBroadcaster()
168+
_, broadcaster := l.prov.getBroadcaster()
153169
l.rec = broadcaster.NewRecorder(l.prov.scheme, l.name)
154170
})
155171
}
@@ -164,21 +180,50 @@ func (l *lazyRecorder) Eventf(regarding runtime.Object, related runtime.Object,
164180
l.prov.lock.RUnlock()
165181
}
166182

167-
// oldRecorder is a wrapper around the events.EventRecorder that implements the old record.EventRecorder API.
168-
// This is a temporary solution to support both the old and new events APIs without duplicating everything.
169-
// Internally it calls the new events API from the old API funcs and no longer supported parameters are ignored (e.g. annotations).
170-
type oldRecorder struct {
171-
newRecorder *lazyRecorder
183+
// deprecatedRecorder implements the old events API during the tranisiton and will be removed in a future release.
184+
// Deprecated: will be removed in a future release.
185+
type deprecatedRecorder struct {
186+
prov *Provider
187+
name string
188+
189+
recOnce sync.Once
190+
rec record.EventRecorder
172191
}
173192

174-
func (l *oldRecorder) Event(object runtime.Object, eventtype, reason, message string) {
175-
l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", message)
193+
// ensureRecording ensures that a concrete recorder is populated for this recorder.
194+
func (l *deprecatedRecorder) ensureRecording() {
195+
l.recOnce.Do(func() {
196+
deprecatedBroadcaster, _ := l.prov.getBroadcaster()
197+
l.rec = deprecatedBroadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
198+
})
176199
}
177200

178-
func (l *oldRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) {
179-
l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", messageFmt, args...)
201+
func (l *deprecatedRecorder) Event(object runtime.Object, eventtype, reason, message string) {
202+
l.ensureRecording()
203+
204+
l.prov.lock.RLock()
205+
if !l.prov.stopped {
206+
l.rec.Event(object, eventtype, reason, message)
207+
}
208+
l.prov.lock.RUnlock()
209+
}
210+
211+
func (l *deprecatedRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...any) {
212+
l.ensureRecording()
213+
214+
l.prov.lock.RLock()
215+
if !l.prov.stopped {
216+
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
217+
}
218+
l.prov.lock.RUnlock()
180219
}
181220

182-
func (l *oldRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) {
183-
l.newRecorder.Eventf(object, nil, eventtype, reason, "no action", messageFmt, args...)
221+
func (l *deprecatedRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...any) {
222+
l.ensureRecording()
223+
224+
l.prov.lock.RLock()
225+
if !l.prov.stopped {
226+
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
227+
}
228+
l.prov.lock.RUnlock()
184229
}

pkg/internal/recorder/recorder_test.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,23 @@ import (
2121
. "github.com/onsi/ginkgo/v2"
2222
. "github.com/onsi/gomega"
2323
"k8s.io/client-go/kubernetes/scheme"
24+
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
25+
"k8s.io/client-go/tools/events"
26+
"k8s.io/client-go/tools/record"
2427
"sigs.k8s.io/controller-runtime/pkg/internal/recorder"
2528
)
2629

2730
var _ = Describe("recorder.Provider", func() {
31+
evtCl, err := eventsv1client.NewForConfigAndClient(cfg, httpClient)
32+
Expect(err).NotTo(HaveOccurred())
33+
34+
makeBroadcaster := func() (record.EventBroadcaster, events.EventBroadcaster, bool) {
35+
return record.NewBroadcaster(), events.NewBroadcaster(&events.EventSinkImpl{Interface: evtCl}), true
36+
}
37+
2838
Describe("NewProvider", func() {
2939
It("should return a provider instance and a nil error.", func() {
30-
provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), nil, true)
40+
provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster)
3141
Expect(provider).NotTo(BeNil())
3242
Expect(err).NotTo(HaveOccurred())
3343
})
@@ -36,14 +46,14 @@ var _ = Describe("recorder.Provider", func() {
3646
// Invalid the config
3747
cfg1 := *cfg
3848
cfg1.Host = "invalid host"
39-
_, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), nil, true)
49+
_, err := recorder.NewProvider(&cfg1, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster)
4050
Expect(err).To(HaveOccurred())
4151
Expect(err.Error()).To(ContainSubstring("failed to init client"))
4252
})
4353
})
4454
Describe("GetEventRecorderFor", func() {
4555
It("should return a recorder instance.", func() {
46-
provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), nil, true)
56+
provider, err := recorder.NewProvider(cfg, httpClient, scheme.Scheme, logr.Discard(), makeBroadcaster)
4757
Expect(err).NotTo(HaveOccurred())
4858

4959
recorder := provider.GetEventRecorderFor("test")

0 commit comments

Comments
 (0)