1616import json
1717import logging
1818import os
19+ import time
1920import uuid
2021from typing import Any , Dict , List , Optional , Tuple
2122
@@ -230,6 +231,43 @@ async def register_message_queue(self, language, chat_id, message_id, message_qu
230231 )
231232 return trace_id
232233
234+ async def _stream_message_content (
235+ self , message : Dict [str , Any ], websocket : WebSocket , chunk_size : int = 5 , delay : float = 0.01
236+ ) -> None :
237+ """
238+ Stream message content in small chunks to simulate typing effect.
239+
240+ Args:
241+ message: The message dict with type="message"
242+ websocket: WebSocket connection to send chunks
243+ chunk_size: Number of characters per chunk
244+ delay: Delay in seconds between chunks
245+ """
246+ content = message .get ("data" , "" )
247+ if not content :
248+ # If no content, send the original message
249+ await websocket .send_text (json .dumps (message ))
250+ return
251+
252+ # Split content into chunks
253+ chunks = [content [i : i + chunk_size ] for i in range (0 , len (content ), chunk_size )]
254+
255+ for i , chunk in enumerate (chunks ):
256+ # Create a chunk message with same structure but partial content
257+ chunk_message = {
258+ "type" : "message" ,
259+ "id" : message .get ("id" ),
260+ "data" : chunk ,
261+ "timestamp" : message .get ("timestamp" , int (time .time ())),
262+ }
263+
264+ await websocket .send_text (json .dumps (chunk_message ))
265+ logger .debug (f"Sent message chunk { i + 1 } /{ len (chunks )} : { len (chunk )} chars" )
266+
267+ # Add delay between chunks (except for the last one)
268+ if i < len (chunks ) - 1 :
269+ await asyncio .sleep (delay )
270+
233271 async def _consume_messages_from_queue (
234272 self , chat_id : str , message_id : str , trace_id : str , message_queue : AgentMessageQueue , websocket : WebSocket
235273 ) -> List [AgentToolCallResultResponse ]:
@@ -256,9 +294,14 @@ async def _consume_messages_from_queue(
256294 if isinstance (message , dict ) and message .get ("type" ) == "tool_call_result" :
257295 tool_call_results .append (message )
258296
259- # Send message to WebSocket (preserve original functionality)
260- await websocket .send_text (json .dumps (message ))
261- logger .debug (f"Sent message to WebSocket: { message .get ('type' , 'unknown' )} " )
297+ # Special handling for type="message" - stream it in chunks
298+ if isinstance (message , dict ) and message .get ("type" ) == "message" :
299+ await self ._stream_message_content (message , websocket )
300+ logger .debug (f"Streamed message content: { message .get ('type' , 'unknown' )} " )
301+ else :
302+ # Send other message types normally (start, stop, tool_call_result, etc.)
303+ await websocket .send_text (json .dumps (message ))
304+ logger .debug (f"Sent message to WebSocket: { message .get ('type' , 'unknown' )} " )
262305
263306 return tool_call_results
264307
0 commit comments