@@ -31,7 +31,7 @@ class SensorsClient():
31
31
def __init__ (self ):
32
32
"""Initialize the bus and sensor info and start monitoring sensor states"""
33
33
self .sensorMutex = RLock ()
34
- self .digitalMutex = RLock ()
34
+ self .realTimeMutex = RLock ()
35
35
self .exiting = Event ()
36
36
self .onDataChanged = None
37
37
self .systemData = []
@@ -63,6 +63,19 @@ def SetDataChanged(self, onDataChanged=None):
63
63
"""
64
64
self .onDataChanged = onDataChanged
65
65
66
+ def QueueRealTimeData (self , name , data ):
67
+ """Add real-time data to queue to be sent on thread
68
+
69
+ Args:
70
+ name: The name to use for the data
71
+ data: The data to send
72
+ """
73
+ with self .realTimeMutex :
74
+ if name not in self .currentRealTimeData :
75
+ self .currentRealTimeData [name ] = data
76
+ else :
77
+ self .queuedRealTimeData [name ] = data
78
+
66
79
def OnSensorChange (self , device , value ):
67
80
"""Callback that is called when digital sensor data has changed
68
81
@@ -71,14 +84,11 @@ def OnSensorChange(self, device, value):
71
84
value: The new data value
72
85
"""
73
86
debug ('OnSensorChange: {}, {}' .format (device , value ))
74
- with self .digitalMutex :
87
+ with self .realTimeMutex :
75
88
data = {'name' : device ['description' ], 'value' : value , 'type' : 'digital_sensor' , 'unit' : 'd' }
76
89
if 'args' in device :
77
90
data ['args' ] = device ['args' ]
78
- if device ['name' ] not in self .currentRealTimeData :
79
- self .currentRealTimeData [device ['name' ]] = data
80
- else :
81
- self .queuedRealTimeData [device ['name' ]] = data
91
+ self .QueueRealTimeData (device ['name' ], data )
82
92
83
93
def OnPluginChange (self , data ):
84
94
"""Callback that is called when digital sensor data has changed
@@ -87,14 +97,26 @@ def OnPluginChange(self, data):
87
97
data: The new data value
88
98
"""
89
99
debug ('OnPluginChange: {}' .format (data ))
90
- with self .digitalMutex :
91
- if data ['id' ] not in self .currentRealTimeData :
92
- self .currentRealTimeData [data ['id' ]] = data
93
- else :
94
- self .queuedRealTimeData [data ['id' ]] = data
100
+ self .QueueRealTimeData (data ['id' ], data )
101
+ with self .realTimeMutex :
95
102
if not self .realTimeMonitorRunning :
96
103
ThreadPool .Submit (self .RealTimeMonitor )
97
104
105
+ def OnGpioStateChange (self , channel , value ):
106
+ """Send updated pin state when it has changed
107
+
108
+ Args:
109
+ channel: The pin number
110
+ value: The new value for the pin
111
+ """
112
+ debug ('OnGpioStateChange: channel {}, value {}' .format (channel , value ))
113
+ data = []
114
+ cayennemqtt .DataChannel .add_unique (data , cayennemqtt .SYS_GPIO , channel , cayennemqtt .VALUE , value )
115
+ if not self .realTimeMonitorRunning :
116
+ self .onDataChanged (data )
117
+ else :
118
+ self .QueueRealTimeData (data [0 ]['channel' ], data [0 ])
119
+
98
120
def InitCallbacks (self ):
99
121
"""Set callback function for any digital devices that support them"""
100
122
devices = manager .getDeviceList ()
@@ -164,26 +186,33 @@ def RealTimeMonitor(self):
164
186
if not self .exiting .wait (0.5 ):
165
187
if datetime .now () > nextTime :
166
188
nextTime = datetime .now () + timedelta (seconds = REAL_TIME_FREQUENCY )
167
- data = []
168
- with self .digitalMutex :
169
- if self .currentRealTimeData :
170
- for name , item in self .currentRealTimeData .items ():
171
- cayennemqtt .DataChannel .add_unique (data , cayennemqtt .DEV_SENSOR , name , value = item ['value' ], name = item ['name' ], type = item ['type' ], unit = item ['unit' ])
172
- try :
173
- cayennemqtt .DataChannel .add_unique (data , cayennemqtt .SYS_GPIO , item ['args' ]['channel' ], cayennemqtt .VALUE , item ['value' ])
174
- except :
175
- pass
176
- if name in self .queuedRealTimeData and self .queuedRealTimeData [name ]['value' ] == item ['value' ]:
177
- del self .queuedRealTimeData [name ]
178
- self .currentRealTimeData = self .queuedRealTimeData
179
- self .queuedRealTimeData = {}
180
- if data :
181
- self .onDataChanged (data )
189
+ self .SendRealTimeData ()
182
190
except :
183
191
exception ('Monitoring real-time changes failed' )
184
192
debug ('Monitoring real-time changes finished' )
185
193
self .realTimeMonitorRunning = False
186
194
195
+ def SendRealTimeData (self ):
196
+ """Send real-time data via callback"""
197
+ data = []
198
+ with self .realTimeMutex :
199
+ if self .currentRealTimeData :
200
+ for name , item in self .currentRealTimeData .items ():
201
+ if cayennemqtt .SYS_GPIO in name :
202
+ data .append (item )
203
+ else :
204
+ cayennemqtt .DataChannel .add_unique (data , cayennemqtt .DEV_SENSOR , name , value = item ['value' ], name = item ['name' ], type = item ['type' ], unit = item ['unit' ])
205
+ try :
206
+ cayennemqtt .DataChannel .add_unique (data , cayennemqtt .SYS_GPIO , item ['args' ]['channel' ], cayennemqtt .VALUE , item ['value' ])
207
+ except :
208
+ pass
209
+ if name in self .queuedRealTimeData and self .queuedRealTimeData [name ]['value' ] == item ['value' ]:
210
+ del self .queuedRealTimeData [name ]
211
+ self .currentRealTimeData = self .queuedRealTimeData
212
+ self .queuedRealTimeData = {}
213
+ if data :
214
+ self .onDataChanged (data )
215
+
187
216
def MonitorSensors (self ):
188
217
"""Check sensor states for changes"""
189
218
if self .exiting .is_set ():
@@ -446,15 +475,20 @@ def GpioCommand(self, command, channel, value):
446
475
String containing command specific return value on success, or 'failure' on failure
447
476
"""
448
477
info ('GpioCommand {}, channel {}, value {}' .format (command , channel , value ))
478
+ result = 'failure'
449
479
if command == 'function' :
480
+ old_state = self .gpio .digitalRead (channel )
450
481
if value .lower () in ('in' , 'input' ):
451
- return str (self .gpio .setFunctionString (channel , 'in' ))
482
+ result = str (self .gpio .setFunctionString (channel , 'in' ))
452
483
elif value .lower () in ('out' , 'output' ):
453
- return str (self .gpio .setFunctionString (channel , 'out' ))
484
+ result = str (self .gpio .setFunctionString (channel , 'out' ))
485
+ new_state = self .gpio .digitalRead (channel )
486
+ if new_state != old_state :
487
+ self .OnGpioStateChange (channel , new_state )
454
488
elif command in ('value' , '' ):
455
489
return self .gpio .digitalWrite (channel , int (value ))
456
490
debug ('GPIO command failed' )
457
- return 'failure'
491
+ return result
458
492
459
493
def SensorCommand (self , command , sensorId , channel , value ):
460
494
"""Execute sensor/actuator command
0 commit comments