Skip to content

Commit 28f45ca

Browse files
committed
mqtt transport v2
Signed-off-by: Wei Liu <[email protected]>
1 parent 2afa7af commit 28f45ca

File tree

2 files changed

+18
-19
lines changed

2 files changed

+18
-19
lines changed

pkg/cloudevents/generic/options/v2/mqtt/transport.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ func (t *mqttTransport) Connect(ctx context.Context) error {
5858
ClientID: t.clientID,
5959
Conn: tcpConn,
6060
OnClientError: func(err error) {
61-
t.errorChan <- err
61+
select {
62+
case t.errorChan <- err:
63+
default:
64+
logger.Error(err, "mqtt client error")
65+
}
6266
},
6367
}
6468

@@ -199,29 +203,18 @@ func (t *mqttTransport) Close(ctx context.Context) error {
199203
}
200204

201205
// Guard against double-close panic - check if already closed
202-
alreadyClosed := false
203206
if t.closeChan != nil {
204207
select {
205208
case <-t.closeChan:
206-
// Already closed
207-
alreadyClosed = true
209+
// already closed
208210
default:
209211
close(t.closeChan)
210212
}
211213
}
212214

213-
// Only close msgChan if not already closed
214-
if !alreadyClosed && t.msgChan != nil {
215-
close(t.msgChan)
216-
}
217-
218215
t.subscribed = false
219216

220-
if !alreadyClosed {
221-
return t.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
222-
}
223-
224-
return nil
217+
return t.client.Disconnect(&paho.Disconnect{ReasonCode: 0})
225218
}
226219

227220
func (t *mqttTransport) getCloseChan() chan struct{} {

pkg/cloudevents/generic/options/v2/mqtt/transport_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func setupTestBroker(t *testing.T) (*mochimqtt.Server, func()) {
8585
}
8686

8787
// createTestMQTTOptions creates MQTTOptions for testing
88-
func createTestMQTTOptions(topic string) *mqtt.MQTTOptions {
88+
func createTestMQTTOptions() *mqtt.MQTTOptions {
8989
return &mqtt.MQTTOptions{
9090
KeepAlive: 60,
9191
PubQoS: 1,
@@ -99,7 +99,7 @@ func createTestMQTTOptions(topic string) *mqtt.MQTTOptions {
9999

100100
// createTestTransport creates a transport for testing
101101
func createTestTransport(clientID, pubTopic, subTopic string) *mqttTransport {
102-
opts := createTestMQTTOptions(pubTopic)
102+
opts := createTestMQTTOptions()
103103
return newTransport(
104104
clientID,
105105
opts,
@@ -206,7 +206,9 @@ func TestTransportSend(t *testing.T) {
206206
event.SetID("test-event-1")
207207
event.SetSource("test-source")
208208
event.SetType("test.type")
209-
event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"})
209+
if err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"key": "value"}); err != nil {
210+
t.Fatalf("failed to set data: %v", err)
211+
}
210212

211213
// Send the event
212214
if err := transport.Send(ctx, event); err != nil {
@@ -290,7 +292,9 @@ func TestTransportSubscribeAndReceive(t *testing.T) {
290292
event.SetID("test-event-123")
291293
event.SetSource("test-source")
292294
event.SetType("test.type.pubsub")
293-
event.SetData(cloudevents.ApplicationJSON, map[string]string{"message": "hello"})
295+
if err := event.SetData(cloudevents.ApplicationJSON, map[string]string{"message": "hello"}); err != nil {
296+
t.Fatalf("failed to set data: %v", err)
297+
}
294298

295299
if err := sender.Send(ctx, event); err != nil {
296300
t.Fatalf("failed to send: %v", err)
@@ -369,7 +373,9 @@ func TestTransportNoMessageLoss(t *testing.T) {
369373
event.SetID(fmt.Sprintf("event-%d", i))
370374
event.SetSource("test-source")
371375
event.SetType("test.type")
372-
event.SetData(cloudevents.ApplicationJSON, map[string]int{"seq": i})
376+
if err := event.SetData(cloudevents.ApplicationJSON, map[string]int{"seq": i}); err != nil {
377+
t.Fatalf("failed to set data: %v", err)
378+
}
373379

374380
if err := sender.Send(ctx, event); err != nil {
375381
t.Fatalf("failed to send event %d: %v", i, err)

0 commit comments

Comments
 (0)