Skip to content

Commit c33e94f

Browse files
committed
grpc transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent e18ff00 commit c33e94f

File tree

18 files changed

+1835
-74
lines changed

18 files changed

+1835
-74
lines changed

pkg/cloudevents/constants/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@ const (
44
ConfigTypeMQTT = "mqtt"
55
ConfigTypeGRPC = "grpc"
66
)
7+
8+
// GRPCSubscriptionIDKey is the key for the gRPC subscription ID.
9+
// This ID is generated by the gRPC server after the client subscribes to it.
10+
const GRPCSubscriptionIDKey = "subscription-id"

pkg/cloudevents/generic/clients/baseclient.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
7979
// TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
8080
// errors
8181
if err != nil {
82-
// failed to reconnect, try agin
82+
// failed to reconnect, try again
8383
runtime.HandleErrorWithContext(ctx, err, "the cloudevents client reconnect failed")
8484
<-wait.RealTimer(DelayFn()).C()
8585
continue
@@ -89,14 +89,11 @@ func (c *baseClient) connect(ctx context.Context) error {
8989
metrics.IncreaseClientReconnectedCounter(c.clientID)
9090
c.setClientReady(true)
9191
c.sendReceiverSignal(restartReceiverSignal)
92-
c.sendReconnectedSignal()
9392
}
9493

9594
select {
9695
case <-ctx.Done():
97-
if c.receiverChan != nil {
98-
close(c.receiverChan)
99-
}
96+
c.closeChannels()
10097
return
10198
case err, ok := <-c.transport.ErrorChan():
10299
if !ok {
@@ -164,14 +161,33 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
164161
return
165162
}
166163

164+
// send subscription request before starting to receive events
165+
if err := c.transport.Subscribe(ctx); err != nil {
166+
runtime.HandleErrorWithContext(ctx, err, "failed to subscribe")
167+
return
168+
}
169+
167170
c.receiverChan = make(chan int)
168171

169172
// start a go routine to handle cloudevents subscription
170173
go func() {
171-
receiverCtx, receiverCancel := context.WithCancel(context.TODO())
174+
receiverCtx, receiverCancel := context.WithCancel(ctx)
172175
startReceiving := true
176+
subscribed := true
173177

174178
for {
179+
if !subscribed {
180+
// resubscribe before restarting the receiver
181+
if err := c.transport.Subscribe(ctx); err != nil {
182+
runtime.HandleError(fmt.Errorf("failed to resubscribe, %v", err))
183+
<-wait.RealTimer(DelayFn()).C()
184+
continue
185+
}
186+
subscribed = true
187+
// notify the client caller to resync the resources
188+
c.sendReconnectedSignal(ctx)
189+
}
190+
175191
if startReceiving {
176192
go func() {
177193
if err := c.transport.Receive(receiverCtx, func(evt cloudevents.Event) {
@@ -202,8 +218,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
202218
case restartReceiverSignal:
203219
logger.V(2).Info("restart the cloudevents receiver")
204220
// rebuild the receiver context and restart receiving
205-
receiverCtx, receiverCancel = context.WithCancel(context.TODO())
221+
receiverCtx, receiverCancel = context.WithCancel(ctx)
206222
startReceiving = true
223+
subscribed = false
207224
case stopReceiverSignal:
208225
logger.V(2).Info("stop the cloudevents receiver")
209226
receiverCancel()
@@ -224,10 +241,32 @@ func (c *baseClient) sendReceiverSignal(signal int) {
224241
}
225242
}
226243

227-
func (c *baseClient) sendReconnectedSignal() {
244+
func (c *baseClient) closeChannels() {
245+
c.Lock()
246+
defer c.Unlock()
247+
248+
if c.receiverChan != nil {
249+
close(c.receiverChan)
250+
c.receiverChan = nil
251+
}
252+
if c.reconnectedChan != nil {
253+
close(c.reconnectedChan)
254+
c.reconnectedChan = nil
255+
}
256+
}
257+
258+
func (c *baseClient) sendReconnectedSignal(ctx context.Context) {
228259
c.RLock()
229260
defer c.RUnlock()
230-
c.reconnectedChan <- struct{}{}
261+
if c.reconnectedChan != nil {
262+
select {
263+
case c.reconnectedChan <- struct{}{}:
264+
// Signal sent successfully
265+
default:
266+
// No receiver listening on reconnectedChan, that's okay - don't block
267+
klog.FromContext(ctx).Info("reconnected signal not sent, no receiver listening")
268+
}
269+
}
231270
}
232271

233272
func (c *baseClient) isClientReady() bool {

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
10+
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
1011
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1112
)
1213

@@ -57,7 +58,7 @@ func BuildCloudEventsSourceOptions(config any,
5758
case *mqtt.MQTTOptions:
5859
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
5960
case *grpc.GRPCOptions:
60-
return grpc.NewSourceOptions(config, sourceId, dataType), nil
61+
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
6162
default:
6263
return nil, fmt.Errorf("unsupported client configuration type %T", config)
6364
}
@@ -70,7 +71,7 @@ func BuildCloudEventsAgentOptions(config any,
7071
case *mqtt.MQTTOptions:
7172
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
7273
case *grpc.GRPCOptions:
73-
return grpc.NewAgentOptions(config, clusterName, clientId, dataType), nil
74+
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
7475
default:
7576
return nil, fmt.Errorf("unsupported client configuration type %T", config)
7677
}

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package builder
22

33
import (
4-
"encoding/json"
54
"os"
6-
"strings"
5+
"reflect"
76
"testing"
87
"time"
98

@@ -31,10 +30,11 @@ url: grpc
3130
)
3231

3332
type buildingCloudEventsOptionTestCase struct {
34-
name string
35-
configType string
36-
configFile *os.File
37-
expectedOptions any
33+
name string
34+
configType string
35+
configFile *os.File
36+
expectedOptions any
37+
expectedTransportType string
3838
}
3939

4040
func TestBuildCloudEventsSourceOptions(t *testing.T) {
@@ -56,6 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
5656
Timeout: 60 * time.Second,
5757
},
5858
},
59+
expectedTransportType: "*mqtt.mqttSourceTransport",
5960
},
6061
{
6162
name: "grpc config",
@@ -72,6 +73,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
7273
},
7374
},
7475
},
76+
expectedTransportType: "*grpc.grpcTransport",
7577
},
7678
}
7779

@@ -107,10 +109,9 @@ func assertOptions(t *testing.T, c buildingCloudEventsOptionTestCase) {
107109
t.Errorf("unexpected error %v", err)
108110
}
109111

110-
optionsRaw, _ := json.Marshal(options)
111-
expectedRaw, _ := json.Marshal(c.expectedOptions)
112+
tt := reflect.TypeOf(options.CloudEventsTransport)
112113

113-
if !strings.Contains(string(optionsRaw), string(expectedRaw)) {
114-
t.Errorf("the results %v\n does not contain the original options %v\n", string(optionsRaw), string(expectedRaw))
114+
if tt.String() != c.expectedTransportType {
115+
t.Errorf("expected %s, but got %s", c.expectedTransportType, tt)
115116
}
116117
}

pkg/cloudevents/generic/options/fake/fakeoptions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ func (c *EventChan) Connect(ctx context.Context) error {
4343
return nil
4444
}
4545

46+
func (c *EventChan) Subscribe(ctx context.Context) error {
47+
return nil
48+
}
49+
4650
func (c *EventChan) Send(ctx context.Context, evt cloudevents.Event) error {
4751
select {
4852
case c.evtChan <- evt:

pkg/cloudevents/generic/options/grpc/agentoptions.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type grpcAgentTransport struct {
2020
dataType types.CloudEventsDataType
2121
}
2222

23+
// Deprecated: use v2.grpc.NewAgentOptions instead
2324
func NewAgentOptions(grpcOptions *GRPCOptions,
2425
clusterName, agentID string, dataType types.CloudEventsDataType) *options.CloudEventsAgentOptions {
2526
return &options.CloudEventsAgentOptions{
@@ -82,6 +83,12 @@ func (o *grpcAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
8283
return nil
8384
}
8485

86+
func (o *grpcAgentTransport) Subscribe(ctx context.Context) error {
87+
// Subscription is handled by the cloudevents client during receiver startup.
88+
// No action needed here.
89+
return nil
90+
}
91+
8592
func (o *grpcAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
8693
return o.cloudEventsClient.StartReceiver(ctx, fn)
8794
}

pkg/cloudevents/generic/options/grpc/sourceoptions.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ func (o *gRPCSourceTransport) Send(ctx context.Context, evt cloudevents.Event) e
7878
return nil
7979
}
8080

81+
func (o *gRPCSourceTransport) Subscribe(ctx context.Context) error {
82+
// Subscription is handled by the cloudevents client during receiver startup.
83+
// No action needed here.
84+
return nil
85+
}
86+
8187
func (o *gRPCSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
8288
return o.cloudEventsClient.StartReceiver(ctx, fn)
8389
}

pkg/cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,14 @@ func (o *mqttAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
138138
return nil
139139
}
140140

141+
func (o *mqttAgentTransport) Subscribe(ctx context.Context) error {
142+
// Subscription is handled by the cloudevents client during receiver startup.
143+
// No action needed here.
144+
// TODO: consider implementing native subscription logic in v2 to decouple from
145+
// the CloudEvents SDK.
146+
return nil
147+
}
148+
141149
func (o *mqttAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
142150
return o.cloudEventsClient.StartReceiver(ctx, fn)
143151
}

pkg/cloudevents/generic/options/mqtt/sourceoptions.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ func (o *mqttSourceTransport) Send(ctx context.Context, evt cloudevents.Event) e
132132
return nil
133133
}
134134

135+
func (o *mqttSourceTransport) Subscribe(ctx context.Context) error {
136+
// Subscription is handled by the cloudevents client during receiver startup.
137+
// No action needed here.
138+
// TODO: consider implementing native subscription logic in v2 to decouple from
139+
// the CloudEvents SDK.
140+
return nil
141+
}
142+
135143
func (o *mqttSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
136144
return o.cloudEventsClient.StartReceiver(ctx, fn)
137145
}

pkg/cloudevents/generic/options/options.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ type CloudEventTransport interface {
2121
// Connect establishes a connection to the event transport.
2222
// This method should be called before Send or Receive.
2323
// Returns an error if the connection cannot be established.
24-
// TODO remove the dataType
2524
Connect(ctx context.Context) error
2625

2726
// Send transmits a CloudEvent through the transport.
2827
// Returns an error if the event cannot be send.
2928
Send(ctx context.Context, evt cloudevents.Event) error
3029

30+
// Subscribe sends a subscription request to the transport to subscribe topics/services.
31+
// This is a non-blocking method that should be called after Connect and before Receive.
32+
// Returns an error if the subscription request cannot be sent.
33+
Subscribe(ctx context.Context) error
34+
3135
// Receive starts receiving events and invokes the provided handler for each event.
3236
// This is a BLOCKING call that runs an event loop until the context is cancelled.
3337
// The handler function is called synchronously for each received event.

0 commit comments

Comments
 (0)