5454from utils .memory_utils import build_memory_config
5555from utils .thread_utils import submit
5656
57+ # Import monitoring utilities
58+ from utils .monitoring import monitoring_manager
59+
5760logger = logging .getLogger (__name__ )
5861
5962
@@ -125,7 +128,8 @@ async def _stream_agent_chunks(
125128 user_id = user_id ,
126129 )
127130 # Always unregister the run to release resources
128- agent_run_manager .unregister_agent_run (agent_request .conversation_id , user_id )
131+ agent_run_manager .unregister_agent_run (
132+ agent_request .conversation_id , user_id )
129133
130134 # Schedule memory addition in background to avoid blocking SSE termination
131135 async def _add_memory_background ():
@@ -150,8 +154,10 @@ async def _add_memory_background():
150154 return
151155
152156 mem_messages_local = [
153- {"role" : MESSAGE_ROLE ["USER" ], "content" : agent_run_info .query },
154- {"role" : MESSAGE_ROLE ["ASSISTANT" ], "content" : final_answer_local },
157+ {"role" : MESSAGE_ROLE ["USER" ],
158+ "content" : agent_run_info .query },
159+ {"role" : MESSAGE_ROLE ["ASSISTANT" ],
160+ "content" : final_answer_local },
155161 ]
156162
157163 add_result_local = await add_memory_in_levels (
@@ -661,7 +667,7 @@ async def prepare_agent_run(
661667 agent_request : AgentRequest ,
662668 user_id : str ,
663669 tenant_id : str ,
664- language : str = LANGUAGE ["ZH" ],
670+ language : str = LANGUAGE ["ZH" ],
665671 allow_memory_search : bool = True ,
666672):
667673 """
@@ -783,7 +789,8 @@ def _memory_token(message_text: str) -> str:
783789 ):
784790 yield data_chunk
785791 except Exception as run_exc :
786- logger .error (f"Agent run error after memory failure: { str (run_exc )} " )
792+ logger .error (
793+ f"Agent run error after memory failure: { str (run_exc )} " )
787794 # Emit an error chunk and terminate the stream immediately
788795 error_payload = json .dumps (
789796 {"type" : "error" , "content" : str (run_exc )}, ensure_ascii = False )
@@ -802,23 +809,28 @@ def _memory_token(message_text: str) -> str:
802809
803810
804811# Helper function for run_agent_stream, used when user memory is disabled (no memory tokens)
812+ @monitoring_manager .monitor_endpoint ("agent_service.generate_stream_no_memory" , exclude_params = ["authorization" ])
805813async def generate_stream_no_memory (
806814 agent_request : AgentRequest ,
807815 user_id : str ,
808816 tenant_id : str ,
809- language : str = LANGUAGE ["ZH" ],
817+ language : str = LANGUAGE ["ZH" ],
810818):
811819 """Stream agent responses without any memory preprocessing tokens or fallback logic."""
812820
813821 # Prepare run info respecting memory disabled (honor provided user_id/tenant_id)
822+ monitoring_manager .add_span_event ("generate_stream_no_memory.started" )
814823 agent_run_info , memory_context = await prepare_agent_run (
815824 agent_request = agent_request ,
816825 user_id = user_id ,
817826 tenant_id = tenant_id ,
818827 language = language ,
819828 allow_memory_search = False ,
820829 )
830+ monitoring_manager .add_span_event ("generate_stream_no_memory.completed" )
821831
832+ monitoring_manager .add_span_event (
833+ "generate_stream_no_memory.streaming.started" )
822834 async for data_chunk in _stream_agent_chunks (
823835 agent_request = agent_request ,
824836 user_id = user_id ,
@@ -827,8 +839,11 @@ async def generate_stream_no_memory(
827839 memory_ctx = memory_context ,
828840 ):
829841 yield data_chunk
842+ monitoring_manager .add_span_event (
843+ "generate_stream_no_memory.streaming.completed" )
830844
831845
846+ @monitoring_manager .monitor_endpoint ("agent_service.run_agent_stream" , exclude_params = ["authorization" ])
832847async def run_agent_stream (
833848 agent_request : AgentRequest ,
834849 http_request : Request ,
@@ -841,49 +856,162 @@ async def run_agent_stream(
841856 Start an agent run and stream responses.
842857 If user_id or tenant_id is provided, authorization will be overridden. (Useful in northbound apis)
843858 """
859+ import time
860+
861+ # Add initial span attributes for tracking
862+ monitoring_manager .set_span_attributes (
863+ agent_id = agent_request .agent_id ,
864+ conversation_id = agent_request .conversation_id ,
865+ is_debug = agent_request .is_debug ,
866+ skip_user_save = skip_user_save ,
867+ has_override_user_id = user_id is not None ,
868+ has_override_tenant_id = tenant_id is not None ,
869+ query_length = len (agent_request .query ) if agent_request .query else 0 ,
870+ history_count = len (
871+ agent_request .history ) if agent_request .history else 0 ,
872+ minio_files_count = len (
873+ agent_request .minio_files ) if agent_request .minio_files else 0
874+ )
875+
876+ # Step 1: Resolve user tenant language
877+ resolve_start_time = time .time ()
878+ monitoring_manager .add_span_event ("user_resolution.started" )
844879
845- # Choose streaming strategy based on user's memory switch
846880 resolved_user_id , resolved_tenant_id , language = _resolve_user_tenant_language (
847881 authorization = authorization ,
848882 http_request = http_request ,
849883 user_id = user_id ,
850884 tenant_id = tenant_id ,
851885 )
852-
853- # Save user message only if not in debug mode (before streaming starts)
886+
887+ resolve_duration = time .time () - resolve_start_time
888+ monitoring_manager .add_span_event ("user_resolution.completed" , {
889+ "duration" : resolve_duration ,
890+ "user_id" : resolved_user_id ,
891+ "tenant_id" : resolved_tenant_id ,
892+ "language" : language
893+ })
894+ monitoring_manager .set_span_attributes (
895+ resolved_user_id = resolved_user_id ,
896+ resolved_tenant_id = resolved_tenant_id ,
897+ language = language ,
898+ user_resolution_duration = resolve_duration
899+ )
900+
901+ # Step 2: Save user message (if needed)
854902 if not agent_request .is_debug and not skip_user_save :
903+ save_start_time = time .time ()
904+ monitoring_manager .add_span_event ("user_message_save.started" )
905+
855906 save_messages (
856907 agent_request ,
857908 target = MESSAGE_ROLE ["USER" ],
858909 user_id = resolved_user_id ,
859910 tenant_id = resolved_tenant_id ,
860911 )
861-
912+
913+ save_duration = time .time () - save_start_time
914+ monitoring_manager .add_span_event ("user_message_save.completed" , {
915+ "duration" : save_duration
916+ })
917+ monitoring_manager .set_span_attributes (
918+ user_message_saved = True ,
919+ user_message_save_duration = save_duration
920+ )
921+ else :
922+ monitoring_manager .add_span_event ("user_message_save.skipped" , {
923+ "reason" : "debug_mode" if agent_request .is_debug else "skip_user_save_flag"
924+ })
925+ monitoring_manager .set_span_attributes (user_message_saved = False )
926+
927+ # Step 3: Build memory context
928+ memory_start_time = time .time ()
929+ monitoring_manager .add_span_event ("memory_context_build.started" )
930+
862931 memory_ctx_preview = build_memory_context (
863932 resolved_user_id , resolved_tenant_id , agent_request .agent_id
864933 )
865934
866- if memory_ctx_preview .user_config .memory_switch and not agent_request .is_debug :
935+ memory_duration = time .time () - memory_start_time
936+ memory_enabled = memory_ctx_preview .user_config .memory_switch
937+ monitoring_manager .add_span_event ("memory_context_build.completed" , {
938+ "duration" : memory_duration ,
939+ "memory_enabled" : memory_enabled ,
940+ "agent_share_option" : getattr (memory_ctx_preview .user_config , "agent_share_option" , "unknown" )
941+ })
942+ monitoring_manager .set_span_attributes (
943+ memory_enabled = memory_enabled ,
944+ memory_context_build_duration = memory_duration ,
945+ agent_share_option = getattr (
946+ memory_ctx_preview .user_config , "agent_share_option" , "unknown" )
947+ )
948+
949+ # Step 4: Choose streaming strategy
950+ strategy_start_time = time .time ()
951+ use_memory_stream = memory_enabled and not agent_request .is_debug
952+
953+ monitoring_manager .add_span_event ("streaming_strategy.selected" , {
954+ "strategy" : "with_memory" if use_memory_stream else "no_memory" ,
955+ "memory_enabled" : memory_enabled ,
956+ "is_debug" : agent_request .is_debug
957+ })
958+
959+ if use_memory_stream :
960+ monitoring_manager .add_span_event (
961+ "stream_generator.memory_stream.creating" )
867962 stream_gen = generate_stream_with_memory (
868963 agent_request ,
869964 user_id = resolved_user_id ,
870965 tenant_id = resolved_tenant_id ,
871966 language = language ,
872967 )
873968 else :
969+ monitoring_manager .add_span_event (
970+ "stream_generator.no_memory_stream.creating" )
874971 stream_gen = generate_stream_no_memory (
875972 agent_request ,
876973 user_id = resolved_user_id ,
877974 tenant_id = resolved_tenant_id ,
878975 language = language ,
879976 )
880977
881- return StreamingResponse (
978+ strategy_duration = time .time () - strategy_start_time
979+ monitoring_manager .add_span_event ("streaming_strategy.completed" , {
980+ "duration" : strategy_duration ,
981+ "selected_strategy" : "with_memory" if use_memory_stream else "no_memory"
982+ })
983+ monitoring_manager .set_span_attributes (
984+ streaming_strategy = (
985+ "with_memory" if use_memory_stream else "no_memory" ),
986+ strategy_selection_duration = strategy_duration
987+ )
988+
989+ # Step 5: Create streaming response
990+ response_start_time = time .time ()
991+ monitoring_manager .add_span_event ("streaming_response.creating" )
992+
993+ response = StreamingResponse (
882994 stream_gen ,
883995 media_type = "text/event-stream" ,
884996 headers = {"Cache-Control" : "no-cache" , "Connection" : "keep-alive" },
885997 )
886998
999+ response_duration = time .time () - response_start_time
1000+ monitoring_manager .add_span_event ("streaming_response.created" , {
1001+ "duration" : response_duration ,
1002+ "media_type" : "text/event-stream"
1003+ })
1004+ monitoring_manager .set_span_attributes (
1005+ response_creation_duration = response_duration ,
1006+ total_preparation_duration = (time .time () - resolve_start_time )
1007+ )
1008+
1009+ monitoring_manager .add_span_event ("run_agent_stream.preparation_completed" , {
1010+ "total_preparation_time" : time .time () - resolve_start_time
1011+ })
1012+
1013+ return response
1014+
8871015
8881016def stop_agent_tasks (conversation_id : int , user_id : str ):
8891017 """
0 commit comments