Skip to content

Commit c5a11e3

Browse files
authored
🐛 Use conversation_id and user_id as the primary key of the agent run thread pool
2 parents 64af1eb + 8e4b20f commit c5a11e3

File tree

8 files changed

+371
-40
lines changed

8 files changed

+371
-40
lines changed

backend/agents/agent_run_manager.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,39 +21,46 @@ def __new__(cls):
2121

2222
def __init__(self):
2323
if not self._initialized:
24-
# conversation_id -> agent_run_info
25-
self.agent_runs: Dict[int, AgentRunInfo] = {}
24+
# user_id:conversation_id -> agent_run_info
25+
self.agent_runs: Dict[str, AgentRunInfo] = {}
2626
self._initialized = True
2727

28-
def register_agent_run(self, conversation_id: int, agent_run_info):
28+
def _get_run_key(self, conversation_id: int, user_id: str) -> str:
29+
"""Generate unique key for agent run using user_id and conversation_id"""
30+
return f"{user_id}:{conversation_id}"
31+
32+
def register_agent_run(self, conversation_id: int, agent_run_info, user_id: str):
2933
"""register agent run instance"""
3034
with self._lock:
31-
self.agent_runs[conversation_id] = agent_run_info
35+
run_key = self._get_run_key(conversation_id, user_id)
36+
self.agent_runs[run_key] = agent_run_info
3237
logger.info(
33-
f"register agent run instance, conversation_id: {conversation_id}")
38+
f"register agent run instance, user_id: {user_id}, conversation_id: {conversation_id}")
3439

35-
def unregister_agent_run(self, conversation_id: int):
40+
def unregister_agent_run(self, conversation_id: int, user_id: str):
3641
"""unregister agent run instance"""
3742
with self._lock:
38-
if conversation_id in self.agent_runs:
39-
del self.agent_runs[conversation_id]
43+
run_key = self._get_run_key(conversation_id, user_id)
44+
if run_key in self.agent_runs:
45+
del self.agent_runs[run_key]
4046
logger.info(
41-
f"unregister agent run instance, conversation_id: {conversation_id}")
47+
f"unregister agent run instance, user_id: {user_id}, conversation_id: {conversation_id}")
4248
else:
4349
logger.info(
44-
f"no agent run instance found for conversation_id: {conversation_id}")
50+
f"no agent run instance found for user_id: {user_id}, conversation_id: {conversation_id}")
4551

46-
def get_agent_run_info(self, conversation_id: int):
52+
def get_agent_run_info(self, conversation_id: int, user_id: str):
4753
"""get agent run instance"""
48-
return self.agent_runs.get(conversation_id)
54+
run_key = self._get_run_key(conversation_id, user_id)
55+
return self.agent_runs.get(run_key)
4956

50-
def stop_agent_run(self, conversation_id: int) -> bool:
51-
"""stop agent run for specified conversation_id"""
52-
agent_run_info = self.get_agent_run_info(conversation_id)
57+
def stop_agent_run(self, conversation_id: int, user_id: str) -> bool:
58+
"""stop agent run for specified conversation_id and user_id"""
59+
agent_run_info = self.get_agent_run_info(conversation_id, user_id)
5360
if agent_run_info is not None:
5461
agent_run_info.stop_event.set()
5562
logger.info(
56-
f"agent run stopped, conversation_id: {conversation_id}")
63+
f"agent run stopped, user_id: {user_id}, conversation_id: {conversation_id}")
5764
return True
5865
return False
5966

backend/apps/agent_app.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@ async def agent_run_api(agent_request: AgentRequest, http_request: Request, auth
4545

4646

4747
@router.get("/stop/{conversation_id}")
48-
async def agent_stop_api(conversation_id: int):
48+
async def agent_stop_api(conversation_id: int, authorization: Optional[str] = Header(None)):
4949
"""
5050
stop agent run and preprocess tasks for specified conversation_id
5151
"""
52-
if stop_agent_tasks(conversation_id).get("status") == "success":
52+
user_id, _ = get_current_user_id(authorization)
53+
if stop_agent_tasks(conversation_id, user_id).get("status") == "success":
5354
return {"status": "success", "message": "agent run and preprocess tasks stopped successfully"}
5455
else:
5556
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST,

backend/services/agent_service.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async def _stream_agent_chunks(
125125
user_id=user_id,
126126
)
127127
# Always unregister the run to release resources
128-
agent_run_manager.unregister_agent_run(agent_request.conversation_id)
128+
agent_run_manager.unregister_agent_run(agent_request.conversation_id, user_id)
129129

130130
# Schedule memory addition in background to avoid blocking SSE termination
131131
async def _add_memory_background():
@@ -681,7 +681,7 @@ async def prepare_agent_run(
681681
allow_memory_search=allow_memory_search,
682682
)
683683
agent_run_manager.register_agent_run(
684-
agent_request.conversation_id, agent_run_info)
684+
agent_request.conversation_id, agent_run_info, user_id)
685685
return agent_run_info, memory_context
686686

687687

@@ -885,13 +885,13 @@ async def run_agent_stream(
885885
)
886886

887887

888-
def stop_agent_tasks(conversation_id: int):
888+
def stop_agent_tasks(conversation_id: int, user_id: str):
889889
"""
890890
Stop agent run and preprocess tasks for the specified conversation_id.
891891
Matches the behavior of agent_app.agent_stop_api.
892892
"""
893893
# Stop agent run
894-
agent_stopped = agent_run_manager.stop_agent_run(conversation_id)
894+
agent_stopped = agent_run_manager.stop_agent_run(conversation_id, user_id)
895895

896896
# Stop preprocess tasks
897897
preprocess_stopped = preprocess_manager.stop_preprocess_tasks(
@@ -904,11 +904,11 @@ def stop_agent_tasks(conversation_id: int):
904904
if preprocess_stopped:
905905
message_parts.append("preprocess tasks")
906906

907-
message = f"successfully stopped {' and '.join(message_parts)} for conversation_id {conversation_id}"
907+
message = f"successfully stopped {' and '.join(message_parts)} for user_id {user_id}, conversation_id {conversation_id}"
908908
logging.info(message)
909909
return {"status": "success", "message": message}
910910
else:
911-
message = f"no running agent or preprocess tasks found for conversation_id {conversation_id}"
911+
message = f"no running agent or preprocess tasks found for user_id {user_id}, conversation_id {conversation_id}"
912912
logging.error(message)
913913
return {"status": "error", "message": message}
914914

backend/services/northbound_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ async def stop_chat(ctx: NorthboundContext, external_conversation_id: str) -> Di
217217
try:
218218
internal_id = await to_internal_conversation_id(external_conversation_id)
219219

220-
stop_result = stop_agent_tasks(internal_id)
220+
stop_result = stop_agent_tasks(internal_id, ctx.user_id)
221221
return {"message": stop_result.get("message", "success"), "data": external_conversation_id, "requestId": ctx.request_id}
222222
except Exception as e:
223223
raise Exception(f"Failed to stop chat for external conversation id {external_conversation_id}: {str(e)}")

0 commit comments

Comments
 (0)