@@ -197,8 +197,7 @@ def __init__(
197
197
self ._username = username
198
198
self ._password = password
199
199
if (
200
- self ._password and len (password .encode (
201
- "utf-8" )) > MQTT_TOPIC_LENGTH_LIMIT
200
+ self ._password and len (password .encode ("utf-8" )) > MQTT_TOPIC_LENGTH_LIMIT
202
201
): # [MQTT-3.1.3.5]
203
202
raise MMQTTException ("Password length is too large." )
204
203
@@ -225,8 +224,7 @@ def __init__(
225
224
self .client_id = f"cpy{ randint (0 , time_int )} { randint (0 , 99 )} "
226
225
# generated client_id's enforce spec.'s length rules
227
226
if len (self .client_id .encode ("utf-8" )) > 23 or not self .client_id :
228
- raise ValueError (
229
- "MQTT Client ID must be between 1 and 23 bytes" )
227
+ raise ValueError ("MQTT Client ID must be between 1 and 23 bytes" )
230
228
231
229
# LWT
232
230
self ._lw_topic = None
@@ -263,7 +261,7 @@ def diff_ns(self, stamp):
263
261
Taking timestamp differences using nanosecond ints before dividing
264
262
should maintain precision.
265
263
"""
266
- return (self .get_monotonic_time () - stamp )/ 1000000000
264
+ return (self .get_monotonic_time () - stamp ) / 1000000000
267
265
268
266
def __enter__ (self ):
269
267
return self
@@ -317,8 +315,7 @@ def will_set(
317
315
self .logger .debug ("Setting last will properties" )
318
316
self ._valid_qos (qos )
319
317
if self ._is_connected :
320
- raise MMQTTException (
321
- "Last Will should only be called before connect()." )
318
+ raise MMQTTException ("Last Will should only be called before connect()." )
322
319
if payload is None :
323
320
payload = ""
324
321
if isinstance (payload , (int , float , str )):
@@ -342,8 +339,7 @@ def add_topic_callback(self, mqtt_topic: str, callback_method) -> None:
342
339
If a callback is called for the topic, then any "on_message" callback will not be called.
343
340
"""
344
341
if mqtt_topic is None or callback_method is None :
345
- raise ValueError (
346
- "MQTT topic and callback method must both be defined." )
342
+ raise ValueError ("MQTT topic and callback method must both be defined." )
347
343
self ._on_message_filtered [mqtt_topic ] = callback_method
348
344
349
345
def remove_topic_callback (self , mqtt_topic : str ) -> None :
@@ -391,8 +387,7 @@ def username_pw_set(self, username: str, password: Optional[str] = None) -> None
391
387
392
388
"""
393
389
if self ._is_connected :
394
- raise MMQTTException (
395
- "This method must be called before connect()." )
390
+ raise MMQTTException ("This method must be called before connect()." )
396
391
self ._username = username
397
392
if password is not None :
398
393
self ._password = password
@@ -521,8 +516,7 @@ def _connect(
521
516
remaining_length += (
522
517
2 + len (self ._lw_topic .encode ("utf-8" )) + 2 + len (self ._lw_msg )
523
518
)
524
- var_header [7 ] |= 0x4 | (self ._lw_qos & 0x1 ) << 3 | (
525
- self ._lw_qos & 0x2 ) << 3
519
+ var_header [7 ] |= 0x4 | (self ._lw_qos & 0x1 ) << 3 | (self ._lw_qos & 0x2 ) << 3
526
520
var_header [7 ] |= self ._lw_retain << 5
527
521
528
522
self ._encode_remaining_length (fixed_header , remaining_length )
@@ -651,8 +645,7 @@ def publish(
651
645
else :
652
646
raise MMQTTException ("Invalid message data type." )
653
647
if len (msg ) > MQTT_MSG_MAX_SZ :
654
- raise MMQTTException (
655
- f"Message size larger than { MQTT_MSG_MAX_SZ } bytes." )
648
+ raise MMQTTException (f"Message size larger than { MQTT_MSG_MAX_SZ } bytes." )
656
649
assert (
657
650
0 <= qos <= 1
658
651
), "Quality of Service Level 2 is unsupported by this library."
@@ -699,8 +692,7 @@ def publish(
699
692
rcv_pid = rcv_pid_buf [0 ] << 0x08 | rcv_pid_buf [1 ]
700
693
if self ._pid == rcv_pid :
701
694
if self .on_publish is not None :
702
- self .on_publish (
703
- self , self .user_data , topic , rcv_pid )
695
+ self .on_publish (self , self .user_data , topic , rcv_pid )
704
696
return
705
697
706
698
if op is None :
@@ -744,10 +736,8 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
744
736
self .logger .debug ("Sending SUBSCRIBE to broker..." )
745
737
fixed_header = bytearray ([MQTT_SUB ])
746
738
packet_length = 2 + (2 * len (topics )) + (1 * len (topics ))
747
- packet_length += sum (len (topic .encode ("utf-8" ))
748
- for topic , qos in topics )
749
- self ._encode_remaining_length (
750
- fixed_header , remaining_length = packet_length )
739
+ packet_length += sum (len (topic .encode ("utf-8" )) for topic , qos in topics )
740
+ self ._encode_remaining_length (fixed_header , remaining_length = packet_length )
751
741
self .logger .debug (f"Fixed Header: { fixed_header } " )
752
742
self ._sock .send (fixed_header )
753
743
self ._pid = self ._pid + 1 if self ._pid < 0xFFFF else 1
@@ -827,8 +817,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
827
817
fixed_header = bytearray ([MQTT_UNSUB ])
828
818
packet_length = 2 + (2 * len (topics ))
829
819
packet_length += sum (len (topic .encode ("utf-8" )) for topic in topics )
830
- self ._encode_remaining_length (
831
- fixed_header , remaining_length = packet_length )
820
+ self ._encode_remaining_length (fixed_header , remaining_length = packet_length )
832
821
self .logger .debug (f"Fixed Header: { fixed_header } " )
833
822
self ._sock .send (fixed_header )
834
823
self ._pid = self ._pid + 1 if self ._pid < 0xFFFF else 1
@@ -861,8 +850,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
861
850
assert rc [1 ] == packet_id_bytes [0 ] and rc [2 ] == packet_id_bytes [1 ]
862
851
for t in topics :
863
852
if self .on_unsubscribe is not None :
864
- self .on_unsubscribe (
865
- self , self .user_data , t , self ._pid )
853
+ self .on_unsubscribe (self , self .user_data , t , self ._pid )
866
854
self ._subscribed_topics .remove (t )
867
855
return
868
856
@@ -880,8 +868,7 @@ def _recompute_reconnect_backoff(self) -> None:
880
868
self ._reconnect_timeout = 2 ** self ._reconnect_attempt
881
869
# pylint: disable=consider-using-f-string
882
870
self .logger .debug (
883
- "Reconnect timeout computed to {:.2f}" .format (
884
- self ._reconnect_timeout )
871
+ "Reconnect timeout computed to {:.2f}" .format (self ._reconnect_timeout )
885
872
)
886
873
887
874
if self ._reconnect_timeout > self ._reconnect_maximum_backoff :
@@ -955,10 +942,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
955
942
rcs = []
956
943
957
944
while True :
958
- if (
959
- self .diff_ns (self ._last_msg_sent_timestamp )
960
- >= self .keep_alive
961
- ):
945
+ if self .diff_ns (self ._last_msg_sent_timestamp ) >= self .keep_alive :
962
946
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
963
947
self .logger .debug (
964
948
"KeepAlive period elapsed - requesting a PINGRESP from the server..."
@@ -967,8 +951,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
967
951
# ping() itself contains a _wait_for_msg() loop which might have taken a while,
968
952
# so check here as well.
969
953
if self .diff_ns (stamp ) > timeout :
970
- self .logger .debug (
971
- f"Loop timed out after { timeout } seconds" )
954
+ self .logger .debug (f"Loop timed out after { timeout } seconds" )
972
955
break
973
956
974
957
rc = self ._wait_for_msg ()
@@ -1006,14 +989,12 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]:
1006
989
# If we get here, it means that there is nothing to be received
1007
990
return None
1008
991
pkt_type = res [0 ] & MQTT_PKT_TYPE_MASK
1009
- self .logger .debug (
1010
- f"Got message type: { hex (pkt_type )} pkt: { hex (res [0 ])} " )
992
+ self .logger .debug (f"Got message type: { hex (pkt_type )} pkt: { hex (res [0 ])} " )
1011
993
if pkt_type == MQTT_PINGRESP :
1012
994
self .logger .debug ("Got PINGRESP" )
1013
995
sz = self ._sock_exact_recv (1 )[0 ]
1014
996
if sz != 0x00 :
1015
- raise MMQTTException (
1016
- f"Unexpected PINGRESP returned from broker: { sz } ." )
997
+ raise MMQTTException (f"Unexpected PINGRESP returned from broker: { sz } ." )
1017
998
return pkt_type
1018
999
1019
1000
if pkt_type != MQTT_PUBLISH :
@@ -1042,8 +1023,7 @@ def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]:
1042
1023
# read message contents
1043
1024
raw_msg = self ._sock_exact_recv (sz )
1044
1025
msg = raw_msg if self ._use_binary_mode else str (raw_msg , "utf-8" )
1045
- self .logger .debug (
1046
- "Receiving PUBLISH \n Topic: %s\n Msg: %s\n " , topic , raw_msg )
1026
+ self .logger .debug ("Receiving PUBLISH \n Topic: %s\n Msg: %s\n " , topic , raw_msg )
1047
1027
self ._handle_on_message (topic , msg )
1048
1028
if res [0 ] & 0x06 == 0x02 :
1049
1029
pkt = bytearray (b"\x40 \x02 \0 \0 " )
@@ -1091,8 +1071,7 @@ def _sock_exact_recv(
1091
1071
recv_len = self ._sock .recv_into (rc , bufsize )
1092
1072
to_read = bufsize - recv_len
1093
1073
if to_read < 0 :
1094
- raise MMQTTException (
1095
- f"negative number of bytes to read: { to_read } " )
1074
+ raise MMQTTException (f"negative number of bytes to read: { to_read } " )
1096
1075
read_timeout = timeout if timeout is not None else self .keep_alive
1097
1076
mv = mv [recv_len :]
1098
1077
while to_read > 0 :
0 commit comments