Skip to content

Commit 28e3766

Browse files
committed
✨ Add performance monitor module
1 parent 6b01e89 commit 28e3766

File tree

19 files changed

+1802
-44
lines changed

19 files changed

+1802
-44
lines changed

backend/apps/agent_app.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@
2222
)
2323
from utils.auth_utils import get_current_user_info, get_current_user_id
2424

25+
# Import monitoring utilities
26+
from utils.monitoring import monitoring_manager
27+
2528
router = APIRouter(prefix="/agent")
2629
logger = logging.getLogger("agent_app")
2730

2831

2932
# Define API route
3033
@router.post("/run")
34+
@monitoring_manager.monitor_endpoint("agent.run", exclude_params=["authorization"])
3135
async def agent_run_api(agent_request: AgentRequest, http_request: Request, authorization: str = Header(None)):
3236
"""
3337
Agent execution API endpoint

backend/apps/base_app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
from apps.voice_app import router as voice_router
2424
from consts.const import IS_SPEED_MODE
2525

26+
# Import monitoring utilities
27+
from utils.monitoring import monitoring_manager
28+
2629
# Create logger instance
2730
logger = logging.getLogger("base_app")
2831
app = FastAPI(root_path="/api")
@@ -61,6 +64,9 @@
6164
app.include_router(tenant_config_router)
6265
app.include_router(remote_mcp_router)
6366

67+
# Initialize monitoring for the application
68+
monitoring_manager.setup_fastapi_app(app)
69+
6470

6571
# Global exception handler for HTTP exceptions
6672
@app.exception_handler(HTTPException)

backend/consts/const.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,19 @@
241241

242242
# Deep Thinking Constants
243243
THINK_START_PATTERN = "<think>"
244-
THINK_END_PATTERN = "</think>"
244+
THINK_END_PATTERN = "</think>"
245+
246+
247+
# Telemetry and Monitoring Configuration
248+
ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false").lower() == "true"
249+
SERVICE_NAME = os.getenv("SERVICE_NAME", "nexent-backend")
250+
JAEGER_ENDPOINT = os.getenv(
251+
"JAEGER_ENDPOINT", "http://localhost:14268/api/traces")
252+
PROMETHEUS_PORT = int(os.getenv("PROMETHEUS_PORT", "8000"))
253+
TELEMETRY_SAMPLE_RATE = float(os.getenv("TELEMETRY_SAMPLE_RATE", "1.0"))
254+
255+
# Performance monitoring thresholds
256+
LLM_SLOW_REQUEST_THRESHOLD_SECONDS = float(
257+
os.getenv("LLM_SLOW_REQUEST_THRESHOLD_SECONDS", "5.0"))
258+
LLM_SLOW_TOKEN_RATE_THRESHOLD = float(
259+
os.getenv("LLM_SLOW_TOKEN_RATE_THRESHOLD", "10.0")) # tokens per second

backend/services/agent_service.py

Lines changed: 140 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@
5454
from utils.memory_utils import build_memory_config
5555
from utils.thread_utils import submit
5656

57+
# Import monitoring utilities
58+
from utils.monitoring import monitoring_manager
59+
5760
logger = logging.getLogger(__name__)
5861

5962

@@ -125,7 +128,8 @@ async def _stream_agent_chunks(
125128
user_id=user_id,
126129
)
127130
# Always unregister the run to release resources
128-
agent_run_manager.unregister_agent_run(agent_request.conversation_id, user_id)
131+
agent_run_manager.unregister_agent_run(
132+
agent_request.conversation_id, user_id)
129133

130134
# Schedule memory addition in background to avoid blocking SSE termination
131135
async def _add_memory_background():
@@ -150,8 +154,10 @@ async def _add_memory_background():
150154
return
151155

152156
mem_messages_local = [
153-
{"role": MESSAGE_ROLE["USER"], "content": agent_run_info.query},
154-
{"role": MESSAGE_ROLE["ASSISTANT"], "content": final_answer_local},
157+
{"role": MESSAGE_ROLE["USER"],
158+
"content": agent_run_info.query},
159+
{"role": MESSAGE_ROLE["ASSISTANT"],
160+
"content": final_answer_local},
155161
]
156162

157163
add_result_local = await add_memory_in_levels(
@@ -661,7 +667,7 @@ async def prepare_agent_run(
661667
agent_request: AgentRequest,
662668
user_id: str,
663669
tenant_id: str,
664-
language: str=LANGUAGE["ZH"],
670+
language: str = LANGUAGE["ZH"],
665671
allow_memory_search: bool = True,
666672
):
667673
"""
@@ -783,7 +789,8 @@ def _memory_token(message_text: str) -> str:
783789
):
784790
yield data_chunk
785791
except Exception as run_exc:
786-
logger.error(f"Agent run error after memory failure: {str(run_exc)}")
792+
logger.error(
793+
f"Agent run error after memory failure: {str(run_exc)}")
787794
# Emit an error chunk and terminate the stream immediately
788795
error_payload = json.dumps(
789796
{"type": "error", "content": str(run_exc)}, ensure_ascii=False)
@@ -802,23 +809,28 @@ def _memory_token(message_text: str) -> str:
802809

803810

804811
# Helper function for run_agent_stream, used when user memory is disabled (no memory tokens)
812+
@monitoring_manager.monitor_endpoint("agent_service.generate_stream_no_memory", exclude_params=["authorization"])
805813
async def generate_stream_no_memory(
806814
agent_request: AgentRequest,
807815
user_id: str,
808816
tenant_id: str,
809-
language: str=LANGUAGE["ZH"],
817+
language: str = LANGUAGE["ZH"],
810818
):
811819
"""Stream agent responses without any memory preprocessing tokens or fallback logic."""
812820

813821
# Prepare run info respecting memory disabled (honor provided user_id/tenant_id)
822+
monitoring_manager.add_span_event("generate_stream_no_memory.started")
814823
agent_run_info, memory_context = await prepare_agent_run(
815824
agent_request=agent_request,
816825
user_id=user_id,
817826
tenant_id=tenant_id,
818827
language=language,
819828
allow_memory_search=False,
820829
)
830+
monitoring_manager.add_span_event("generate_stream_no_memory.completed")
821831

832+
monitoring_manager.add_span_event(
833+
"generate_stream_no_memory.streaming.started")
822834
async for data_chunk in _stream_agent_chunks(
823835
agent_request=agent_request,
824836
user_id=user_id,
@@ -827,8 +839,11 @@ async def generate_stream_no_memory(
827839
memory_ctx=memory_context,
828840
):
829841
yield data_chunk
842+
monitoring_manager.add_span_event(
843+
"generate_stream_no_memory.streaming.completed")
830844

831845

846+
@monitoring_manager.monitor_endpoint("agent_service.run_agent_stream", exclude_params=["authorization"])
832847
async def run_agent_stream(
833848
agent_request: AgentRequest,
834849
http_request: Request,
@@ -841,49 +856,162 @@ async def run_agent_stream(
841856
Start an agent run and stream responses.
842857
If user_id or tenant_id is provided, authorization will be overridden. (Useful in northbound apis)
843858
"""
859+
import time
860+
861+
# Add initial span attributes for tracking
862+
monitoring_manager.set_span_attributes(
863+
agent_id=agent_request.agent_id,
864+
conversation_id=agent_request.conversation_id,
865+
is_debug=agent_request.is_debug,
866+
skip_user_save=skip_user_save,
867+
has_override_user_id=user_id is not None,
868+
has_override_tenant_id=tenant_id is not None,
869+
query_length=len(agent_request.query) if agent_request.query else 0,
870+
history_count=len(
871+
agent_request.history) if agent_request.history else 0,
872+
minio_files_count=len(
873+
agent_request.minio_files) if agent_request.minio_files else 0
874+
)
875+
876+
# Step 1: Resolve user tenant language
877+
resolve_start_time = time.time()
878+
monitoring_manager.add_span_event("user_resolution.started")
844879

845-
# Choose streaming strategy based on user's memory switch
846880
resolved_user_id, resolved_tenant_id, language = _resolve_user_tenant_language(
847881
authorization=authorization,
848882
http_request=http_request,
849883
user_id=user_id,
850884
tenant_id=tenant_id,
851885
)
852-
853-
# Save user message only if not in debug mode (before streaming starts)
886+
887+
resolve_duration = time.time() - resolve_start_time
888+
monitoring_manager.add_span_event("user_resolution.completed", {
889+
"duration": resolve_duration,
890+
"user_id": resolved_user_id,
891+
"tenant_id": resolved_tenant_id,
892+
"language": language
893+
})
894+
monitoring_manager.set_span_attributes(
895+
resolved_user_id=resolved_user_id,
896+
resolved_tenant_id=resolved_tenant_id,
897+
language=language,
898+
user_resolution_duration=resolve_duration
899+
)
900+
901+
# Step 2: Save user message (if needed)
854902
if not agent_request.is_debug and not skip_user_save:
903+
save_start_time = time.time()
904+
monitoring_manager.add_span_event("user_message_save.started")
905+
855906
save_messages(
856907
agent_request,
857908
target=MESSAGE_ROLE["USER"],
858909
user_id=resolved_user_id,
859910
tenant_id=resolved_tenant_id,
860911
)
861-
912+
913+
save_duration = time.time() - save_start_time
914+
monitoring_manager.add_span_event("user_message_save.completed", {
915+
"duration": save_duration
916+
})
917+
monitoring_manager.set_span_attributes(
918+
user_message_saved=True,
919+
user_message_save_duration=save_duration
920+
)
921+
else:
922+
monitoring_manager.add_span_event("user_message_save.skipped", {
923+
"reason": "debug_mode" if agent_request.is_debug else "skip_user_save_flag"
924+
})
925+
monitoring_manager.set_span_attributes(user_message_saved=False)
926+
927+
# Step 3: Build memory context
928+
memory_start_time = time.time()
929+
monitoring_manager.add_span_event("memory_context_build.started")
930+
862931
memory_ctx_preview = build_memory_context(
863932
resolved_user_id, resolved_tenant_id, agent_request.agent_id
864933
)
865934

866-
if memory_ctx_preview.user_config.memory_switch and not agent_request.is_debug:
935+
memory_duration = time.time() - memory_start_time
936+
memory_enabled = memory_ctx_preview.user_config.memory_switch
937+
monitoring_manager.add_span_event("memory_context_build.completed", {
938+
"duration": memory_duration,
939+
"memory_enabled": memory_enabled,
940+
"agent_share_option": getattr(memory_ctx_preview.user_config, "agent_share_option", "unknown")
941+
})
942+
monitoring_manager.set_span_attributes(
943+
memory_enabled=memory_enabled,
944+
memory_context_build_duration=memory_duration,
945+
agent_share_option=getattr(
946+
memory_ctx_preview.user_config, "agent_share_option", "unknown")
947+
)
948+
949+
# Step 4: Choose streaming strategy
950+
strategy_start_time = time.time()
951+
use_memory_stream = memory_enabled and not agent_request.is_debug
952+
953+
monitoring_manager.add_span_event("streaming_strategy.selected", {
954+
"strategy": "with_memory" if use_memory_stream else "no_memory",
955+
"memory_enabled": memory_enabled,
956+
"is_debug": agent_request.is_debug
957+
})
958+
959+
if use_memory_stream:
960+
monitoring_manager.add_span_event(
961+
"stream_generator.memory_stream.creating")
867962
stream_gen = generate_stream_with_memory(
868963
agent_request,
869964
user_id=resolved_user_id,
870965
tenant_id=resolved_tenant_id,
871966
language=language,
872967
)
873968
else:
969+
monitoring_manager.add_span_event(
970+
"stream_generator.no_memory_stream.creating")
874971
stream_gen = generate_stream_no_memory(
875972
agent_request,
876973
user_id=resolved_user_id,
877974
tenant_id=resolved_tenant_id,
878975
language=language,
879976
)
880977

881-
return StreamingResponse(
978+
strategy_duration = time.time() - strategy_start_time
979+
monitoring_manager.add_span_event("streaming_strategy.completed", {
980+
"duration": strategy_duration,
981+
"selected_strategy": "with_memory" if use_memory_stream else "no_memory"
982+
})
983+
monitoring_manager.set_span_attributes(
984+
streaming_strategy=(
985+
"with_memory" if use_memory_stream else "no_memory"),
986+
strategy_selection_duration=strategy_duration
987+
)
988+
989+
# Step 5: Create streaming response
990+
response_start_time = time.time()
991+
monitoring_manager.add_span_event("streaming_response.creating")
992+
993+
response = StreamingResponse(
882994
stream_gen,
883995
media_type="text/event-stream",
884996
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
885997
)
886998

999+
response_duration = time.time() - response_start_time
1000+
monitoring_manager.add_span_event("streaming_response.created", {
1001+
"duration": response_duration,
1002+
"media_type": "text/event-stream"
1003+
})
1004+
monitoring_manager.set_span_attributes(
1005+
response_creation_duration=response_duration,
1006+
total_preparation_duration=(time.time() - resolve_start_time)
1007+
)
1008+
1009+
monitoring_manager.add_span_event("run_agent_stream.preparation_completed", {
1010+
"total_preparation_time": time.time() - resolve_start_time
1011+
})
1012+
1013+
return response
1014+
8871015

8881016
def stop_agent_tasks(conversation_id: int, user_id: str):
8891017
"""

0 commit comments

Comments
 (0)