Skip to content

Commit 83cbf0e

Browse files
committed
Change token.Wait to token.WaitTimeout.
1 parent 2b4070b commit 83cbf0e

File tree

4 files changed

+10
-5
lines changed

4 files changed

+10
-5
lines changed

cmd/chirpstack-gateway-bridge/cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func init() {
6262
viper.SetDefault("integration.mqtt.state_retained", true)
6363
viper.SetDefault("integration.mqtt.keep_alive", 30*time.Second)
6464
viper.SetDefault("integration.mqtt.max_reconnect_interval", time.Minute)
65+
viper.SetDefault("integration.mqtt.max_token_wait", time.Second)
6566

6667
viper.SetDefault("integration.mqtt.auth.generic.servers", []string{"tcp://127.0.0.1:1883"})
6768
viper.SetDefault("integration.mqtt.auth.generic.clean_session", true)

internal/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type Config struct {
6363
KeepAlive time.Duration `mapstructure:"keep_alive"`
6464
MaxReconnectInterval time.Duration `mapstructure:"max_reconnect_interval"`
6565
TerminateOnConnectError bool `mapstructure:"terminate_on_connect_error"`
66+
MaxTokenWait time.Duration `mapstructure:"max_token_wait"`
6667

6768
Auth struct {
6869
Type string `mapstructure:"type"`

internal/integration/mqtt/backend.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Backend struct {
4141
gatewaysSubscribed map[lorawan.EUI64]struct{}
4242
terminateOnConnectError bool
4343
stateRetained bool
44+
maxTokenWait time.Duration
4445

4546
qos uint8
4647
eventTopicTemplate *template.Template
@@ -62,6 +63,7 @@ func NewBackend(conf config.Config) (*Backend, error) {
6263
gateways: make(map[lorawan.EUI64]struct{}),
6364
gatewaysSubscribed: make(map[lorawan.EUI64]struct{}),
6465
stateRetained: conf.Integration.MQTT.StateRetained,
66+
maxTokenWait: conf.Integration.MQTT.MaxTokenWait,
6567
}
6668

6769
switch conf.Integration.MQTT.Auth.Type {
@@ -283,7 +285,7 @@ func (b *Backend) subscribeGateway(gatewayID lorawan.EUI64) error {
283285
"qos": b.qos,
284286
}).Info("integration/mqtt: subscribing to topic")
285287

286-
if token := b.conn.Subscribe(topic.String(), b.qos, b.handleCommand); token.Wait() && token.Error() != nil {
288+
if token := b.conn.Subscribe(topic.String(), b.qos, b.handleCommand); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
287289
return errors.Wrap(token.Error(), "subscribe topic error")
288290
}
289291
return nil
@@ -298,7 +300,7 @@ func (b *Backend) unsubscribeGateway(gatewayID lorawan.EUI64) error {
298300
"topic": topic.String(),
299301
}).Info("integration/mqtt: unsubscribing from topic")
300302

301-
if token := b.conn.Unsubscribe(topic.String()); token.Wait() && token.Error() != nil {
303+
if token := b.conn.Unsubscribe(topic.String()); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
302304
return errors.Wrap(token.Error(), "unsubscribe topic error")
303305
}
304306

@@ -351,7 +353,7 @@ func (b *Backend) PublishState(gatewayID lorawan.EUI64, state string, v proto.Me
351353
"state": state,
352354
"gateway_id": gatewayID,
353355
}).Info("integration/mqtt: publishing state")
354-
if token := b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes); token.Wait() && token.Error() != nil {
356+
if token := b.conn.Publish(topic.String(), b.qos, b.stateRetained, bytes); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
355357
return token.Error()
356358
}
357359
return nil
@@ -366,7 +368,7 @@ func (b *Backend) connect() error {
366368
}
367369

368370
b.conn = paho.NewClient(b.clientOpts)
369-
if token := b.conn.Connect(); token.Wait() && token.Error() != nil {
371+
if token := b.conn.Connect(); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
370372
return token.Error()
371373
}
372374

@@ -659,7 +661,7 @@ func (b *Backend) publishEvent(gatewayID lorawan.EUI64, event string, fields log
659661
fields["event"] = event
660662

661663
log.WithFields(fields).Info("integration/mqtt: publishing event")
662-
if token := b.conn.Publish(topic.String(), b.qos, false, bytes); token.Wait() && token.Error() != nil {
664+
if token := b.conn.Publish(topic.String(), b.qos, false, bytes); token.WaitTimeout(b.maxTokenWait) && token.Error() != nil {
663665
return token.Error()
664666
}
665667
return nil

internal/integration/mqtt/backend_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func (ts *MQTTBackendTestSuite) SetupSuite() {
6464
conf.Integration.MQTT.Auth.Generic.Password = password
6565
conf.Integration.MQTT.Auth.Generic.CleanSession = true
6666
conf.Integration.MQTT.Auth.Generic.ClientID = ts.gatewayID.String()
67+
conf.Integration.MQTT.MaxTokenWait = time.Second
6768

6869
var err error
6970
ts.backend, err = NewBackend(conf)

0 commit comments

Comments
 (0)