Skip to content

Commit b6f1477

Browse files
committed
grpc transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent e18ff00 commit b6f1477

File tree

16 files changed

+1729
-51
lines changed

16 files changed

+1729
-51
lines changed

pkg/cloudevents/generic/clients/baseclient.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,16 @@ func (c *baseClient) connect(ctx context.Context) error {
8989
metrics.IncreaseClientReconnectedCounter(c.clientID)
9090
c.setClientReady(true)
9191
c.sendReceiverSignal(restartReceiverSignal)
92-
c.sendReconnectedSignal()
9392
}
9493

9594
select {
9695
case <-ctx.Done():
9796
if c.receiverChan != nil {
9897
close(c.receiverChan)
9998
}
99+
if c.reconnectedChan != nil {
100+
close(c.reconnectedChan)
101+
}
100102
return
101103
case err, ok := <-c.transport.ErrorChan():
102104
if !ok {
@@ -164,14 +166,32 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
164166
return
165167
}
166168

169+
// send subscription request before starting to receive events
170+
if err := c.transport.Subscribe(ctx); err != nil {
171+
runtime.HandleErrorWithContext(ctx, err, "failed to subscribe")
172+
return
173+
}
174+
167175
c.receiverChan = make(chan int)
168176

169177
// start a go routine to handle cloudevents subscription
170178
go func() {
171-
receiverCtx, receiverCancel := context.WithCancel(context.TODO())
179+
receiverCtx, receiverCancel := context.WithCancel(ctx)
172180
startReceiving := true
181+
subscribed := true
173182

174183
for {
184+
if !subscribed {
185+
// resubscribe before restarting the receiver
186+
if err := c.transport.Subscribe(ctx); err != nil {
187+
runtime.HandleError(fmt.Errorf("failed to resubscribe, %v", err))
188+
continue
189+
}
190+
subscribed = true
191+
// notify the client caller to resync the resources
192+
c.sendReconnectedSignal(ctx)
193+
}
194+
175195
if startReceiving {
176196
go func() {
177197
if err := c.transport.Receive(receiverCtx, func(evt cloudevents.Event) {
@@ -202,8 +222,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
202222
case restartReceiverSignal:
203223
logger.V(2).Info("restart the cloudevents receiver")
204224
// rebuild the receiver context and restart receiving
205-
receiverCtx, receiverCancel = context.WithCancel(context.TODO())
225+
receiverCtx, receiverCancel = context.WithCancel(ctx)
206226
startReceiving = true
227+
subscribed = false
207228
case stopReceiverSignal:
208229
logger.V(2).Info("stop the cloudevents receiver")
209230
receiverCancel()
@@ -224,10 +245,16 @@ func (c *baseClient) sendReceiverSignal(signal int) {
224245
}
225246
}
226247

227-
func (c *baseClient) sendReconnectedSignal() {
248+
func (c *baseClient) sendReconnectedSignal(ctx context.Context) {
228249
c.RLock()
229250
defer c.RUnlock()
230-
c.reconnectedChan <- struct{}{}
251+
select {
252+
case c.reconnectedChan <- struct{}{}:
253+
// Signal sent successfully
254+
default:
255+
// No receiver listening on reconnectedChan, that's okay - don't block
256+
klog.FromContext(ctx).Info("reconnected signal not sent, no receiver listening")
257+
}
231258
}
232259

233260
func (c *baseClient) isClientReady() bool {

pkg/cloudevents/generic/options/builder/optionsbuilder.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
88
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
99
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
10+
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
1011
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1112
)
1213

@@ -57,7 +58,7 @@ func BuildCloudEventsSourceOptions(config any,
5758
case *mqtt.MQTTOptions:
5859
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
5960
case *grpc.GRPCOptions:
60-
return grpc.NewSourceOptions(config, sourceId, dataType), nil
61+
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
6162
default:
6263
return nil, fmt.Errorf("unsupported client configuration type %T", config)
6364
}
@@ -70,7 +71,7 @@ func BuildCloudEventsAgentOptions(config any,
7071
case *mqtt.MQTTOptions:
7172
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
7273
case *grpc.GRPCOptions:
73-
return grpc.NewAgentOptions(config, clusterName, clientId, dataType), nil
74+
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
7475
default:
7576
return nil, fmt.Errorf("unsupported client configuration type %T", config)
7677
}

pkg/cloudevents/generic/options/builder/optionsbuilder_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package builder
22

33
import (
4-
"encoding/json"
54
"os"
6-
"strings"
5+
"reflect"
76
"testing"
87
"time"
98

@@ -31,10 +30,11 @@ url: grpc
3130
)
3231

3332
type buildingCloudEventsOptionTestCase struct {
34-
name string
35-
configType string
36-
configFile *os.File
37-
expectedOptions any
33+
name string
34+
configType string
35+
configFile *os.File
36+
expectedOptions any
37+
expectedTransportType string
3838
}
3939

4040
func TestBuildCloudEventsSourceOptions(t *testing.T) {
@@ -56,6 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
5656
Timeout: 60 * time.Second,
5757
},
5858
},
59+
expectedTransportType: "*mqtt.mqttSourceTransport",
5960
},
6061
{
6162
name: "grpc config",
@@ -72,6 +73,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
7273
},
7374
},
7475
},
76+
expectedTransportType: "*grpc.grpcTransport",
7577
},
7678
}
7779

@@ -107,10 +109,9 @@ func assertOptions(t *testing.T, c buildingCloudEventsOptionTestCase) {
107109
t.Errorf("unexpected error %v", err)
108110
}
109111

110-
optionsRaw, _ := json.Marshal(options)
111-
expectedRaw, _ := json.Marshal(c.expectedOptions)
112+
tt := reflect.TypeOf(options.CloudEventsTransport)
112113

113-
if !strings.Contains(string(optionsRaw), string(expectedRaw)) {
114-
t.Errorf("the results %v\n does not contain the original options %v\n", string(optionsRaw), string(expectedRaw))
114+
if tt.String() != c.expectedTransportType {
115+
t.Errorf("expected %s, but got %s", c.expectedTransportType, tt)
115116
}
116117
}

pkg/cloudevents/generic/options/fake/fakeoptions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ func (c *EventChan) Connect(ctx context.Context) error {
4343
return nil
4444
}
4545

46+
func (c *EventChan) Subscribe(ctx context.Context) error {
47+
return nil
48+
}
49+
4650
func (c *EventChan) Send(ctx context.Context, evt cloudevents.Event) error {
4751
select {
4852
case c.evtChan <- evt:

pkg/cloudevents/generic/options/grpc/agentoptions.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type grpcAgentTransport struct {
2020
dataType types.CloudEventsDataType
2121
}
2222

23+
// Deprecated: using v2.grpc.NewAgentOptions instead
2324
func NewAgentOptions(grpcOptions *GRPCOptions,
2425
clusterName, agentID string, dataType types.CloudEventsDataType) *options.CloudEventsAgentOptions {
2526
return &options.CloudEventsAgentOptions{
@@ -82,6 +83,12 @@ func (o *grpcAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
8283
return nil
8384
}
8485

86+
func (o *grpcAgentTransport) Subscribe(ctx context.Context) error {
87+
// Subscription is handled by the cloudevents client during receiver startup.
88+
// No action needed here.
89+
return nil
90+
}
91+
8592
func (o *grpcAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
8693
return o.cloudEventsClient.StartReceiver(ctx, fn)
8794
}

pkg/cloudevents/generic/options/grpc/sourceoptions.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ func (o *gRPCSourceTransport) Send(ctx context.Context, evt cloudevents.Event) e
7878
return nil
7979
}
8080

81+
func (o *gRPCSourceTransport) Subscribe(ctx context.Context) error {
82+
// Subscription is handled by the cloudevents client during receiver startup.
83+
// No action needed here.
84+
return nil
85+
}
86+
8187
func (o *gRPCSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
8288
return o.cloudEventsClient.StartReceiver(ctx, fn)
8389
}

pkg/cloudevents/generic/options/mqtt/agentoptions.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,14 @@ func (o *mqttAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
138138
return nil
139139
}
140140

141+
func (o *mqttAgentTransport) Subscribe(ctx context.Context) error {
142+
// Subscription is handled by the cloudevents client during receiver startup.
143+
// No action needed here.
144+
// TODO: consider implementing native subscription logic in v2 to decouple from
145+
// the CloudEvents SDK.
146+
return nil
147+
}
148+
141149
func (o *mqttAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
142150
return o.cloudEventsClient.StartReceiver(ctx, fn)
143151
}

pkg/cloudevents/generic/options/mqtt/sourceoptions.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ func (o *mqttSourceTransport) Send(ctx context.Context, evt cloudevents.Event) e
132132
return nil
133133
}
134134

135+
func (o *mqttSourceTransport) Subscribe(ctx context.Context) error {
136+
// Subscription is handled by the cloudevents client during receiver startup.
137+
// No action needed here.
138+
// TODO: consider implementing native subscription logic in v2 to decouple from
139+
// the CloudEvents SDK.
140+
return nil
141+
}
142+
135143
func (o *mqttSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
136144
return o.cloudEventsClient.StartReceiver(ctx, fn)
137145
}

pkg/cloudevents/generic/options/options.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ type CloudEventTransport interface {
2121
// Connect establishes a connection to the event transport.
2222
// This method should be called before Send or Receive.
2323
// Returns an error if the connection cannot be established.
24-
// TODO remove the dataType
2524
Connect(ctx context.Context) error
2625

2726
// Send transmits a CloudEvent through the transport.
2827
// Returns an error if the event cannot be send.
2928
Send(ctx context.Context, evt cloudevents.Event) error
3029

30+
// Subscribe sends a subscription request to the transport to subscribe topics/services.
31+
// This is a non-blocking method that should be called after Connect and before Receive.
32+
// Returns an error if the subscription request cannot be sent.
33+
Subscribe(ctx context.Context) error
34+
3135
// Receive starts receiving events and invokes the provided handler for each event.
3236
// This is a BLOCKING call that runs an event loop until the context is cancelled.
3337
// The handler function is called synchronously for each received event.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package grpc
2+
3+
import (
4+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
5+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
6+
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
7+
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
8+
)
9+
10+
func NewAgentOptions(grpcOptions *grpc.GRPCOptions,
11+
clusterName, agentID string, dataType types.CloudEventsDataType) *options.CloudEventsAgentOptions {
12+
return &options.CloudEventsAgentOptions{
13+
CloudEventsTransport: &grpcTransport{
14+
opts: grpcOptions,
15+
errorChan: make(chan error),
16+
getSubscriptionRequest: func() *pbv1.SubscriptionRequest {
17+
return &pbv1.SubscriptionRequest{
18+
// TODO: Update this code to determine the subscription source for the agent client.
19+
// Currently, the grpc agent client is not utilized, and the 'Source' field serves
20+
// as a placeholder with all the sources.
21+
Source: types.SourceAll,
22+
ClusterName: clusterName,
23+
DataType: dataType.String(),
24+
}
25+
},
26+
},
27+
AgentID: agentID,
28+
ClusterName: clusterName,
29+
}
30+
}
31+
32+
func NewSourceOptions(gRPCOptions *grpc.GRPCOptions,
33+
sourceID string, dataType types.CloudEventsDataType) *options.CloudEventsSourceOptions {
34+
return &options.CloudEventsSourceOptions{
35+
CloudEventsTransport: &grpcTransport{
36+
opts: gRPCOptions,
37+
errorChan: make(chan error),
38+
getSubscriptionRequest: func() *pbv1.SubscriptionRequest {
39+
return &pbv1.SubscriptionRequest{
40+
Source: sourceID,
41+
DataType: dataType.String(),
42+
}
43+
},
44+
},
45+
SourceID: sourceID,
46+
}
47+
}

0 commit comments

Comments
 (0)