11import logging
2- import time
32import os
3+ import time
44from queue import Queue
55
66import paho .mqtt .client as mqtt_client
77
8-
9- FORMAT = ('%(asctime)-15s %(threadName)-15s'
10- ' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s' )
8+ FORMAT = (
9+ "%(asctime)-15s %(threadName)-15s"
10+ " %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s"
11+ )
1112logging .basicConfig (format = FORMAT )
1213_logger = logging .getLogger ()
13- log_level = logging .DEBUG if os .environ .get ('DEBUG' ) in ['1' , 'true' , 'True' ] else logging .WARNING
14+ log_level = (
15+ logging .DEBUG
16+ if os .environ .get ("DEBUG" ) in ["1" , "true" , "True" ]
17+ else logging .WARNING
18+ )
1419_logger .setLevel (log_level )
1520
1621
17- IOT_ENDPOINT_URL = ' http://localhost:4566'
22+ IOT_ENDPOINT_URL = " http://localhost:4566"
1823
1924NUM_MESSAGES = 10
20- TOPIC_NAME = ' /test-topic'
25+ TOPIC_NAME = " /test-topic"
2126
2227recv_queue = Queue ()
2328
2429
2530def get_endpoint ():
2631 import boto3
27- endpoint = boto3 .client ('iot' , endpoint_url = IOT_ENDPOINT_URL ).describe_endpoint ()
28- host , port = endpoint ['endpointAddress' ].split (':' )
32+
33+ endpoint = boto3 .client ("iot" , endpoint_url = IOT_ENDPOINT_URL ).describe_endpoint ()
34+ host , port = endpoint ["endpointAddress" ].split (":" )
2935 return host , int (port )
3036
3137
@@ -36,20 +42,24 @@ def _on_connect(client, *args):
3642 def on_message (client , userdata , message : mqtt_client .MQTTMessage ):
3743 recv_queue .put (message )
3844
39- mqtt = mqtt_client .Client ("mqtt_subscriber" )
45+ mqtt = mqtt_client .Client (
46+ mqtt_client .CallbackAPIVersion .VERSION1 , "mqtt_subscriber"
47+ )
4048 mqtt .enable_logger (_logger )
4149 mqtt .on_connect = _on_connect
4250 mqtt .on_message = on_message
4351 mqtt .loop_start ()
44- mqtt ._thread .name = ' mqtt_thread_subscriber' # noqa
52+ mqtt ._thread .name = " mqtt_thread_subscriber" # noqa
4553 return mqtt
4654
4755
4856def create_publisher ():
49- mqtt_publisher = mqtt_client .Client ("mqtt_publisher" )
57+ mqtt_publisher = mqtt_client .Client (
58+ mqtt_client .CallbackAPIVersion .VERSION1 , "mqtt_publisher"
59+ )
5060 mqtt_publisher .enable_logger (_logger )
5161 mqtt_publisher .loop_start ()
52- mqtt_publisher ._thread .name = f' mqtt_thread_publisher' # noqa
62+ mqtt_publisher ._thread .name = " mqtt_thread_publisher"
5363 return mqtt_publisher
5464
5565
@@ -59,11 +69,7 @@ def publish_messages(endpoint_host: str, endpoint_port: int):
5969 # sleep 2 to let broker connack
6070 time .sleep (2 )
6171 for i in range (NUM_MESSAGES ):
62- publisher .publish (
63- topic = TOPIC_NAME ,
64- payload = f"TEST MESSAGE { i } " ,
65- qos = 0
66- )
72+ publisher .publish (topic = TOPIC_NAME , payload = f"TEST MESSAGE { i } " , qos = 0 )
6773 print (f"{ NUM_MESSAGES } messages published" )
6874 publisher .disconnect ()
6975 publisher .loop_stop ()
@@ -72,7 +78,9 @@ def publish_messages(endpoint_host: str, endpoint_port: int):
7278
7379def main ():
7480 endpoint_host , endpoint_port = get_endpoint ()
75- _logger .debug ("Trying to connect to MQTT endpoint %s:%s" , endpoint_host , endpoint_port )
81+ _logger .debug (
82+ "Trying to connect to MQTT endpoint %s:%s" , endpoint_host , endpoint_port
83+ )
7684 mqtt_subscriber = create_subscriber ()
7785 mqtt_subscriber .connect (host = endpoint_host , port = endpoint_port )
7886 time .sleep (2 ) # sleep 2 to let broker connack and suback
@@ -90,5 +98,5 @@ def main():
9098 mqtt_subscriber .loop_stop ()
9199
92100
93- if __name__ == ' __main__' :
101+ if __name__ == " __main__" :
94102 main ()
0 commit comments