@@ -64,6 +64,7 @@ def create_client(
6464 clean_session = None ,
6565 on_connect : Optional [Callable ] = None ,
6666 on_disconnect : Optional [Callable ] = None ,
67+ on_subscribe : Optional [Callable ] = None ,
6768 on_message : Optional [Callable ] = None ,
6869 userdata : Optional [dict ] = None ,
6970 port : int = config .getint ("mqtt" , "broker_port" , fallback = 1883 ),
@@ -113,6 +114,9 @@ def default_on_connect(client: Client, userdata, flags, rc: int, properties=None
113114 if on_disconnect :
114115 client .on_disconnect = on_disconnect
115116
117+ if on_subscribe :
118+ client .on_subscribe = on_subscribe
119+
116120 if last_will is not None :
117121 client .will_set (** last_will )
118122
@@ -277,6 +281,11 @@ def _callback(client: Client, userdata: dict, message):
277281 def on_connect (client : Client , userdata : dict , * args ):
278282 client .subscribe (userdata ["topics" ])
279283
284+ def on_subscribe (client , userdata , mid , granted_qos , properties = None ):
285+ sub_ready .set ()
286+
287+ sub_ready = threading .Event ()
288+
280289 userdata = {
281290 "topics" : [(topic , mqtt_kwargs .pop ("qos" , QOS .EXACTLY_ONCE )) for topic in topics ],
282291 "name" : name ,
@@ -286,10 +295,13 @@ def on_connect(client: Client, userdata: dict, *args):
286295 last_will = last_will ,
287296 on_connect = on_connect ,
288297 on_message = wrap_callback (callback ),
298+ on_subscribe = on_subscribe ,
289299 userdata = userdata ,
290300 ** mqtt_kwargs ,
291301 )
292302
303+ if not sub_ready .wait (timeout = 5 ):
304+ raise RuntimeError ("MQTT subscribe timeout" )
293305 else :
294306 # user provided a client
295307 for topic in topics :
0 commit comments