Skip to content

Commit c15b93d

Browse files
committed
mqtt-gateway v0.5.0
1 parent d5dae45 commit c15b93d

File tree

6 files changed

+113
-105
lines changed

6 files changed

+113
-105
lines changed

cmd/gateway/gateway.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ func main() {
7272
if err := configSet.register(gw); err != nil {
7373
logger.Fatal(err)
7474
}
75+
// start listening
76+
if err := gw.Listen(); err != nil {
77+
logger.Fatal(err)
78+
}
7579

7680
sig := make(chan os.Signal, 1)
7781
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

gateway/cs.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,21 @@ func (cs *CS) handler(wg *sync.WaitGroup, hndCh <-chan *hndMsg, pubCh chan<- *pu
7676
defer wg.Done()
7777

7878
for msg := range hndCh {
79-
if value, err := msg.fn(msg.value); err != nil {
80-
errCh <- &errMsg{topic: msg.topic.String(), err: err}
81-
} else {
82-
pubCh <- &pubMsg{topic: msg.topic.noCommand(), value: value}
79+
80+
topic, err := parseTopic(msg.topic)
81+
if err != nil {
82+
errCh <- &errMsg{topic: msg.topic, err: err}
83+
break
84+
}
85+
86+
value, err := msg.fn(msg.value)
87+
if err != nil {
88+
errCh <- &errMsg{topic: msg.topic, err: err}
89+
break
90+
}
91+
92+
if topic.isCommand() {
93+
pubCh <- &pubMsg{topic: topic.noCommand(), value: value}
8394
}
8495
}
8596
}

gateway/gateway.go

Lines changed: 74 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
// Package gateway provides a pico-cs MQTT broker gateway.
22
package gateway
33

4+
// use paho mqtt 3.1 broker instead the mqtt 5 version github.com/eclipse/paho.golang/paho
5+
// because couldn't get the retain message handling work properly which is an essential part
6+
// of this gateway
7+
48
import (
5-
"context"
69
"encoding/json"
710
"fmt"
811
"io"
912
"log"
10-
"net/url"
1113
"sync"
12-
"time"
1314

14-
"github.com/eclipse/paho.golang/autopaho"
15-
"github.com/eclipse/paho.golang/paho"
15+
MQTT "github.com/eclipse/paho.mqtt.golang"
1616
"golang.org/x/exp/maps"
1717
)
1818

@@ -21,17 +21,19 @@ const defChanSize = 100
2121
type hndFn func(payload any) (any, error)
2222

2323
type pubMsg struct {
24-
topic string
25-
value any
24+
topic string
25+
retain bool
26+
value any
2627
}
2728

2829
type errMsg struct {
29-
topic string
30-
err error
30+
topic string
31+
retain bool
32+
err error
3133
}
3234

3335
type hndMsg struct {
34-
topic topic
36+
topic string
3537
fn hndFn
3638
value any
3739
}
@@ -44,8 +46,10 @@ type subscription struct {
4446

4547
// Gateway represents a MQTT broker gateway.
4648
type Gateway struct {
47-
config *Config
48-
connectionManager *autopaho.ConnectionManager
49+
config *Config
50+
client MQTT.Client
51+
52+
listening bool
4953

5054
mu sync.RWMutex
5155
csMap map[string]*CS
@@ -88,35 +92,24 @@ func New(config *Config) (*Gateway, error) {
8892
wg: new(sync.WaitGroup),
8993
}
9094

91-
pahoConfig := autopaho.ClientConfig{
92-
BrokerUrls: []*url.URL{{Scheme: "tcp", Host: config.address()}},
93-
OnConnectError: func(err error) { log.Println(err) },
94-
ClientConfig: paho.ClientConfig{
95-
Router: paho.NewSingleHandlerRouter(gw.handler),
96-
},
97-
}
98-
99-
pahoConfig.SetUsernamePassword(config.Username, []byte(config.Password))
100-
101-
connectionManager, err := autopaho.NewConnection(context.Background(), pahoConfig)
102-
//cancel()
103-
if err != nil {
104-
return nil, err
95+
// MQTT:
96+
// starting with a clean seesion without client id as receiving
97+
// retained messages should be enough initializing the
98+
// command stations
99+
opts := MQTT.NewClientOptions()
100+
opts.AddBroker(config.address())
101+
opts.SetUsername(config.Username)
102+
opts.SetPassword(config.Password)
103+
opts.SetAutoReconnect(true)
104+
opts.SetCleanSession(true)
105+
opts.SetDefaultPublishHandler(gw.handler)
106+
107+
client := MQTT.NewClient(opts)
108+
if token := client.Connect(); token.Wait() && token.Error() != nil {
109+
return nil, token.Error()
105110
}
111+
gw.client = client
106112

107-
gw.connectionManager = connectionManager
108-
109-
// don't wait forever in case of connection issues like invalid host or port.
110-
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
111-
defer cancel()
112-
113-
if err := connectionManager.AwaitConnection(ctx); err != nil {
114-
return nil, err
115-
}
116-
117-
if err := gw.subscribeBroker(); err != nil {
118-
return nil, err
119-
}
120113
gw.logger.Printf("connected to broker %s", config.address())
121114

122115
// start go routines
@@ -126,6 +119,11 @@ func New(config *Config) (*Gateway, error) {
126119
return gw, nil
127120
}
128121

122+
const (
123+
defaultQoS = 1
124+
wait = 250 // waiting time for client disconnect in ms
125+
)
126+
129127
// Close closes the gateway and the MQTT connection.
130128
func (gw *Gateway) Close() error {
131129
gw.mu.RLock()
@@ -147,8 +145,20 @@ func (gw *Gateway) Close() error {
147145
gw.wg.Wait()
148146
gw.logger.Printf("disconnect from broker %s", gw.config.address())
149147
gw.unsubscribeBroker() // ignore error
148+
gw.client.Disconnect(wait)
149+
return nil
150+
}
150151

151-
return gw.connectionManager.Disconnect(context.Background())
152+
// Listen starts the gateway listening to subscriptions.
153+
func (gw *Gateway) Listen() error {
154+
// separated to start listen after subscriptions not to miss retained messages
155+
gw.mu.Lock()
156+
defer gw.mu.Unlock()
157+
if gw.listening {
158+
return fmt.Errorf("gateway is already listening")
159+
}
160+
// subscribe
161+
return gw.subscribeBroker()
152162
}
153163

154164
// CSList returns the list of command stations assigned to this gateway.
@@ -166,25 +176,15 @@ func (gw *Gateway) LocoList() []*Loco {
166176
}
167177

168178
func (gw *Gateway) subscribeBroker() error {
169-
sub := &paho.Subscribe{
170-
Subscriptions: map[string]paho.SubscribeOptions{
171-
gw.subTopic: {QoS: 1}, //QoS 1: at least once
172-
},
173-
}
174-
if suback, err := gw.connectionManager.Subscribe(context.Background(), sub); err != nil {
175-
gw.logger.Printf("subscribe suback %v error %s", suback, err)
176-
return err
179+
if token := gw.client.Subscribe(gw.subTopic, defaultQoS, gw.handler); token.Wait() && token.Error() != nil {
180+
return token.Error()
177181
}
178182
return nil
179183
}
180184

181185
func (gw *Gateway) unsubscribeBroker() error {
182-
unsub := &paho.Unsubscribe{
183-
Topics: []string{gw.subTopic},
184-
}
185-
if unsuback, err := gw.connectionManager.Unsubscribe(context.Background(), unsub); err != nil {
186-
gw.logger.Printf("unsubscribe unsuback %v error %s", unsuback, err)
187-
return err
186+
if token := gw.client.Unsubscribe(gw.subTopic); token.Wait() && token.Error() != nil {
187+
return token.Error()
188188
}
189189
return nil
190190
}
@@ -238,13 +238,21 @@ func (gw *Gateway) unsubscribe(owner any, topic string) {
238238
}
239239
}
240240

241-
func (gw *Gateway) handler(p *paho.Publish) {
242-
topic, err := parseTopic(p.Topic)
241+
func (gw *Gateway) handler(client MQTT.Client, msg MQTT.Message) {
242+
topic, err := parseTopic(msg.Topic())
243243
if err != nil {
244-
gw.errCh <- &errMsg{topic: p.Topic, err: err}
244+
gw.errCh <- &errMsg{topic: msg.Topic(), err: err}
245245
return
246246
}
247247

248+
var value any
249+
if err := json.Unmarshal(msg.Payload(), &value); err != nil {
250+
gw.errCh <- &errMsg{topic: msg.Topic(), err: err}
251+
return
252+
}
253+
254+
gw.logger.Printf("receive: topic %s retained %t value %v\n", msg.Topic(), msg.Retained(), value)
255+
248256
gw.subMu.RLock()
249257
defer gw.subMu.RUnlock()
250258

@@ -253,16 +261,8 @@ func (gw *Gateway) handler(p *paho.Publish) {
253261
return // nothing to do
254262
}
255263

256-
var value any
257-
if err := json.Unmarshal(p.Payload, &value); err != nil {
258-
gw.errCh <- &errMsg{topic: p.Topic, err: err}
259-
return
260-
}
261-
262-
// log.Printf("unmarshall payload %[1]v %[1]s value %[2]T %[2]v\n", p.Payload, payload)
263-
264264
for _, subscription := range subscriptions {
265-
subscription.hndCh <- &hndMsg{topic: topic, fn: subscription.fn, value: value}
265+
subscription.hndCh <- &hndMsg{topic: msg.Topic(), fn: subscription.fn, value: value}
266266
}
267267
}
268268

@@ -275,23 +275,17 @@ func (gw *Gateway) publish(wg *sync.WaitGroup, pubCh <-chan *pubMsg, errCh chan<
275275
continue // nothing to publish
276276
}
277277

278-
gw.logger.Printf("publish: topic %s value %v", msg.topic, msg.value)
278+
gw.logger.Printf("publish: topic %s retain %t value %v\n", msg.topic, msg.retain, msg.value)
279279

280280
payload, err := json.Marshal(msg.value)
281281
if err != nil {
282282
errCh <- &errMsg{topic: msg.topic, err: err}
283283
continue
284284
}
285285

286-
publish := &paho.Publish{
287-
QoS: 1, // QoS == 1
288-
Retain: true, // retain msg, so that new joiners will get the latest message
289-
Topic: msg.topic,
290-
Payload: payload,
291-
}
292-
293-
if _, err := gw.connectionManager.Publish(context.Background(), publish); err != nil {
294-
errCh <- &errMsg{topic: msg.topic, err: err}
286+
token := gw.client.Publish(msg.topic, defaultQoS, msg.retain, payload)
287+
if token.Wait() && token.Error() != nil {
288+
errCh <- &errMsg{topic: msg.topic, err: token.Error()}
295289
}
296290
}
297291
}
@@ -307,24 +301,18 @@ func (gw *Gateway) publishError(wg *sync.WaitGroup, errCh <-chan *errMsg) {
307301

308302
for msg := range errCh {
309303

310-
gw.logger.Printf("publish error: %s", msg.err)
304+
gw.logger.Printf("publish: topic %s retain %t error %s\n", msg.topic, msg.retain, msg.err)
311305

312306
payload, err := json.Marshal(&errPayload{Topic: msg.topic, Error: msg.err.Error()})
313307
if err != nil {
314308
// hm, we can only log...
315309
gw.logger.Printf("publish error: topic %s err %s", msg.topic, err)
316310
}
317311

318-
publish := &paho.Publish{
319-
QoS: 1, // QoS == 1
320-
Retain: false,
321-
Topic: gw.errorTopic,
322-
Payload: payload,
323-
}
324-
325-
if _, err := gw.connectionManager.Publish(context.Background(), publish); err != nil {
312+
token := gw.client.Publish(gw.errorTopic, defaultQoS, msg.retain, payload)
313+
if token.Wait() && token.Error() != nil {
326314
// hm, we can only log...
327-
gw.logger.Printf("publish error: topic %s error %s", msg.topic, err)
315+
gw.logger.Printf("publish error: topic %s err %s", msg.topic, token.Error())
328316
}
329317
}
330318
}

gateway/topic.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func (t topic) noRoot() string {
6565
return joinTopic(t[1:]...)
6666
}
6767

68+
// isCommand returns true if the topic is a command topic.
69+
func (t topic) isCommand() bool {
70+
return len(t) == maxNumPart
71+
}
72+
6873
// noCommand returns topic without command part.
6974
func (t topic) noCommand() string {
7075
l := len(t)

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ go 1.19
66
// replace github.com/pico-cs/go-client/client => ../go-client/client
77

88
require (
9-
github.com/eclipse/paho.golang v0.10.0
9+
github.com/eclipse/paho.mqtt.golang v1.4.2
1010
github.com/pico-cs/go-client v0.1.12
11-
golang.org/x/exp v0.0.0-20221208044002-44028be4359e
11+
golang.org/x/exp v0.0.0-20221211133740-4296e2f59697
1212
gopkg.in/yaml.v3 v3.0.1
1313
)
1414

1515
require (
1616
github.com/creack/goselect v0.1.2 // indirect
1717
github.com/gorilla/websocket v1.5.0 // indirect
1818
go.bug.st/serial v1.4.1 // indirect
19+
golang.org/x/net v0.4.0 // indirect
1920
golang.org/x/sync v0.1.0 // indirect
2021
golang.org/x/sys v0.3.0 // indirect
2122
)

go.sum

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,32 @@
11
github.com/creack/goselect v0.1.2 h1:2DNy14+JPjRBgPzAd1thbQp4BSIihxcBf0IXhQXDRa0=
22
github.com/creack/goselect v0.1.2/go.mod h1:a/NhLweNvqIYMuxcMOuWY516Cimucms3DglDzQP3hKY=
33
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
4-
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5-
github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q=
6-
github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
7-
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
8-
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
4+
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
5+
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
96
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
107
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
118
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
129
github.com/pico-cs/go-client v0.1.12 h1:5pI6Fbv69c9Lus12eGZNG8KkXgU2v5ERO5Km/PoeZRs=
1310
github.com/pico-cs/go-client v0.1.12/go.mod h1:cTadZ2cRaR3RxGFbodrMZm9XlMmnLTaE9BW8fyvNaYA=
1411
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
15-
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
16-
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
1712
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
18-
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
1913
go.bug.st/serial v1.4.1 h1:AwYUNixVf90XymNeJaUkMrPp+GZQe3RMFQmpVdHIUK8=
2014
go.bug.st/serial v1.4.1/go.mod h1:z8CesKorE90Qr/oRSJiEuvzYRKol9r/anJZEb5kt304=
21-
golang.org/x/exp v0.0.0-20221208044002-44028be4359e h1:lTjJJUAuWTLRn0pXoNLiVZIFYOIpvmg3MxmZxgO09bM=
22-
golang.org/x/exp v0.0.0-20221208044002-44028be4359e/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
23-
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
15+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
16+
golang.org/x/exp v0.0.0-20221211133740-4296e2f59697 h1:8C7UQbJ2XKm4YTE6T3cINHaDUWdnmYLECqz/oHFNsCU=
17+
golang.org/x/exp v0.0.0-20221211133740-4296e2f59697/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
18+
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
19+
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
20+
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
21+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
2422
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
2523
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
24+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
25+
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
2626
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
2727
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
28-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
28+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
2929
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
3030
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
31-
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3231
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
3332
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 commit comments

Comments
 (0)