Skip to content

Commit 7c9332d

Browse files
authored
Merge pull request #7 from grafana/minor_cleanups
Minor cleanups
2 parents e3d2153 + fc5be30 commit 7c9332d

File tree

5 files changed

+78
-88
lines changed

5 files changed

+78
-88
lines changed

pkg/mqtt/client.go

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -86,45 +86,45 @@ func (c *Client) Stream() chan StreamMessage {
8686
return c.stream
8787
}
8888

89-
func (c *Client) HandleMessage(client paho.Client, msg paho.Message) {
89+
func (c *Client) HandleMessage(_ paho.Client, msg paho.Message) {
9090
log.DefaultLogger.Debug(fmt.Sprintf("Received MQTT Message for topic %s", msg.Topic()))
9191
topic, ok := c.topics.Load(msg.Topic())
92+
if !ok {
93+
return
94+
}
95+
96+
// store message for query
97+
message := Message{
98+
Timestamp: time.Now(),
99+
Value: string(msg.Payload()),
100+
}
101+
topic.messages = append(topic.messages, message)
102+
103+
// limit the size of the retained messages
104+
if len(topic.messages) > 1000 {
105+
topic.messages = topic.messages[1:]
106+
}
107+
108+
c.topics.Store(topic)
92109

93-
if ok {
94-
// store message for query
95-
message := Message{
96-
Timestamp: time.Now(),
97-
Value: string(msg.Payload()),
98-
}
99-
topic.messages = append(topic.messages, message)
100-
101-
// limit the size of the retained messages
102-
if len(topic.messages) > 1000 {
103-
topic.messages = topic.messages[1:]
104-
}
105-
106-
c.topics.Store(topic)
107-
108-
streamMessage := StreamMessage{Topic: msg.Topic(), Value: string(msg.Payload())}
109-
select {
110-
case c.stream <- streamMessage:
111-
default:
112-
// don't block if nothing is reading from the channel
113-
}
110+
streamMessage := StreamMessage{Topic: msg.Topic(), Value: string(msg.Payload())}
111+
select {
112+
case c.stream <- streamMessage:
113+
default:
114+
// don't block if nothing is reading from the channel
114115
}
115116
}
116117

117118
func (c *Client) Subscribe(t string) {
118-
_, ok := c.topics.Load(t)
119-
120-
if !ok {
121-
log.DefaultLogger.Debug(fmt.Sprintf("Subscribing to MQTT topic: %s", t))
122-
topic := Topic{
123-
path: t,
124-
}
125-
c.topics.Store(&topic)
126-
c.client.Subscribe(t, 0, c.HandleMessage)
119+
if _, ok := c.topics.Load(t); ok {
120+
return
121+
}
122+
log.DefaultLogger.Debug(fmt.Sprintf("Subscribing to MQTT topic: %s", t))
123+
topic := Topic{
124+
path: t,
127125
}
126+
c.topics.Store(&topic)
127+
c.client.Subscribe(t, 0, c.HandleMessage)
128128
}
129129

130130
func (c *Client) Unsubscribe(t string) {

pkg/plugin/datasource.go

Lines changed: 41 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package plugin
33
import (
44
"context"
55
"encoding/json"
6-
"errors"
76
"fmt"
87
"time"
98

@@ -14,6 +13,29 @@ import (
1413
"github.com/grafana/mqtt-datasource/pkg/mqtt"
1514
)
1615

16+
// NewMQTTDatasource creates a new datasource instance.
17+
func NewMQTTInstance(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
18+
settings, err := getDatasourceSettings(s)
19+
if err != nil {
20+
return nil, err
21+
}
22+
23+
client, err := mqtt.NewClient(*settings)
24+
if err != nil {
25+
return nil, err
26+
}
27+
28+
return NewMQTTDatasource(client, s.UID), nil
29+
}
30+
31+
func getDatasourceSettings(s backend.DataSourceInstanceSettings) (*mqtt.Options, error) {
32+
settings := &mqtt.Options{}
33+
if err := json.Unmarshal(s.JSONData, settings); err != nil {
34+
return nil, err
35+
}
36+
return settings, nil
37+
}
38+
1739
type MQTTClient interface {
1840
Stream() chan mqtt.StreamMessage
1941
IsConnected() bool
@@ -26,18 +48,9 @@ type MQTTClient interface {
2648
type MQTTDatasource struct {
2749
Client MQTTClient
2850
channelPrefix string
29-
closeCh chan struct{}
3051
}
3152

32-
func GetDatasourceSettings(s backend.DataSourceInstanceSettings) (*mqtt.Options, error) {
33-
settings := &mqtt.Options{}
34-
if err := json.Unmarshal(s.JSONData, settings); err != nil {
35-
return nil, err
36-
}
37-
return settings, nil
38-
}
39-
40-
// Make sure SampleDatasource implements required interfaces.
53+
// Make sure MQTTDatasource implements required interfaces.
4154
// This is important to do since otherwise we will only get a
4255
// not implemented error response from plugin in runtime.
4356
var (
@@ -48,38 +61,22 @@ var (
4861
)
4962

5063
// NewMQTTDatasource creates a new datasource instance.
51-
func NewMQTTDatasource(client MQTTClient, id int64) *MQTTDatasource {
64+
func NewMQTTDatasource(client MQTTClient, uid string) *MQTTDatasource {
5265
return &MQTTDatasource{
5366
Client: client,
54-
channelPrefix: fmt.Sprintf("ds/%d/", id),
55-
closeCh: make(chan struct{}),
67+
channelPrefix: fmt.Sprintf("ds/%s/", uid),
5668
}
5769
}
5870

59-
// NewMQTTDatasource creates a new datasource instance.
60-
func NewMQTTInstance(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
61-
settings, err := GetDatasourceSettings(s)
62-
if err != nil {
63-
return nil, err
64-
}
65-
66-
client, err := mqtt.NewClient(*settings)
67-
if err != nil {
68-
return nil, err
69-
}
70-
71-
return NewMQTTDatasource(client, s.ID), nil
72-
}
73-
7471
// Dispose here tells plugin SDK that plugin wants to clean up resources
7572
// when a new instance created. As soon as datasource settings change detected
7673
// by SDK old datasource instance will be disposed and a new one will be created
77-
// using NewSampleDatasource factory function.
74+
// using NewMQTTDatasource factory function.
7875
func (ds *MQTTDatasource) Dispose() {
79-
close(ds.closeCh)
76+
// Nothing to clean up yet.
8077
}
8178

82-
func (ds *MQTTDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
79+
func (ds *MQTTDatasource) QueryData(_ context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
8380
response := backend.NewQueryDataResponse()
8481

8582
for _, q := range req.Queries {
@@ -90,7 +87,7 @@ func (ds *MQTTDatasource) QueryData(ctx context.Context, req *backend.QueryDataR
9087
return response, nil
9188
}
9289

93-
func (ds *MQTTDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
90+
func (ds *MQTTDatasource) CheckHealth(_ context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
9491
if !ds.Client.IsConnected() {
9592
return &backend.CheckHealthResult{
9693
Status: backend.HealthStatusError,
@@ -104,22 +101,18 @@ func (ds *MQTTDatasource) CheckHealth(ctx context.Context, req *backend.CheckHea
104101
}, nil
105102
}
106103

107-
func (ds *MQTTDatasource) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
108-
ds.Client.Subscribe(req.Path)
109-
104+
func (ds *MQTTDatasource) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
110105
return &backend.SubscribeStreamResponse{
111106
Status: backend.SubscribeStreamStatusOK,
112107
}, nil
113108
}
114109

115110
func (ds *MQTTDatasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
111+
ds.Client.Subscribe(req.Path)
116112
defer ds.Client.Unsubscribe(req.Path)
117113

118114
for {
119115
select {
120-
case <-ds.closeCh:
121-
log.DefaultLogger.Info("Datasource restart")
122-
return errors.New("datasource closed")
123116
case <-ctx.Done():
124117
backend.Logger.Info("stop streaming (context canceled)")
125118
return nil
@@ -135,17 +128,17 @@ func (ds *MQTTDatasource) RunStream(ctx context.Context, req *backend.RunStreamR
135128
}
136129
}
137130

138-
func (ds *MQTTDatasource) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
131+
func (ds *MQTTDatasource) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
139132
return &backend.PublishStreamResponse{
140-
Status: backend.PublishStreamStatusPermissionDenied, // ?? Unsupported
133+
Status: backend.PublishStreamStatusPermissionDenied,
141134
}, nil
142135
}
143136

144137
type queryModel struct {
145138
Topic string `json:"queryText"`
146139
}
147140

148-
func (m *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse {
141+
func (ds *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse {
149142
var qm queryModel
150143

151144
response := backend.DataResponse{}
@@ -155,10 +148,10 @@ func (m *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse {
155148
return response
156149
}
157150

158-
// ensure the client is subscribed to the topic
159-
m.Client.Subscribe(qm.Topic)
151+
// ensure the client is subscribed to the topic.
152+
ds.Client.Subscribe(qm.Topic)
160153

161-
messages, ok := m.Client.Messages(qm.Topic)
154+
messages, ok := ds.Client.Messages(qm.Topic)
162155
if !ok {
163156
return response
164157
}
@@ -167,16 +160,16 @@ func (m *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse {
167160

168161
if qm.Topic != "" {
169162
frame.SetMeta(&data.FrameMeta{
170-
Channel: m.channelPrefix + qm.Topic,
163+
Channel: ds.channelPrefix + qm.Topic,
171164
})
172165
}
173166

174167
response.Frames = append(response.Frames, frame)
175168
return response
176169
}
177170

178-
func (m *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
179-
if !m.Client.IsSubscribed(req.Path) {
171+
func (ds *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
172+
if !ds.Client.IsSubscribed(req.Path) {
180173
return nil
181174
}
182175

pkg/plugin/datasource_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func TestCheckHealthHandler(t *testing.T) {
1515
ds := plugin.NewMQTTDatasource(&fakeMQTTClient{
1616
connected: true,
1717
subscribed: false,
18-
}, 5)
18+
}, "xyz")
1919

2020
res, _ := ds.CheckHealth(
2121
context.Background(),
@@ -30,7 +30,7 @@ func TestCheckHealthHandler(t *testing.T) {
3030
ds := plugin.NewMQTTDatasource(&fakeMQTTClient{
3131
connected: false,
3232
subscribed: false,
33-
}, 5)
33+
}, "xyz")
3434

3535
res, _ := ds.CheckHealth(
3636
context.Background(),
@@ -63,6 +63,6 @@ func (c *fakeMQTTClient) Stream() chan mqtt.StreamMessage {
6363
return make(chan mqtt.StreamMessage)
6464
}
6565

66-
func (c *fakeMQTTClient) Subscribe(topic string) {}
66+
func (c *fakeMQTTClient) Subscribe(_ string) {}
6767

68-
func (c *fakeMQTTClient) Unsubscribe(topic string) {}
68+
func (c *fakeMQTTClient) Unsubscribe(_ string) {}

pkg/plugin/message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func ToFrame(topic string, messages []mqtt.Message) *data.Frame {
3838

3939
func jsonMessagesToFrame(topic string, messages []mqtt.Message) *data.Frame {
4040
count := len(messages)
41-
if count < 1 {
41+
if count == 0 {
4242
return nil
4343
}
4444

@@ -66,7 +66,7 @@ func jsonMessagesToFrame(topic string, messages []mqtt.Message) *data.Frame {
6666
fields[key] = field
6767
keys = append(keys, key)
6868
}
69-
sort.Strings(keys) // keeys stable field order
69+
sort.Strings(keys) // keys stable field order.
7070

7171
// Add rows 1...n
7272
for row, m := range messages {

pkg/plugin/message_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ func TestJSONValuesMessage(t *testing.T) {
4242
})
4343
numFields := len(values) + 1
4444
require.NotNil(t, frame)
45-
str, err := frame.StringTable(numFields, 1)
46-
require.NoError(t, err)
47-
fmt.Printf("FRAME: %s", str)
4845
require.Equal(t, numFields, len(frame.Fields))
4946
v, ok := frame.Fields[0].ConcreteAt(0)
5047
require.Equal(t, true, ok)

0 commit comments

Comments
 (0)