diff --git a/pkg/cloudevents/generic/options/builder/optionsbuilder.go b/pkg/cloudevents/generic/options/builder/optionsbuilder.go index d0cfff2f..f120b00a 100644 --- a/pkg/cloudevents/generic/options/builder/optionsbuilder.go +++ b/pkg/cloudevents/generic/options/builder/optionsbuilder.go @@ -8,6 +8,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" grpcv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/grpc" + mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) @@ -56,7 +57,7 @@ func BuildCloudEventsSourceOptions(config any, clientId, sourceId string, dataType types.CloudEventsDataType) (*options.CloudEventsSourceOptions, error) { switch config := config.(type) { case *mqtt.MQTTOptions: - return mqtt.NewSourceOptions(config, clientId, sourceId), nil + return mqttv2.NewSourceOptions(config, clientId, sourceId), nil case *grpc.GRPCOptions: return grpcv2.NewSourceOptions(config, sourceId, dataType), nil default: @@ -69,7 +70,7 @@ func BuildCloudEventsAgentOptions(config any, clusterName, clientId string, dataType types.CloudEventsDataType) (*options.CloudEventsAgentOptions, error) { switch config := config.(type) { case *mqtt.MQTTOptions: - return mqtt.NewAgentOptions(config, clusterName, clientId), nil + return mqttv2.NewAgentOptions(config, clusterName, clientId), nil case *grpc.GRPCOptions: return grpcv2.NewAgentOptions(config, clusterName, clientId, dataType), nil default: diff --git a/pkg/cloudevents/generic/options/builder/optionsbuilder_test.go b/pkg/cloudevents/generic/options/builder/optionsbuilder_test.go index f8c33aef..951b4e16 100644 --- a/pkg/cloudevents/generic/options/builder/optionsbuilder_test.go +++ b/pkg/cloudevents/generic/options/builder/optionsbuilder_test.go @@ -56,7 +56,7 @@ func TestBuildCloudEventsSourceOptions(t *testing.T) { Timeout: 60 * time.Second, }, }, - expectedTransportType: "*mqtt.mqttSourceTransport", + expectedTransportType: "*mqtt.mqttTransport", }, { name: "grpc config", diff --git a/pkg/cloudevents/generic/options/mqtt/agentoptions.go b/pkg/cloudevents/generic/options/mqtt/agentoptions.go index 93347045..34a25c30 100644 --- a/pkg/cloudevents/generic/options/mqtt/agentoptions.go +++ b/pkg/cloudevents/generic/options/mqtt/agentoptions.go @@ -24,6 +24,7 @@ type mqttAgentTransport struct { agentID string } +// Deprecated: use v2.mqtt.NewAgentOptions instead func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions { mqttAgentOptions := &mqttAgentTransport{ MQTTOptions: *mqttOptions, @@ -40,8 +41,6 @@ func NewAgentOptions(mqttOptions *MQTTOptions, clusterName, agentID string) *opt } func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) { - logger := klog.FromContext(ctx) - topic, err := getAgentPubTopic(ctx) if err != nil { return nil, err @@ -51,59 +50,18 @@ func (o *mqttAgentTransport) WithContext(ctx context.Context, evtCtx cloudevents return cloudeventscontext.WithTopic(ctx, string(*topic)), nil } - eventType, err := types.ParseCloudEventsType(evtCtx.GetType()) - if err != nil { - return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err) - } - - originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource) - if err != nil { - return nil, err - } - - // agent request to sync resource spec from all sources - if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll { - if len(o.Topics.AgentBroadcast) == 0 { - logger.Info("the agent broadcast topic not set, fall back to the agent events topic") - - // TODO after supporting multiple sources, we should list each source - eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName) - return cloudeventscontext.WithTopic(ctx, eventsTopic), nil - } - - resyncTopic := strings.Replace(o.Topics.AgentBroadcast, "+", o.clusterName, 1) - return cloudeventscontext.WithTopic(ctx, resyncTopic), nil - } - - topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) + pubTopic, err := AgentPubTopic(ctx, &o.MQTTOptions, o.clusterName, evtCtx) if err != nil { return nil, err } - // agent publishes status events or spec resync events - eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName) - eventsTopic = replaceLast(eventsTopic, "+", topicSource) - return cloudeventscontext.WithTopic(ctx, eventsTopic), nil + return cloudeventscontext.WithTopic(ctx, pubTopic), nil } func (o *mqttAgentTransport) Connect(ctx context.Context) error { - subscribe := &paho.Subscribe{ - Subscriptions: []paho.SubscribeOptions{ - { - // TODO support multiple sources, currently the client require the source events topic has a sourceID, in - // the future, client may need a source list, it will subscribe to each source - // receiving the sources events - Topic: replaceLast(o.Topics.SourceEvents, "+", o.clusterName), QoS: byte(o.SubQoS), - }, - }, - } - - // receiving status resync events from all sources - if len(o.Topics.SourceBroadcast) != 0 { - subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{ - Topic: o.Topics.SourceBroadcast, - QoS: byte(o.SubQoS), - }) + subscribe, err := AgentSubscribe(&o.MQTTOptions, o.clusterName) + if err != nil { + return err } protocol, err := o.GetCloudEventsProtocol( @@ -157,3 +115,67 @@ func (o *mqttAgentTransport) Close(ctx context.Context) error { func (o *mqttAgentTransport) ErrorChan() <-chan error { return o.errorChan } + +func AgentPubTopic(ctx context.Context, o *MQTTOptions, clusterName string, evtCtx cloudevents.EventContext) (string, error) { + logger := klog.FromContext(ctx) + + ceType := evtCtx.GetType() + eventType, err := types.ParseCloudEventsType(ceType) + if err != nil { + return "", fmt.Errorf("unsupported event type %q, %v", ceType, err) + } + + originalSourceVal, err := evtCtx.GetExtension(types.ExtensionOriginalSource) + if err != nil { + return "", err + } + + originalSource, ok := originalSourceVal.(string) + if !ok { + return "", fmt.Errorf("originalsource extension must be a string, got %T", originalSourceVal) + } + + // agent request to sync resource spec from all sources + if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll { + if len(o.Topics.AgentBroadcast) == 0 { + logger.Info("the agent broadcast topic not set, fall back to the agent events topic") + + // TODO after supporting multiple sources, we should list each source + return replaceLast(o.Topics.AgentEvents, "+", clusterName), nil + } + + return strings.Replace(o.Topics.AgentBroadcast, "+", clusterName, 1), nil + } + + topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) + if err != nil { + return "", err + } + + // agent publishes status events or spec resync events + eventsTopic := replaceLast(o.Topics.AgentEvents, "+", clusterName) + return replaceLast(eventsTopic, "+", topicSource), nil +} + +func AgentSubscribe(o *MQTTOptions, clusterName string) (*paho.Subscribe, error) { + subscribe := &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + { + // TODO support multiple sources, currently the client require the source events topic has a sourceID, in + // the future, client may need a source list, it will subscribe to each source + // receiving the sources events + Topic: replaceLast(o.Topics.SourceEvents, "+", clusterName), QoS: byte(o.SubQoS), + }, + }, + } + + // receiving status resync events from all sources + if len(o.Topics.SourceBroadcast) != 0 { + subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{ + Topic: o.Topics.SourceBroadcast, + QoS: byte(o.SubQoS), + }) + } + + return subscribe, nil +} diff --git a/pkg/cloudevents/generic/options/mqtt/logger.go b/pkg/cloudevents/generic/options/mqtt/logger.go index aef10be2..04d79ad6 100644 --- a/pkg/cloudevents/generic/options/mqtt/logger.go +++ b/pkg/cloudevents/generic/options/mqtt/logger.go @@ -2,6 +2,7 @@ package mqtt import ( "fmt" + "github.com/eclipse/paho.golang/paho/log" "k8s.io/klog/v2" ) @@ -14,8 +15,13 @@ type PahoDebugLogger struct { logger klog.Logger } -var _ log.Logger = &PahoErrorLogger{} -var _ log.Logger = &PahoDebugLogger{} +func NewPahoErrorLogger(logger klog.Logger) log.Logger { + return &PahoErrorLogger{logger: logger} +} + +func NewPahoDebugLogger(logger klog.Logger) log.Logger { + return &PahoDebugLogger{logger: logger} +} func (l *PahoErrorLogger) Println(v ...interface{}) { l.logger.Error(fmt.Errorf("get err %s", fmt.Sprint(v...)), "MQTT error message") diff --git a/pkg/cloudevents/generic/options/mqtt/options.go b/pkg/cloudevents/generic/options/mqtt/options.go index fe8b6a4c..36aab305 100644 --- a/pkg/cloudevents/generic/options/mqtt/options.go +++ b/pkg/cloudevents/generic/options/mqtt/options.go @@ -4,13 +4,14 @@ import ( "context" "crypto/tls" "fmt" - "k8s.io/klog/v2" "net" "os" "regexp" "strings" "time" + "k8s.io/klog/v2" + cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" @@ -235,8 +236,8 @@ func (o *MQTTOptions) GetCloudEventsProtocol( opts := []cloudeventsmqtt.Option{ cloudeventsmqtt.WithConnect(o.GetMQTTConnectOption(clientID)), - cloudeventsmqtt.WithDebugLogger(&PahoDebugLogger{logger: logger}), - cloudeventsmqtt.WithErrorLogger(&PahoErrorLogger{logger: logger}), + cloudeventsmqtt.WithDebugLogger(NewPahoDebugLogger(logger)), + cloudeventsmqtt.WithErrorLogger(NewPahoErrorLogger(logger)), } opts = append(opts, clientOpts...) return cloudeventsmqtt.New(ctx, config, opts...) diff --git a/pkg/cloudevents/generic/options/mqtt/sourceoptions.go b/pkg/cloudevents/generic/options/mqtt/sourceoptions.go index 7fd71c82..a2f3050c 100644 --- a/pkg/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/pkg/cloudevents/generic/options/mqtt/sourceoptions.go @@ -23,6 +23,7 @@ type mqttSourceTransport struct { clientID string } +// Deprecated: use v2.mqtt.NewSourceOptions instead func NewSourceOptions(mqttOptions *MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions { mqttSourceOptions := &mqttSourceTransport{ MQTTOptions: *mqttOptions, @@ -47,58 +48,20 @@ func (o *mqttSourceTransport) WithContext(ctx context.Context, evtCtx cloudevent return cloudeventscontext.WithTopic(ctx, string(*topic)), nil } - eventType, err := types.ParseCloudEventsType(evtCtx.GetType()) - if err != nil { - return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err) - } - - clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName) + pubTopic, err := SourcePubTopic(ctx, &o.MQTTOptions, o.sourceID, evtCtx) if err != nil { return nil, err } - if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll { - // source request to get resources status from all agents - if len(o.Topics.SourceBroadcast) == 0 { - return nil, fmt.Errorf("the source broadcast topic not set") - } - - resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", o.sourceID, 1) - return cloudeventscontext.WithTopic(ctx, resyncTopic), nil - } - - // source publishes spec events or status resync events - eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1) - return cloudeventscontext.WithTopic(ctx, eventsTopic), nil + return cloudeventscontext.WithTopic(ctx, pubTopic), nil } func (o *mqttSourceTransport) Connect(ctx context.Context) error { - topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) + subscribe, err := SourceSubscribe(&o.MQTTOptions, o.sourceID) if err != nil { return err } - if topicSource != o.sourceID { - return fmt.Errorf("the topic source %q does not match with the client sourceID %q", - o.Topics.AgentEvents, o.sourceID) - } - - subscribe := &paho.Subscribe{ - Subscriptions: []paho.SubscribeOptions{ - { - Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS), - }, - }, - } - - if len(o.Topics.AgentBroadcast) != 0 { - // receiving spec resync events from all agents - subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{ - Topic: o.Topics.AgentBroadcast, - QoS: byte(o.SubQoS), - }) - } - protocol, err := o.GetCloudEventsProtocol( ctx, o.clientID, @@ -151,3 +114,65 @@ func (o *mqttSourceTransport) Close(ctx context.Context) error { func (o *mqttSourceTransport) ErrorChan() <-chan error { return o.errorChan } + +func SourcePubTopic(ctx context.Context, o *MQTTOptions, sourceID string, evtCtx cloudevents.EventContext) (string, error) { + ceType := evtCtx.GetType() + eventType, err := types.ParseCloudEventsType(ceType) + if err != nil { + return "", fmt.Errorf("unsupported event type %q, %v", ceType, err) + } + + clusterNameVal, err := evtCtx.GetExtension(types.ExtensionClusterName) + if err != nil { + return "", err + } + + clusterName, ok := clusterNameVal.(string) + if !ok { + return "", fmt.Errorf("clustername extension must be a string, got %T", clusterNameVal) + } + + if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll { + // source request to get resources status from all agents + if len(o.Topics.SourceBroadcast) == 0 { + return "", fmt.Errorf("the source broadcast topic not set") + } + + resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", sourceID, 1) + return resyncTopic, nil + } + + // source publishes spec events or status resync events + eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", clusterName, 1) + return eventsTopic, nil +} + +func SourceSubscribe(o *MQTTOptions, sourceID string) (*paho.Subscribe, error) { + topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) + if err != nil { + return nil, err + } + + if topicSource != sourceID { + return nil, fmt.Errorf("the topic source %q does not match the client sourceID %q", + topicSource, sourceID) + } + + subscribe := &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + { + Topic: o.Topics.AgentEvents, QoS: byte(o.SubQoS), + }, + }, + } + + if len(o.Topics.AgentBroadcast) != 0 { + // receiving spec resync events from all agents + subscribe.Subscriptions = append(subscribe.Subscriptions, paho.SubscribeOptions{ + Topic: o.Topics.AgentBroadcast, + QoS: byte(o.SubQoS), + }) + } + + return subscribe, nil +} diff --git a/pkg/cloudevents/generic/options/v2/mqtt/options.go b/pkg/cloudevents/generic/options/v2/mqtt/options.go new file mode 100644 index 00000000..d2302aea --- /dev/null +++ b/pkg/cloudevents/generic/options/v2/mqtt/options.go @@ -0,0 +1,44 @@ +package mqtt + +import ( + "context" + "fmt" + + v2 "github.com/cloudevents/sdk-go/v2" + "github.com/eclipse/paho.golang/paho" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" +) + +func NewAgentOptions(opts *mqtt.MQTTOptions, clusterName, agentID string) *options.CloudEventsAgentOptions { + return &options.CloudEventsAgentOptions{ + CloudEventsTransport: newTransport( + fmt.Sprintf("%s-client", agentID), + opts, + func(ctx context.Context, e v2.Event) (string, error) { + return mqtt.AgentPubTopic(ctx, opts, clusterName, e.Context) + }, + func() (*paho.Subscribe, error) { + return mqtt.AgentSubscribe(opts, clusterName) + }, + ), + AgentID: agentID, + ClusterName: clusterName, + } +} + +func NewSourceOptions(opts *mqtt.MQTTOptions, clientID, sourceID string) *options.CloudEventsSourceOptions { + return &options.CloudEventsSourceOptions{ + CloudEventsTransport: newTransport( + clientID, + opts, + func(ctx context.Context, e v2.Event) (string, error) { + return mqtt.SourcePubTopic(ctx, opts, sourceID, e.Context) + }, + func() (*paho.Subscribe, error) { + return mqtt.SourceSubscribe(opts, sourceID) + }, + ), + SourceID: sourceID, + } +} diff --git a/pkg/cloudevents/generic/options/v2/mqtt/transport.go b/pkg/cloudevents/generic/options/v2/mqtt/transport.go new file mode 100644 index 00000000..9ec7d470 --- /dev/null +++ b/pkg/cloudevents/generic/options/v2/mqtt/transport.go @@ -0,0 +1,228 @@ +package mqtt + +import ( + "context" + "fmt" + "sync" + + cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/eclipse/paho.golang/paho" + "k8s.io/klog/v2" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" +) + +type pubTopicGetter func(context.Context, cloudevents.Event) (string, error) +type subscribeGetter func() (*paho.Subscribe, error) + +type mqttTransport struct { + opts *mqtt.MQTTOptions + + mu sync.RWMutex + subscribed bool + closeChan chan struct{} + errorChan chan error + msgChan chan *paho.Publish + + clientID string + + getPublishTopic pubTopicGetter + getSubscribe subscribeGetter + + client *paho.Client +} + +func newTransport(clientID string, opts *mqtt.MQTTOptions, pubTopicGetter pubTopicGetter, subscribeGetter subscribeGetter) *mqttTransport { + return &mqttTransport{ + opts: opts, + clientID: clientID, + errorChan: make(chan error, 1), + getPublishTopic: pubTopicGetter, + getSubscribe: subscribeGetter, + } +} + +func (t *mqttTransport) Connect(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() + + logger := klog.FromContext(ctx) + tcpConn, err := t.opts.Dialer.Dial() + if err != nil { + return err + } + + config := paho.ClientConfig{ + ClientID: t.clientID, + Conn: tcpConn, + OnClientError: func(err error) { + select { + case t.errorChan <- err: + default: + logger.Error(err, "mqtt client error") + } + }, + } + + t.client = paho.NewClient(config) + t.client.SetDebugLogger(mqtt.NewPahoDebugLogger(logger)) + t.client.SetErrorLogger(mqtt.NewPahoErrorLogger(logger)) + + connAck, err := t.client.Connect(ctx, t.opts.GetMQTTConnectOption(t.clientID)) + if err != nil { + return err + } + if connAck.ReasonCode != 0 { + return fmt.Errorf("failed to establish the connection: %s", connAck.String()) + } + + // Initialize closeChan and msgChan to support reconnect cycles + t.closeChan = make(chan struct{}) + // TODO consider to make the channel size configurable + t.msgChan = make(chan *paho.Publish, 100) + + logger.Info("mqtt is connected", "brokerHost", t.opts.Dialer.BrokerHost) + + return nil +} + +func (t *mqttTransport) Send(ctx context.Context, evt cloudevents.Event) error { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.client == nil { + return fmt.Errorf("transport not connected") + } + + topic, err := t.getPublishTopic(ctx, evt) + if err != nil { + return err + } + + msg := &paho.Publish{ + QoS: byte(t.opts.PubQoS), + Topic: topic, + } + + if err := cloudeventsmqtt.WritePubMessage(ctx, (*binding.EventMessage)(&evt), msg); err != nil { + return err + } + + if _, err := t.client.Publish(ctx, msg); err != nil { + return err + } + + return nil +} + +func (t *mqttTransport) Subscribe(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() + + if t.client == nil { + return fmt.Errorf("transport not connected") + } + + if t.subscribed { + return fmt.Errorf("transport has already subscribed") + } + + subscribe, err := t.getSubscribe() + if err != nil { + return err + } + + t.client.AddOnPublishReceived(func(m paho.PublishReceived) (bool, error) { + select { + case t.msgChan <- m.Packet: + return true, nil + case <-t.getCloseChan(): + // Only drop messages if we're shutting down + return false, fmt.Errorf("transport closed") + } + // No default case - will block until channel has space (no message loss) + }) + + if _, err := t.client.Subscribe(ctx, subscribe); err != nil { + return err + } + + t.subscribed = true + + logger := klog.FromContext(ctx) + for _, sub := range subscribe.Subscriptions { + logger.Info("subscribed to mqtt broker", "topic", sub.Topic, "QoS", sub.QoS) + } + + return nil +} + +// Receive starts receiving events and invokes the provided handler for each event. +// This is a BLOCKING call that runs an event loop until the context is cancelled. +func (t *mqttTransport) Receive(ctx context.Context, handleFn options.ReceiveHandlerFn) error { + t.mu.RLock() + if !t.subscribed { + t.mu.RUnlock() + return fmt.Errorf("transport not subscribed") + } + t.mu.RUnlock() + + logger := klog.FromContext(ctx) + for { + select { + case <-ctx.Done(): + return nil + case <-t.getCloseChan(): + return nil + case m, ok := <-t.msgChan: + if !ok { + return nil + } + evt, err := binding.ToEvent(ctx, cloudeventsmqtt.NewMessage(m)) + if err != nil { + logger.Error(err, "invalid event") + continue + } + + handleFn(ctx, *evt) + } + } +} + +func (t *mqttTransport) ErrorChan() <-chan error { + return t.errorChan +} + +func (t *mqttTransport) Close(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() + klog.FromContext(ctx).Info("close mqtt transport") + + if t.client == nil { + // no client, do nothing + return nil + } + + // Guard against double-close panic - check if already closed + if t.closeChan != nil { + select { + case <-t.closeChan: + // already closed + default: + close(t.closeChan) + } + } + + t.subscribed = false + + return t.client.Disconnect(&paho.Disconnect{ReasonCode: 0}) +} + +func (t *mqttTransport) getCloseChan() chan struct{} { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.closeChan +} diff --git a/pkg/cloudevents/generic/options/v2/mqtt/transport_test.go b/pkg/cloudevents/generic/options/v2/mqtt/transport_test.go new file mode 100644 index 00000000..42cf57df --- /dev/null +++ b/pkg/cloudevents/generic/options/v2/mqtt/transport_test.go @@ -0,0 +1,658 @@ +package mqtt + +import ( + "bytes" + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/eclipse/paho.golang/paho" + mochimqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/listeners" + "github.com/mochi-mqtt/server/v2/packets" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" +) + +const ( + testBrokerHost = "127.0.0.1:11883" + testTimeout = 10 * time.Second +) + +// allowAllHook allows all connections for testing +type allowAllHook struct { + mochimqtt.HookBase +} + +func (h *allowAllHook) ID() string { + return "allow-all-auth" +} + +func (h *allowAllHook) Provides(b byte) bool { + return bytes.Contains([]byte{ + mochimqtt.OnConnectAuthenticate, + mochimqtt.OnACLCheck, + }, []byte{b}) +} + +func (h *allowAllHook) OnConnectAuthenticate(cl *mochimqtt.Client, pk packets.Packet) bool { + return true +} + +func (h *allowAllHook) OnACLCheck(cl *mochimqtt.Client, topic string, write bool) bool { + return true +} + +// setupTestBroker creates and starts a test MQTT broker +func setupTestBroker(t *testing.T) (*mochimqtt.Server, func()) { + broker := mochimqtt.New(&mochimqtt.Options{}) + + // Allow all connections + err := broker.AddHook(new(allowAllHook), nil) + if err != nil { + t.Fatalf("failed to add auth hook: %v", err) + } + + err = broker.AddListener(listeners.NewTCP(listeners.Config{ + ID: "test-mqtt-broker", + Address: testBrokerHost, + })) + if err != nil { + t.Fatalf("failed to add listener: %v", err) + } + + go func() { + err := broker.Serve() + if err != nil { + t.Logf("broker serve error: %v", err) + } + }() + + // Wait for broker to be ready + time.Sleep(100 * time.Millisecond) + + cleanup := func() { + // Add a delay before broker shutdown to ensure all MQTT client operations + // complete. This prevents a race condition where broker shutdown tries to + // close listener clients while client deletion is still in progress, which + // can cause a deadlock in the mochi-mqtt server's client management. + time.Sleep(200 * time.Millisecond) + if err := broker.Close(); err != nil { + t.Logf("failed to close broker: %v", err) + } + } + + return broker, cleanup +} + +// createTestMQTTOptions creates MQTTOptions for testing +func createTestMQTTOptions() *mqtt.MQTTOptions { + return &mqtt.MQTTOptions{ + KeepAlive: 60, + PubQoS: 1, + SubQoS: 1, + Dialer: &mqtt.MQTTDialer{ + BrokerHost: testBrokerHost, + Timeout: 5 * time.Second, + }, + } +} + +// createTestTransport creates a transport for testing +func createTestTransport(clientID, pubTopic, subTopic string) *mqttTransport { + opts := createTestMQTTOptions() + return newTransport( + clientID, + opts, + func(ctx context.Context, evt cloudevents.Event) (string, error) { + return pubTopic, nil + }, + func() (*paho.Subscribe, error) { + return &paho.Subscribe{ + Subscriptions: []paho.SubscribeOptions{ + {Topic: subTopic, QoS: byte(opts.SubQoS)}, + }, + }, nil + }, + ) +} + +// TestTransportConnectDisconnect tests basic connect and disconnect +func TestTransportConnectDisconnect(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + transport := createTestTransport("test-client-1", "test/pub", "test/sub") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + // Test connect + if err := transport.Connect(ctx); err != nil { + t.Fatalf("failed to connect: %v", err) + } + + // Verify channels are initialized + if transport.msgChan == nil { + t.Fatal("msgChan should be initialized after Connect") + } + if transport.closeChan == nil { + t.Fatal("closeChan should be initialized after Connect") + } + if transport.client == nil { + t.Fatal("client should be initialized after Connect") + } + + // Test disconnect + if err := transport.Close(ctx); err != nil { + t.Fatalf("failed to close: %v", err) + } +} + +// TestTransportReconnect tests reconnection capability +func TestTransportReconnect(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + transport := createTestTransport("test-client-reconnect", "test/pub", "test/sub") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + // First connection + if err := transport.Connect(ctx); err != nil { + t.Fatalf("first connect failed: %v", err) + } + + // Close + if err := transport.Close(ctx); err != nil { + t.Fatalf("close failed: %v", err) + } + + // Reconnect + if err := transport.Connect(ctx); err != nil { + t.Fatalf("reconnect failed: %v", err) + } + + // Verify new channels are created + if transport.msgChan == nil { + t.Fatal("msgChan should be re-initialized after reconnect") + } + if transport.closeChan == nil { + t.Fatal("closeChan should be re-initialized after reconnect") + } + + if err := transport.Close(ctx); err != nil { + t.Fatalf("final close failed: %v", err) + } +} + +// TestTransportSend tests sending messages +func TestTransportSend(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + transport := createTestTransport("test-sender", "test/send/topic", "test/receive/topic") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := transport.Connect(ctx); err != nil { + t.Fatalf("failed to connect: %v", err) + } + defer transport.Close(ctx) + + // Create a test event + event := cloudevents.NewEvent() + event.SetID("test-event-1") + event.SetSource("test-source") + event.SetType("test.type") + if err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}); err != nil { + t.Fatalf("failed to set data: %v", err) + } + + // Send the event + if err := transport.Send(ctx, event); err != nil { + t.Fatalf("failed to send event: %v", err) + } +} + +// TestTransportSendWithoutConnect tests error when sending without connecting +func TestTransportSendWithoutConnect(t *testing.T) { + transport := createTestTransport("test-sender-no-connect", "test/send", "test/receive") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + event := cloudevents.NewEvent() + event.SetID("test-event") + event.SetSource("test-source") + event.SetType("test.type") + + // Should fail because not connected + err := transport.Send(ctx, event) + if err == nil { + t.Fatal("expected error when sending without connect, got nil") + } + if err.Error() != "transport not connected" { + t.Fatalf("unexpected error message: %v", err) + } +} + +// TestTransportSubscribeAndReceive tests subscribing and receiving messages +func TestTransportSubscribeAndReceive(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + topic := "test/pubsub/topic" + sender := createTestTransport("test-sender", topic, topic) + receiver := createTestTransport("test-receiver", topic, topic) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + // Connect both transports + if err := sender.Connect(ctx); err != nil { + t.Fatalf("sender connect failed: %v", err) + } + defer sender.Close(ctx) + + if err := receiver.Connect(ctx); err != nil { + t.Fatalf("receiver connect failed: %v", err) + } + defer receiver.Close(ctx) + + // Subscribe receiver + if err := receiver.Subscribe(ctx); err != nil { + t.Fatalf("subscribe failed: %v", err) + } + + // Give subscription time to complete + time.Sleep(100 * time.Millisecond) + + // Channel to signal when message is received + received := make(chan cloudevents.Event, 1) + receivedCount := atomic.Int32{} + + // Start receiving in background + go func() { + err := receiver.Receive(ctx, func(ctx context.Context, evt cloudevents.Event) { + receivedCount.Add(1) + received <- evt + }) + if err != nil && ctx.Err() == nil { + t.Logf("receive error: %v", err) + } + }() + + // Wait a bit for receiver to be ready + time.Sleep(100 * time.Millisecond) + + // Send an event + event := cloudevents.NewEvent() + event.SetID("test-event-123") + event.SetSource("test-source") + event.SetType("test.type.pubsub") + if err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"message": "hello"}); err != nil { + t.Fatalf("failed to set data: %v", err) + } + + if err := sender.Send(ctx, event); err != nil { + t.Fatalf("failed to send: %v", err) + } + + // Wait for the message + select { + case evt := <-received: + if evt.ID() != "test-event-123" { + t.Fatalf("unexpected event ID: got %s, want test-event-123", evt.ID()) + } + if evt.Type() != "test.type.pubsub" { + t.Fatalf("unexpected event type: got %s, want test.type.pubsub", evt.Type()) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for message") + } + + // Verify only one message was received + if count := receivedCount.Load(); count != 1 { + t.Fatalf("expected 1 message, got %d", count) + } +} + +// TestTransportNoMessageLoss tests that messages are not lost under load +func TestTransportNoMessageLoss(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + topic := "test/no-loss/topic" + sender := createTestTransport("test-sender-load", topic, topic) + receiver := createTestTransport("test-receiver-load", topic, topic) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Connect both + if err := sender.Connect(ctx); err != nil { + t.Fatalf("sender connect failed: %v", err) + } + defer sender.Close(ctx) + + if err := receiver.Connect(ctx); err != nil { + t.Fatalf("receiver connect failed: %v", err) + } + defer receiver.Close(ctx) + + // Subscribe + if err := receiver.Subscribe(ctx); err != nil { + t.Fatalf("subscribe failed: %v", err) + } + + time.Sleep(100 * time.Millisecond) + + // Track received messages + receivedIDs := sync.Map{} + receivedCount := atomic.Int32{} + const totalMessages = 100 + + // Start receiver + go func() { + err := receiver.Receive(ctx, func(ctx context.Context, evt cloudevents.Event) { + receivedIDs.Store(evt.ID(), true) + receivedCount.Add(1) + }) + if err != nil && ctx.Err() == nil { + t.Logf("receive error: %v", err) + } + }() + + time.Sleep(100 * time.Millisecond) + + // Send many messages rapidly + for i := 0; i < totalMessages; i++ { + event := cloudevents.NewEvent() + event.SetID(fmt.Sprintf("event-%d", i)) + event.SetSource("test-source") + event.SetType("test.type") + if err := event.SetData(cloudevents.ApplicationJSON, map[string]int{"seq": i}); err != nil { + t.Fatalf("failed to set data: %v", err) + } + + if err := sender.Send(ctx, event); err != nil { + t.Fatalf("failed to send event %d: %v", i, err) + } + } + + // Wait for all messages to be received + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + if receivedCount.Load() >= totalMessages { + break + } + time.Sleep(100 * time.Millisecond) + } + + // Verify all messages received + finalCount := receivedCount.Load() + if finalCount != totalMessages { + t.Fatalf("message loss detected: sent %d, received %d", totalMessages, finalCount) + } + + // Verify all IDs are unique and present + for i := 0; i < totalMessages; i++ { + expectedID := fmt.Sprintf("event-%d", i) + if _, ok := receivedIDs.Load(expectedID); !ok { + t.Fatalf("message %s not received", expectedID) + } + } +} + +// TestTransportSubscribeWithoutConnect tests error when subscribing without connecting +func TestTransportSubscribeWithoutConnect(t *testing.T) { + transport := createTestTransport("test-sub-no-connect", "test/pub", "test/sub") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + err := transport.Subscribe(ctx) + if err == nil { + t.Fatal("expected error when subscribing without connect, got nil") + } + if err.Error() != "transport not connected" { + t.Fatalf("unexpected error message: %v", err) + } +} + +// TestTransportDoubleSubscribe tests error on double subscribe +func TestTransportDoubleSubscribe(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + transport := createTestTransport("test-double-sub", "test/pub", "test/sub") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := transport.Connect(ctx); err != nil { + t.Fatalf("connect failed: %v", err) + } + defer transport.Close(ctx) + + // First subscribe should succeed + if err := transport.Subscribe(ctx); err != nil { + t.Fatalf("first subscribe failed: %v", err) + } + + // Second subscribe should fail + err := transport.Subscribe(ctx) + if err == nil { + t.Fatal("expected error on double subscribe, got nil") + } + if err.Error() != "transport has already subscribed" { + t.Fatalf("unexpected error message: %v", err) + } +} + +// TestTransportCloseWhileReceiving tests graceful shutdown during receive +func TestTransportCloseWhileReceiving(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + topic := "test/close/topic" + transport := createTestTransport("test-close-receive", topic, topic) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := transport.Connect(ctx); err != nil { + t.Fatalf("connect failed: %v", err) + } + + if err := transport.Subscribe(ctx); err != nil { + t.Fatalf("subscribe failed: %v", err) + } + + receiveDone := make(chan error, 1) + + // Start receiving + go func() { + err := transport.Receive(ctx, func(ctx context.Context, evt cloudevents.Event) { + // Handler should not be called after close + }) + receiveDone <- err + }() + + // Wait a bit for receive loop to start + time.Sleep(100 * time.Millisecond) + + // Close transport + if err := transport.Close(ctx); err != nil { + t.Fatalf("close failed: %v", err) + } + + // Receive should exit gracefully + select { + case err := <-receiveDone: + if err != nil { + t.Logf("receive exited with error (expected): %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("receive did not exit after close") + } +} + +// TestTransportDoubleClose tests that double close is safe +func TestTransportDoubleClose(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + transport := createTestTransport("test-double-close", "test/pub", "test/sub") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := transport.Connect(ctx); err != nil { + t.Fatalf("connect failed: %v", err) + } + + // First close + if err := transport.Close(ctx); err != nil { + t.Fatalf("first close failed: %v", err) + } + + // Second close should not panic + if err := transport.Close(ctx); err != nil { + t.Logf("second close returned error (acceptable): %v", err) + } +} + +// TestTransportContextCancellation tests that receive exits on context cancellation +func TestTransportContextCancellation(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + topic := "test/cancel/topic" + transport := createTestTransport("test-cancel", topic, topic) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := transport.Connect(ctx); err != nil { + t.Fatalf("connect failed: %v", err) + } + defer transport.Close(ctx) + + if err := transport.Subscribe(ctx); err != nil { + t.Fatalf("subscribe failed: %v", err) + } + + receiveCtx, receiveCancel := context.WithCancel(ctx) + receiveDone := make(chan error, 1) + + // Start receiving + go func() { + err := transport.Receive(receiveCtx, func(ctx context.Context, evt cloudevents.Event) { + // Handler + }) + receiveDone <- err + }() + + // Wait a bit for receive loop to start + time.Sleep(100 * time.Millisecond) + + // Cancel the receive context + receiveCancel() + + // Receive should exit + select { + case err := <-receiveDone: + if err != nil { + t.Logf("receive exited with error (expected): %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("receive did not exit after context cancellation") + } +} + +// TestTransportReceiveInvalidEvent tests handling of invalid events +func TestTransportReceiveInvalidEvent(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + topic := "test/invalid/topic" + receiver := createTestTransport("test-receiver-invalid", topic, topic) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := receiver.Connect(ctx); err != nil { + t.Fatalf("connect failed: %v", err) + } + defer receiver.Close(ctx) + + if err := receiver.Subscribe(ctx); err != nil { + t.Fatalf("subscribe failed: %v", err) + } + + handlerCalled := atomic.Int32{} + + go func() { + err := receiver.Receive(ctx, func(ctx context.Context, evt cloudevents.Event) { + handlerCalled.Add(1) + }) + if err != nil && ctx.Err() == nil { + t.Logf("receive error: %v", err) + } + }() + + // Wait and verify handler is not called for invalid events + time.Sleep(200 * time.Millisecond) + + if count := handlerCalled.Load(); count != 0 { + t.Fatalf("handler should not be called for invalid events, got %d calls", count) + } +} + +// TestTransportErrorChan tests the error channel +func TestTransportErrorChan(t *testing.T) { + transport := createTestTransport("test-error-chan", "test/pub", "test/sub") + + errorChan := transport.ErrorChan() + if errorChan == nil { + t.Fatal("ErrorChan should not return nil") + } + + // Verify it's the same channel + if transport.ErrorChan() != errorChan { + t.Fatal("ErrorChan should return the same channel") + } +} + +// TestTransportReceiveWithoutSubscribe tests error when receiving without subscribing +func TestTransportReceiveWithoutSubscribe(t *testing.T) { + _, cleanup := setupTestBroker(t) + defer cleanup() + + transport := createTestTransport("test-receive-no-sub", "test/pub", "test/sub") + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + if err := transport.Connect(ctx); err != nil { + t.Fatalf("connect failed: %v", err) + } + defer transport.Close(ctx) + + // Try to receive without subscribing + err := transport.Receive(ctx, func(ctx context.Context, evt cloudevents.Event) {}) + if err == nil { + t.Fatal("expected error when receiving without subscribe, got nil") + } + if err.Error() != "transport not subscribed" { + t.Fatalf("unexpected error message: %v", err) + } +} diff --git a/test/integration/cloudevents/certrotation_mqtt_test.go b/test/integration/cloudevents/certrotation_mqtt_test.go index 2a80675f..f52f9c98 100644 --- a/test/integration/cloudevents/certrotation_mqtt_test.go +++ b/test/integration/cloudevents/certrotation_mqtt_test.go @@ -10,6 +10,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" + mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/test/integration/cloudevents/util" ) @@ -18,7 +19,7 @@ var _ = ginkgo.Describe("CloudEvents Certificate Rotation Test - MQTT", runCloud func GetMQTTAgentOptions(_ context.Context, agentID, clusterName, clientCertFile, clientKeyFile string) *options.CloudEventsAgentOptions { mqttOptions := newTLSMQTTOptions(certPool, mqttTLSBrokerHost, clientCertFile, clientKeyFile) - return mqtt.NewAgentOptions(mqttOptions, clusterName, agentID) + return mqttv2.NewAgentOptions(mqttOptions, clusterName, agentID) } func newTLSMQTTOptions(certPool *x509.CertPool, brokerHost, clientCertFile, clientKeyFile string) *mqtt.MQTTOptions { diff --git a/test/integration/cloudevents/cloudevents_resync_test.go b/test/integration/cloudevents/cloudevents_resync_test.go index aa6c9a84..5d1be96a 100644 --- a/test/integration/cloudevents/cloudevents_resync_test.go +++ b/test/integration/cloudevents/cloudevents_resync_test.go @@ -4,9 +4,10 @@ import ( "context" "encoding/json" "fmt" + "time" + jsonpatch "github.com/evanphx/json-patch/v5" "k8s.io/apimachinery/pkg/types" - "time" "github.com/onsi/ginkgo" "github.com/onsi/gomega" @@ -20,6 +21,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" + mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt" "open-cluster-management.io/sdk-go/test/integration/cloudevents/agent" "open-cluster-management.io/sdk-go/test/integration/cloudevents/source" "open-cluster-management.io/sdk-go/test/integration/cloudevents/store" @@ -55,7 +57,7 @@ var _ = ginkgo.Describe("CloudEvents Clients Test - RESYNC", func() { sourceCloudEventsClient, err = source.StartResourceSourceClient( ctx, - mqtt.NewSourceOptions( + mqttv2.NewSourceOptions( util.NewMQTTSourceOptions(mqttBrokerHost, sourceID), fmt.Sprintf("%s-client", sourceID), sourceID, diff --git a/test/integration/cloudevents/cloudevetns_mqtt_test.go b/test/integration/cloudevents/cloudevetns_mqtt_test.go index ed497a8a..a04cc6d1 100644 --- a/test/integration/cloudevents/cloudevetns_mqtt_test.go +++ b/test/integration/cloudevents/cloudevetns_mqtt_test.go @@ -8,14 +8,14 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/constants" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" + mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt" "open-cluster-management.io/sdk-go/test/integration/cloudevents/util" ) var _ = ginkgo.Describe("CloudEvents Clients Test - MQTT", runCloudeventsClientPubSubTest(GetMQTTSourceOptions)) func GetMQTTSourceOptions(_ context.Context, sourceID string) (*options.CloudEventsSourceOptions, string) { - return mqtt.NewSourceOptions( + return mqttv2.NewSourceOptions( util.NewMQTTSourceOptions(mqttBrokerHost, sourceID), fmt.Sprintf("%s-client", sourceID), sourceID,