@@ -162,18 +162,33 @@ async def _process_message_queue(self) -> None:
162162 except asyncio .QueueShutDown :
163163 break
164164
165- # Handle Awareness messages
165+ # Determine message type & subtype from header
166166 message_type = message [0 ]
167- if message_type == YMessageType .AWARENESS :
167+ sync_message_subtype = "*"
168+ if message_type == YMessageType .SYNC and len (message ) >= 2 :
169+ sync_message_subtype = message [1 ]
170+
171+ # Determine if message is invalid
172+ invalid_message_type = message_type not in YMessageType
173+ invalid_sync_message_type = message_type == YMessageType .SYNC and sync_message_subtype not in YSyncMessageSubtype
174+ invalid_message = invalid_message_type or invalid_sync_message_type
175+
176+ # Handle invalid messages by logging a warning and ignoring
177+ if invalid_message :
178+ self .log .warning (
179+ "Ignoring an unrecognized message with header "
180+ f"'{ message_type } ,{ sync_message_subtype } ' from client "
181+ "'{client_id}'. Messages must have one of the following "
182+ "headers: '0,0' (SyncStep1), '0,1' (SyncStep2), "
183+ "'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)."
184+ )
185+ # Handle Awareness messages
186+ elif message_type == YMessageType .AWARENESS :
168187 self .log .debug (f"Received AwarenessUpdate from '{ client_id } '." )
169188 self .handle_awareness_update (client_id , message )
170189 self .log .debug (f"Handled AwarenessUpdate from '{ client_id } '." )
171-
172190 # Handle Sync messages
173- sync_message_subtype = None
174- if message_type == YMessageType .SYNC and len (message ) >= 2 :
175- sync_message_subtype = message [1 ]
176- if sync_message_subtype == YSyncMessageSubtype .SYNC_STEP1 :
191+ elif sync_message_subtype == YSyncMessageSubtype .SYNC_STEP1 :
177192 self .log .info (f"Received SS1 from '{ client_id } '." )
178193 self .handle_sync_step1 (client_id , message )
179194 self .log .info (f"Handled SS1 from '{ client_id } '." )
@@ -185,20 +200,17 @@ async def _process_message_queue(self) -> None:
185200 self .log .info (f"Received SyncUpdate from '{ client_id } '." )
186201 self .handle_sync_update (client_id , message )
187202 self .log .info (f"Handled SyncUpdate from '{ client_id } '." )
188- else :
189- self .log .warning (
190- "Ignoring an unrecognized message with header "
191- f"'{ message_type } ,{ sync_message_subtype } ' from client "
192- "'{client_id}'. Messages must have one of the following "
193- "headers: '0,0' (SyncStep1), '0,1' (SyncStep2), "
194- "'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)."
195- )
196203
197204 # Finally, inform the asyncio Queue that the task was complete
198205 # This is required for `self._message_queue.join()` to unblock once
199206 # queue is empty in `self.stop()`.
200207 self ._message_queue .task_done ()
201208
209+ self .log .info (
210+ "Stopped `self._process_message_queue()` background task "
211+ f"for YRoom '{ self .room_id } '."
212+ )
213+
202214
203215 def handle_sync_step1 (self , client_id : str , message : bytes ) -> None :
204216 """
0 commit comments