36
36
from random import randint
37
37
38
38
from adafruit_connection_manager import get_connection_manager
39
+ from adafruit_ticks import ticks_ms , ticks_diff
39
40
40
41
try :
41
42
from typing import List , Optional , Tuple , Type , Union
@@ -181,7 +182,7 @@ def __init__(
181
182
self ._is_connected = False
182
183
self ._msg_size_lim = MQTT_MSG_SZ_LIM
183
184
self ._pid = 0
184
- self ._last_msg_sent_timestamp_ns : int = 0
185
+ self ._last_msg_sent_timestamp : int = 0
185
186
self .logger = NullLogger ()
186
187
"""An optional logging attribute that can be set with with a Logger
187
188
to enable debug logging."""
@@ -220,7 +221,7 @@ def __init__(
220
221
self .client_id = client_id
221
222
else :
222
223
# assign a unique client_id
223
- time_int = ( self . get_monotonic_time () % 10000000000 ) // 10000000
224
+ time_int = int ( ticks_ms () / 10 ) % 1000
224
225
self .client_id = f"cpy{ randint (0 , time_int )} { randint (0 , 99 )} "
225
226
# generated client_id's enforce spec.'s length rules
226
227
if len (self .client_id .encode ("utf-8" )) > 23 or not self .client_id :
@@ -246,22 +247,14 @@ def __init__(
246
247
247
248
def get_monotonic_time (self ) -> float :
248
249
"""
249
- Provide monotonic time in nanoseconds . Based on underlying implementation
250
+ Provide monotonic time in seconds . Based on underlying implementation
250
251
this might result in imprecise time, that will result in the library
251
252
not being able to operate if running contiguously for more than 24 days or so.
252
- Keeping timestamps in nanosecond ints from monotonic_ns should preserve precision.
253
253
"""
254
254
if self .use_monotonic_ns :
255
- return time .monotonic_ns ()
255
+ return time .monotonic_ns () / 1000000000
256
256
257
- return int (time .monotonic () * 1000000000 )
258
-
259
- def diff_ns (self , stamp_ns ):
260
- """
261
- Taking timestamp differences using nanosecond ints before dividing
262
- should maintain precision.
263
- """
264
- return (self .get_monotonic_time () - stamp_ns ) / 1000000000
257
+ return time .monotonic ()
265
258
266
259
def __enter__ (self ):
267
260
return self
@@ -534,9 +527,9 @@ def _connect(
534
527
if self ._username is not None :
535
528
self ._send_str (self ._username )
536
529
self ._send_str (self ._password )
537
- self ._last_msg_sent_timestamp_ns = self . get_monotonic_time ()
530
+ self ._last_msg_sent_timestamp = ticks_ms ()
538
531
self .logger .debug ("Receiving CONNACK packet from broker" )
539
- stamp_ns = self . get_monotonic_time ()
532
+ stamp = ticks_ms ()
540
533
while True :
541
534
op = self ._wait_for_msg ()
542
535
if op == 32 :
@@ -552,7 +545,7 @@ def _connect(
552
545
return result
553
546
554
547
if op is None :
555
- if self . diff_ns ( stamp_ns ) > self ._recv_timeout :
548
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > self ._recv_timeout :
556
549
raise MMQTTException (
557
550
f"No data received from broker for { self ._recv_timeout } seconds."
558
551
)
@@ -589,7 +582,7 @@ def disconnect(self) -> None:
589
582
self ._connection_manager .close_socket (self ._sock )
590
583
self ._is_connected = False
591
584
self ._subscribed_topics = []
592
- self ._last_msg_sent_timestamp_ns = 0
585
+ self ._last_msg_sent_timestamp = 0
593
586
if self .on_disconnect is not None :
594
587
self .on_disconnect (self , self .user_data , 0 )
595
588
@@ -602,14 +595,14 @@ def ping(self) -> list[int]:
602
595
self .logger .debug ("Sending PINGREQ" )
603
596
self ._sock .send (MQTT_PINGREQ )
604
597
ping_timeout = self .keep_alive
605
- stamp_ns = self . get_monotonic_time ()
606
- self ._last_msg_sent_timestamp_ns = stamp_ns
598
+ stamp = ticks_ms ()
599
+ self ._last_msg_sent_timestamp = stamp
607
600
rc , rcs = None , []
608
601
while rc != MQTT_PINGRESP :
609
602
rc = self ._wait_for_msg ()
610
603
if rc :
611
604
rcs .append (rc )
612
- if self . diff_ns ( stamp_ns ) > ping_timeout :
605
+ if ticks_diff ( ticks_ms (), stamp ) > ping_timeout * 1000 :
613
606
raise MMQTTException ("PINGRESP not returned from broker." )
614
607
return rcs
615
608
@@ -678,11 +671,11 @@ def publish(
678
671
self ._sock .send (pub_hdr_fixed )
679
672
self ._sock .send (pub_hdr_var )
680
673
self ._sock .send (msg )
681
- self ._last_msg_sent_timestamp_ns = self . get_monotonic_time ()
674
+ self ._last_msg_sent_timestamp = ticks_ms ()
682
675
if qos == 0 and self .on_publish is not None :
683
676
self .on_publish (self , self .user_data , topic , self ._pid )
684
677
if qos == 1 :
685
- stamp_ns = self . get_monotonic_time ()
678
+ stamp = ticks_ms ()
686
679
while True :
687
680
op = self ._wait_for_msg ()
688
681
if op == 0x40 :
@@ -696,7 +689,7 @@ def publish(
696
689
return
697
690
698
691
if op is None :
699
- if self . diff_ns ( stamp_ns ) > self ._recv_timeout :
692
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > self ._recv_timeout :
700
693
raise MMQTTException (
701
694
f"No data received from broker for { self ._recv_timeout } seconds."
702
695
)
@@ -755,12 +748,12 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
755
748
self .logger .debug (f"SUBSCRIBING to topic { t } with QoS { q } " )
756
749
self .logger .debug (f"payload: { payload } " )
757
750
self ._sock .send (payload )
758
- stamp_ns = self . get_monotonic_time ()
759
- self ._last_msg_sent_timestamp_ns = stamp_ns
751
+ stamp = ticks_ms ()
752
+ self ._last_msg_sent_timestamp = stamp
760
753
while True :
761
754
op = self ._wait_for_msg ()
762
755
if op is None :
763
- if self . diff_ns ( stamp_ns ) > self ._recv_timeout :
756
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > self ._recv_timeout :
764
757
raise MMQTTException (
765
758
f"No data received from broker for { self ._recv_timeout } seconds."
766
759
)
@@ -832,13 +825,13 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
832
825
for t in topics :
833
826
self .logger .debug (f"UNSUBSCRIBING from topic { t } " )
834
827
self ._sock .send (payload )
835
- self ._last_msg_sent_timestamp_ns = self . get_monotonic_time ()
828
+ self ._last_msg_sent_timestamp = ticks_ms ()
836
829
self .logger .debug ("Waiting for UNSUBACK..." )
837
830
while True :
838
- stamp_ns = self . get_monotonic_time ()
831
+ stamp = ticks_ms ()
839
832
op = self ._wait_for_msg ()
840
833
if op is None :
841
- if self . diff_ns ( stamp_ns ) > self ._recv_timeout :
834
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > self ._recv_timeout :
842
835
raise MMQTTException (
843
836
f"No data received from broker for { self ._recv_timeout } seconds."
844
837
)
@@ -938,26 +931,29 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
938
931
self ._connected ()
939
932
self .logger .debug (f"waiting for messages for { timeout } seconds" )
940
933
941
- stamp_ns = self . get_monotonic_time ()
934
+ stamp = ticks_ms ()
942
935
rcs = []
943
936
944
937
while True :
945
- if self .diff_ns (self ._last_msg_sent_timestamp_ns ) >= self .keep_alive :
938
+ if (
939
+ ticks_diff (ticks_ms (), self ._last_msg_sent_timestamp ) / 1000
940
+ >= self .keep_alive
941
+ ):
946
942
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
947
943
self .logger .debug (
948
944
"KeepAlive period elapsed - requesting a PINGRESP from the server..."
949
945
)
950
946
rcs .extend (self .ping ())
951
947
# ping() itself contains a _wait_for_msg() loop which might have taken a while,
952
948
# so check here as well.
953
- if self . diff_ns ( stamp_ns ) > timeout :
949
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > timeout :
954
950
self .logger .debug (f"Loop timed out after { timeout } seconds" )
955
951
break
956
952
957
953
rc = self ._wait_for_msg ()
958
954
if rc is not None :
959
955
rcs .append (rc )
960
- if self . diff_ns ( stamp_ns ) > timeout :
956
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > timeout :
961
957
self .logger .debug (f"Loop timed out after { timeout } seconds" )
962
958
break
963
959
@@ -1063,7 +1059,7 @@ def _sock_exact_recv(
1063
1059
:param float timeout: timeout, in seconds. Defaults to keep_alive
1064
1060
:return: byte array
1065
1061
"""
1066
- stamp_ns = self . get_monotonic_time ()
1062
+ stamp = ticks_ms ()
1067
1063
if not self ._backwards_compatible_sock :
1068
1064
# CPython/Socketpool Impl.
1069
1065
rc = bytearray (bufsize )
@@ -1078,7 +1074,7 @@ def _sock_exact_recv(
1078
1074
recv_len = self ._sock .recv_into (mv , to_read )
1079
1075
to_read -= recv_len
1080
1076
mv = mv [recv_len :]
1081
- if self . diff_ns ( stamp_ns ) > read_timeout :
1077
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > read_timeout :
1082
1078
raise MMQTTException (
1083
1079
f"Unable to receive { to_read } bytes within { read_timeout } seconds."
1084
1080
)
@@ -1098,7 +1094,7 @@ def _sock_exact_recv(
1098
1094
recv = self ._sock .recv (to_read )
1099
1095
to_read -= len (recv )
1100
1096
rc += recv
1101
- if self . diff_ns ( stamp_ns ) > read_timeout :
1097
+ if ticks_diff ( ticks_ms (), stamp ) / 1000 > read_timeout :
1102
1098
raise MMQTTException (
1103
1099
f"Unable to receive { to_read } bytes within { read_timeout } seconds."
1104
1100
)
0 commit comments