88from fastapi import Header , Request
99from fastapi .responses import JSONResponse , StreamingResponse
1010from nexent .core .agents .run_agent import agent_run
11- from nexent .memory .memory_service import clear_memory
11+ from nexent .memory .memory_service import clear_memory , add_memory_in_levels
1212
1313from agents .agent_run_manager import agent_run_manager
1414from agents .create_agent_info import create_agent_run_info , create_tool_config_list
1515from agents .preprocess_manager import preprocess_manager
16- from consts .exceptions import AgentRunException , MemoryPreparationException
16+ from consts .exceptions import MemoryPreparationException
1717from consts .model import (
1818 AgentInfoRequest ,
1919 AgentRequest ,
@@ -79,7 +79,8 @@ def _resolve_user_tenant_language(
7979
8080async def _stream_agent_chunks (
8181 agent_request : "AgentRequest" ,
82- authorization : str ,
82+ user_id : str ,
83+ tenant_id : str ,
8384 agent_run_info ,
8485 memory_ctx ,
8586):
@@ -91,25 +92,87 @@ async def _stream_agent_chunks(
9192 """
9293
9394 local_messages = []
95+ captured_final_answer = None
9496 try :
95- async for chunk in agent_run (agent_run_info , memory_ctx ):
97+ async for chunk in agent_run (agent_run_info ):
9698 local_messages .append (chunk )
99+ # Try to capture the final answer as it streams by in order to start memory addition
100+ try :
101+ data = json .loads (chunk )
102+ if data .get ("type" ) == "final_answer" :
103+ captured_final_answer = data .get ("content" )
104+ except Exception :
105+ pass
97106 yield f"data: { chunk } \n \n "
98107 except Exception as run_exc :
99108 logger .error (f"Agent run error: { str (run_exc )} " )
100- raise AgentRunException (f"Agent run error: { str (run_exc )} " )
109+ # Emit an error chunk and terminate the stream immediately
110+ try :
111+ error_payload = json .dumps (
112+ {"type" : "error" , "content" : str (run_exc )}, ensure_ascii = False )
113+ yield f"data: { error_payload } \n \n "
114+ finally :
115+ return
101116 finally :
102117 # Persist assistant messages for non-debug runs
103118 if not agent_request .is_debug :
104119 save_messages (
105120 agent_request ,
106121 target = "assistant" ,
107122 messages = local_messages ,
108- authorization = authorization ,
123+ tenant_id = tenant_id ,
124+ user_id = user_id ,
109125 )
110126 # Always unregister the run to release resources
111127 agent_run_manager .unregister_agent_run (agent_request .conversation_id )
112128
129+ # Schedule memory addition in background to avoid blocking SSE termination
130+ async def _add_memory_background ():
131+ try :
132+ # Skip if memory recording is disabled
133+ if not getattr (memory_ctx .user_config , "memory_switch" , False ):
134+ return
135+ # Use the captured final answer during streaming; observer queue was drained
136+ final_answer_local = captured_final_answer
137+ if not final_answer_local :
138+ return
139+
140+ # Determine allowed memory levels
141+ levels_local = {"agent" , "user_agent" }
142+ if memory_ctx .user_config .agent_share_option == "never" :
143+ levels_local .discard ("agent" )
144+ if memory_ctx .agent_id in getattr (memory_ctx .user_config , "disable_agent_ids" , []):
145+ levels_local .discard ("agent" )
146+ if memory_ctx .agent_id in getattr (memory_ctx .user_config , "disable_user_agent_ids" , []):
147+ levels_local .discard ("user_agent" )
148+ if not levels_local :
149+ return
150+
151+ mem_messages_local = [
152+ {"role" : "user" , "content" : agent_run_info .query },
153+ {"role" : "assistant" , "content" : final_answer_local },
154+ ]
155+
156+ add_result_local = await add_memory_in_levels (
157+ messages = mem_messages_local ,
158+ memory_config = memory_ctx .memory_config ,
159+ tenant_id = memory_ctx .tenant_id ,
160+ user_id = memory_ctx .user_id ,
161+ agent_id = memory_ctx .agent_id ,
162+ memory_levels = list (levels_local ),
163+ )
164+ items_local = add_result_local .get ("results" , [])
165+ logger .info (f"Memory addition completed: { items_local } " )
166+ except Exception as bg_e :
167+ logger .error (
168+ f"Unexpected error during background memory addition: { bg_e } " )
169+
170+ try :
171+ asyncio .create_task (_add_memory_background ())
172+ except Exception as schedule_err :
173+ logger .error (
174+ f"Failed to schedule background memory addition: { schedule_err } " )
175+
113176
114177def get_enable_tool_id_by_agent_id (agent_id : int , tenant_id : str ):
115178 all_tool_instance = query_all_enabled_tool_instances (
@@ -592,21 +655,14 @@ def insert_related_agent_impl(parent_agent_id, child_agent_id, tenant_id):
592655# Helper function for run_agent_stream, used to prepare context for an agent run
593656async def prepare_agent_run (
594657 agent_request : AgentRequest ,
595- http_request : Request ,
596- authorization : str ,
597- user_id : str = None ,
598- tenant_id : str = None ,
658+ user_id : str ,
659+ tenant_id : str ,
660+ language : str = "zh" ,
599661 allow_memory_search : bool = True ,
600662):
601663 """
602664 Prepare for an agent run by creating context and run info, and registering the run.
603665 """
604- user_id , tenant_id , language = _resolve_user_tenant_language (
605- authorization = authorization ,
606- http_request = http_request ,
607- user_id = user_id ,
608- tenant_id = tenant_id ,
609- )
610666
611667 memory_context = build_memory_context (
612668 user_id , tenant_id , agent_request .agent_id )
@@ -615,7 +671,8 @@ async def prepare_agent_run(
615671 minio_files = agent_request .minio_files ,
616672 query = agent_request .query ,
617673 history = agent_request .history ,
618- authorization = authorization ,
674+ tenant_id = tenant_id ,
675+ user_id = user_id ,
619676 language = language ,
620677 allow_memory_search = allow_memory_search ,
621678 )
@@ -625,26 +682,25 @@ async def prepare_agent_run(
625682
626683
627684# Helper function for run_agent_stream, used to save messages for either user or assistant
628- def save_messages (agent_request , target : str , messages = None , authorization = None ):
685+ def save_messages (agent_request , target : str , user_id : str , tenant_id : str , messages = None ):
629686 if target == "user" :
630687 if messages is not None :
631688 raise ValueError ("Messages should be None when saving for user." )
632- submit (save_conversation_user , agent_request , authorization )
689+ submit (save_conversation_user , agent_request , user_id , tenant_id )
633690 elif target == "assistant" :
634691 if messages is None :
635692 raise ValueError (
636693 "Messages cannot be None when saving for assistant." )
637694 submit (save_conversation_assistant ,
638- agent_request , messages , authorization )
695+ agent_request , messages , user_id , tenant_id )
639696
640697
641698# Helper function for run_agent_stream, used to generate stream response with memory preprocess tokens
642699async def generate_stream_with_memory (
643700 agent_request : AgentRequest ,
644- http_request : Request ,
645- authorization : str ,
646- user_id : str = None ,
647- tenant_id : str = None ,
701+ user_id : str ,
702+ tenant_id : str ,
703+ language : str = "zh" ,
648704):
649705 # Prepare preprocess task tracking (simulate preprocess flow)
650706 task_id = str (uuid .uuid4 ())
@@ -674,15 +730,8 @@ def _memory_token(message_text: str) -> str:
674730
675731 memory_enabled = False
676732 try :
677- # Decide whether to emit memory tokens based on user's memory switch
678- user_id_preview , tenant_id_preview , _ = _resolve_user_tenant_language (
679- authorization = authorization ,
680- http_request = http_request ,
681- user_id = user_id ,
682- tenant_id = tenant_id ,
683- )
684733 memory_context_preview = build_memory_context (
685- user_id_preview , tenant_id_preview , agent_request .agent_id
734+ user_id , tenant_id , agent_request .agent_id
686735 )
687736 memory_enabled = bool (memory_context_preview .user_config .memory_switch )
688737
@@ -694,10 +743,9 @@ def _memory_token(message_text: str) -> str:
694743 try :
695744 agent_run_info , memory_context = await prepare_agent_run (
696745 agent_request = agent_request ,
697- http_request = http_request ,
698- authorization = authorization ,
699746 user_id = user_id ,
700747 tenant_id = tenant_id ,
748+ language = language ,
701749 allow_memory_search = True ,
702750 )
703751 except Exception as prep_err :
@@ -709,7 +757,11 @@ def _memory_token(message_text: str) -> str:
709757 yield f"data: { _memory_token (msg_done )} \n \n "
710758
711759 async for data_chunk in _stream_agent_chunks (
712- agent_request , authorization , agent_run_info , memory_context
760+ agent_request = agent_request ,
761+ user_id = user_id ,
762+ tenant_id = tenant_id ,
763+ agent_run_info = agent_run_info ,
764+ memory_ctx = memory_context ,
713765 ):
714766 yield data_chunk
715767
@@ -722,18 +774,24 @@ def _memory_token(message_text: str) -> str:
722774 # Fallback to the no-memory streaming path, which internally handles
723775 async for data_chunk in generate_stream_no_memory (
724776 agent_request ,
725- http_request ,
726- authorization ,
727777 user_id = user_id ,
728778 tenant_id = tenant_id ,
729779 ):
730780 yield data_chunk
731781 except Exception as run_exc :
732782 logger .error (f"Agent run error after memory failure: { str (run_exc )} " )
733- raise AgentRunException (f"Agent run error: { str (run_exc )} " )
783+ # Emit an error chunk and terminate the stream immediately
784+ error_payload = json .dumps (
785+ {"type" : "error" , "content" : str (run_exc )}, ensure_ascii = False )
786+ yield f"data: { error_payload } \n \n "
787+ return
734788 except Exception as e :
735789 logger .error (f"Generate stream with memory error: { str (e )} " )
736- raise AgentRunException (f"Generate stream with memory error: { str (e )} " )
790+ # Emit an error chunk and terminate the stream immediately
791+ error_payload = json .dumps (
792+ {"type" : "error" , "content" : str (e )}, ensure_ascii = False )
793+ yield f"data: { error_payload } \n \n "
794+ return
737795 finally :
738796 # Always unregister preprocess task
739797 preprocess_manager .unregister_preprocess_task (task_id )
@@ -742,25 +800,27 @@ def _memory_token(message_text: str) -> str:
742800# Helper function for run_agent_stream, used when user memory is disabled (no memory tokens)
743801async def generate_stream_no_memory (
744802 agent_request : AgentRequest ,
745- http_request : Request ,
746- authorization : str ,
747- user_id : str = None ,
748- tenant_id : str = None ,
803+ user_id : str ,
804+ tenant_id : str ,
805+ language : str = "zh" ,
749806):
750807 """Stream agent responses without any memory preprocessing tokens or fallback logic."""
751808
752809 # Prepare run info respecting memory disabled (honor provided user_id/tenant_id)
753810 agent_run_info , memory_context = await prepare_agent_run (
754811 agent_request = agent_request ,
755- http_request = http_request ,
756- authorization = authorization ,
757812 user_id = user_id ,
758813 tenant_id = tenant_id ,
814+ language = language ,
759815 allow_memory_search = False ,
760816 )
761817
762818 async for data_chunk in _stream_agent_chunks (
763- agent_request , authorization , agent_run_info , memory_context
819+ agent_request = agent_request ,
820+ user_id = user_id ,
821+ tenant_id = tenant_id ,
822+ agent_run_info = agent_run_info ,
823+ memory_ctx = memory_context ,
764824 ):
765825 yield data_chunk
766826
@@ -776,37 +836,38 @@ async def run_agent_stream(
776836 Start an agent run and stream responses.
777837 If user_id or tenant_id is provided, authorization will be overridden. (Useful in northbound apis)
778838 """
779- # Save user message only if not in debug mode (before streaming starts)
780- if not agent_request .is_debug :
781- save_messages (agent_request , target = "user" ,
782- authorization = authorization )
783839
784840 # Choose streaming strategy based on user's memory switch
785- resolved_user_id , resolved_tenant_id , _ = _resolve_user_tenant_language (
841+ resolved_user_id , resolved_tenant_id , language = _resolve_user_tenant_language (
786842 authorization = authorization ,
787843 http_request = http_request ,
788844 user_id = user_id ,
789845 tenant_id = tenant_id ,
790846 )
847+
848+ # Save user message only if not in debug mode (before streaming starts)
849+ if not agent_request .is_debug :
850+ save_messages (agent_request , target = "user" ,
851+ user_id = resolved_user_id ,
852+ tenant_id = resolved_tenant_id )
853+
791854 memory_ctx_preview = build_memory_context (
792855 resolved_user_id , resolved_tenant_id , agent_request .agent_id
793856 )
794857
795- if memory_ctx_preview .user_config .memory_switch :
858+ if memory_ctx_preview .user_config .memory_switch and not agent_request . is_debug :
796859 stream_gen = generate_stream_with_memory (
797860 agent_request ,
798- http_request ,
799- authorization ,
800861 user_id = resolved_user_id ,
801862 tenant_id = resolved_tenant_id ,
863+ language = language ,
802864 )
803865 else :
804866 stream_gen = generate_stream_no_memory (
805867 agent_request ,
806- http_request ,
807- authorization ,
808868 user_id = resolved_user_id ,
809869 tenant_id = resolved_tenant_id ,
870+ language = language ,
810871 )
811872
812873 return StreamingResponse (
0 commit comments