Skip to content

Commit 6395632

Browse files
committed
mqtt transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent 28f45ca commit 6395632

File tree

4 files changed

+9
-6
lines changed

4 files changed

+9
-6
lines changed

pkg/cloudevents/generic/options/v2/mqtt/transport.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func newTransport(clientID string, opts *mqtt.MQTTOptions, pubTopicGetter pubTop
3838
return &mqttTransport{
3939
opts: opts,
4040
clientID: clientID,
41-
errorChan: make(chan error),
41+
errorChan: make(chan error, 1),
4242
getPublishTopic: pubTopicGetter,
4343
getSubscribe: subscribeGetter,
4444
}

test/integration/cloudevents/certrotation_mqtt_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
1111
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert"
1212
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
13+
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
1314
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
1415
"open-cluster-management.io/sdk-go/test/integration/cloudevents/util"
1516
)
@@ -18,7 +19,7 @@ var _ = ginkgo.Describe("CloudEvents Certificate Rotation Test - MQTT", runCloud
1819

1920
func GetMQTTAgentOptions(_ context.Context, agentID, clusterName, clientCertFile, clientKeyFile string) *options.CloudEventsAgentOptions {
2021
mqttOptions := newTLSMQTTOptions(certPool, mqttTLSBrokerHost, clientCertFile, clientKeyFile)
21-
return mqtt.NewAgentOptions(mqttOptions, clusterName, agentID)
22+
return mqttv2.NewAgentOptions(mqttOptions, clusterName, agentID)
2223
}
2324

2425
func newTLSMQTTOptions(certPool *x509.CertPool, brokerHost, clientCertFile, clientKeyFile string) *mqtt.MQTTOptions {

test/integration/cloudevents/cloudevents_resync_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"time"
8+
79
jsonpatch "github.com/evanphx/json-patch/v5"
810
"k8s.io/apimachinery/pkg/types"
9-
"time"
1011

1112
"github.com/onsi/ginkgo"
1213
"github.com/onsi/gomega"
@@ -20,6 +21,7 @@ import (
2021
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec"
2122
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
2223
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
24+
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
2325
"open-cluster-management.io/sdk-go/test/integration/cloudevents/agent"
2426
"open-cluster-management.io/sdk-go/test/integration/cloudevents/source"
2527
"open-cluster-management.io/sdk-go/test/integration/cloudevents/store"
@@ -55,7 +57,7 @@ var _ = ginkgo.Describe("CloudEvents Clients Test - RESYNC", func() {
5557

5658
sourceCloudEventsClient, err = source.StartResourceSourceClient(
5759
ctx,
58-
mqtt.NewSourceOptions(
60+
mqttv2.NewSourceOptions(
5961
util.NewMQTTSourceOptions(mqttBrokerHost, sourceID),
6062
fmt.Sprintf("%s-client", sourceID),
6163
sourceID,

test/integration/cloudevents/cloudevetns_mqtt_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88

99
"open-cluster-management.io/sdk-go/pkg/cloudevents/constants"
1010
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
11-
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
11+
mqttv2 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/v2/mqtt"
1212
"open-cluster-management.io/sdk-go/test/integration/cloudevents/util"
1313
)
1414

1515
var _ = ginkgo.Describe("CloudEvents Clients Test - MQTT", runCloudeventsClientPubSubTest(GetMQTTSourceOptions))
1616

1717
func GetMQTTSourceOptions(_ context.Context, sourceID string) (*options.CloudEventsSourceOptions, string) {
18-
return mqtt.NewSourceOptions(
18+
return mqttv2.NewSourceOptions(
1919
util.NewMQTTSourceOptions(mqttBrokerHost, sourceID),
2020
fmt.Sprintf("%s-client", sourceID),
2121
sourceID,

0 commit comments

Comments
 (0)