Skip to content

Commit 95e07d9

Browse files
committed
Fix new API not broadcasting events and add integration test
Signed-off-by: Borja Clemente <[email protected]>
1 parent bc36d51 commit 95e07d9

File tree

4 files changed

+107
-8
lines changed

4 files changed

+107
-8
lines changed

pkg/cluster/cluster.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,6 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) {
230230
// Create the recorder provider to inject event recorders for the components.
231231
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
232232
// to the particular controller that it's being injected into, rather than a generic one like is here.
233-
// Stop the broadcaster with the provider only if the broadcaster is externally given (aka non-nil).
234233
recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
235234
if err != nil {
236235
return nil, err

pkg/internal/recorder/recorder.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/go-logr/logr"
2626
corev1 "k8s.io/api/core/v1"
27+
eventsv1 "k8s.io/api/events/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
2829
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
2930
"k8s.io/client-go/rest"
@@ -34,7 +35,7 @@ import (
3435
// EventBroadcasterProducer makes an event broadcaster, returning
3536
// whether or not the broadcaster should be stopped with the Provider,
3637
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
37-
// This producer currently produces both a
38+
// This producer currently produces both an old API and a new API broadcaster.
3839
type EventBroadcasterProducer func() (deprecatedCaster record.EventBroadcaster, caster events.EventBroadcaster, stopWithProvider bool)
3940

4041
// Provider is a recorder.Provider that records events to the k8s API server
@@ -52,6 +53,7 @@ type Provider struct {
5253

5354
broadcasterOnce sync.Once
5455
broadcaster events.EventBroadcaster
56+
stopCh chan struct{}
5557
// Deprecated: will be removed in a future release. Use the broadcaster above instead.
5658
deprecatedBroadcaster record.EventBroadcaster
5759
stopBroadcaster bool
@@ -79,6 +81,7 @@ func (p *Provider) Stop(shutdownCtx context.Context) {
7981
if p.stopBroadcaster {
8082
p.lock.Lock()
8183
broadcaster.Shutdown()
84+
close(p.stopCh)
8285
deprecatedBroadcaster.Shutdown()
8386
p.stopped = true
8487
p.lock.Unlock()
@@ -104,12 +107,22 @@ func (p *Provider) getBroadcaster() (record.EventBroadcaster, events.EventBroadc
104107
p.broadcasterOnce.Do(func() {
105108
p.deprecatedBroadcaster, p.broadcaster, p.stopBroadcaster = p.makeBroadcaster()
106109

107-
// init deprecated broadcaster (new broadcaster does not need to be initialised)
110+
// init deprecated broadcaster
108111
p.deprecatedBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
109112
p.deprecatedBroadcaster.StartEventWatcher(
110113
func(e *corev1.Event) {
111114
p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
112115
})
116+
117+
// init new broadcaster
118+
p.stopCh = make(chan struct{})
119+
p.broadcaster.StartRecordingToSink(p.stopCh)
120+
_, _ = p.broadcaster.StartEventWatcher(func(event runtime.Object) {
121+
e, isEvt := event.(*eventsv1.Event)
122+
if isEvt {
123+
p.logger.V(1).Info(e.Note, "type", e.Type, "object", e.Related, "action", e.Action, "reason", e.Reason)
124+
}
125+
})
113126
})
114127

115128
return p.deprecatedBroadcaster, p.broadcaster

pkg/internal/recorder/recorder_integration_test.go

Lines changed: 90 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ import (
2121

2222
appsv1 "k8s.io/api/apps/v1"
2323
corev1 "k8s.io/api/core/v1"
24+
eventsv1 "k8s.io/api/events/v1"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526
"k8s.io/apimachinery/pkg/watch"
27+
"k8s.io/client-go/kubernetes"
2628
"k8s.io/client-go/kubernetes/scheme"
2729
ref "k8s.io/client-go/tools/reference"
2830
"sigs.k8s.io/controller-runtime/pkg/controller"
31+
"sigs.k8s.io/controller-runtime/pkg/envtest"
2932
"sigs.k8s.io/controller-runtime/pkg/handler"
3033
"sigs.k8s.io/controller-runtime/pkg/manager"
3134
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -36,14 +39,14 @@ import (
3639
)
3740

3841
var _ = Describe("recorder", func() {
39-
Describe("recorder", func() {
42+
Describe("deprecated recorder", func() {
4043
It("should publish events", func(ctx SpecContext) {
4144
By("Creating the Manager")
4245
cm, err := manager.New(cfg, manager.Options{})
4346
Expect(err).NotTo(HaveOccurred())
4447

4548
By("Creating the Controller")
46-
recorder := cm.GetEventRecorderFor("test-recorder") //nolint:staticcheck
49+
recorder := cm.GetEventRecorderFor("test-deprecated-recorder") //nolint:staticcheck
4750
instance, err := controller.New("foo-controller", cm, controller.Options{
4851
Reconciler: reconcile.Func(
4952
func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
@@ -66,7 +69,7 @@ var _ = Describe("recorder", func() {
6669
}()
6770

6871
deployment := &appsv1.Deployment{
69-
ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"},
72+
ObjectMeta: metav1.ObjectMeta{Name: "deprecated-deployment-name"},
7073
Spec: appsv1.DeploymentSpec{
7174
Selector: &metav1.LabelSelector{
7275
MatchLabels: map[string]string{"foo": "bar"},
@@ -108,4 +111,88 @@ var _ = Describe("recorder", func() {
108111
Expect(evt.Message).To(Equal("test-msg"))
109112
})
110113
})
114+
115+
Describe("recorder", func() {
116+
It("should publish events", func(ctx SpecContext) {
117+
By("Creating the Manager")
118+
// this test needs its own env for now to not interfere with the previous one.
119+
// Once the deprecated API is removed this can be removed.
120+
testenv := &envtest.Environment{}
121+
122+
cfg, err := testenv.Start()
123+
Expect(err).NotTo(HaveOccurred())
124+
defer testenv.Stop() //nolint:errcheck
125+
126+
clientset, err := kubernetes.NewForConfig(cfg)
127+
Expect(err).NotTo(HaveOccurred())
128+
129+
cm, err := manager.New(cfg, manager.Options{})
130+
Expect(err).NotTo(HaveOccurred())
131+
132+
By("Creating the Controller")
133+
recorder := cm.GetEventRecorder("test-recorder")
134+
instance, err := controller.New("bar-controller", cm, controller.Options{
135+
Reconciler: reconcile.Func(
136+
func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
137+
dp, err := clientset.AppsV1().Deployments(request.Namespace).Get(ctx, request.Name, metav1.GetOptions{})
138+
Expect(err).NotTo(HaveOccurred())
139+
recorder.Eventf(dp, nil, corev1.EventTypeNormal, "test-reason", "test-action", "test-msg")
140+
return reconcile.Result{}, nil
141+
}),
142+
})
143+
Expect(err).NotTo(HaveOccurred())
144+
145+
By("Watching Resources")
146+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
147+
Expect(err).NotTo(HaveOccurred())
148+
149+
By("Starting the Manager")
150+
go func() {
151+
defer GinkgoRecover()
152+
Expect(cm.Start(ctx)).NotTo(HaveOccurred())
153+
}()
154+
155+
deployment := &appsv1.Deployment{
156+
ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"},
157+
Spec: appsv1.DeploymentSpec{
158+
Selector: &metav1.LabelSelector{
159+
MatchLabels: map[string]string{"foo": "bar"},
160+
},
161+
Template: corev1.PodTemplateSpec{
162+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
163+
Spec: corev1.PodSpec{
164+
Containers: []corev1.Container{
165+
{
166+
Name: "nginx",
167+
Image: "nginx",
168+
},
169+
},
170+
},
171+
},
172+
},
173+
}
174+
175+
By("Invoking Reconciling")
176+
deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
177+
Expect(err).NotTo(HaveOccurred())
178+
179+
By("Validate event is published as expected")
180+
evtWatcher, err := clientset.EventsV1().Events("default").Watch(ctx, metav1.ListOptions{})
181+
Expect(err).NotTo(HaveOccurred())
182+
183+
resultEvent := <-evtWatcher.ResultChan()
184+
185+
Expect(resultEvent.Type).To(Equal(watch.Added))
186+
evt, isEvent := resultEvent.Object.(*eventsv1.Event)
187+
Expect(isEvent).To(BeTrue())
188+
189+
dpRef, err := ref.GetReference(scheme.Scheme, deployment)
190+
Expect(err).NotTo(HaveOccurred())
191+
192+
Expect(evt.Regarding).To(Equal(*dpRef))
193+
Expect(evt.Type).To(Equal(corev1.EventTypeNormal))
194+
Expect(evt.Reason).To(Equal("test-reason"))
195+
Expect(evt.Note).To(Equal("test-msg"))
196+
})
197+
})
111198
})

pkg/recorder/recorder.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import (
2727

2828
// Provider knows how to generate new event recorders with given name.
2929
type Provider interface {
30-
// GetEventRecorder returns a EventRecorder with given name.
30+
// GetEventRecorderFor returns an EventRecorder for the old events API.
3131
//
3232
// Deprecated: this uses the old events API and will be removed in a future release. Please use GetEventRecorder instead.
3333
GetEventRecorderFor(name string) record.EventRecorder
34-
// GetEventRecorder returns an EventRecorder for the old events API.
34+
// GetEventRecorder returns a EventRecorder with given name.
3535
// The old API is not 100% supported anymore, use the new one whenever possible.
3636
GetEventRecorder(name string) events.EventRecorder
3737
}

0 commit comments

Comments
 (0)