Skip to content

Commit 70f963b

Browse files
committed
maintain time.monotonic precision by using ns integer timestamps
1 parent cbe2177 commit 70f963b

File tree

1 file changed

+60
-35
lines changed

1 file changed

+60
-35
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 60 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
from .matcher import MQTTMatcher
5353

54-
__version__ = "0.0.0+auto.0"
54+
__version__ = "7.6.3"
5555
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git"
5656

5757
# Client-specific variables
@@ -181,7 +181,7 @@ def __init__(
181181
self._is_connected = False
182182
self._msg_size_lim = MQTT_MSG_SZ_LIM
183183
self._pid = 0
184-
self._last_msg_sent_timestamp: float = 0
184+
self._last_msg_sent_timestamp: int = 0
185185
self.logger = NullLogger()
186186
"""An optional logging attribute that can be set with with a Logger
187187
to enable debug logging."""
@@ -197,7 +197,8 @@ def __init__(
197197
self._username = username
198198
self._password = password
199199
if (
200-
self._password and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT
200+
self._password and len(password.encode(
201+
"utf-8")) > MQTT_TOPIC_LENGTH_LIMIT
201202
): # [MQTT-3.1.3.5]
202203
raise MMQTTException("Password length is too large.")
203204

@@ -220,11 +221,12 @@ def __init__(
220221
self.client_id = client_id
221222
else:
222223
# assign a unique client_id
223-
time_int = int(self.get_monotonic_time() * 100) % 1000
224+
time_int = (self.get_monotonic_time() % 10000000000) // 10000000
224225
self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}"
225226
# generated client_id's enforce spec.'s length rules
226227
if len(self.client_id.encode("utf-8")) > 23 or not self.client_id:
227-
raise ValueError("MQTT Client ID must be between 1 and 23 bytes")
228+
raise ValueError(
229+
"MQTT Client ID must be between 1 and 23 bytes")
228230

229231
# LWT
230232
self._lw_topic = None
@@ -246,14 +248,22 @@ def __init__(
246248

247249
def get_monotonic_time(self) -> float:
248250
"""
249-
Provide monotonic time in seconds. Based on underlying implementation
251+
Provide monotonic time in nanoseconds. Based on underlying implementation
250252
this might result in imprecise time, that will result in the library
251253
not being able to operate if running contiguously for more than 24 days or so.
254+
Keeping timestamps in nanosecond ints from monotonic_ns should preserve precision.
252255
"""
253256
if self.use_monotonic_ns:
254-
return time.monotonic_ns() / 1000000000
257+
return time.monotonic_ns()
255258

256-
return time.monotonic()
259+
return int(time.monotonic() * 1000000000)
260+
261+
def diff_ns(self, stamp):
262+
"""
263+
Taking timestamp differences using nanosecond ints before dividing
264+
should maintain precision.
265+
"""
266+
return (self.get_monotonic_time() - stamp)/1000000000
257267

258268
def __enter__(self):
259269
return self
@@ -307,7 +317,8 @@ def will_set(
307317
self.logger.debug("Setting last will properties")
308318
self._valid_qos(qos)
309319
if self._is_connected:
310-
raise MMQTTException("Last Will should only be called before connect().")
320+
raise MMQTTException(
321+
"Last Will should only be called before connect().")
311322
if payload is None:
312323
payload = ""
313324
if isinstance(payload, (int, float, str)):
@@ -331,7 +342,8 @@ def add_topic_callback(self, mqtt_topic: str, callback_method) -> None:
331342
If a callback is called for the topic, then any "on_message" callback will not be called.
332343
"""
333344
if mqtt_topic is None or callback_method is None:
334-
raise ValueError("MQTT topic and callback method must both be defined.")
345+
raise ValueError(
346+
"MQTT topic and callback method must both be defined.")
335347
self._on_message_filtered[mqtt_topic] = callback_method
336348

337349
def remove_topic_callback(self, mqtt_topic: str) -> None:
@@ -379,7 +391,8 @@ def username_pw_set(self, username: str, password: Optional[str] = None) -> None
379391
380392
"""
381393
if self._is_connected:
382-
raise MMQTTException("This method must be called before connect().")
394+
raise MMQTTException(
395+
"This method must be called before connect().")
383396
self._username = username
384397
if password is not None:
385398
self._password = password
@@ -508,7 +521,8 @@ def _connect(
508521
remaining_length += (
509522
2 + len(self._lw_topic.encode("utf-8")) + 2 + len(self._lw_msg)
510523
)
511-
var_header[7] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
524+
var_header[7] |= 0x4 | (self._lw_qos & 0x1) << 3 | (
525+
self._lw_qos & 0x2) << 3
512526
var_header[7] |= self._lw_retain << 5
513527

514528
self._encode_remaining_length(fixed_header, remaining_length)
@@ -544,7 +558,7 @@ def _connect(
544558
return result
545559

546560
if op is None:
547-
if self.get_monotonic_time() - stamp > self._recv_timeout:
561+
if self.diff_ns(stamp) > self._recv_timeout:
548562
raise MMQTTException(
549563
f"No data received from broker for {self._recv_timeout} seconds."
550564
)
@@ -601,7 +615,7 @@ def ping(self) -> list[int]:
601615
rc = self._wait_for_msg()
602616
if rc:
603617
rcs.append(rc)
604-
if self.get_monotonic_time() - stamp > ping_timeout:
618+
if self.diff_ns(stamp) > ping_timeout:
605619
raise MMQTTException("PINGRESP not returned from broker.")
606620
return rcs
607621

@@ -637,7 +651,8 @@ def publish(
637651
else:
638652
raise MMQTTException("Invalid message data type.")
639653
if len(msg) > MQTT_MSG_MAX_SZ:
640-
raise MMQTTException(f"Message size larger than {MQTT_MSG_MAX_SZ} bytes.")
654+
raise MMQTTException(
655+
f"Message size larger than {MQTT_MSG_MAX_SZ} bytes.")
641656
assert (
642657
0 <= qos <= 1
643658
), "Quality of Service Level 2 is unsupported by this library."
@@ -684,11 +699,12 @@ def publish(
684699
rcv_pid = rcv_pid_buf[0] << 0x08 | rcv_pid_buf[1]
685700
if self._pid == rcv_pid:
686701
if self.on_publish is not None:
687-
self.on_publish(self, self.user_data, topic, rcv_pid)
702+
self.on_publish(
703+
self, self.user_data, topic, rcv_pid)
688704
return
689705

690706
if op is None:
691-
if self.get_monotonic_time() - stamp > self._recv_timeout:
707+
if self.diff_ns(stamp) > self._recv_timeout:
692708
raise MMQTTException(
693709
f"No data received from broker for {self._recv_timeout} seconds."
694710
)
@@ -728,8 +744,10 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
728744
self.logger.debug("Sending SUBSCRIBE to broker...")
729745
fixed_header = bytearray([MQTT_SUB])
730746
packet_length = 2 + (2 * len(topics)) + (1 * len(topics))
731-
packet_length += sum(len(topic.encode("utf-8")) for topic, qos in topics)
732-
self._encode_remaining_length(fixed_header, remaining_length=packet_length)
747+
packet_length += sum(len(topic.encode("utf-8"))
748+
for topic, qos in topics)
749+
self._encode_remaining_length(
750+
fixed_header, remaining_length=packet_length)
733751
self.logger.debug(f"Fixed Header: {fixed_header}")
734752
self._sock.send(fixed_header)
735753
self._pid = self._pid + 1 if self._pid < 0xFFFF else 1
@@ -752,7 +770,7 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
752770
while True:
753771
op = self._wait_for_msg()
754772
if op is None:
755-
if self.get_monotonic_time() - stamp > self._recv_timeout:
773+
if self.diff_ns(stamp) > self._recv_timeout:
756774
raise MMQTTException(
757775
f"No data received from broker for {self._recv_timeout} seconds."
758776
)
@@ -809,7 +827,8 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
809827
fixed_header = bytearray([MQTT_UNSUB])
810828
packet_length = 2 + (2 * len(topics))
811829
packet_length += sum(len(topic.encode("utf-8")) for topic in topics)
812-
self._encode_remaining_length(fixed_header, remaining_length=packet_length)
830+
self._encode_remaining_length(
831+
fixed_header, remaining_length=packet_length)
813832
self.logger.debug(f"Fixed Header: {fixed_header}")
814833
self._sock.send(fixed_header)
815834
self._pid = self._pid + 1 if self._pid < 0xFFFF else 1
@@ -830,7 +849,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
830849
stamp = self.get_monotonic_time()
831850
op = self._wait_for_msg()
832851
if op is None:
833-
if self.get_monotonic_time() - stamp > self._recv_timeout:
852+
if self.diff_ns(stamp) > self._recv_timeout:
834853
raise MMQTTException(
835854
f"No data received from broker for {self._recv_timeout} seconds."
836855
)
@@ -842,7 +861,8 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
842861
assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1]
843862
for t in topics:
844863
if self.on_unsubscribe is not None:
845-
self.on_unsubscribe(self, self.user_data, t, self._pid)
864+
self.on_unsubscribe(
865+
self, self.user_data, t, self._pid)
846866
self._subscribed_topics.remove(t)
847867
return
848868

@@ -860,7 +880,8 @@ def _recompute_reconnect_backoff(self) -> None:
860880
self._reconnect_timeout = 2**self._reconnect_attempt
861881
# pylint: disable=consider-using-f-string
862882
self.logger.debug(
863-
"Reconnect timeout computed to {:.2f}".format(self._reconnect_timeout)
883+
"Reconnect timeout computed to {:.2f}".format(
884+
self._reconnect_timeout)
864885
)
865886

866887
if self._reconnect_timeout > self._reconnect_maximum_backoff:
@@ -935,7 +956,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
935956

936957
while True:
937958
if (
938-
self.get_monotonic_time() - self._last_msg_sent_timestamp
959+
self.diff_ns(self._last_msg_sent_timestamp)
939960
>= self.keep_alive
940961
):
941962
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
@@ -945,22 +966,22 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
945966
rcs.extend(self.ping())
946967
# ping() itself contains a _wait_for_msg() loop which might have taken a while,
947968
# so check here as well.
948-
if self.get_monotonic_time() - stamp > timeout:
949-
self.logger.debug(f"Loop timed out after {timeout} seconds")
969+
if self.diff_ns(stamp) > timeout:
970+
self.logger.debug(
971+
f"Loop timed out after {timeout} seconds")
950972
break
951973

952974
rc = self._wait_for_msg()
953975
if rc is not None:
954976
rcs.append(rc)
955-
if self.get_monotonic_time() - stamp > timeout:
977+
if self.diff_ns(stamp) > timeout:
956978
self.logger.debug(f"Loop timed out after {timeout} seconds")
957979
break
958980

959981
return rcs if rcs else None
960982

961983
def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]:
962984
# pylint: disable = too-many-return-statements
963-
964985
"""Reads and processes network events.
965986
Return the packet type or None if there is nothing to be received.
966987
@@ -985,12 +1006,14 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]:
9851006
# If we get here, it means that there is nothing to be received
9861007
return None
9871008
pkt_type = res[0] & MQTT_PKT_TYPE_MASK
988-
self.logger.debug(f"Got message type: {hex(pkt_type)} pkt: {hex(res[0])}")
1009+
self.logger.debug(
1010+
f"Got message type: {hex(pkt_type)} pkt: {hex(res[0])}")
9891011
if pkt_type == MQTT_PINGRESP:
9901012
self.logger.debug("Got PINGRESP")
9911013
sz = self._sock_exact_recv(1)[0]
9921014
if sz != 0x00:
993-
raise MMQTTException(f"Unexpected PINGRESP returned from broker: {sz}.")
1015+
raise MMQTTException(
1016+
f"Unexpected PINGRESP returned from broker: {sz}.")
9941017
return pkt_type
9951018

9961019
if pkt_type != MQTT_PUBLISH:
@@ -1019,7 +1042,8 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]:
10191042
# read message contents
10201043
raw_msg = self._sock_exact_recv(sz)
10211044
msg = raw_msg if self._use_binary_mode else str(raw_msg, "utf-8")
1022-
self.logger.debug("Receiving PUBLISH \nTopic: %s\nMsg: %s\n", topic, raw_msg)
1045+
self.logger.debug(
1046+
"Receiving PUBLISH \nTopic: %s\nMsg: %s\n", topic, raw_msg)
10231047
self._handle_on_message(topic, msg)
10241048
if res[0] & 0x06 == 0x02:
10251049
pkt = bytearray(b"\x40\x02\0\0")
@@ -1067,14 +1091,15 @@ def _sock_exact_recv(
10671091
recv_len = self._sock.recv_into(rc, bufsize)
10681092
to_read = bufsize - recv_len
10691093
if to_read < 0:
1070-
raise MMQTTException(f"negative number of bytes to read: {to_read}")
1094+
raise MMQTTException(
1095+
f"negative number of bytes to read: {to_read}")
10711096
read_timeout = timeout if timeout is not None else self.keep_alive
10721097
mv = mv[recv_len:]
10731098
while to_read > 0:
10741099
recv_len = self._sock.recv_into(mv, to_read)
10751100
to_read -= recv_len
10761101
mv = mv[recv_len:]
1077-
if self.get_monotonic_time() - stamp > read_timeout:
1102+
if self.diff_ns(stamp) > read_timeout:
10781103
raise MMQTTException(
10791104
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
10801105
)
@@ -1094,7 +1119,7 @@ def _sock_exact_recv(
10941119
recv = self._sock.recv(to_read)
10951120
to_read -= len(recv)
10961121
rc += recv
1097-
if self.get_monotonic_time() - stamp > read_timeout:
1122+
if self.diff_ns(stamp) > read_timeout:
10981123
raise MMQTTException(
10991124
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
11001125
)

0 commit comments

Comments
 (0)