@@ -63,14 +63,14 @@ func New(config *Config, logger Logger, metrics Metrics) *MQTT {
6363 client := mqtt .NewClient (options )
6464
6565 if token := client .Connect (); token .Wait () && token .Error () != nil {
66- logger .Errorf ("cannot connect to MQTT, HostName : %v, Port : %v, error : %v" , config .Hostname , config .Port , token .Error ())
66+ logger .Errorf ("cannot connect to MQTT, host : %v, port : %v, error: %v" , config .Hostname , config .Port , token .Error ())
6767
6868 return & MQTT {Client : client , config : config , logger : logger }
6969 }
7070
7171 msg := make (map [string ]chan * pubsub.Message )
7272
73- logger .Debugf ("connected to MQTT, HostName : %v, Port : %v" , config .Hostname , config .Port )
73+ logger .Debugf ("connected to MQTT, host : %v, port : %v" , config .Hostname , config .Port )
7474
7575 return & MQTT {Client : client , config : config , logger : logger , msgChanMap : msg , mu : new (sync.RWMutex ), metrics : metrics }
7676}
@@ -82,15 +82,13 @@ func getDefaultClient(config *Config, logger Logger, metrics Metrics) *MQTT {
8282 clientID = getClientID (config .ClientID )
8383 )
8484
85- logger .Debugf ("using %v clientID for this session" , clientID )
86-
8785 opts := mqtt .NewClientOptions ()
8886 opts .AddBroker (fmt .Sprintf ("tcp://%s:%d" , host , port ))
8987 opts .SetClientID (clientID )
9088 client := mqtt .NewClient (opts )
9189
9290 if token := client .Connect (); token .Wait () && token .Error () != nil {
93- logger .Errorf ("cannot connect to MQTT, HostName : %v, Port : %v, error : %v" , host , port , token .Error ())
91+ logger .Errorf ("cannot connect to MQTT, host : %v, port : %v, error: %v" , host , port , token .Error ())
9492
9593 return & MQTT {Client : client , config : config , logger : logger }
9694 }
@@ -102,6 +100,7 @@ func getDefaultClient(config *Config, logger Logger, metrics Metrics) *MQTT {
102100 msg := make (map [string ]chan * pubsub.Message )
103101
104102 logger .Debugf ("connected to MQTT, HostName : %v, Port : %v" , config .Hostname , config .Port )
103+ logger .Debugf ("using %v clientID for this MQTT session" , clientID )
105104
106105 return & MQTT {Client : client , config : config , logger : logger , msgChanMap : msg , mu : new (sync.RWMutex ), metrics : metrics }
107106}
@@ -174,9 +173,13 @@ func (m *MQTT) Subscribe(ctx context.Context, topic string) (*pubsub.Message, er
174173 token := m .Client .Subscribe (topic , m .config .QoS , handler )
175174
176175 if token .Wait () && token .Error () != nil {
176+ m .logger .Errorf ("error getting a message from MQTT, err: %v" , token .Error ())
177+
177178 return nil , token .Error ()
178179 }
179180
181+ m .logger .Debugf ("received mqtt message %v on topic %v" , string (messg .Value ), topic )
182+
180183 m .metrics .IncrementCounter (ctx , "app_pubsub_subscribe_success_count" , "topic" , topic )
181184
182185 // blocks if there are no messages in the channel
@@ -196,6 +199,8 @@ func (m *MQTT) Publish(ctx context.Context, topic string, message []byte) error
196199 return token .Error ()
197200 }
198201
202+ m .logger .Debugf ("published message %v on topic %v" , string (message ), topic )
203+
199204 m .metrics .IncrementCounter (ctx , "app_pubsub_publish_success_count" , "topic" , topic )
200205
201206 return nil
0 commit comments