Skip to content

Commit bc1c94f

Browse files
committed
Merge remote-tracking branch 'origin/develop' into ljy/dev_0923_frontend
# Conflicts: # frontend/app/[locale]/setup/agents/components/agent/SubAgentPool.tsx # frontend/app/[locale]/setup/agents/components/tool/ToolPool.tsx
2 parents 1e45691 + 07de4fd commit bc1c94f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+4913
-571
lines changed

backend/apps/agent_app.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@
2222
)
2323
from utils.auth_utils import get_current_user_info, get_current_user_id
2424

25+
# Import monitoring utilities
26+
from utils.monitoring import monitoring_manager
27+
2528
router = APIRouter(prefix="/agent")
2629
logger = logging.getLogger("agent_app")
2730

2831

2932
# Define API route
3033
@router.post("/run")
34+
@monitoring_manager.monitor_endpoint("agent.run", exclude_params=["authorization"])
3135
async def agent_run_api(agent_request: AgentRequest, http_request: Request, authorization: str = Header(None)):
3236
"""
3337
Agent execution API endpoint

backend/apps/base_app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
from apps.voice_app import router as voice_router
2424
from consts.const import IS_SPEED_MODE
2525

26+
# Import monitoring utilities
27+
from utils.monitoring import monitoring_manager
28+
2629
# Create logger instance
2730
logger = logging.getLogger("base_app")
2831
app = FastAPI(root_path="/api")
@@ -61,6 +64,9 @@
6164
app.include_router(tenant_config_router)
6265
app.include_router(remote_mcp_router)
6366

67+
# Initialize monitoring for the application
68+
monitoring_manager.setup_fastapi_app(app)
69+
6470

6571
# Global exception handler for HTTP exceptions
6672
@app.exception_handler(HTTPException)

backend/apps/me_model_managment_app.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
from fastapi.responses import JSONResponse
66

77
from consts.exceptions import TimeoutException, NotFoundException, MEConnectionException
8-
from consts.model import ModelConnectStatusEnum
9-
from services.me_model_management_service import get_me_models_impl
8+
from services.me_model_management_service import get_me_models_impl, check_me_variable_set
109
from services.model_health_service import check_me_connectivity_impl
1110

1211
router = APIRouter(prefix="/me")
@@ -23,6 +22,15 @@ async def get_me_models(
2322
Get list of models from model engine API
2423
"""
2524
try:
25+
# Pre-check ME environment variables; return empty list if not configured
26+
if not await check_me_variable_set():
27+
return JSONResponse(
28+
status_code=HTTPStatus.OK,
29+
content={
30+
"message": "Retrieve skipped",
31+
"data": []
32+
}
33+
)
2634
filtered_result = await get_me_models_impl(timeout=timeout, type=type)
2735
return JSONResponse(
2836
status_code=HTTPStatus.OK,
@@ -48,12 +56,21 @@ async def check_me_connectivity(timeout: int = Query(default=2, description="Tim
4856
Health check from model engine API
4957
"""
5058
try:
59+
# Pre-check ME environment variables; return not connected if not configured
60+
if not await check_me_variable_set():
61+
return JSONResponse(
62+
status_code=HTTPStatus.OK,
63+
content={
64+
"connectivity": False,
65+
"message": "ModelEngine platform necessary environment variables not configured. Healthcheck skipped.",
66+
}
67+
)
5168
await check_me_connectivity_impl(timeout)
5269
return JSONResponse(
5370
status_code=HTTPStatus.OK,
5471
content={
5572
"connectivity": True,
56-
"message": "ModelEngine model connect successfully.",
73+
"message": "ModelEngine platform connect successfully.",
5774
}
5875
)
5976
except MEConnectionException as e:

backend/consts/const.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,19 @@
241241

242242
# Deep Thinking Constants
243243
THINK_START_PATTERN = "<think>"
244-
THINK_END_PATTERN = "</think>"
244+
THINK_END_PATTERN = "</think>"
245+
246+
247+
# Telemetry and Monitoring Configuration
248+
ENABLE_TELEMETRY = os.getenv("ENABLE_TELEMETRY", "false").lower() == "true"
249+
SERVICE_NAME = os.getenv("SERVICE_NAME", "nexent-backend")
250+
JAEGER_ENDPOINT = os.getenv(
251+
"JAEGER_ENDPOINT", "http://localhost:14268/api/traces")
252+
PROMETHEUS_PORT = int(os.getenv("PROMETHEUS_PORT", "8000"))
253+
TELEMETRY_SAMPLE_RATE = float(os.getenv("TELEMETRY_SAMPLE_RATE", "1.0"))
254+
255+
# Performance monitoring thresholds
256+
LLM_SLOW_REQUEST_THRESHOLD_SECONDS = float(
257+
os.getenv("LLM_SLOW_REQUEST_THRESHOLD_SECONDS", "5.0"))
258+
LLM_SLOW_TOKEN_RATE_THRESHOLD = float(
259+
os.getenv("LLM_SLOW_TOKEN_RATE_THRESHOLD", "10.0")) # tokens per second

backend/services/agent_service.py

Lines changed: 140 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@
5454
from utils.memory_utils import build_memory_config
5555
from utils.thread_utils import submit
5656

57+
# Import monitoring utilities
58+
from utils.monitoring import monitoring_manager
59+
5760
logger = logging.getLogger(__name__)
5861

5962

@@ -125,7 +128,8 @@ async def _stream_agent_chunks(
125128
user_id=user_id,
126129
)
127130
# Always unregister the run to release resources
128-
agent_run_manager.unregister_agent_run(agent_request.conversation_id, user_id)
131+
agent_run_manager.unregister_agent_run(
132+
agent_request.conversation_id, user_id)
129133

130134
# Schedule memory addition in background to avoid blocking SSE termination
131135
async def _add_memory_background():
@@ -150,8 +154,10 @@ async def _add_memory_background():
150154
return
151155

152156
mem_messages_local = [
153-
{"role": MESSAGE_ROLE["USER"], "content": agent_run_info.query},
154-
{"role": MESSAGE_ROLE["ASSISTANT"], "content": final_answer_local},
157+
{"role": MESSAGE_ROLE["USER"],
158+
"content": agent_run_info.query},
159+
{"role": MESSAGE_ROLE["ASSISTANT"],
160+
"content": final_answer_local},
155161
]
156162

157163
add_result_local = await add_memory_in_levels(
@@ -661,7 +667,7 @@ async def prepare_agent_run(
661667
agent_request: AgentRequest,
662668
user_id: str,
663669
tenant_id: str,
664-
language: str=LANGUAGE["ZH"],
670+
language: str = LANGUAGE["ZH"],
665671
allow_memory_search: bool = True,
666672
):
667673
"""
@@ -783,7 +789,8 @@ def _memory_token(message_text: str) -> str:
783789
):
784790
yield data_chunk
785791
except Exception as run_exc:
786-
logger.error(f"Agent run error after memory failure: {str(run_exc)}")
792+
logger.error(
793+
f"Agent run error after memory failure: {str(run_exc)}")
787794
# Emit an error chunk and terminate the stream immediately
788795
error_payload = json.dumps(
789796
{"type": "error", "content": str(run_exc)}, ensure_ascii=False)
@@ -802,23 +809,28 @@ def _memory_token(message_text: str) -> str:
802809

803810

804811
# Helper function for run_agent_stream, used when user memory is disabled (no memory tokens)
812+
@monitoring_manager.monitor_endpoint("agent_service.generate_stream_no_memory", exclude_params=["authorization"])
805813
async def generate_stream_no_memory(
806814
agent_request: AgentRequest,
807815
user_id: str,
808816
tenant_id: str,
809-
language: str=LANGUAGE["ZH"],
817+
language: str = LANGUAGE["ZH"],
810818
):
811819
"""Stream agent responses without any memory preprocessing tokens or fallback logic."""
812820

813821
# Prepare run info respecting memory disabled (honor provided user_id/tenant_id)
822+
monitoring_manager.add_span_event("generate_stream_no_memory.started")
814823
agent_run_info, memory_context = await prepare_agent_run(
815824
agent_request=agent_request,
816825
user_id=user_id,
817826
tenant_id=tenant_id,
818827
language=language,
819828
allow_memory_search=False,
820829
)
830+
monitoring_manager.add_span_event("generate_stream_no_memory.completed")
821831

832+
monitoring_manager.add_span_event(
833+
"generate_stream_no_memory.streaming.started")
822834
async for data_chunk in _stream_agent_chunks(
823835
agent_request=agent_request,
824836
user_id=user_id,
@@ -827,8 +839,11 @@ async def generate_stream_no_memory(
827839
memory_ctx=memory_context,
828840
):
829841
yield data_chunk
842+
monitoring_manager.add_span_event(
843+
"generate_stream_no_memory.streaming.completed")
830844

831845

846+
@monitoring_manager.monitor_endpoint("agent_service.run_agent_stream", exclude_params=["authorization"])
832847
async def run_agent_stream(
833848
agent_request: AgentRequest,
834849
http_request: Request,
@@ -841,49 +856,162 @@ async def run_agent_stream(
841856
Start an agent run and stream responses.
842857
If user_id or tenant_id is provided, authorization will be overridden. (Useful in northbound apis)
843858
"""
859+
import time
860+
861+
# Add initial span attributes for tracking
862+
monitoring_manager.set_span_attributes(
863+
agent_id=agent_request.agent_id,
864+
conversation_id=agent_request.conversation_id,
865+
is_debug=agent_request.is_debug,
866+
skip_user_save=skip_user_save,
867+
has_override_user_id=user_id is not None,
868+
has_override_tenant_id=tenant_id is not None,
869+
query_length=len(agent_request.query) if agent_request.query else 0,
870+
history_count=len(
871+
agent_request.history) if agent_request.history else 0,
872+
minio_files_count=len(
873+
agent_request.minio_files) if agent_request.minio_files else 0
874+
)
875+
876+
# Step 1: Resolve user tenant language
877+
resolve_start_time = time.time()
878+
monitoring_manager.add_span_event("user_resolution.started")
844879

845-
# Choose streaming strategy based on user's memory switch
846880
resolved_user_id, resolved_tenant_id, language = _resolve_user_tenant_language(
847881
authorization=authorization,
848882
http_request=http_request,
849883
user_id=user_id,
850884
tenant_id=tenant_id,
851885
)
852-
853-
# Save user message only if not in debug mode (before streaming starts)
886+
887+
resolve_duration = time.time() - resolve_start_time
888+
monitoring_manager.add_span_event("user_resolution.completed", {
889+
"duration": resolve_duration,
890+
"user_id": resolved_user_id,
891+
"tenant_id": resolved_tenant_id,
892+
"language": language
893+
})
894+
monitoring_manager.set_span_attributes(
895+
resolved_user_id=resolved_user_id,
896+
resolved_tenant_id=resolved_tenant_id,
897+
language=language,
898+
user_resolution_duration=resolve_duration
899+
)
900+
901+
# Step 2: Save user message (if needed)
854902
if not agent_request.is_debug and not skip_user_save:
903+
save_start_time = time.time()
904+
monitoring_manager.add_span_event("user_message_save.started")
905+
855906
save_messages(
856907
agent_request,
857908
target=MESSAGE_ROLE["USER"],
858909
user_id=resolved_user_id,
859910
tenant_id=resolved_tenant_id,
860911
)
861-
912+
913+
save_duration = time.time() - save_start_time
914+
monitoring_manager.add_span_event("user_message_save.completed", {
915+
"duration": save_duration
916+
})
917+
monitoring_manager.set_span_attributes(
918+
user_message_saved=True,
919+
user_message_save_duration=save_duration
920+
)
921+
else:
922+
monitoring_manager.add_span_event("user_message_save.skipped", {
923+
"reason": "debug_mode" if agent_request.is_debug else "skip_user_save_flag"
924+
})
925+
monitoring_manager.set_span_attributes(user_message_saved=False)
926+
927+
# Step 3: Build memory context
928+
memory_start_time = time.time()
929+
monitoring_manager.add_span_event("memory_context_build.started")
930+
862931
memory_ctx_preview = build_memory_context(
863932
resolved_user_id, resolved_tenant_id, agent_request.agent_id
864933
)
865934

866-
if memory_ctx_preview.user_config.memory_switch and not agent_request.is_debug:
935+
memory_duration = time.time() - memory_start_time
936+
memory_enabled = memory_ctx_preview.user_config.memory_switch
937+
monitoring_manager.add_span_event("memory_context_build.completed", {
938+
"duration": memory_duration,
939+
"memory_enabled": memory_enabled,
940+
"agent_share_option": getattr(memory_ctx_preview.user_config, "agent_share_option", "unknown")
941+
})
942+
monitoring_manager.set_span_attributes(
943+
memory_enabled=memory_enabled,
944+
memory_context_build_duration=memory_duration,
945+
agent_share_option=getattr(
946+
memory_ctx_preview.user_config, "agent_share_option", "unknown")
947+
)
948+
949+
# Step 4: Choose streaming strategy
950+
strategy_start_time = time.time()
951+
use_memory_stream = memory_enabled and not agent_request.is_debug
952+
953+
monitoring_manager.add_span_event("streaming_strategy.selected", {
954+
"strategy": "with_memory" if use_memory_stream else "no_memory",
955+
"memory_enabled": memory_enabled,
956+
"is_debug": agent_request.is_debug
957+
})
958+
959+
if use_memory_stream:
960+
monitoring_manager.add_span_event(
961+
"stream_generator.memory_stream.creating")
867962
stream_gen = generate_stream_with_memory(
868963
agent_request,
869964
user_id=resolved_user_id,
870965
tenant_id=resolved_tenant_id,
871966
language=language,
872967
)
873968
else:
969+
monitoring_manager.add_span_event(
970+
"stream_generator.no_memory_stream.creating")
874971
stream_gen = generate_stream_no_memory(
875972
agent_request,
876973
user_id=resolved_user_id,
877974
tenant_id=resolved_tenant_id,
878975
language=language,
879976
)
880977

881-
return StreamingResponse(
978+
strategy_duration = time.time() - strategy_start_time
979+
monitoring_manager.add_span_event("streaming_strategy.completed", {
980+
"duration": strategy_duration,
981+
"selected_strategy": "with_memory" if use_memory_stream else "no_memory"
982+
})
983+
monitoring_manager.set_span_attributes(
984+
streaming_strategy=(
985+
"with_memory" if use_memory_stream else "no_memory"),
986+
strategy_selection_duration=strategy_duration
987+
)
988+
989+
# Step 5: Create streaming response
990+
response_start_time = time.time()
991+
monitoring_manager.add_span_event("streaming_response.creating")
992+
993+
response = StreamingResponse(
882994
stream_gen,
883995
media_type="text/event-stream",
884996
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
885997
)
886998

999+
response_duration = time.time() - response_start_time
1000+
monitoring_manager.add_span_event("streaming_response.created", {
1001+
"duration": response_duration,
1002+
"media_type": "text/event-stream"
1003+
})
1004+
monitoring_manager.set_span_attributes(
1005+
response_creation_duration=response_duration,
1006+
total_preparation_duration=(time.time() - resolve_start_time)
1007+
)
1008+
1009+
monitoring_manager.add_span_event("run_agent_stream.preparation_completed", {
1010+
"total_preparation_time": time.time() - resolve_start_time
1011+
})
1012+
1013+
return response
1014+
8871015

8881016
def stop_agent_tasks(conversation_id: int, user_id: str):
8891017
"""

backend/services/config_sync_service.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@
2626
logger = logging.getLogger("config_sync_service")
2727

2828

29-
def _parse_optional_int(value) -> Optional[int]:
30-
try:
31-
return int(value)
32-
except (ValueError, TypeError):
33-
return None
34-
35-
3629
def handle_model_config(tenant_id: str, user_id: str, config_key: str, model_id: Optional[int], tenant_config_dict: dict) -> None:
3730
"""
3831
Handle model configuration updates, deletions, and settings operations
@@ -56,7 +49,8 @@ def handle_model_config(tenant_id: str, user_id: str, config_key: str, model_id:
5649
user_id, tenant_id, config_key, model_id)
5750
return
5851

59-
current_model_id = _parse_optional_int(tenant_config_dict.get(config_key))
52+
current_model_id = tenant_config_dict.get(config_key)
53+
current_model_id = int(current_model_id) if str(current_model_id).isdigit() else None
6054

6155
if current_model_id == model_id:
6256
tenant_config_manager.update_single_config(tenant_id, config_key)

0 commit comments

Comments
 (0)