11package plugin
22
33import (
4+ "context"
45 "encoding/json"
6+ "errors"
57 "fmt"
68 "strconv"
79 "time"
810
911 "github.com/grafana/grafana-plugin-sdk-go/backend"
12+ "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
1013 "github.com/grafana/grafana-plugin-sdk-go/backend/log"
1114 "github.com/grafana/grafana-plugin-sdk-go/data"
1215 "github.com/grafana/mqtt-datasource/pkg/mqtt"
@@ -24,6 +27,7 @@ type MQTTClient interface {
2427type MQTTDatasource struct {
2528 Client MQTTClient
2629 channelPrefix string
30+ closeCh chan struct {}
2731}
2832
2933func GetDatasourceSettings (s backend.DataSourceInstanceSettings ) (* mqtt.Options , error ) {
@@ -34,6 +38,112 @@ func GetDatasourceSettings(s backend.DataSourceInstanceSettings) (*mqtt.Options,
3438 return settings , nil
3539}
3640
41+ // Make sure SampleDatasource implements required interfaces.
42+ // This is important to do since otherwise we will only get a
43+ // not implemented error response from plugin in runtime.
44+ var (
45+ _ backend.QueryDataHandler = (* MQTTDatasource )(nil )
46+ _ backend.CheckHealthHandler = (* MQTTDatasource )(nil )
47+ _ backend.StreamHandler = (* MQTTDatasource )(nil )
48+ _ instancemgmt.InstanceDisposer = (* MQTTDatasource )(nil )
49+ )
50+
51+ // NewMQTTDatasource creates a new datasource instance.
52+ func NewMQTTDatasource (client MQTTClient , id int64 ) * MQTTDatasource {
53+ return & MQTTDatasource {
54+ Client : client ,
55+ channelPrefix : fmt .Sprintf ("ds/%d/" , id ),
56+ closeCh : make (chan struct {}),
57+ }
58+ }
59+
60+ // NewMQTTDatasource creates a new datasource instance.
61+ func NewMQTTInstance (s backend.DataSourceInstanceSettings ) (instancemgmt.Instance , error ) {
62+ settings , err := GetDatasourceSettings (s )
63+ if err != nil {
64+ return nil , err
65+ }
66+
67+ client , err := mqtt .NewClient (* settings )
68+ if err != nil {
69+ return nil , err
70+ }
71+
72+ return NewMQTTDatasource (client , s .ID ), nil
73+ }
74+
75+ // Dispose here tells plugin SDK that plugin wants to clean up resources
76+ // when a new instance created. As soon as datasource settings change detected
77+ // by SDK old datasource instance will be disposed and a new one will be created
78+ // using NewSampleDatasource factory function.
79+ func (ds * MQTTDatasource ) Dispose () {
80+ close (ds .closeCh )
81+ }
82+
83+ func (ds * MQTTDatasource ) QueryData (ctx context.Context , req * backend.QueryDataRequest ) (* backend.QueryDataResponse , error ) {
84+ response := backend .NewQueryDataResponse ()
85+
86+ for _ , q := range req .Queries {
87+ res := ds .Query (q )
88+ response .Responses [q .RefID ] = res
89+ }
90+
91+ return response , nil
92+ }
93+
94+ func (ds * MQTTDatasource ) CheckHealth (ctx context.Context , req * backend.CheckHealthRequest ) (* backend.CheckHealthResult , error ) {
95+ if ! ds .Client .IsConnected () {
96+ return & backend.CheckHealthResult {
97+ Status : backend .HealthStatusError ,
98+ Message : "MQTT Disconnected" ,
99+ }, nil
100+ }
101+
102+ return & backend.CheckHealthResult {
103+ Status : backend .HealthStatusOk ,
104+ Message : "MQTT Connected" ,
105+ }, nil
106+ }
107+
108+ func (ds * MQTTDatasource ) SubscribeStream (ctx context.Context , req * backend.SubscribeStreamRequest ) (* backend.SubscribeStreamResponse , error ) {
109+ ds .Client .Subscribe (req .Path )
110+
111+ bytes , err := data .FrameToJSON (ToFrame ([]mqtt.Message {}), true , false ) // only schema
112+ if err != nil {
113+ return nil , err
114+ }
115+ return & backend.SubscribeStreamResponse {
116+ Status : backend .SubscribeStreamStatusOK ,
117+ Data : bytes , // just the schema
118+ }, nil
119+ }
120+
121+ func (ds * MQTTDatasource ) RunStream (ctx context.Context , req * backend.RunStreamRequest , sender backend.StreamPacketSender ) error {
122+ defer ds .Client .Unsubscribe (req .Path )
123+
124+ for {
125+ select {
126+ case <- ds .closeCh :
127+ log .DefaultLogger .Info ("Datasource restart" )
128+ return errors .New ("datasource closed" )
129+ case <- ctx .Done ():
130+ backend .Logger .Info ("stop streaming (context canceled)" )
131+ return nil
132+ case message := <- ds .Client .Stream ():
133+ err := ds .SendMessage (message , req , sender )
134+ if err != nil {
135+ log .DefaultLogger .Error (fmt .Sprintf ("unable to send message: %s" , err .Error ()))
136+ }
137+ }
138+ }
139+ }
140+
141+ func (ds * MQTTDatasource ) PublishStream (ctx context.Context , req * backend.PublishStreamRequest ) (* backend.PublishStreamResponse , error ) {
142+ return & backend.PublishStreamResponse {
143+ Status : backend .PublishStreamStatusPermissionDenied , // ?? Unsupported
144+ }, nil
145+ }
146+
37147type queryModel struct {
38148 Topic string `json:"queryText"`
39149}
@@ -61,25 +171,6 @@ func ToFrame(messages []mqtt.Message) *data.Frame {
61171 return frame
62172}
63173
64- func NewMQTTDatasource (s backend.DataSourceInstanceSettings ) (* MQTTDatasource , error ) {
65- settings , err := GetDatasourceSettings (s )
66- if err != nil {
67- return nil , err
68- }
69-
70- client , err := mqtt .NewClient (* settings )
71- if err != nil {
72- return nil , err
73- }
74-
75- ds := MQTTDatasource {
76- Client : client ,
77- channelPrefix : fmt .Sprintf ("ds/%d/" , s .ID ),
78- }
79-
80- return & ds , nil
81- }
82-
83174func (m * MQTTDatasource ) Query (query backend.DataQuery ) backend.DataResponse {
84175 var qm queryModel
85176
@@ -133,8 +224,3 @@ func (m *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunStr
133224 log .DefaultLogger .Debug (fmt .Sprintf ("Sending message to client for topic %s" , msg .Topic ))
134225 return sender .Send (packet )
135226}
136-
137- func (m * MQTTDatasource ) Dispose () {
138- // Called before creating a a new instance to allow plugin authors
139- // to cleanup.
140- }
0 commit comments