2828db_url = "postgresql+psycopg://ai:ai@localhost:5532/ai"
2929
3030def get_agentic_rag_agent (
31- model_id : str = "gemini-2.0-flash-exp " ,
31+ model_id : str = "gemini-2.0-flash" ,
3232 user_id : Optional [str ] = None ,
3333 session_id : Optional [str ] = None ,
3434 debug_mode : bool = True ,
@@ -204,7 +204,7 @@ def _create_agent(self) -> Agent:
204204 self ._session_id = f"session_{ int (time .time ())} "
205205
206206 return get_agentic_rag_agent (
207- model_id = "gemini-2.0-flash-exp " ,
207+ model_id = "gemini-2.0-flash" ,
208208 session_id = self ._session_id ,
209209 user_id = None ,
210210 debug_mode = True
@@ -268,7 +268,7 @@ async def handle_upload(self, files: List[rx.UploadFile]):
268268
269269 @rx .event (background = True )
270270 async def process_question (self , form_data : dict ):
271- """Process a question using a fresh agent instance """
271+ """Process a question using streaming responses """
272272 if self .processing or not form_data .get ("question" ):
273273 return
274274
@@ -281,21 +281,52 @@ async def process_question(self, form_data: dict):
281281 await asyncio .sleep (0.1 )
282282
283283 try :
284- # Create fresh agent that will access the same vector store
285284 agent = self ._create_agent ()
286- response = await asyncio .to_thread (
287- agent .run ,
288- question
289- )
290- answer_content = response .content if response else "No response received"
285+ queue = asyncio .Queue ()
286+ loop = asyncio .get_running_loop ()
287+
288+ def run_stream ():
289+ """Run synchronous stream in a thread"""
290+ try :
291+ stream_response = agent .run (question , stream = True )
292+ for chunk in stream_response :
293+ if chunk .content :
294+ asyncio .run_coroutine_threadsafe (queue .put (chunk .content ), loop )
295+ asyncio .run_coroutine_threadsafe (queue .put (None ), loop )
296+ except Exception as e :
297+ error_msg = f"Error: { str (e )} "
298+ asyncio .run_coroutine_threadsafe (queue .put (error_msg ), loop )
299+
300+ # Start streaming in background thread
301+ loop .run_in_executor (None , run_stream )
302+
303+ answer_content = ""
304+ while True :
305+ chunk = await queue .get ()
306+ if chunk is None : # End of stream
307+ break
308+ if isinstance (chunk , str ) and chunk .startswith ("Error: " ):
309+ answer_content = chunk
310+ break
311+
312+ answer_content += chunk
313+ async with self :
314+ self .chats [self .current_chat ][- 1 ].answer = answer_content
315+ self .chats = self .chats # Trigger refresh
316+ yield
317+
291318 except Exception as e :
292319 answer_content = f"Error processing question: { str (e )} "
320+ async with self :
321+ self .chats [self .current_chat ][- 1 ].answer = answer_content
322+ self .chats = self .chats
323+ yield
293324
294- async with self :
295- self . chats [ self . current_chat ][ - 1 ]. answer = answer_content
296- self .chats = self . chats
297- self . processing = False
298- yield
325+ finally :
326+ async with self :
327+ self .processing = False
328+ yield
329+
299330
300331 def clear_knowledge_base (self ):
301332 """Clear knowledge base and reset state"""
0 commit comments