@@ -168,35 +168,36 @@ async def _process_message_queue(self) -> None:
168168 self .log .debug (f"Received AwarenessUpdate from '{ client_id } '." )
169169 self .handle_awareness_update (client_id , message )
170170 self .log .debug (f"Handled AwarenessUpdate from '{ client_id } '." )
171- continue
172171
173172 # Handle Sync messages
174- assert message_type == YMessageType .SYNC
175- message_subtype = message [1 ] if len (message ) >= 2 else None
176- if message_subtype == YSyncMessageSubtype .SYNC_STEP1 :
173+ sync_message_subtype = None
174+ if message_type == YMessageType .SYNC and len (message ) >= 2 :
175+ sync_message_subtype = message [1 ]
176+ if sync_message_subtype == YSyncMessageSubtype .SYNC_STEP1 :
177177 self .log .info (f"Received SS1 from '{ client_id } '." )
178178 self .handle_sync_step1 (client_id , message )
179179 self .log .info (f"Handled SS1 from '{ client_id } '." )
180- continue
181- elif message_subtype == YSyncMessageSubtype .SYNC_STEP2 :
180+ elif sync_message_subtype == YSyncMessageSubtype .SYNC_STEP2 :
182181 self .log .info (f"Received SS2 from '{ client_id } '." )
183182 self .handle_sync_step2 (client_id , message )
184183 self .log .info (f"Handled SS2 from '{ client_id } '." )
185- continue
186- elif message_subtype == YSyncMessageSubtype .SYNC_UPDATE :
184+ elif sync_message_subtype == YSyncMessageSubtype .SYNC_UPDATE :
187185 self .log .info (f"Received SyncUpdate from '{ client_id } '." )
188186 self .handle_sync_update (client_id , message )
189187 self .log .info (f"Handled SyncUpdate from '{ client_id } '." )
190- continue
191188 else :
192189 self .log .warning (
193190 "Ignoring an unrecognized message with header "
194- f"'{ message_type } ,{ message_subtype } ' from client "
191+ f"'{ message_type } ,{ sync_message_subtype } ' from client "
195192 "'{client_id}'. Messages must have one of the following "
196193 "headers: '0,0' (SyncStep1), '0,1' (SyncStep2), "
197194 "'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)."
198195 )
199- continue
196+
197+ # Finally, inform the asyncio Queue that the task was complete
198+ # This is required for `self._message_queue.join()` to unblock once
199+ # queue is empty in `self.stop()`.
200+ self ._message_queue .task_done ()
200201
201202
202203 def handle_sync_step1 (self , client_id : str , message : bytes ) -> None :
0 commit comments