Skip to content

Commit 3a1a65e

Browse files
committed
Implement priority queue for sending messages
1 parent 05c86e3 commit 3a1a65e

File tree

7 files changed

+62
-13
lines changed

7 files changed

+62
-13
lines changed

plugwise/constants.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@
136136
# Default sleep between sending messages
137137
SLEEP_TIME = 150 / 1000
138138

139+
# Message priority levels
140+
PRIORITY_HIGH = 1
141+
PRIORITY_LOW = 2
142+
PRIORITY_MEDIUM = 3
143+
139144
# Max seconds the internal clock of plugwise nodes
140145
# are allowed to drift in seconds
141146
MAX_TIME_DRIFT = 30

plugwise/controller.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
"""
1313

1414
from datetime import datetime, timedelta
15+
from queue import Empty, PriorityQueue
1516
import logging
16-
import queue
1717
import threading
1818
import time
1919

@@ -22,6 +22,7 @@
2222
from .constants import (
2323
MESSAGE_RETRY,
2424
MESSAGE_TIME_OUT,
25+
PRIORITY_MEDIUM,
2526
REQUEST_FAILED,
2627
REQUEST_SUCCESS,
2728
SLEEP_TIME,
@@ -77,7 +78,7 @@ def connect_to_stick(self, callback=None) -> bool:
7778
if self.connection.connect():
7879
_LOGGER.debug("Starting message controller threads...")
7980
# send daemon
80-
self._send_message_queue = queue.Queue()
81+
self._send_message_queue = PriorityQueue()
8182
self._run_send_message_thread = True
8283
self._send_message_thread = threading.Thread(
8384
None, self._send_message_loop, "send_messages_thread", (), {}
@@ -96,20 +97,32 @@ def connect_to_stick(self, callback=None) -> bool:
9697
_LOGGER.warning("Failed to connect to USB stick")
9798
return self.connection.is_connected()
9899

99-
def send(self, request: NodeRequest, callback=None, retry_counter=0):
100+
def send(
101+
self,
102+
request: NodeRequest,
103+
callback=None,
104+
retry_counter=0,
105+
priority=PRIORITY_MEDIUM,
106+
):
100107
"""Queue request message to be sent into Plugwise Zigbee network."""
101108
_LOGGER.debug(
102-
"Queue %s to be send with retry counter %s",
109+
"Queue %s to be send with retry counter %s and priority %s",
103110
request.__class__.__name__,
104111
str(retry_counter),
112+
str(priority),
105113
)
106114
self._send_message_queue.put(
107-
[
108-
request,
109-
callback,
115+
(
116+
priority,
110117
retry_counter,
111-
None,
112-
]
118+
datetime.now(),
119+
[
120+
request,
121+
callback,
122+
retry_counter,
123+
None,
124+
],
125+
)
113126
)
114127

115128
def resend(self, seq_id):
@@ -180,8 +193,10 @@ def _send_message_loop(self):
180193
"""Daemon to send messages waiting in queue."""
181194
while self._run_send_message_thread:
182195
try:
183-
request_set = self._send_message_queue.get(block=True, timeout=1)
184-
except queue.Empty:
196+
_prio, _retry, _dt, request_set = self._send_message_queue.get(
197+
block=True, timeout=1
198+
)
199+
except Empty:
185200
time.sleep(SLEEP_TIME)
186201
else:
187202
# Calc next seq_id based last received ack message

plugwise/nodes/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
FEATURE_RELAY,
99
FEATURE_RSSI_IN,
1010
FEATURE_RSSI_OUT,
11+
PRIORITY_LOW,
1112
UTF8_DECODE,
1213
)
1314
from ..messages.requests import NodeFeaturesRequest, NodeInfoRequest, NodePingRequest
@@ -155,6 +156,8 @@ def _request_info(self, callback=None):
155156
self.message_sender(
156157
NodeInfoRequest(self._mac),
157158
callback,
159+
0,
160+
PRIORITY_LOW,
158161
)
159162

160163
def _request_features(self, callback=None):

plugwise/nodes/circle.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
FEATURE_RSSI_OUT,
1818
MAX_TIME_DRIFT,
1919
MESSAGE_TIME_OUT,
20+
PRIORITY_HIGH,
21+
PRIORITY_LOW,
2022
PULSES_PER_KW_SECOND,
2123
RELAY_SWITCHED_OFF,
2224
RELAY_SWITCHED_ON,
@@ -163,13 +165,17 @@ def _request_calibration(self, callback=None):
163165
self.message_sender(
164166
CircleCalibrationRequest(self._mac),
165167
callback,
168+
0,
169+
PRIORITY_HIGH,
166170
)
167171

168172
def _request_switch(self, state, callback=None):
169173
"""Request to switch relay state and request state info"""
170174
self.message_sender(
171175
CircleSwitchRelayRequest(self._mac, state),
172176
callback,
177+
0,
178+
PRIORITY_HIGH,
173179
)
174180

175181
def _request_power_update(self, callback=None):
@@ -358,21 +364,31 @@ def request_power_buffer(self, log_address=None, callback=None):
358364
# Only request last 2 power buffer logs
359365
self.message_sender(
360366
CirclePowerBufferRequest(self._mac, log_address - 1),
367+
None,
368+
0,
369+
PRIORITY_LOW,
361370
)
362371
self.message_sender(
363372
CirclePowerBufferRequest(self._mac, log_address),
364373
callback,
374+
0,
375+
PRIORITY_LOW,
365376
)
366377
else:
367378
# Collect power history info of today and yesterday
368379
# Each request contains 4 hours except last request
369380
for req_log_address in range(log_address - 13, log_address):
370381
self.message_sender(
371382
CirclePowerBufferRequest(self._mac, req_log_address),
383+
None,
384+
0,
385+
PRIORITY_LOW,
372386
)
373387
self.message_sender(
374388
CirclePowerBufferRequest(self._mac, log_address),
375389
callback,
390+
0,
391+
PRIORITY_LOW,
376392
)
377393

378394
def _response_power_buffer(self, message):
@@ -452,6 +468,8 @@ def get_clock(self, callback=None):
452468
self.message_sender(
453469
CircleClockGetRequest(self._mac),
454470
callback,
471+
0,
472+
PRIORITY_LOW,
455473
)
456474

457475
def set_clock(self, callback=None):

plugwise/nodes/circle_plus.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from datetime import datetime
33
import logging
44

5-
from ..constants import MAX_TIME_DRIFT, UTF8_DECODE
5+
from ..constants import MAX_TIME_DRIFT, PRIORITY_LOW, UTF8_DECODE
66
from ..messages.requests import (
77
CirclePlusRealTimeClockGetRequest,
88
CirclePlusRealTimeClockSetRequest,
@@ -92,6 +92,8 @@ def get_real_time_clock(self, callback=None):
9292
self.message_sender(
9393
CirclePlusRealTimeClockGetRequest(self._mac),
9494
callback,
95+
0,
96+
PRIORITY_LOW,
9597
)
9698

9799
def _response_realtime_clock(self, message):

plugwise/nodes/sed.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
FEATURE_PING,
1111
FEATURE_RSSI_IN,
1212
FEATURE_RSSI_OUT,
13+
PRIORITY_HIGH,
1314
SED_AWAKE_BUTTON,
1415
SED_AWAKE_FIRST,
1516
SED_AWAKE_MAINTENANCE,
@@ -90,7 +91,7 @@ def _process_awake_response(self, message):
9091
request_message.__class__.__name__,
9192
self.mac,
9293
)
93-
self.message_sender(request_message, callback, -1)
94+
self.message_sender(request_message, callback, -1, PRIORITY_HIGH)
9495
self._SED_requests = {}
9596
else:
9697
if message.awake_type.value == SED_AWAKE_STATE:

plugwise/stick.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
NODE_TYPE_SENSE,
2424
NODE_TYPE_STEALTH,
2525
NODE_TYPE_SWITCH,
26+
PRIORITY_LOW,
2627
STATE_ACTIONS,
2728
UTF8_DECODE,
2829
WATCHDOG_DEAMON,
@@ -656,6 +657,7 @@ def _update_loop(self):
656657
NodePingRequest(bytes(mac, UTF8_DECODE)),
657658
None,
658659
-1,
660+
PRIORITY_LOW,
659661
)
660662
_discover_counter = 0
661663
else:
@@ -679,6 +681,7 @@ def auto_update(self, timer=None):
679681
"""Configure auto update polling daemon for power usage and availability state."""
680682
if timer:
681683
self._auto_update_timer = timer
684+
self._auto_update_manually = True
682685
elif timer == 0:
683686
self._auto_update_timer = 0
684687
self._run_update_thread = False
@@ -764,6 +767,8 @@ def discover_node(self, mac: str, callback=None, force_discover=False) -> bool:
764767
self.msg_controller.send(
765768
NodeInfoRequest(bytes(mac, UTF8_DECODE)),
766769
callback,
770+
0,
771+
PRIORITY_LOW,
767772
)
768773
elif force_discover:
769774
self.msg_controller.send(

0 commit comments

Comments
 (0)