Skip to content

Commit e18ff00

Browse files
qiujian16claude
andauthored
🐛 Add CSR to local store and pass context through broker (#168)
This commit addresses two related issues: 1. Adds newly created CSRs to the watcher store to ensure the gRPC driver can retrieve them when the lister is not yet synced 2. Passes context through RegisterService and handler chain to enable contextual logging throughout the gRPC broker 🤖 Generated with [Claude Code](https://claude.com/claude-code) Signed-off-by: Jian Qiu <[email protected]> Co-authored-by: Claude <[email protected]>
1 parent 8266e17 commit e18ff00

File tree

10 files changed

+43
-39
lines changed

10 files changed

+43
-39
lines changed

pkg/cloudevents/clients/csr/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ func (c *CSRClient) Create(ctx context.Context, csr *certificatev1.CertificateSi
6767
return nil, cloudeventserrors.ToStatusError(common.CSRGR, csr.Name, err)
6868
}
6969

70+
// we need to add to the store here since grpc driver may call this when it cannot
71+
// get from lister.
72+
if err := c.watcherStore.Add(csr); err != nil {
73+
return nil, errors.NewInternalError(err)
74+
}
75+
7076
return csr.DeepCopy(), nil
7177
}
7278

pkg/cloudevents/server/grpc/broker.go

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
2929
)
3030

31-
type resourceHandler func(res *cloudevents.Event) error
31+
type resourceHandler func(ctx context.Context, res *cloudevents.Event) error
3232

3333
// subscriber defines a subscriber that can receive and handle resource spec.
3434
type subscriber struct {
@@ -60,9 +60,9 @@ func NewGRPCBroker() *GRPCBroker {
6060
return broker
6161
}
6262

63-
func (bkr *GRPCBroker) RegisterService(t types.CloudEventsDataType, service server.Service) {
63+
func (bkr *GRPCBroker) RegisterService(ctx context.Context, t types.CloudEventsDataType, service server.Service) {
6464
bkr.services[t] = service
65-
service.RegisterHandler(bkr)
65+
service.RegisterHandler(ctx, bkr)
6666
}
6767

6868
func (bkr *GRPCBroker) Subscribers() sets.Set[string] {
@@ -122,7 +122,9 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
122122

123123
// register registers a subscriber and return client id and error channel.
124124
func (bkr *GRPCBroker) register(
125-
clusterName string, dataType types.CloudEventsDataType, handler resourceHandler) (string, <-chan error) {
125+
ctx context.Context, clusterName string, dataType types.CloudEventsDataType, handler resourceHandler) (string, <-chan error) {
126+
logger := klog.FromContext(ctx)
127+
126128
bkr.mu.Lock()
127129
defer bkr.mu.Unlock()
128130

@@ -135,18 +137,19 @@ func (bkr *GRPCBroker) register(
135137
errChan: errChan,
136138
}
137139

138-
klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName)
140+
logger.V(4).Info("register a subscriber", "id", id)
139141
metrics.IncGRPCCESubscribersMetric(clusterName, dataType.String())
140142

141143
return id, errChan
142144
}
143145

144146
// unregister a subscriber by id
145-
func (bkr *GRPCBroker) unregister(id string) {
147+
func (bkr *GRPCBroker) unregister(ctx context.Context, id string) {
146148
bkr.mu.Lock()
147149
defer bkr.mu.Unlock()
148150

149-
klog.V(10).Infof("unregister subscriber %s", id)
151+
logger := klog.FromContext(ctx)
152+
logger.V(4).Info("unregister subscriber", "id", id)
150153
if sub, exists := bkr.subscribers[id]; exists {
151154
close(sub.errChan)
152155
delete(bkr.subscribers, id)
@@ -171,6 +174,9 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
171174
ctx, cancel := context.WithCancel(subServer.Context())
172175
defer cancel()
173176

177+
logger := klog.FromContext(ctx).WithValues("clusterName", subReq.ClusterName)
178+
ctx = klog.NewContext(ctx, logger)
179+
174180
// TODO make the channel size configurable
175181
eventCh := make(chan *pbv1.CloudEvent, 100)
176182

@@ -189,7 +195,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
189195
return
190196
case evt := <-hearbeater.Heartbeat():
191197
if err := subServer.Send(evt); err != nil {
192-
klog.Errorf("failed to send heartbeat: %v", err)
198+
logger.Error(err, "failed to send heartbeat")
193199
// Unblock producers (handler select) and exit heartbeat ticker.
194200
cancel()
195201
select {
@@ -200,7 +206,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
200206
}
201207
case evt := <-eventCh:
202208
if err := subServer.Send(evt); err != nil {
203-
klog.Errorf("failed to send event: %v", err)
209+
logger.Error(err, "failed to send event")
204210
// Unblock producers (handler select) and exit heartbeat ticker.
205211
cancel()
206212
select {
@@ -213,16 +219,16 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
213219
}
214220
}()
215221

216-
subscriberID, errChan := bkr.register(subReq.ClusterName, *dataType, func(evt *cloudevents.Event) error {
222+
subscriberID, errChan := bkr.register(ctx, subReq.ClusterName, *dataType, func(ctx context.Context, evt *cloudevents.Event) error {
217223
// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf
218224
pbEvt := &pbv1.CloudEvent{}
219-
if err := grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil {
225+
if err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt); err != nil {
220226
// return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent).
221227
return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", evt.ID(), err)
222228
}
223229

224230
// send the cloudevent to the subscriber
225-
klog.V(4).Infof("sending the event to spec subscribers, %s", evt.Context)
231+
logger.V(4).Info("sending the event to spec subscribers", "eventContext", evt.Context)
226232
select {
227233
case eventCh <- pbEvt:
228234
case <-ctx.Done():
@@ -238,12 +244,12 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
238244
case err := <-errChan:
239245
// When reaching this point, an unrecoverable error occurred while sending the event,
240246
// such as the connection being closed. Unregister the subscriber to trigger agent reconnection.
241-
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
242-
bkr.unregister(subscriberID)
247+
logger.Error(err, "unregister subscriber", "id", subscriberID)
248+
bkr.unregister(ctx, subscriberID)
243249
return err
244250
case err := <-sendErrCh:
245-
klog.Errorf("failed to send event, unregister subscriber %s, error=%v", subscriberID, err)
246-
bkr.unregister(subscriberID)
251+
logger.Error(err, "failed to send event, unregister subscriber", "id", subscriberID)
252+
bkr.unregister(ctx, subscriberID)
247253
return err
248254
case <-ctx.Done():
249255
// The context of the stream has been canceled or completed.
@@ -252,7 +258,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
252258
// - The server closed the stream, potentially due to a shutdown.
253259
// Regardless of the reason, unregister the subscriber and stop processing.
254260
// No error is returned here because the stream closure is expected.
255-
bkr.unregister(subscriberID)
261+
bkr.unregister(ctx, subscriberID)
256262
return nil
257263
}
258264
}
@@ -307,7 +313,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
307313
lastResourceVersion := findResourceVersion(obj.ID(), resourceVersions.Versions)
308314
currentResourceVersion, err := cloudeventstypes.ToInteger(obj.Extensions()[types.ExtensionResourceVersion])
309315
if err != nil {
310-
log.V(4).Info("ignore the obj %v since it has a invalid resourceVersion, %v", obj, err)
316+
log.V(4).Info("ignore the obj since it has a invalid resourceVersion", "object", obj, "error", err)
311317
continue
312318
}
313319

@@ -383,7 +389,7 @@ func (bkr *GRPCBroker) handleRes(
383389

384390
for _, subscriber := range bkr.subscribers {
385391
if subscriber.clusterName == clusterName && subscriber.dataType == t {
386-
if err := subscriber.handler(evt); err != nil {
392+
if err := subscriber.handler(ctx, evt); err != nil {
387393
// check if the error is recoverable. For unrecoverable errors,
388394
// such as a connection closed by an intermediate proxy, push
389395
// the error to subscriber's error channel to unregister the subscriber.

pkg/cloudevents/server/grpc/broker_heartbeat_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestGRPCBroker_Subscribe_HeartbeatIntegration(t *testing.T) {
104104
}
105105

106106
svc := &testService{evts: make(map[string]*cloudevents.Event)}
107-
broker.RegisterService(dataType, svc)
107+
broker.RegisterService(context.Background(), dataType, svc)
108108

109109
mockServer := newMockSubscribeServer()
110110
defer mockServer.Close()
@@ -162,7 +162,7 @@ func TestGRPCBroker_Subscribe_SendError(t *testing.T) {
162162
}
163163

164164
svc := &testService{evts: make(map[string]*cloudevents.Event)}
165-
broker.RegisterService(dataType, svc)
165+
broker.RegisterService(context.Background(), dataType, svc)
166166

167167
mockServer := newMockSubscribeServer()
168168
defer mockServer.Close()
@@ -196,7 +196,7 @@ func TestGRPCBroker_Subscribe_EventAndHeartbeatSeparation(t *testing.T) {
196196
}
197197

198198
svc := &testService{evts: make(map[string]*cloudevents.Event)}
199-
broker.RegisterService(dataType, svc)
199+
broker.RegisterService(context.Background(), dataType, svc)
200200

201201
mockServer := newMockSubscribeServer()
202202
defer mockServer.Close()
@@ -317,7 +317,7 @@ func TestGRPCBroker_Subscribe_ChannelBlocking(t *testing.T) {
317317
}
318318

319319
svc := &testService{evts: make(map[string]*cloudevents.Event)}
320-
broker.RegisterService(dataType, svc)
320+
broker.RegisterService(context.Background(), dataType, svc)
321321

322322
// Create a slow mock server that simulates slow processing
323323
slowMockServer := &slowMockSubscribeServer{}

pkg/cloudevents/server/grpc/broker_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (s *testService) HandleStatusUpdate(ctx context.Context, evt *cloudevents.E
4949
}
5050

5151
// RegisterHandler register the handler to the service.
52-
func (s *testService) RegisterHandler(handler server.EventHandler) {
52+
func (s *testService) RegisterHandler(_ context.Context, handler server.EventHandler) {
5353
s.handler = handler
5454
}
5555

@@ -67,7 +67,7 @@ func TestServer(t *testing.T) {
6767
pbv1.RegisterCloudEventServiceServer(grpcServer, grpcEventServer)
6868

6969
svc := &testService{evts: make(map[string]*cloudevents.Event)}
70-
grpcEventServer.RegisterService(dataType, svc)
70+
grpcEventServer.RegisterService(context.Background(), dataType, svc)
7171

7272
ctx, cancel := context.WithCancel(context.Background())
7373
defer cancel()

pkg/cloudevents/server/interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type AgentEventServer interface {
1515
EventHandler
1616

1717
// RegisterService registers a backend service with a certain data type.
18-
RegisterService(t types.CloudEventsDataType, service Service)
18+
RegisterService(ctx context.Context, t types.CloudEventsDataType, service Service)
1919

2020
// Subscribers returns all current subscribers who subscribe to this server.
2121
Subscribers() sets.Set[string]

pkg/cloudevents/server/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@ type Service interface {
2121
HandleStatusUpdate(ctx context.Context, evt *cloudevents.Event) error
2222

2323
// RegisterHandler register the handler to the service.
24-
RegisterHandler(handler EventHandler)
24+
RegisterHandler(ctx context.Context, handler EventHandler)
2525
}

pkg/server/grpc/metrics/metrics_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func startBufServer(t *testing.T) *grpc.Server {
4444
)
4545

4646
grpcBroker := cegrpc.NewGRPCBroker()
47-
grpcBroker.RegisterService(payload.ManifestBundleEventDataType, newMockWorkService())
47+
grpcBroker.RegisterService(context.Background(), payload.ManifestBundleEventDataType, newMockWorkService())
4848
pbv1.RegisterCloudEventServiceServer(server, grpcBroker)
4949

5050
RegisterGRPCMetrics(promMiddleware, cemetrics.CloudEventsGRPCMetrics()...)
@@ -219,7 +219,7 @@ func (s *mockWorkService) HandleStatusUpdate(ctx context.Context, evt *cloudeven
219219
return nil
220220
}
221221

222-
func (s *mockWorkService) RegisterHandler(handler server.EventHandler) {}
222+
func (s *mockWorkService) RegisterHandler(ctx context.Context, handler server.EventHandler) {}
223223

224224
var testCloudEventJSON = `{
225225
"specversion": "1.0",

test/integration/cloudevents/broker/services/resource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (s *ResourceService) HandleStatusUpdate(ctx context.Context, evt *cloudeven
6262
return s.statusHandler(resource)
6363
}
6464

65-
func (s *ResourceService) RegisterHandler(handler server.EventHandler) {
65+
func (s *ResourceService) RegisterHandler(_ context.Context, handler server.EventHandler) {
6666
s.handler = handler
6767
}
6868

test/integration/cloudevents/manifestworkclients_watch_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,18 +185,10 @@ var _ = ginkgo.Describe("ManifestWork Clients Test - Watch Only", func() {
185185
gomega.Eventually(func() error {
186186
workClient := agentClient.ManifestWorks(clusterName)
187187

188-
work, err := workClient.Get(ctx, workName, metav1.GetOptions{})
189-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
190-
ginkgo.By(fmt.Sprintf("resourceVersion is %s", work.ResourceVersion))
191-
192188
if err := util.AddWorkFinalizer(ctx, workClient, workName); err != nil {
193189
return err
194190
}
195191

196-
work, err = workClient.Get(ctx, workName, metav1.GetOptions{})
197-
gomega.Expect(err).ToNot(gomega.HaveOccurred())
198-
ginkgo.By(fmt.Sprintf("resourceVersion is %s after patch", work.ResourceVersion))
199-
200192
return util.UpdateWorkStatus(ctx, workClient, workName, util.WorkCreatedCondition)
201193
}, 10*time.Second, 1*time.Second).Should(gomega.Succeed())
202194

test/integration/cloudevents/suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) {
136136

137137
// start the grpc broker
138138
grpcBroker := cloudeventsgrpc.NewGRPCBroker()
139-
grpcBroker.RegisterService(payload.ManifestBundleEventDataType, service)
139+
grpcBroker.RegisterService(context.Background(), payload.ManifestBundleEventDataType, service)
140140

141141
opt := sdkgrpc.NewGRPCServerOptions()
142142
opt.ClientCAFile = caFile

0 commit comments

Comments
 (0)