Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/cloudevents/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ const (
ConfigTypeMQTT = "mqtt"
ConfigTypeGRPC = "grpc"
)

// GRPCSubscriptionIDKey is the key for the gRPC subscription ID.
// This ID is generated by the gRPC server after the client subscribes to it.
const GRPCSubscriptionIDKey = "subscription-id"
6 changes: 3 additions & 3 deletions pkg/cloudevents/generic/clients/agentclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestAgentResync(t *testing.T) {

go func() {
transport := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
err = transport.Receive(ctx, func(event cloudevents.Event) {
err = transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
select {
case eventChan <- receiveEvent{event: event}:
case <-ctx.Done():
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestAgentPublish(t *testing.T) {
stop := make(chan bool)
go func() {
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
err = cloudEventsClient.Receive(ctx, func(event cloudevents.Event) {
err = cloudEventsClient.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
select {
case eventChan <- receiveEvent{event: event}:
case <-ctx.Done():
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestStatusResyncResponse(t *testing.T) {

go func() {
cloudEventsClient := agent.(*CloudEventAgentClient[*generictesting.MockResource]).transport
_ = cloudEventsClient.Receive(ctx, func(event cloudevents.Event) {
_ = cloudEventsClient.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
mutex.Lock()
defer mutex.Unlock()
receivedEvents = append(receivedEvents, event)
Expand Down
80 changes: 68 additions & 12 deletions pkg/cloudevents/generic/clients/baseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *baseClient) connect(ctx context.Context) error {
// TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
// errors
if err != nil {
// failed to reconnect, try agin
// failed to reconnect, try again
runtime.HandleErrorWithContext(ctx, err, "the cloudevents client reconnect failed")
<-wait.RealTimer(DelayFn()).C()
continue
Expand All @@ -89,14 +89,11 @@ func (c *baseClient) connect(ctx context.Context) error {
metrics.IncreaseClientReconnectedCounter(c.clientID)
c.setClientReady(true)
c.sendReceiverSignal(restartReceiverSignal)
c.sendReconnectedSignal()
}

select {
case <-ctx.Done():
if c.receiverChan != nil {
close(c.receiverChan)
}
c.closeChannels()
return
case err, ok := <-c.transport.ErrorChan():
if !ok {
Expand Down Expand Up @@ -164,22 +161,52 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
return
}

// send subscription request before starting to receive events
if err := c.transport.Subscribe(ctx); err != nil {
runtime.HandleErrorWithContext(ctx, err, "failed to subscribe")
return
}

c.receiverChan = make(chan int)

// start a go routine to handle cloudevents subscription
go func() {
receiverCtx, receiverCancel := context.WithCancel(context.TODO())
receiverCtx, receiverCancel := context.WithCancel(ctx)
startReceiving := true
subscribed := true

for {
if !subscribed {
// resubscribe before restarting the receiver
if err := c.transport.Subscribe(ctx); err != nil {
if ctx.Err() != nil {
receiverCancel()
return
}

runtime.HandleError(fmt.Errorf("failed to resubscribe, %v", err))
select {
case <-ctx.Done():
receiverCancel()
return
case <-wait.RealTimer(DelayFn()).C():
}
continue
}
subscribed = true
// notify the client caller to resync the resources
c.sendReconnectedSignal(ctx)
}

if startReceiving {
go func() {
if err := c.transport.Receive(receiverCtx, func(evt cloudevents.Event) {
if err := c.transport.Receive(receiverCtx, func(ctx context.Context, evt cloudevents.Event) {
logger := klog.FromContext(ctx)
logger.V(2).Info("Received event", "event", evt.Context)
if logger.V(5).Enabled() {
logger.V(5).Info("Received event", "event", evt.String())
}
receive(receiverCtx, evt)
receive(ctx, evt)
}); err != nil {
runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err))
}
Expand All @@ -191,7 +218,7 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
case <-ctx.Done():
receiverCancel()
return
case signal, ok := <-c.receiverChan:
case signal, ok := <-c.getReceiverChan():
if !ok {
// receiver channel is closed, stop the receiver
receiverCancel()
Expand All @@ -202,8 +229,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
case restartReceiverSignal:
logger.V(2).Info("restart the cloudevents receiver")
// rebuild the receiver context and restart receiving
receiverCtx, receiverCancel = context.WithCancel(context.TODO())
receiverCtx, receiverCancel = context.WithCancel(ctx)
startReceiving = true
subscribed = false
case stopReceiverSignal:
logger.V(2).Info("stop the cloudevents receiver")
receiverCancel()
Expand All @@ -224,10 +252,32 @@ func (c *baseClient) sendReceiverSignal(signal int) {
}
}

func (c *baseClient) sendReconnectedSignal() {
func (c *baseClient) closeChannels() {
c.Lock()
defer c.Unlock()

if c.receiverChan != nil {
close(c.receiverChan)
c.receiverChan = nil
}
if c.reconnectedChan != nil {
close(c.reconnectedChan)
c.reconnectedChan = nil
}
}

func (c *baseClient) sendReconnectedSignal(ctx context.Context) {
c.RLock()
defer c.RUnlock()
c.reconnectedChan <- struct{}{}
if c.reconnectedChan != nil {
select {
case c.reconnectedChan <- struct{}{}:
// Signal sent successfully
default:
// No receiver listening on reconnectedChan, that's okay - don't block
klog.FromContext(ctx).Info("reconnected signal not sent, no receiver listening")
}
}
}

func (c *baseClient) isClientReady() bool {
Expand All @@ -241,3 +291,9 @@ func (c *baseClient) setClientReady(ready bool) {
defer c.Unlock()
c.clientReady = ready
}

func (c *baseClient) getReceiverChan() chan int {
c.RLock()
defer c.RUnlock()
return c.receiverChan
}
6 changes: 3 additions & 3 deletions pkg/cloudevents/generic/clients/sourceclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestSourceResync(t *testing.T) {
eventChan := make(chan receiveEvent)
stop := make(chan bool)
go func() {
err = source.transport.Receive(ctx, func(event cloudevents.Event) {
err = source.transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
select {
case eventChan <- receiveEvent{event: event}:
case <-ctx.Done():
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestSourcePublish(t *testing.T) {
eventChan := make(chan receiveEvent)
stop := make(chan bool)
go func() {
err = source.transport.Receive(ctx, func(event cloudevents.Event) {
err = source.transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
select {
case eventChan <- receiveEvent{event: event}:
case <-ctx.Done():
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestSpecResyncResponse(t *testing.T) {
stop := make(chan bool)
mutex := &sync.Mutex{}
go func() {
_ = source.transport.Receive(ctx, func(event cloudevents.Event) {
_ = source.transport.Receive(ctx, func(ctx context.Context, event cloudevents.Event) {
mutex.Lock()
defer mutex.Unlock()
receivedEvents = append(receivedEvents, event)
Expand Down
5 changes: 3 additions & 2 deletions pkg/cloudevents/generic/options/builder/optionsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func BuildCloudEventsSourceOptions(config any,
case *mqtt.MQTTOptions:
return mqtt.NewSourceOptions(config, clientId, sourceId), nil
case *grpc.GRPCOptions:
return grpc.NewSourceOptions(config, sourceId, dataType), nil
return grpcv2.NewSourceOptions(config, sourceId, dataType), nil
default:
return nil, fmt.Errorf("unsupported client configuration type %T", config)
}
Expand All @@ -70,7 +71,7 @@ func BuildCloudEventsAgentOptions(config any,
case *mqtt.MQTTOptions:
return mqtt.NewAgentOptions(config, clusterName, clientId), nil
case *grpc.GRPCOptions:
return grpc.NewAgentOptions(config, clusterName, clientId, dataType), nil
return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil
default:
return nil, fmt.Errorf("unsupported client configuration type %T", config)
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/cloudevents/generic/options/builder/optionsbuilder_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package builder

import (
"encoding/json"
"os"
"strings"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -31,10 +30,11 @@ url: grpc
)

type buildingCloudEventsOptionTestCase struct {
name string
configType string
configFile *os.File
expectedOptions any
name string
configType string
configFile *os.File
expectedOptions any
expectedTransportType string
}

func TestBuildCloudEventsSourceOptions(t *testing.T) {
Expand All @@ -56,6 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
Timeout: 60 * time.Second,
},
},
expectedTransportType: "*mqtt.mqttSourceTransport",
},
{
name: "grpc config",
Expand All @@ -72,6 +73,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) {
},
},
},
expectedTransportType: "*grpc.grpcTransport",
},
}

Expand Down Expand Up @@ -107,10 +109,9 @@ func assertOptions(t *testing.T, c buildingCloudEventsOptionTestCase) {
t.Errorf("unexpected error %v", err)
}

optionsRaw, _ := json.Marshal(options)
expectedRaw, _ := json.Marshal(c.expectedOptions)
tt := reflect.TypeOf(options.CloudEventsTransport)

if !strings.Contains(string(optionsRaw), string(expectedRaw)) {
t.Errorf("the results %v\n does not contain the original options %v\n", string(optionsRaw), string(expectedRaw))
if tt.String() != c.expectedTransportType {
t.Errorf("expected %s, but got %s", c.expectedTransportType, tt)
}
}
6 changes: 5 additions & 1 deletion pkg/cloudevents/generic/options/fake/fakeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (c *EventChan) Connect(ctx context.Context) error {
return nil
}

func (c *EventChan) Subscribe(ctx context.Context) error {
return nil
}

func (c *EventChan) Send(ctx context.Context, evt cloudevents.Event) error {
select {
case c.evtChan <- evt:
Expand All @@ -61,7 +65,7 @@ func (c *EventChan) Receive(ctx context.Context, fn options.ReceiveHandlerFn) er
if !ok {
return nil
}
fn(e)
fn(ctx, e)
case <-ctx.Done():
return ctx.Err()
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/cloudevents/generic/options/grpc/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type grpcAgentTransport struct {
dataType types.CloudEventsDataType
}

// Deprecated: use v2.grpc.NewAgentOptions instead
func NewAgentOptions(grpcOptions *GRPCOptions,
clusterName, agentID string, dataType types.CloudEventsDataType) *options.CloudEventsAgentOptions {
return &options.CloudEventsAgentOptions{
Expand Down Expand Up @@ -82,6 +83,12 @@ func (o *grpcAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
return nil
}

func (o *grpcAgentTransport) Subscribe(ctx context.Context) error {
// Subscription is handled by the cloudevents client during receiver startup.
// No action needed here.
return nil
}

func (o *grpcAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
return o.cloudEventsClient.StartReceiver(ctx, fn)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/cloudevents/generic/options/grpc/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func (o *gRPCSourceTransport) Send(ctx context.Context, evt cloudevents.Event) e
return nil
}

func (o *gRPCSourceTransport) Subscribe(ctx context.Context) error {
// Subscription is handled by the cloudevents client during receiver startup.
// No action needed here.
return nil
}

func (o *gRPCSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
return o.cloudEventsClient.StartReceiver(ctx, fn)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/cloudevents/generic/options/mqtt/agentoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ func (o *mqttAgentTransport) Send(ctx context.Context, evt cloudevents.Event) er
return nil
}

func (o *mqttAgentTransport) Subscribe(ctx context.Context) error {
// Subscription is handled by the cloudevents client during receiver startup.
// No action needed here.
// TODO: consider implementing native subscription logic in v2 to decouple from
// the CloudEvents SDK.
return nil
}

func (o *mqttAgentTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
return o.cloudEventsClient.StartReceiver(ctx, fn)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/cloudevents/generic/options/mqtt/sourceoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func (o *mqttSourceTransport) Send(ctx context.Context, evt cloudevents.Event) e
return nil
}

func (o *mqttSourceTransport) Subscribe(ctx context.Context) error {
// Subscription is handled by the cloudevents client during receiver startup.
// No action needed here.
// TODO: consider implementing native subscription logic in v2 to decouple from
// the CloudEvents SDK.
return nil
}

func (o *mqttSourceTransport) Receive(ctx context.Context, fn options.ReceiveHandlerFn) error {
return o.cloudEventsClient.StartReceiver(ctx, fn)
}
Expand Down
Loading