@@ -55,33 +55,42 @@ def __init__(self, feed_url, authenticator):
5555 self .associated_gql_device_id = None
5656
5757 self ._mqtt_client : gmqtt .Client = None
58+ self ._valid_auth = True
5859 self ._device_uuid : str = None
59- self ._fetching_uuid = False
6060 self ._subscription_attempts = 0
61- self ._fetched_uuid = asyncio .Event ()
6261 self ._subscription_topics = set ()
6362 self ._reconnect_task = None
63+ self ._connect_task = None
64+ self ._connected_at_least_once = False
6465 self ._processed_messages = set ()
6566
6667 async def start (self ):
6768 self .should_stop = False
68- await self .fetch_mqtt_device_uuid ()
69+ self . _device_uuid = self .authenticator . user_account . get_selected_device_uuid ()
6970 await self ._connect ()
7071
7172 async def stop (self ):
7273 self .should_stop = True
7374 await self ._stop_mqtt_client ()
7475 if self ._reconnect_task is not None and not self ._reconnect_task .done ():
7576 self ._reconnect_task .cancel ()
77+ if self ._connect_task is not None and not self ._connect_task .done ():
78+ self ._connect_task .cancel ()
7679 self ._reset ()
7780
7881 async def restart (self ):
79- await self .stop ()
80- await self .start ()
82+ try :
83+ await self .stop ()
84+ await self .start ()
85+ except Exception as e :
86+ self .logger .exception (e , True , f"Error when restarting mqtt feed: { e } " )
8187
8288 def _reset (self ):
89+ self ._connected_at_least_once = False
8390 self ._subscription_attempts = 0
8491 self ._subscription_topics = set ()
92+ self ._connect_task = None
93+ self ._valid_auth = True
8594
8695 async def _stop_mqtt_client (self ):
8796 if self .is_connected ():
@@ -90,6 +99,9 @@ async def _stop_mqtt_client(self):
9099 def is_connected (self ):
91100 return self ._mqtt_client is not None and self ._mqtt_client .is_connected
92101
102+ def can_connect (self ):
103+ return self ._valid_auth
104+
93105 async def register_feed_callback (self , channel_type , callback , identifier = None ):
94106 topic = self ._build_topic (channel_type , identifier )
95107 try :
@@ -98,57 +110,14 @@ async def register_feed_callback(self, channel_type, callback, identifier=None):
98110 self .feed_callbacks [topic ] = [callback ]
99111 if topic not in self ._subscription_topics :
100112 self ._subscription_topics .add (topic )
101- self ._subscribe ((topic , ))
102-
103- async def fetch_mqtt_device_uuid (self ):
104- if self ._fetching_uuid :
105- self .logger .info (f"Waiting for feed UUID fetching" )
106- await asyncio .wait_for (self ._fetched_uuid .wait (), self .DEVICE_CREATE_TIMEOUT + 2 )
107- else :
108- await self ._fetch_mqtt_device_uuid ()
113+ if self ._valid_auth :
114+ self ._subscribe ((topic , ))
115+ else :
116+ self .logger .error (f"Can't subscribe to { channel_type .name } feed, invalid authentication" )
109117
110118 def remove_device_details (self ):
111- self ._fetched_uuid .clear ()
112119 self ._device_uuid = None
113120
114- async def _fetch_mqtt_device_uuid (self ):
115- try :
116- self ._fetching_uuid = True
117- if device_uuid := self .authenticator .user_account .get_selected_device_uuid ():
118- self ._device_uuid = device_uuid
119- self .logger .debug ("Using fetched mqtt device id" )
120- else :
121- await self ._poll_mqtt_device_uuid ()
122- self .logger .debug ("Successfully waited for mqtt device id" )
123- except Exception as e :
124- self .logger .exception (e , True , f"Error when fetching device id: { e } " )
125- raise
126- finally :
127- self ._fetching_uuid = False
128- self ._fetched_uuid .set ()
129-
130- async def _poll_mqtt_device_uuid (self ):
131- t0 = time .time ()
132- while time .time () - t0 < self .DEVICE_CREATE_TIMEOUT :
133- # loop until the device uuid is available
134- # used when a new gql device is created its uuid is not instantly filled
135- try :
136- device_data = await self .authenticator .fetch_device (self .authenticator .user_account .gql_device_id )
137- if device_data is None :
138- raise errors .RequestError (f"Error when fetching mqtt device uuid: can't find content with id: "
139- f"{ self .authenticator .user_account .gql_device_id } " )
140- elif device_uuid := device_data ["uuid" ]:
141- self ._device_uuid = device_uuid
142- return
143- # retry soon
144- await asyncio .sleep (self .DEVICE_CREATION_REFRESH_DELAY )
145- # should never happen unless there is a real issue
146- except errors .RequestError :
147- raise
148- raise errors .RequestError (
149- f"Timeout when fetching mqtt device uuid: no uuid to be found after { self .DEVICE_CREATE_TIMEOUT } seconds"
150- )
151-
152121 @staticmethod
153122 def _build_topic (channel_type , identifier ):
154123 return f"{ channel_type .value } /{ identifier } "
@@ -180,6 +149,9 @@ def _should_process(self, parsed_message):
180149 return True
181150
182151 async def send (self , message , channel_type , identifier , ** kwargs ):
152+ if not self ._valid_auth :
153+ self .logger .warning (f"Can't send { channel_type .name } , invalid feed authentication." )
154+ return
183155 topic = self ._build_topic (channel_type , identifier )
184156 self .logger .debug (f"Sending message on topic: { topic } , message: { message } " )
185157 self ._mqtt_client .publish (
@@ -241,8 +213,12 @@ async def _reconnect(self, client):
241213 await asyncio .sleep (delay )
242214
243215 def _on_disconnect (self , client , packet , exc = None ):
244- self .logger .info (f"Disconnected, client_id: { client ._client_id } " )
245- self ._try_reconnect_if_necessary (client )
216+ if self ._connected_at_least_once :
217+ self .logger .info (f"Disconnected, client_id: { client ._client_id } " )
218+ self ._try_reconnect_if_necessary (client )
219+ else :
220+ if self ._connect_task is not None and not self ._connect_task .done ():
221+ self ._connect_task .cancel ()
246222
247223 def _on_subscribe (self , client , mid , qos , properties ):
248224 # from https://github.com/wialon/gmqtt/blob/master/examples/resubscription.py#L28
@@ -282,12 +258,25 @@ def _update_client_config(self, client):
282258 client .set_config (default_config )
283259
284260 async def _connect (self ):
261+ if self ._device_uuid is None :
262+ self ._valid_auth = False
263+ raise errors .DeviceError ("mqtt device uuid is None, impossible to connect client" )
285264 self ._mqtt_client = gmqtt .Client (self .__class__ .__name__ )
286265 self ._update_client_config (self ._mqtt_client )
287266 self ._register_callbacks (self ._mqtt_client )
288267 self ._mqtt_client .set_auth_credentials (self ._device_uuid , None )
289268 self .logger .debug (f"Connecting client" )
290- await self ._mqtt_client .connect (self .feed_url , self .mqtt_broker_port , version = self .MQTT_VERSION )
269+ self ._connect_task = asyncio .create_task (
270+ self ._mqtt_client .connect (self .feed_url , self .mqtt_broker_port , version = self .MQTT_VERSION )
271+ )
272+ try :
273+ await self ._connect_task
274+ self ._connected_at_least_once = True
275+ except asyncio .CancelledError :
276+ # got cancelled by on_disconnect, can't connect
277+ self .logger .error (f"Can't connect to server, please check your device uuid. "
278+ f"Current mqtt uuid is: { self ._device_uuid } " )
279+ self ._valid_auth = False
291280
292281 def _subscribe (self , topics ):
293282 if not topics :
0 commit comments