Skip to content

Commit 09f8970

Browse files
committed
Added hub
Signed-off-by: Vishal Rana <[email protected]>
1 parent d280fe0 commit 09f8970

File tree

3 files changed

+129
-2
lines changed

3 files changed

+129
-2
lines changed

Gopkg.lock

Lines changed: 14 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,7 @@
4444
[prune]
4545
go-tests = true
4646
unused-packages = true
47+
48+
[[constraint]]
49+
name = "github.com/eclipse/paho.mqtt.golang"
50+
version = "1.1.1"

hub/hub.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package hub
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"time"
7+
8+
mqtt "github.com/eclipse/paho.mqtt.golang"
9+
)
10+
11+
type (
12+
Hub struct {
13+
Options
14+
accountID string
15+
apiKey string
16+
client mqtt.Client
17+
}
18+
19+
Options struct {
20+
DeviceID string
21+
MessageHandler MessageHandler
22+
}
23+
24+
ConnectHandler func()
25+
26+
MessageHandler func(topic string, message []byte)
27+
)
28+
29+
func New(accountID, apiKey string) *Hub {
30+
return NewWithOptions(accountID, apiKey, Options{})
31+
}
32+
33+
func NewWithOptions(accountID, apiKey string, options Options) *Hub {
34+
h := &Hub{
35+
accountID: accountID,
36+
apiKey: apiKey,
37+
}
38+
h.Options = options
39+
return h
40+
}
41+
42+
func (h *Hub) normalizeTopic(topic string) string {
43+
return fmt.Sprintf("%s/%s", h.accountID, topic)
44+
}
45+
46+
func (h *Hub) denormalizeTopic(topic string) string {
47+
return strings.TrimPrefix(topic, h.accountID+"/")
48+
}
49+
50+
func (h *Hub) Connect() error {
51+
return h.ConnectWithHandler(nil)
52+
}
53+
54+
func (h *Hub) ConnectWithHandler(handler ConnectHandler) error {
55+
o := mqtt.NewClientOptions().
56+
AddBroker("tcp://hub.labstack.com:1883").
57+
SetUsername(h.accountID).
58+
SetPassword(h.apiKey)
59+
if h.DeviceID != "" {
60+
o.SetClientID(h.DeviceID)
61+
}
62+
if handler != nil {
63+
o.OnConnect = func(_ mqtt.Client) {
64+
handler()
65+
}
66+
}
67+
h.client = mqtt.NewClient(o)
68+
t := h.client.Connect()
69+
t.Wait()
70+
return t.Error()
71+
}
72+
73+
func (h *Hub) Publish(topic string, message []byte) error {
74+
t := h.client.Publish(h.normalizeTopic(topic), 0, false, message)
75+
t.Wait()
76+
return t.Error()
77+
}
78+
79+
func (h *Hub) Subscribe(topic string) error {
80+
return h.SubscribeWithHandler(topic, nil)
81+
}
82+
83+
func (h *Hub) SubscribeWithHandler(topic string, handler MessageHandler) error {
84+
t := h.client.Subscribe(h.normalizeTopic(topic), 0, func(_ mqtt.Client, m mqtt.Message) {
85+
topic := h.denormalizeTopic(m.Topic())
86+
if handler != nil {
87+
handler(topic, m.Payload())
88+
}
89+
if h.MessageHandler != nil {
90+
h.MessageHandler(topic, m.Payload())
91+
}
92+
})
93+
t.Wait()
94+
return t.Error()
95+
}
96+
97+
func (h *Hub) Unsubscribe(topic string) error {
98+
t := h.client.Unsubscribe(h.normalizeTopic(topic))
99+
t.Wait()
100+
return t.Error()
101+
}
102+
103+
func (h *Hub) Disconnect() {
104+
h.client.Disconnect(1000)
105+
}
106+
107+
func (h *Hub) Run() {
108+
for {
109+
time.Sleep(time.Second)
110+
}
111+
}

0 commit comments

Comments
 (0)