Skip to content

Commit da32c0c

Browse files
authored
♻️ Refactor move some business logic from agent_app.py to agent_service.py …
2 parents bae8504 + 665fef8 commit da32c0c

File tree

7 files changed

+430
-402
lines changed

7 files changed

+430
-402
lines changed

backend/apps/agent_app.py

Lines changed: 10 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,13 @@
22
from typing import Optional
33

44
from fastapi import HTTPException, APIRouter, Header, Request, Body
5-
from fastapi.responses import StreamingResponse, JSONResponse
6-
from nexent.core.agents.run_agent import agent_run
7-
8-
from database.agent_db import delete_related_agent
9-
from utils.auth_utils import get_current_user_info, get_current_user_id
10-
from agents.create_agent_info import create_agent_run_info
5+
from fastapi.responses import JSONResponse
116
from consts.model import AgentRequest, AgentInfoRequest, AgentIDRequest, ConversationResponse, AgentImportRequest
127
from services.agent_service import get_agent_info_impl, \
138
get_creating_sub_agent_info_impl, update_agent_info_impl, delete_agent_impl, export_agent_impl, import_agent_impl, \
14-
list_all_agent_info_impl, insert_related_agent_impl
15-
from services.conversation_management_service import save_conversation_user, save_conversation_assistant
16-
from services.memory_config_service import build_memory_context
17-
from utils.config_utils import config_manager
18-
from utils.thread_utils import submit
19-
from agents.agent_run_manager import agent_run_manager
20-
from agents.preprocess_manager import preprocess_manager
9+
list_all_agent_info_impl, insert_related_agent_impl, run_agent_stream, stop_agent_tasks
10+
from database.agent_db import delete_related_agent
11+
from utils.auth_utils import get_current_user_info, get_current_user_id
2112

2213

2314
router = APIRouter(prefix="/agent")
@@ -30,43 +21,10 @@ async def agent_run_api(agent_request: AgentRequest, http_request: Request, auth
3021
"""
3122
Agent execution API endpoint
3223
"""
33-
user_id, tenant_id, language = get_current_user_info(authorization, http_request)
34-
memory_context = build_memory_context(user_id, tenant_id, agent_request.agent_id)
35-
36-
agent_run_info = await create_agent_run_info(agent_id=agent_request.agent_id,
37-
minio_files=agent_request.minio_files,
38-
query=agent_request.query,
39-
history=agent_request.history,
40-
authorization=authorization,
41-
language=language)
42-
43-
agent_run_manager.register_agent_run(agent_request.conversation_id, agent_run_info)
44-
# Save user message only if not in debug mode
45-
if not agent_request.is_debug:
46-
submit(save_conversation_user, agent_request, authorization)
47-
48-
async def generate():
49-
messages = []
50-
try:
51-
async for chunk in agent_run(agent_run_info, memory_context):
52-
messages.append(chunk)
53-
yield f"data: {chunk}\n\n"
54-
except Exception as e:
55-
raise HTTPException(status_code=500, detail=f"Agent run error: {str(e)}")
56-
finally:
57-
# Save assistant message only if not in debug mode
58-
if not agent_request.is_debug:
59-
submit(save_conversation_assistant, agent_request, messages, authorization)
60-
# Unregister agent run instance for both debug and non-debug modes
61-
agent_run_manager.unregister_agent_run(agent_request.conversation_id)
62-
63-
return StreamingResponse(
64-
generate(),
65-
media_type="text/event-stream",
66-
headers={
67-
"Cache-Control": "no-cache",
68-
"Connection": "keep-alive"
69-
}
24+
return await run_agent_stream(
25+
agent_request=agent_request,
26+
http_request=http_request,
27+
authorization=authorization
7028
)
7129

7230

@@ -75,21 +33,8 @@ async def agent_stop_api(conversation_id: int):
7533
"""
7634
stop agent run and preprocess tasks for specified conversation_id
7735
"""
78-
# Stop agent run
79-
agent_stopped = agent_run_manager.stop_agent_run(conversation_id)
80-
81-
# Stop preprocess tasks
82-
preprocess_stopped = preprocess_manager.stop_preprocess_tasks(conversation_id)
83-
84-
if agent_stopped or preprocess_stopped:
85-
message_parts = []
86-
if agent_stopped:
87-
message_parts.append("agent run")
88-
if preprocess_stopped:
89-
message_parts.append("preprocess tasks")
90-
91-
message = f"successfully stopped {' and '.join(message_parts)} for conversation_id {conversation_id}"
92-
return {"status": "success", "message": message}
36+
if stop_agent_tasks(conversation_id).get("status") == "success":
37+
return {"status": "success", "message": "agent run and preprocess tasks stopped successfully"}
9338
else:
9439
raise HTTPException(status_code=404, detail=f"no running agent or preprocess tasks found for conversation_id {conversation_id}")
9540

backend/apps/conversation_management_app.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
from typing import Dict, Any, Optional
33

44
from fastapi import HTTPException, APIRouter, Header, Request
5-
from fastapi.encoders import jsonable_encoder
6-
from pydantic import BaseModel
75

86
from consts.model import ConversationResponse, ConversationRequest, RenameRequest, GenerateTitleRequest, OpinionRequest, MessageIdRequest
97
from services.conversation_management_service import (

backend/database/agent_db.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,20 @@ def search_agent_info_by_agent_id(agent_id: int, tenant_id: str):
2525

2626
return agent_dict
2727

28+
def search_agent_id_by_agent_name(agent_name: str, tenant_id: str):
29+
"""
30+
Search agent id by agent name
31+
"""
32+
with get_db_session() as session:
33+
agent = session.query(AgentInfo).filter(
34+
AgentInfo.name == agent_name,
35+
AgentInfo.tenant_id == tenant_id,
36+
AgentInfo.delete_flag != 'Y').first()
37+
if not agent:
38+
raise ValueError("agent not found")
39+
return agent.agent_id
40+
41+
2842
def search_blank_sub_agent_by_main_agent_id(tenant_id: str):
2943
"""
3044
Search blank sub agent by main agent id

backend/services/agent_service.py

Lines changed: 135 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,30 @@
33
import logging
44
from collections import deque
55

6-
from fastapi import Header
7-
from fastapi.responses import JSONResponse
6+
from fastapi import Header, Request, HTTPException
7+
from fastapi.responses import JSONResponse, StreamingResponse
8+
from consts.model import AgentRequest
89
from agents.create_agent_info import create_tool_config_list
910
from consts.model import AgentInfoRequest, ExportAndImportAgentInfo, ExportAndImportDataFormat, ToolInstanceInfoRequest, MCPInfo
1011
from database.agent_db import create_agent, query_all_enabled_tool_instances, \
1112
search_blank_sub_agent_by_main_agent_id, \
1213
search_tools_for_sub_agent, search_agent_info_by_agent_id, update_agent, delete_agent_by_id, query_all_tools, \
1314
create_or_update_tool_by_tool_info, check_tool_is_available, query_all_agent_info_by_tenant_id, \
14-
query_sub_agents_id_list, insert_related_agent, delete_all_related_agent
15+
query_sub_agents_id_list, insert_related_agent, delete_all_related_agent, search_agent_id_by_agent_name
1516
from database.remote_mcp_db import get_mcp_server_by_name_and_tenant, check_mcp_name_exists
1617
from services.remote_mcp_service import add_remote_mcp_server_list
1718
from services.tool_configuration_service import update_tool_list
19+
from services.conversation_management_service import save_conversation_user, save_conversation_assistant
1820

19-
from utils.auth_utils import get_current_user_id
21+
from utils.auth_utils import get_current_user_info
2022
from utils.memory_utils import build_memory_config
23+
from utils.thread_utils import submit
2124
from nexent.memory.memory_service import clear_memory
25+
from nexent.core.agents.run_agent import agent_run
26+
from services.memory_config_service import build_memory_context
27+
from agents.create_agent_info import create_agent_run_info
28+
from agents.agent_run_manager import agent_run_manager
29+
from agents.preprocess_manager import preprocess_manager
2230

2331

2432
logger = logging.getLogger("agent_service")
@@ -69,7 +77,7 @@ def get_agent_info_impl(agent_id: int, tenant_id: str):
6977

7078

7179
def get_creating_sub_agent_info_impl(authorization: str = Header(None)):
72-
user_id, tenant_id = get_current_user_id(authorization)
80+
user_id, tenant_id, _ = get_current_user_info(authorization)
7381

7482
try:
7583
sub_agent_id = get_creating_sub_agent_id_service(tenant_id, user_id)
@@ -100,7 +108,7 @@ def get_creating_sub_agent_info_impl(authorization: str = Header(None)):
100108
"sub_agent_id_list": query_sub_agents_id_list(main_agent_id=sub_agent_id, tenant_id=tenant_id)}
101109

102110
def update_agent_info_impl(request: AgentInfoRequest, authorization: str = Header(None)):
103-
user_id, tenant_id = get_current_user_id(authorization)
111+
user_id, tenant_id, _ = get_current_user_info(authorization)
104112

105113
try:
106114
update_agent(request.agent_id, request, tenant_id, user_id)
@@ -109,7 +117,7 @@ def update_agent_info_impl(request: AgentInfoRequest, authorization: str = Heade
109117
raise ValueError(f"Failed to update agent info: {str(e)}")
110118

111119
async def delete_agent_impl(agent_id: int, authorization: str = Header(None)):
112-
user_id, tenant_id = get_current_user_id(authorization)
120+
user_id, tenant_id, _ = get_current_user_info(authorization)
113121

114122
try:
115123
delete_agent_by_id(agent_id, tenant_id, user_id)
@@ -183,7 +191,7 @@ async def export_agent_impl(agent_id: int, authorization: str = Header(None)) ->
183191
This function recursively finds all managed sub-agents and exports the detailed configuration of each agent (including tools, prompts, etc.) as a dictionary, and finally returns it as a formatted JSON string for frontend download and backup.
184192
"""
185193

186-
user_id, tenant_id = get_current_user_id(authorization)
194+
user_id, tenant_id, _ = get_current_user_info(authorization)
187195

188196
export_agent_dict = {}
189197
search_list = deque([agent_id])
@@ -251,7 +259,7 @@ async def import_agent_impl(agent_info: ExportAndImportDataFormat, authorization
251259
"""
252260
Import agent using DFS
253261
"""
254-
user_id, tenant_id = get_current_user_id(authorization)
262+
user_id, tenant_id, _ = get_current_user_info(authorization)
255263
agent_id = agent_info.agent_id
256264

257265
# First, add MCP servers if any
@@ -459,4 +467,121 @@ def insert_related_agent_impl(parent_agent_id, child_agent_id, tenant_id):
459467
return JSONResponse(
460468
status_code=400,
461469
content={"message":"Failed to insert relation", "status": "error"}
462-
)
470+
)
471+
472+
473+
# Helper function for run_agent_stream, used to prepare context for an agent run
474+
async def prepare_agent_run(agent_request: AgentRequest, http_request: Request, authorization: str):
475+
"""
476+
Prepare for an agent run by creating context and run info, and registering the run.
477+
"""
478+
user_id, tenant_id, language = get_current_user_info(authorization, http_request)
479+
480+
memory_context = build_memory_context(user_id, tenant_id, agent_request.agent_id)
481+
agent_run_info = await create_agent_run_info(agent_id=agent_request.agent_id,
482+
minio_files=agent_request.minio_files,
483+
query=agent_request.query,
484+
history=agent_request.history,
485+
authorization=authorization,
486+
language=language)
487+
agent_run_manager.register_agent_run(agent_request.conversation_id, agent_run_info)
488+
return agent_run_info, memory_context
489+
490+
491+
# Helper function for run_agent_stream, used to save messages for either user or assistant
492+
def save_messages(agent_request, target:str, messages=None, authorization=None):
493+
if target == "user":
494+
if messages is not None:
495+
raise ValueError("Messages should be None when saving for user.")
496+
submit(save_conversation_user, agent_request, authorization)
497+
elif target == "assistant":
498+
if messages is None:
499+
raise ValueError("Messages cannot be None when saving for assistant.")
500+
submit(save_conversation_assistant, agent_request, messages, authorization)
501+
502+
503+
# Helper function for run_agent_stream, used to generate stream response
504+
async def generate_stream(agent_run_info, memory_context, agent_request: AgentRequest, authorization: str):
505+
messages = []
506+
try:
507+
async for chunk in agent_run(agent_run_info, memory_context):
508+
messages.append(chunk)
509+
yield f"data: {chunk}\n\n"
510+
except Exception as e:
511+
logger.error(f"Agent run error: {str(e)}")
512+
raise HTTPException(status_code=500, detail=f"Agent run error: {str(e)}")
513+
finally:
514+
# Save assistant message only if not in debug mode
515+
if not agent_request.is_debug:
516+
save_messages(agent_request, target="assistant", messages=messages, authorization=authorization)
517+
# Unregister agent run instance for both debug and non-debug modes
518+
agent_run_manager.unregister_agent_run(agent_request.conversation_id)
519+
520+
521+
async def run_agent_stream(agent_request: AgentRequest, http_request: Request, authorization: str):
522+
"""
523+
Start an agent run and stream responses, using explicit user/tenant context.
524+
Mirrors the logic of agent_app.agent_run_api but reusable by services.
525+
"""
526+
agent_run_info, memory_context = await prepare_agent_run(
527+
agent_request=agent_request,
528+
http_request=http_request,
529+
authorization=authorization
530+
)
531+
532+
# Save user message only if not in debug mode
533+
if not agent_request.is_debug:
534+
save_messages(
535+
agent_request,
536+
target="user",
537+
authorization=authorization
538+
)
539+
540+
return StreamingResponse(
541+
generate_stream(agent_run_info, memory_context, agent_request, authorization),
542+
media_type="text/event-stream",
543+
headers={
544+
"Cache-Control": "no-cache",
545+
"Connection": "keep-alive"
546+
}
547+
)
548+
549+
550+
def stop_agent_tasks(conversation_id: int):
551+
"""
552+
Stop agent run and preprocess tasks for the specified conversation_id.
553+
Matches the behavior of agent_app.agent_stop_api.
554+
"""
555+
# Stop agent run
556+
agent_stopped = agent_run_manager.stop_agent_run(conversation_id)
557+
558+
# Stop preprocess tasks
559+
preprocess_stopped = preprocess_manager.stop_preprocess_tasks(conversation_id)
560+
561+
if agent_stopped or preprocess_stopped:
562+
message_parts = []
563+
if agent_stopped:
564+
message_parts.append("agent run")
565+
if preprocess_stopped:
566+
message_parts.append("preprocess tasks")
567+
568+
message = f"successfully stopped {' and '.join(message_parts)} for conversation_id {conversation_id}"
569+
logging.info(message)
570+
return {"status": "success", "message": message}
571+
else:
572+
message = f"no running agent or preprocess tasks found for conversation_id {conversation_id}"
573+
logging.error(message)
574+
return {"status": "error", "message": message}
575+
576+
577+
def get_agent_id_by_name(agent_name: str, tenant_id: str) -> int:
578+
"""
579+
Resolve unique agent id by its unique name under the same tenant.
580+
"""
581+
if not agent_name:
582+
raise HTTPException(status_code=400, detail="agent_name required")
583+
try:
584+
return search_agent_id_by_agent_name(agent_name, tenant_id)
585+
except Exception as _:
586+
logger.error(f"Failed to find agent id with '{agent_name}' in tenant {tenant_id}")
587+
raise HTTPException(status_code=404, detail="agent not found")

sdk/nexent/vector_database/elasticsearch_core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(
5959
api_key=self.api_key,
6060
verify_certs=verify_certs,
6161
ssl_show_warn=ssl_show_warn,
62-
timeout=20,
62+
request_timeout=20,
6363
max_retries=3, # Reduce retries for faster failure detection
6464
retry_on_timeout=True,
6565
retry_on_status=[502, 503, 504], # Retry on these status codes,

0 commit comments

Comments
 (0)