Skip to content

Commit a41bfeb

Browse files
committed
grpc transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent b6f1477 commit a41bfeb

File tree

5 files changed

+90
-54
lines changed

5 files changed

+90
-54
lines changed

pkg/cloudevents/constants/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@ const (
44
ConfigTypeMQTT = "mqtt"
55
ConfigTypeGRPC = "grpc"
66
)
7+
8+
// GRPCSubscriptionIDKey is the key for the gRPC subscription ID.
9+
// This ID is generated by the gRPC server after the client subscribes to it.
10+
const GRPCSubscriptionIDKey = "subscription-id"

pkg/cloudevents/generic/clients/baseclient.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
7979
// TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
8080
// errors
8181
if err != nil {
82-
// failed to reconnect, try agin
82+
// failed to reconnect, try again
8383
runtime.HandleErrorWithContext(ctx, err, "the cloudevents client reconnect failed")
8484
<-wait.RealTimer(DelayFn()).C()
8585
continue

pkg/cloudevents/generic/options/v2/grpc/options.go

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,16 @@ import (
1010
func NewAgentOptions(grpcOptions *grpc.GRPCOptions,
1111
clusterName, agentID string, dataType types.CloudEventsDataType) *options.CloudEventsAgentOptions {
1212
return &options.CloudEventsAgentOptions{
13-
CloudEventsTransport: &grpcTransport{
14-
opts: grpcOptions,
15-
errorChan: make(chan error),
16-
getSubscriptionRequest: func() *pbv1.SubscriptionRequest {
17-
return &pbv1.SubscriptionRequest{
18-
// TODO: Update this code to determine the subscription source for the agent client.
19-
// Currently, the grpc agent client is not utilized, and the 'Source' field serves
20-
// as a placeholder with all the sources.
21-
Source: types.SourceAll,
22-
ClusterName: clusterName,
23-
DataType: dataType.String(),
24-
}
25-
},
26-
},
13+
CloudEventsTransport: newTransport(grpcOptions, func() *pbv1.SubscriptionRequest {
14+
return &pbv1.SubscriptionRequest{
15+
// TODO: Update this code to determine the subscription source for the agent client.
16+
// Currently, the grpc agent client is not utilized, and the 'Source' field serves
17+
// as a placeholder with all the sources.
18+
Source: types.SourceAll,
19+
ClusterName: clusterName,
20+
DataType: dataType.String(),
21+
}
22+
}),
2723
AgentID: agentID,
2824
ClusterName: clusterName,
2925
}
@@ -32,16 +28,12 @@ func NewAgentOptions(grpcOptions *grpc.GRPCOptions,
3228
func NewSourceOptions(gRPCOptions *grpc.GRPCOptions,
3329
sourceID string, dataType types.CloudEventsDataType) *options.CloudEventsSourceOptions {
3430
return &options.CloudEventsSourceOptions{
35-
CloudEventsTransport: &grpcTransport{
36-
opts: gRPCOptions,
37-
errorChan: make(chan error),
38-
getSubscriptionRequest: func() *pbv1.SubscriptionRequest {
39-
return &pbv1.SubscriptionRequest{
40-
Source: sourceID,
41-
DataType: dataType.String(),
42-
}
43-
},
44-
},
31+
CloudEventsTransport: newTransport(gRPCOptions, func() *pbv1.SubscriptionRequest {
32+
return &pbv1.SubscriptionRequest{
33+
Source: sourceID,
34+
DataType: dataType.String(),
35+
}
36+
}),
4537
SourceID: sourceID,
4638
}
4739
}

pkg/cloudevents/generic/options/v2/grpc/transport.go

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@ import (
99
"github.com/cloudevents/sdk-go/v2/binding"
1010

1111
"google.golang.org/grpc"
12+
"google.golang.org/grpc/codes"
1213
"google.golang.org/grpc/connectivity"
14+
"google.golang.org/grpc/status"
1315

1416
"k8s.io/klog/v2"
1517

18+
"open-cluster-management.io/sdk-go/pkg/cloudevents/constants"
1619
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
1720
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
1821
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
@@ -21,17 +24,28 @@ import (
2124
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/heartbeat"
2225
)
2326

27+
type subscriptionRequestGetter func() *pbv1.SubscriptionRequest
28+
2429
type grpcTransport struct {
2530
opts *grpcoptions.GRPCOptions
2631

2732
client pbv1.CloudEventServiceClient
2833
subClient pbv1.CloudEventService_SubscribeClient
34+
subID string
2935

3036
mu sync.Mutex
3137
closeChan chan struct{}
3238
errorChan chan error
3339

34-
getSubscriptionRequest func() *pbv1.SubscriptionRequest
40+
getSubscriptionRequest subscriptionRequestGetter
41+
}
42+
43+
func newTransport(grpcOptions *grpcoptions.GRPCOptions, getter subscriptionRequestGetter) *grpcTransport {
44+
return &grpcTransport{
45+
opts: grpcOptions,
46+
errorChan: make(chan error),
47+
getSubscriptionRequest: getter,
48+
}
3549
}
3650

3751
func (t *grpcTransport) Connect(ctx context.Context) error {
@@ -72,25 +86,37 @@ func (t *grpcTransport) Send(ctx context.Context, evt cloudevents.Event) error {
7286

7387
func (t *grpcTransport) Subscribe(ctx context.Context) error {
7488
logger := klog.FromContext(ctx)
75-
subscribeOption := t.getSubscriptionRequest()
76-
subClient, err := t.client.Subscribe(ctx, subscribeOption)
89+
subOption := t.getSubscriptionRequest()
90+
subClient, err := t.client.Subscribe(ctx, subOption)
7791
if err != nil {
7892
return err
7993
}
8094

81-
logger.Info("subscribed to grpc server", "subscribeOption", subscribeOption)
95+
// Wait for server to send subscription-id
96+
header, err := subClient.Header()
97+
if err != nil {
98+
return err
99+
}
100+
101+
values := header.Get(constants.GRPCSubscriptionIDKey)
102+
if len(values) != 1 {
103+
return fmt.Errorf("unexpected subscription-id")
104+
}
105+
t.subID = values[0]
82106
t.subClient = subClient
107+
108+
logger.Info("subscribed to grpc server", "subID", t.subID, "subOption", subOption)
83109
return nil
84110
}
85111

86112
func (t *grpcTransport) Receive(ctx context.Context, handleFn options.ReceiveHandlerFn) error {
87113
logger := klog.FromContext(ctx)
88-
subscribeOption := t.getSubscriptionRequest()
114+
subOption := t.getSubscriptionRequest()
89115
subCtx, cancel := context.WithCancel(ctx)
90116
healthChecker := heartbeat.NewHealthChecker(t.opts.ServerHealthinessTimeout, t.errorChan)
91117

92118
// start to receive the events from stream
93-
logger.Info("receiving events", "subscribeOption", subscribeOption)
119+
logger.Info("receiving events", "subID", t.subID, "subOption", subOption)
94120
go t.startEventsReceiver(subCtx, healthChecker.Input(), handleFn)
95121

96122
// start to watch the stream heartbeat
@@ -105,7 +131,7 @@ func (t *grpcTransport) Receive(ctx context.Context, handleFn options.ReceiveHan
105131
// ensure the event receiver and heartbeat watcher are done
106132
cancel()
107133

108-
logger.Info("stop receiving events", "subscribeOption", subscribeOption)
134+
logger.Info("stop receiving events", "subID", t.subID, "subOption", subOption)
109135
return nil
110136
}
111137

@@ -137,10 +163,16 @@ func (t *grpcTransport) startEventsReceiver(ctx context.Context,
137163
for {
138164
pbEvt, err := t.subClient.Recv()
139165
if err != nil {
166+
st, ok := status.FromError(err)
167+
if ok && st.Code() == codes.Canceled {
168+
// canceled by transport, return directly.
169+
return
170+
}
171+
140172
select {
141-
case t.errorChan <- fmt.Errorf("subscribe stream failed: %w", err):
173+
case t.errorChan <- fmt.Errorf("subscribe (subID=%s) stream failed: %w", t.subID, err):
142174
default:
143-
logger.Error(err, "subscribe stream failed")
175+
logger.Error(err, "subscribe stream failed", "subID", t.subID)
144176
}
145177
return
146178
}
@@ -183,7 +215,7 @@ func (t *grpcTransport) monitorConnectionState(ctx context.Context, conn *grpc.C
183215
// the ctx is closed, stop this watch
184216
logger.Info("stop watching grpc connection state")
185217
if err := conn.Close(); err != nil {
186-
logger.Error(err, "failed to close gRPC connection")
218+
logger.Error(err, "failed to close grpc connection")
187219
}
188220
return
189221
}
@@ -208,7 +240,7 @@ func (t *grpcTransport) monitorConnectionState(ctx context.Context, conn *grpc.C
208240
if newState != connectivity.Shutdown {
209241
// don't close the connection if it's already shutdown
210242
if err := conn.Close(); err != nil {
211-
logger.Error(err, "failed to close gRPC connection")
243+
logger.Error(err, "failed to close grpc connection")
212244
}
213245
}
214246

pkg/cloudevents/server/grpc/broker.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync"
88
"time"
99

10+
"open-cluster-management.io/sdk-go/pkg/cloudevents/constants"
1011
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/heartbeat"
1112
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/metrics"
1213

@@ -17,6 +18,7 @@ import (
1718
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
1819
"github.com/google/uuid"
1920
"google.golang.org/grpc/codes"
21+
"google.golang.org/grpc/metadata"
2022
"google.golang.org/grpc/status"
2123
"google.golang.org/protobuf/types/known/emptypb"
2224
"k8s.io/apimachinery/pkg/util/sets"
@@ -28,7 +30,7 @@ import (
2830
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
2931
)
3032

31-
type resourceHandler func(ctx context.Context, res *cloudevents.Event) error
33+
type resourceHandler func(ctx context.Context, subID string, res *cloudevents.Event) error
3234

3335
// subscriber defines a subscriber that can receive and handle resource spec.
3436
type subscriber struct {
@@ -215,7 +217,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
215217
}
216218
}()
217219

218-
subscriberID := bkr.register(ctx, subReq.ClusterName, *dataType, func(ctx context.Context, evt *cloudevents.Event) error {
220+
subID := bkr.register(ctx, subReq.ClusterName, *dataType, func(ctx context.Context, subID string, evt *cloudevents.Event) error {
219221
// convert the cloudevents.Event to pbv1.CloudEvent
220222
// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf
221223
pbEvt := &pbv1.CloudEvent{}
@@ -224,7 +226,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
224226
}
225227

226228
// send the cloudevent to the subscriber
227-
logger.V(4).Info("sending the event to spec subscribers", "eventContext", evt.Context)
229+
logger.V(4).Info("sending the event to spec subscribers", "subID", subID, "eventContext", evt.Context)
228230
select {
229231
case eventCh <- pbEvt:
230232
case <-ctx.Done():
@@ -239,12 +241,19 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
239241
return nil
240242
})
241243

244+
// Send initial metadata to signal registration is complete
245+
if err := subServer.SendHeader(metadata.Pairs(constants.GRPCSubscriptionIDKey, subID)); err != nil {
246+
logger.Error(err, "failed to send subscription header, , unregister subscriber", "subID", subID)
247+
bkr.unregister(ctx, subID)
248+
return err
249+
}
250+
242251
go heartbeater.Start(ctx)
243252

244253
select {
245254
case err := <-sendErrCh:
246-
logger.Error(err, "failed to send event, unregister subscriber", "id", subscriberID)
247-
bkr.unregister(ctx, subscriberID)
255+
logger.Error(err, "failed to send event, unregister subscriber", "subID", subID)
256+
bkr.unregister(ctx, subID)
248257
return err
249258
case <-ctx.Done():
250259
// The context of the stream has been canceled or completed.
@@ -253,7 +262,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
253262
// - The server closed the stream, potentially due to a shutdown.
254263
// Regardless of the reason, unregister the subscriber and stop processing.
255264
// No error is returned here because the stream closure is expected.
256-
bkr.unregister(ctx, subscriberID)
265+
bkr.unregister(ctx, subID)
257266
return nil
258267
}
259268
}
@@ -291,13 +300,14 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
291300
}
292301

293302
if len(objs) == 0 {
294-
log.V(4).Info("there are is no objs from the list, do nothing")
303+
log.V(4).Info("no objs from the lister, do nothing", "eventContext", evt.Context)
295304
return nil
296305
}
297306

298307
for _, obj := range objs {
299308
// respond with the deleting resource regardless of the resource version
300309
if _, ok := obj.Extensions()[types.ExtensionDeletionTimestamp]; ok {
310+
log.V(4).Info("respond spec resync request", "eventContext", evt.Context)
301311
err = bkr.handleRes(ctx, obj, eventDataType, "delete_request")
302312
if err != nil {
303313
log.Error(err, "failed to handle resync spec request")
@@ -315,6 +325,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
315325
// the version of the work is not maintained on source or the source's work is newer than agent, send
316326
// the newer work to agent
317327
if currentResourceVersion == 0 || int64(currentResourceVersion) > lastResourceVersion {
328+
log.V(4).Info("respond spec resync request", "eventContext", evt.Context)
318329
err := bkr.handleRes(ctx, obj, eventDataType, "update_request")
319330
if err != nil {
320331
log.Error(err, "failed to handle resync spec request")
@@ -341,6 +352,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
341352
NewEvent()
342353

343354
// send a delete event for the current resource
355+
log.V(4).Info("respond spec resync request", "eventContext", evt.Context)
344356
err := bkr.handleRes(ctx, &obj, eventDataType, "delete_request")
345357
if err != nil {
346358
log.Error(err, "failed to handle delete request")
@@ -374,20 +386,16 @@ func (bkr *GRPCBroker) handleRes(
374386
}
375387
clusterName := fmt.Sprintf("%s", clusterNameValue)
376388

377-
// checks if the event should be processed by the current instance
378-
// by verifying the resource consumer name is in the subscriber list, ensuring the
379-
// event will be only processed when the consumer is subscribed to the current broker.
380-
if !bkr.IsConsumerSubscribed(clusterName) {
381-
log.V(4).Info("skip the event since the agent is not subscribed.",
382-
"eventContext", evt.Context, "clusterName", clusterName)
383-
return nil
384-
}
385-
386-
for _, subscriber := range bkr.subscribers {
389+
for id, subscriber := range bkr.subscribers {
390+
// checks if the event should be processed by the current instance
391+
// by verifying the resource consumer name is in the subscriber list, ensuring the
392+
// event will be only processed when the consumer is subscribed to the current broker.
387393
if subscriber.clusterName == clusterName && subscriber.dataType == t {
388-
if err := subscriber.handler(ctx, evt); err != nil {
394+
if err := subscriber.handler(ctx, id, evt); err != nil {
389395
return err
390396
}
397+
} else {
398+
log.V(4).Info("skip the event since the agent is not subscribed.", "subID", id, "eventContext", evt.Context)
391399
}
392400
}
393401
return nil

0 commit comments

Comments
 (0)