Skip to content

Commit 053ec57

Browse files
committed
grpc transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent dd75001 commit 053ec57

File tree

4 files changed

+30
-9
lines changed

4 files changed

+30
-9
lines changed

pkg/cloudevents/generic/options/v2/grpc/options.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ func NewAgentOptions(grpcOptions *grpc.GRPCOptions,
1212
return &options.CloudEventsAgentOptions{
1313
CloudEventsTransport: &grpcTransport{
1414
opts: grpcOptions,
15-
closeChan: make(chan struct{}),
1615
errorChan: make(chan error),
1716
getSubscriptionRequest: func() *pbv1.SubscriptionRequest {
1817
return &pbv1.SubscriptionRequest{
@@ -35,7 +34,6 @@ func NewSourceOptions(gRPCOptions *grpc.GRPCOptions,
3534
return &options.CloudEventsSourceOptions{
3635
CloudEventsTransport: &grpcTransport{
3736
opts: gRPCOptions,
38-
closeChan: make(chan struct{}),
3937
errorChan: make(chan error),
4038
getSubscriptionRequest: func() *pbv1.SubscriptionRequest {
4139
return &pbv1.SubscriptionRequest{

pkg/cloudevents/generic/options/v2/grpc/transport.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package grpc
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
cloudevents "github.com/cloudevents/sdk-go/v2"
89
"github.com/cloudevents/sdk-go/v2/binding"
@@ -26,13 +27,17 @@ type grpcTransport struct {
2627
client pbv1.CloudEventServiceClient
2728
subClient pbv1.CloudEventService_SubscribeClient
2829

30+
mu sync.Mutex
2931
closeChan chan struct{}
3032
errorChan chan error
3133

3234
getSubscriptionRequest func() *pbv1.SubscriptionRequest
3335
}
3436

3537
func (t *grpcTransport) Connect(ctx context.Context) error {
38+
t.mu.Lock()
39+
defer t.mu.Unlock()
40+
3641
logger := klog.FromContext(ctx)
3742
conn, err := t.opts.Dialer.Dial()
3843
if err != nil {
@@ -41,6 +46,9 @@ func (t *grpcTransport) Connect(ctx context.Context) error {
4146

4247
t.client = pbv1.NewCloudEventServiceClient(conn)
4348

49+
// Initialize closeChan to support reconnect cycles
50+
t.closeChan = make(chan struct{})
51+
4452
// Start a goroutine to monitor the gRPC connection state changes
4553
go t.monitorConnectionState(ctx, conn)
4654

@@ -106,8 +114,19 @@ func (t *grpcTransport) ErrorChan() <-chan error {
106114
}
107115

108116
func (t *grpcTransport) Close(ctx context.Context) error {
117+
t.mu.Lock()
118+
defer t.mu.Unlock()
119+
109120
klog.FromContext(ctx).Info("close grpc transport")
110-
close(t.closeChan)
121+
122+
// Guard against double-close panic
123+
select {
124+
case <-t.closeChan:
125+
// Already closed
126+
default:
127+
close(t.closeChan)
128+
}
129+
111130
return t.opts.Dialer.Close()
112131
}
113132

@@ -163,6 +182,9 @@ func (t *grpcTransport) monitorConnectionState(ctx context.Context, conn *grpc.C
163182
if !conn.WaitForStateChange(ctx, state) {
164183
// the ctx is closed, stop this watch
165184
logger.Info("stop watching grpc connection state")
185+
if err := conn.Close(); err != nil {
186+
logger.Error(err, "failed to close gRPC connection")
187+
}
166188
return
167189
}
168190

pkg/cloudevents/server/grpc/broker.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
9090
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to parse cloud event type %s, %v", evt.Type(), err))
9191
}
9292

93-
logger.V(4).Info("receive the event with grpc broker", "event", evt.Context)
93+
logger.V(4).Info("receive the event with grpc broker", "eventContext", evt.Context)
9494

9595
// handler resync request
9696
if eventType.Action == types.ResyncRequestAction {
@@ -134,7 +134,7 @@ func (bkr *GRPCBroker) register(
134134
handler: handler,
135135
}
136136

137-
logger.V(4).Info("register a subscriber", "id", id)
137+
logger.V(4).Info("register a subscriber", "id", id, "clusterName", clusterName, "dataType", dataType)
138138
metrics.IncGRPCCESubscribersMetric(clusterName, dataType.String())
139139

140140
return id
@@ -146,8 +146,8 @@ func (bkr *GRPCBroker) unregister(ctx context.Context, id string) {
146146
defer bkr.mu.Unlock()
147147

148148
logger := klog.FromContext(ctx)
149-
logger.V(4).Info("unregister subscriber", "id", id)
150149
if sub, exists := bkr.subscribers[id]; exists {
150+
logger.V(4).Info("unregister subscriber", "id", id, "clusterName", sub.clusterName, "dataType", sub.dataType)
151151
delete(bkr.subscribers, id)
152152
metrics.DecGRPCCESubscribersMetric(sub.clusterName, sub.dataType.String())
153153
}
@@ -308,7 +308,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
308308
lastResourceVersion := findResourceVersion(obj.ID(), resourceVersions.Versions)
309309
currentResourceVersion, err := cloudeventstypes.ToInteger(obj.Extensions()[types.ExtensionResourceVersion])
310310
if err != nil {
311-
log.V(4).Info("ignore the obj since it has a invalid resourceVersion", "object", obj, "error", err)
311+
log.V(4).Info("ignore the event since it has a invalid resourceVersion", "eventContext", obj.Context, "error", err)
312312
continue
313313
}
314314

@@ -378,7 +378,8 @@ func (bkr *GRPCBroker) handleRes(
378378
// by verifying the resource consumer name is in the subscriber list, ensuring the
379379
// event will be only processed when the consumer is subscribed to the current broker.
380380
if !bkr.IsConsumerSubscribed(clusterName) {
381-
log.V(4).Info("skip the event since the agent is not subscribed.")
381+
log.V(4).Info("skip the event since the agent is not subscribed.",
382+
"eventContext", evt.Context, "clusterName", clusterName)
382383
return nil
383384
}
384385

test/integration/cloudevents/cloudevetns_grpc_v2_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ var _ = ginkgo.Describe("CloudEvents Clients Test - GRPC V2", runCloudeventsClie
1616

1717
// The GRPC test simulates there is a server between the source and agent, the GRPC source client
1818
// sends/receives events to/from server, then server forward the events to agent via GRPC broker.
19-
func GetGRPCSourceOptionsV2(ctx context.Context, sourceID string) (*options.CloudEventsSourceOptions, string) {
19+
func GetGRPCSourceOptionsV2(_ context.Context, sourceID string) (*options.CloudEventsSourceOptions, string) {
2020
return grpcv2.NewSourceOptions(
2121
util.NewGRPCSourceOptions(grpcServerHost), sourceID, payload.ManifestBundleEventDataType),
2222
constants.ConfigTypeGRPC

0 commit comments

Comments
 (0)