Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/cloudevents/clients/csr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func (c *CSRClient) Create(ctx context.Context, csr *certificatev1.CertificateSi
return nil, cloudeventserrors.ToStatusError(common.CSRGR, csr.Name, err)
}

// we need to add to the store here since grpc driver may call this when it cannot
// get from lister.
if err := c.watcherStore.Add(csr); err != nil {
return nil, errors.NewInternalError(err)
}

return csr.DeepCopy(), nil
}

Expand Down
44 changes: 25 additions & 19 deletions pkg/cloudevents/server/grpc/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
)

type resourceHandler func(res *cloudevents.Event) error
type resourceHandler func(ctx context.Context, res *cloudevents.Event) error

// subscriber defines a subscriber that can receive and handle resource spec.
type subscriber struct {
Expand Down Expand Up @@ -60,9 +60,9 @@ func NewGRPCBroker() *GRPCBroker {
return broker
}

func (bkr *GRPCBroker) RegisterService(t types.CloudEventsDataType, service server.Service) {
func (bkr *GRPCBroker) RegisterService(ctx context.Context, t types.CloudEventsDataType, service server.Service) {
bkr.services[t] = service
service.RegisterHandler(bkr)
service.RegisterHandler(ctx, bkr)
}

func (bkr *GRPCBroker) Subscribers() sets.Set[string] {
Expand Down Expand Up @@ -122,7 +122,9 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)

// register registers a subscriber and return client id and error channel.
func (bkr *GRPCBroker) register(
clusterName string, dataType types.CloudEventsDataType, handler resourceHandler) (string, <-chan error) {
ctx context.Context, clusterName string, dataType types.CloudEventsDataType, handler resourceHandler) (string, <-chan error) {
logger := klog.FromContext(ctx)

bkr.mu.Lock()
defer bkr.mu.Unlock()

Expand All @@ -135,18 +137,19 @@ func (bkr *GRPCBroker) register(
errChan: errChan,
}

klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName)
logger.V(4).Info("register a subscriber", "id", id)
metrics.IncGRPCCESubscribersMetric(clusterName, dataType.String())

return id, errChan
}

// unregister a subscriber by id
func (bkr *GRPCBroker) unregister(id string) {
func (bkr *GRPCBroker) unregister(ctx context.Context, id string) {
bkr.mu.Lock()
defer bkr.mu.Unlock()

klog.V(10).Infof("unregister subscriber %s", id)
logger := klog.FromContext(ctx)
logger.V(4).Info("unregister subscriber", "id", id)
if sub, exists := bkr.subscribers[id]; exists {
close(sub.errChan)
delete(bkr.subscribers, id)
Expand All @@ -171,6 +174,9 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
ctx, cancel := context.WithCancel(subServer.Context())
defer cancel()

logger := klog.FromContext(ctx).WithValues("clusterName", subReq.ClusterName)
ctx = klog.NewContext(ctx, logger)

// TODO make the channel size configurable
eventCh := make(chan *pbv1.CloudEvent, 100)

Expand All @@ -189,7 +195,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
return
case evt := <-hearbeater.Heartbeat():
if err := subServer.Send(evt); err != nil {
klog.Errorf("failed to send heartbeat: %v", err)
logger.Error(err, "failed to send heartbeat")
// Unblock producers (handler select) and exit heartbeat ticker.
cancel()
select {
Expand All @@ -200,7 +206,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
}
case evt := <-eventCh:
if err := subServer.Send(evt); err != nil {
klog.Errorf("failed to send event: %v", err)
logger.Error(err, "failed to send event")
// Unblock producers (handler select) and exit heartbeat ticker.
cancel()
select {
Expand All @@ -213,16 +219,16 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
}
}()

subscriberID, errChan := bkr.register(subReq.ClusterName, *dataType, func(evt *cloudevents.Event) error {
subscriberID, errChan := bkr.register(ctx, subReq.ClusterName, *dataType, func(ctx context.Context, evt *cloudevents.Event) error {
// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf
pbEvt := &pbv1.CloudEvent{}
if err := grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil {
if err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt); err != nil {
// return the error to requeue the event if converting to protobuf fails (e.g., due to invalid cloudevent).
return fmt.Errorf("failed to convert cloudevent to protobuf for resource(%s): %v", evt.ID(), err)
}

// send the cloudevent to the subscriber
klog.V(4).Infof("sending the event to spec subscribers, %s", evt.Context)
logger.V(4).Info("sending the event to spec subscribers", "eventContext", evt.Context)
select {
case eventCh <- pbEvt:
case <-ctx.Done():
Expand All @@ -238,12 +244,12 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
case err := <-errChan:
// When reaching this point, an unrecoverable error occurred while sending the event,
// such as the connection being closed. Unregister the subscriber to trigger agent reconnection.
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
bkr.unregister(subscriberID)
logger.Error(err, "unregister subscriber", "id", subscriberID)
bkr.unregister(ctx, subscriberID)
return err
case err := <-sendErrCh:
klog.Errorf("failed to send event, unregister subscriber %s, error=%v", subscriberID, err)
bkr.unregister(subscriberID)
logger.Error(err, "failed to send event, unregister subscriber", "id", subscriberID)
bkr.unregister(ctx, subscriberID)
return err
case <-ctx.Done():
// The context of the stream has been canceled or completed.
Expand All @@ -252,7 +258,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// - The server closed the stream, potentially due to a shutdown.
// Regardless of the reason, unregister the subscriber and stop processing.
// No error is returned here because the stream closure is expected.
bkr.unregister(subscriberID)
bkr.unregister(ctx, subscriberID)
return nil
}
}
Expand Down Expand Up @@ -307,7 +313,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
lastResourceVersion := findResourceVersion(obj.ID(), resourceVersions.Versions)
currentResourceVersion, err := cloudeventstypes.ToInteger(obj.Extensions()[types.ExtensionResourceVersion])
if err != nil {
log.V(4).Info("ignore the obj %v since it has a invalid resourceVersion, %v", obj, err)
log.V(4).Info("ignore the obj since it has a invalid resourceVersion", "object", obj, "error", err)
continue
}

Expand Down Expand Up @@ -383,7 +389,7 @@ func (bkr *GRPCBroker) handleRes(

for _, subscriber := range bkr.subscribers {
if subscriber.clusterName == clusterName && subscriber.dataType == t {
if err := subscriber.handler(evt); err != nil {
if err := subscriber.handler(ctx, evt); err != nil {
// check if the error is recoverable. For unrecoverable errors,
// such as a connection closed by an intermediate proxy, push
// the error to subscriber's error channel to unregister the subscriber.
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudevents/server/grpc/broker_heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestGRPCBroker_Subscribe_HeartbeatIntegration(t *testing.T) {
}

svc := &testService{evts: make(map[string]*cloudevents.Event)}
broker.RegisterService(dataType, svc)
broker.RegisterService(context.Background(), dataType, svc)

mockServer := newMockSubscribeServer()
defer mockServer.Close()
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestGRPCBroker_Subscribe_SendError(t *testing.T) {
}

svc := &testService{evts: make(map[string]*cloudevents.Event)}
broker.RegisterService(dataType, svc)
broker.RegisterService(context.Background(), dataType, svc)

mockServer := newMockSubscribeServer()
defer mockServer.Close()
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestGRPCBroker_Subscribe_EventAndHeartbeatSeparation(t *testing.T) {
}

svc := &testService{evts: make(map[string]*cloudevents.Event)}
broker.RegisterService(dataType, svc)
broker.RegisterService(context.Background(), dataType, svc)

mockServer := newMockSubscribeServer()
defer mockServer.Close()
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestGRPCBroker_Subscribe_ChannelBlocking(t *testing.T) {
}

svc := &testService{evts: make(map[string]*cloudevents.Event)}
broker.RegisterService(dataType, svc)
broker.RegisterService(context.Background(), dataType, svc)

// Create a slow mock server that simulates slow processing
slowMockServer := &slowMockSubscribeServer{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudevents/server/grpc/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *testService) HandleStatusUpdate(ctx context.Context, evt *cloudevents.E
}

// RegisterHandler register the handler to the service.
func (s *testService) RegisterHandler(handler server.EventHandler) {
func (s *testService) RegisterHandler(_ context.Context, handler server.EventHandler) {
s.handler = handler
}

Expand All @@ -67,7 +67,7 @@ func TestServer(t *testing.T) {
pbv1.RegisterCloudEventServiceServer(grpcServer, grpcEventServer)

svc := &testService{evts: make(map[string]*cloudevents.Event)}
grpcEventServer.RegisterService(dataType, svc)
grpcEventServer.RegisterService(context.Background(), dataType, svc)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudevents/server/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type AgentEventServer interface {
EventHandler

// RegisterService registers a backend service with a certain data type.
RegisterService(t types.CloudEventsDataType, service Service)
RegisterService(ctx context.Context, t types.CloudEventsDataType, service Service)

// Subscribers returns all current subscribers who subscribe to this server.
Subscribers() sets.Set[string]
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudevents/server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ type Service interface {
HandleStatusUpdate(ctx context.Context, evt *cloudevents.Event) error

// RegisterHandler register the handler to the service.
RegisterHandler(handler EventHandler)
RegisterHandler(ctx context.Context, handler EventHandler)
}
4 changes: 2 additions & 2 deletions pkg/server/grpc/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func startBufServer(t *testing.T) *grpc.Server {
)

grpcBroker := cegrpc.NewGRPCBroker()
grpcBroker.RegisterService(payload.ManifestBundleEventDataType, newMockWorkService())
grpcBroker.RegisterService(context.Background(), payload.ManifestBundleEventDataType, newMockWorkService())
pbv1.RegisterCloudEventServiceServer(server, grpcBroker)

RegisterGRPCMetrics(promMiddleware, cemetrics.CloudEventsGRPCMetrics()...)
Expand Down Expand Up @@ -219,7 +219,7 @@ func (s *mockWorkService) HandleStatusUpdate(ctx context.Context, evt *cloudeven
return nil
}

func (s *mockWorkService) RegisterHandler(handler server.EventHandler) {}
func (s *mockWorkService) RegisterHandler(ctx context.Context, handler server.EventHandler) {}

var testCloudEventJSON = `{
"specversion": "1.0",
Expand Down
2 changes: 1 addition & 1 deletion test/integration/cloudevents/broker/services/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *ResourceService) HandleStatusUpdate(ctx context.Context, evt *cloudeven
return s.statusHandler(resource)
}

func (s *ResourceService) RegisterHandler(handler server.EventHandler) {
func (s *ResourceService) RegisterHandler(_ context.Context, handler server.EventHandler) {
s.handler = handler
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,10 @@ var _ = ginkgo.Describe("ManifestWork Clients Test - Watch Only", func() {
gomega.Eventually(func() error {
workClient := agentClient.ManifestWorks(clusterName)

work, err := workClient.Get(ctx, workName, metav1.GetOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
ginkgo.By(fmt.Sprintf("resourceVersion is %s", work.ResourceVersion))

if err := util.AddWorkFinalizer(ctx, workClient, workName); err != nil {
return err
}

work, err = workClient.Get(ctx, workName, metav1.GetOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
ginkgo.By(fmt.Sprintf("resourceVersion is %s after patch", work.ResourceVersion))

return util.UpdateWorkStatus(ctx, workClient, workName, util.WorkCreatedCondition)
}, 10*time.Second, 1*time.Second).Should(gomega.Succeed())

Expand Down
2 changes: 1 addition & 1 deletion test/integration/cloudevents/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) {

// start the grpc broker
grpcBroker := cloudeventsgrpc.NewGRPCBroker()
grpcBroker.RegisterService(payload.ManifestBundleEventDataType, service)
grpcBroker.RegisterService(context.Background(), payload.ManifestBundleEventDataType, service)

opt := sdkgrpc.NewGRPCServerOptions()
opt.ClientCAFile = caFile
Expand Down