@@ -15,8 +15,10 @@ public class EasyModbus2Mqtt
15
15
private ModbusClient modbusClient = new ModbusClient ( ) ;
16
16
List < ReadOrder > readOrders = new List < ReadOrder > ( ) ;
17
17
string mqttBrokerAddress = "www.mqtt-dashboard.com" ;
18
+ int mqttBrokerPort = 1883 ;
18
19
string mqttRootTopic = "easymodbusclient" ;
19
20
uPLibrary . Networking . M2Mqtt . MqttClient mqttClient ;
21
+ public bool AutomaticReconnect { get ; set ; } = true ;
20
22
21
23
22
24
public EasyModbus2Mqtt ( )
@@ -114,14 +116,16 @@ public void AddReadOrder(FunctionCode functionCode, int quantity, int startingAd
114
116
115
117
public void start ( )
116
118
{
119
+
117
120
this . shouldStop = false ;
118
121
if ( mqttBrokerAddress == null )
119
122
throw new ArgumentOutOfRangeException ( "Mqtt Broker Address not initialized" ) ;
123
+ mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress , mqttBrokerPort , false , null , null , uPLibrary . Networking . M2Mqtt . MqttSslProtocols . None ) ;
124
+
125
+ string clientID = new Guid ( ) . ToString ( ) ;
126
+ mqttClient . Connect ( clientID ) ;
120
127
if ( ! modbusClient . Connected )
121
128
modbusClient . Connect ( ) ;
122
- mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress ) ;
123
- string clientID = new Guid ( ) . ToString ( ) ;
124
- mqttClient . Connect ( clientID ) ;
125
129
for ( int i = 0 ; i < readOrders . Count ; i ++ )
126
130
{
127
131
readOrders [ i ] . thread = new System . Threading . Thread ( new ParameterizedThreadStart ( ProcessData ) ) ;
@@ -137,7 +141,7 @@ public void publish(string[] topic, string[] payload, string mqttBrokerAddress)
137
141
mqttClient . Disconnect ( ) ;
138
142
if ( topic . Length != payload . Length )
139
143
throw new ArgumentOutOfRangeException ( "Array topic and payload must be the same size" ) ;
140
- mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress ) ;
144
+ mqttClient = new uPLibrary . Networking . M2Mqtt . MqttClient ( mqttBrokerAddress , mqttBrokerPort , false , null , null , uPLibrary . Networking . M2Mqtt . MqttSslProtocols . None ) ;
141
145
string clientID = Guid . NewGuid ( ) . ToString ( ) ;
142
146
if ( ! mqttClient . IsConnected )
143
147
mqttClient . Connect ( clientID ) ;
@@ -159,61 +163,92 @@ private void ProcessData(object param)
159
163
{
160
164
while ( ! shouldStop )
161
165
{
166
+
162
167
ReadOrder readOrder = ( ReadOrder ) param ;
163
168
lock ( lockProcessData )
164
169
{
165
-
166
- if ( readOrder . FunctionCode == FunctionCode . ReadCoils )
170
+ try
167
171
{
168
- bool [ ] value = modbusClient . ReadCoils ( readOrder . StartingAddress , readOrder . Quantity ) ;
169
- for ( int i = 0 ; i < value . Length ; i ++ )
172
+ if ( readOrder . FunctionCode == FunctionCode . ReadCoils )
170
173
{
171
- if ( readOrder . oldvalue [ i ] == null )
172
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
173
- else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
174
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
175
- readOrder . oldvalue [ i ] = value [ i ] ;
176
- }
174
+ bool [ ] value = modbusClient . ReadCoils ( readOrder . StartingAddress , readOrder . Quantity ) ;
175
+ for ( int i = 0 ; i < value . Length ; i ++ )
176
+ {
177
+ if ( readOrder . oldvalue [ i ] == null )
178
+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
179
+ else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
180
+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
181
+ readOrder . oldvalue [ i ] = value [ i ] ;
182
+ }
177
183
178
- }
179
- if ( readOrder . FunctionCode == FunctionCode . ReadDiscreteInputs )
180
- {
181
- bool [ ] value = modbusClient . ReadDiscreteInputs ( readOrder . StartingAddress , readOrder . Quantity ) ;
182
- for ( int i = 0 ; i < value . Length ; i ++ )
184
+ }
185
+ if ( readOrder . FunctionCode == FunctionCode . ReadDiscreteInputs )
183
186
{
184
- if ( readOrder . oldvalue [ i ] == null )
185
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
186
- else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
187
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
188
- readOrder . oldvalue [ i ] = value [ i ] ;
187
+ bool [ ] value = modbusClient . ReadDiscreteInputs ( readOrder . StartingAddress , readOrder . Quantity ) ;
188
+ for ( int i = 0 ; i < value . Length ; i ++ )
189
+ {
190
+ if ( readOrder . oldvalue [ i ] == null )
191
+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
192
+ else if ( ( bool ) readOrder . oldvalue [ i ] != value [ i ] )
193
+ mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
194
+ readOrder . oldvalue [ i ] = value [ i ] ;
195
+ }
189
196
}
190
- }
191
- if ( readOrder . FunctionCode == FunctionCode . ReadHoldingRegisters )
192
- {
193
- int [ ] value = modbusClient . ReadHoldingRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
194
- for ( int i = 0 ; i < value . Length ; i ++ )
197
+ if ( readOrder . FunctionCode == FunctionCode . ReadHoldingRegisters )
195
198
{
196
- if ( readOrder . oldvalue [ i ] == null )
197
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
198
- else if ( ( int ) readOrder . oldvalue [ i ] != value [ i ] )
199
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
200
- readOrder . oldvalue [ i ] = value [ i ] ;
199
+ int [ ] value = modbusClient . ReadHoldingRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
200
+ for ( int i = 0 ; i < value . Length ; i ++ )
201
+ {
202
+ float scale = readOrder . Scale != null ? ( readOrder . Scale [ i ] == 0 ) ? 1 : readOrder . Scale [ i ] : 1 ;
203
+ if ( readOrder . oldvalue [ i ] == null )
204
+ {
205
+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
206
+ readOrder . oldvalue [ i ] = value [ i ] ;
207
+ }
208
+ else if ( ( ( int ) readOrder . oldvalue [ i ] != value [ i ] ) && ( readOrder . Hysteresis != null ? ( ( value [ i ] < ( int ) readOrder . oldvalue [ i ] - ( int ) readOrder . Hysteresis [ i ] ) | ( value [ i ] > ( int ) readOrder . oldvalue [ i ] + ( int ) readOrder . Hysteresis [ i ] ) ) : true ) )
209
+ {
210
+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
211
+ readOrder . oldvalue [ i ] = value [ i ] ;
212
+ }
213
+ }
201
214
}
202
- }
203
215
204
- if ( readOrder . FunctionCode == FunctionCode . ReadInputRegisters )
216
+ if ( readOrder . FunctionCode == FunctionCode . ReadInputRegisters )
217
+ {
218
+ int [ ] value = modbusClient . ReadInputRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
219
+ for ( int i = 0 ; i < value . Length ; i ++ )
220
+ {
221
+ float scale = readOrder . Scale != null ? ( readOrder . Scale [ i ] == 0 ) ? 1 : readOrder . Scale [ i ] : 1 ;
222
+ if ( readOrder . oldvalue [ i ] == null )
223
+ {
224
+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
225
+ readOrder . oldvalue [ i ] = value [ i ] ;
226
+ }
227
+ else if ( ( ( int ) readOrder . oldvalue [ i ] != value [ i ] ) && ( readOrder . Hysteresis != null ? ( ( value [ i ] < ( int ) readOrder . oldvalue [ i ] - ( int ) readOrder . Hysteresis [ i ] ) | ( value [ i ] > ( int ) readOrder . oldvalue [ i ] + ( int ) readOrder . Hysteresis [ i ] ) ) : true ) )
228
+ {
229
+ mqttClient . Publish ( readOrder . Topic [ i ] , ( readOrder . Unit == null ? Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) ) : Encoding . UTF8 . GetBytes ( ( ( float ) value [ i ] * scale ) . ToString ( ) + " " + readOrder . Unit [ i ] ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
230
+ readOrder . oldvalue [ i ] = value [ i ] ;
231
+ }
232
+ }
233
+ }
234
+ }
235
+ catch ( Exception exc )
205
236
{
206
- int [ ] value = modbusClient . ReadInputRegisters ( readOrder . StartingAddress , readOrder . Quantity ) ;
207
- for ( int i = 0 ; i < value . Length ; i ++ )
237
+ modbusClient . Disconnect ( ) ;
238
+ Thread . Sleep ( 2000 ) ;
239
+ if ( ! AutomaticReconnect )
240
+ throw exc ;
241
+ if ( ! modbusClient . Connected )
208
242
{
209
- if ( readOrder . oldvalue [ i ] == null )
210
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
211
- else if ( ( int ) readOrder . oldvalue [ i ] != value [ i ] )
212
- mqttClient . Publish ( readOrder . Topic [ i ] , Encoding . UTF8 . GetBytes ( value [ i ] . ToString ( ) ) , MqttMsgBase . QOS_LEVEL_EXACTLY_ONCE , false ) ;
213
- readOrder . oldvalue [ i ] = value [ i ] ;
243
+ try
244
+ {
245
+
246
+ modbusClient . Connect ( ) ;
247
+ }
248
+ catch ( Exception ) { }
214
249
}
215
250
}
216
- }
251
+ }
217
252
System . Threading . Thread . Sleep ( readOrder . CylceTime ) ;
218
253
}
219
254
}
@@ -231,6 +266,18 @@ public string MqttBrokerAddress
231
266
}
232
267
}
233
268
269
+ public int MqttBrokerPort
270
+ {
271
+ get
272
+ {
273
+ return this . mqttBrokerPort ;
274
+ }
275
+ set
276
+ {
277
+ this . mqttBrokerPort = value ;
278
+ }
279
+ }
280
+
234
281
public string MqttRootTopic
235
282
{
236
283
get
@@ -396,13 +443,16 @@ public string SerialPort
396
443
397
444
public class ReadOrder
398
445
{
399
- public FunctionCode FunctionCode ; //Function Code to execute
400
- public int CylceTime = 500 ; //Polling intervall in ms
401
- public int StartingAddress ; //First Modbus Register to Read (0-based)
446
+ public FunctionCode FunctionCode ; //Function Code to execute
447
+ public int CylceTime = 500 ; //Polling intervall in ms
448
+ public int StartingAddress ; //First Modbus Register to Read (0-based)
402
449
public int Quantity ;
403
- public string [ ] Topic ; //Symbolnames can by replaced, by default we Push to the topic e.g. for Coils: /modbusclient/coils/1
450
+ public string [ ] Topic ; //Symbolnames can by replaced, by default we Push to the topic e.g. for Coils: /modbusclient/coils/1
451
+ public int [ ] Hysteresis ; //Values for 16-Bit Registers will be published of the dieffreence is greater than Hysteresis
452
+ public string [ ] Unit ; //Unit for Analog Values (Holding Registers and Input Registers)
453
+ public float [ ] Scale ; //Scale for Analog Values (Holding Registers and Input Registers)
404
454
internal System . Threading . Thread thread ;
405
- internal object [ ] oldvalue ;
455
+ internal object [ ] oldvalue ;
406
456
}
407
457
408
458
public enum FunctionCode
0 commit comments