Skip to content

Commit b6358e8

Browse files
committed
Add agent message support to database and services
Introduces methods for adding, updating, and retrieving agent messages in the database layer (CosmosDBClient and DatabaseBase). Implements conversion and persistence logic for agent messages in PlanService. Also adds a session_id field to MPlan for partitioning.
1 parent d233db2 commit b6358e8

File tree

4 files changed

+115
-13
lines changed

4 files changed

+115
-13
lines changed

src/backend/common/database/cosmosdb.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from .database_base import DatabaseBase
2525
from ..models.messages_kernel import (
26+
AgentMessageData,
2627
BaseDataModel,
2728
Plan,
2829
Step,
@@ -479,4 +480,23 @@ async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]:
479480
{"name": "@data_type", "value": DataType.m_plan},
480481
]
481482
results = await self.query_items(query, parameters, messages.MPlan)
482-
return results[0] if results else None
483+
return results[0] if results else None
484+
485+
486+
async def add_agent_message(self, message: messages.AgentMessageResponse) -> None:
487+
"""Add an agent message to the database."""
488+
await self.add_item(message)
489+
490+
async def update_agent_message(self, message: messages.AgentMessageResponse) -> None:
491+
"""Update an agent message in the database."""
492+
await self.update_item(message)
493+
494+
async def get_agent_messages(self, plan_id: str) -> List[messages.AgentMessageResponse]:
495+
"""Retrieve an agent message by message_id."""
496+
query = "SELECT * FROM c WHERE c.plan_id=@plan_id AND c.data_type=@data_type"
497+
parameters = [
498+
{"name": "@plan_id", "value": plan_id},
499+
{"name": "@data_type", "value": DataType.m_plan_message},
500+
]
501+
502+
return await self.query_items(query, parameters, messages.AgentMessageResponse)

src/backend/common/database/database_base.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any, Dict, List, Optional, Type
55
import v3.models.messages as messages
66
from ..models.messages_kernel import (
7+
AgentMessageData,
78
BaseDataModel,
89
Plan,
910
Step,
@@ -213,5 +214,19 @@ async def update_mplan(self, mplan: messages.MPlan) -> None:
213214

214215
@abstractmethod
215216
async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]:
216-
"""Retrieve a mplan configuration by mplan_id."""
217+
"""Retrieve a mplan configuration by plan_id."""
218+
pass
219+
220+
@abstractmethod
221+
async def add_agent_message(self, message: AgentMessageData) -> None:
222+
pass
223+
224+
@abstractmethod
225+
async def update_agent_message(self, message: AgentMessageResponse) -> None:
226+
"""Update an agent message in the database."""
227+
pass
228+
229+
@abstractmethod
230+
async def get_agent_messages(self, plan_id: str) -> Optional[AgentMessageResponse]:
231+
"""Retrieve an agent message by message_id."""
217232
pass

src/backend/v3/common/services/plan_service.py

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
import uuid
1414
from semantic_kernel.kernel_pydantic import Field
1515

16-
class MPlanExpanded(MPlan):
17-
session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
1816

1917
logger = logging.getLogger(__name__)
2018
def build_agent_message_from_user_clarification(
@@ -39,6 +37,70 @@ def build_agent_message_from_user_clarification(
3937
next_steps = [] # intentionally empty
4038
)
4139

40+
def build_agent_message_from_agent_message_response(
41+
agent_response: messages.AgentMessageResponse,
42+
user_id: str,
43+
) -> AgentMessageData:
44+
"""
45+
Convert a messages.AgentMessageResponse into common.models.messages_kernel.AgentMessageData.
46+
This is defensive: it tolerates missing fields and different timestamp formats.
47+
"""
48+
# Robust timestamp parsing (accepts seconds or ms or missing)
49+
50+
51+
# Raw data serialization
52+
raw = getattr(agent_response, "raw_data", None)
53+
try:
54+
if raw is None:
55+
# try asdict if it's a dataclass-like
56+
try:
57+
raw_str = json.dumps(asdict(agent_response))
58+
except Exception:
59+
raw_str = json.dumps({k: getattr(agent_response, k) for k in dir(agent_response) if not k.startswith("_")})
60+
elif isinstance(raw, (dict, list)):
61+
raw_str = json.dumps(raw)
62+
else:
63+
raw_str = str(raw)
64+
except Exception:
65+
raw_str = json.dumps({"raw": str(raw)})
66+
67+
# Steps / next_steps defaulting
68+
steps = getattr(agent_response, "steps", []) or []
69+
next_steps = getattr(agent_response, "next_steps", []) or []
70+
71+
# Agent name and type
72+
agent_name = getattr(agent_response, "agent", "") or getattr(agent_response, "agent_name", "") or getattr(agent_response, "source", "")
73+
# Try to infer agent_type, fallback to AI_AGENT
74+
agent_type_raw = getattr(agent_response, "agent_type", None)
75+
if isinstance(agent_type_raw, AgentMessageType):
76+
agent_type = agent_type_raw
77+
else:
78+
# Normalize common strings
79+
agent_type_str = str(agent_type_raw or "").lower()
80+
if "human" in agent_type_str:
81+
agent_type = AgentMessageType.HUMAN_AGENT
82+
else:
83+
agent_type = AgentMessageType.AI_AGENT
84+
85+
# Content
86+
content = getattr(agent_response, "content", "") or getattr(agent_response, "text", "") or ""
87+
88+
# plan_id / user_id fallback
89+
plan_id_val = getattr(agent_response, "plan_id", "") or ""
90+
user_id_val = getattr(agent_response, "user_id", "") or user_id
91+
92+
return AgentMessageData(
93+
plan_id=plan_id_val,
94+
user_id=user_id_val,
95+
m_plan_id=getattr(agent_response, "m_plan_id", ""),
96+
agent=agent_name,
97+
agent_type=agent_type,
98+
content=content,
99+
raw_data=raw_str,
100+
steps=list(steps),
101+
next_steps=list(next_steps),
102+
)
103+
42104

43105
class PlanService:
44106

@@ -118,7 +180,18 @@ async def handle_agent_messages(agent_message: messages.AgentMessageResponse, us
118180
Raises:
119181
ValueError on invalid state
120182
"""
121-
return True
183+
try:
184+
agent_msg = build_agent_message_from_agent_message_response(agent_message, user_id)
185+
186+
# Persist if your database layer supports it.
187+
# Look for or implement something like: memory_store.add_agent_message(agent_msg)
188+
memory_store = await DatabaseFactory.get_database(user_id=user_id)
189+
await memory_store.add_agent_message(agent_msg)
190+
return True
191+
except Exception as e:
192+
logger.exception("Failed to handle human clarification -> agent message: %s", e)
193+
return False
194+
122195

123196
@staticmethod
124197
async def handle_human_clarification(human_feedback: messages.UserClarificationResponse, user_id: str) -> bool:
@@ -141,14 +214,7 @@ async def handle_human_clarification(human_feedback: messages.UserClarificationR
141214
# Persist if your database layer supports it.
142215
# Look for or implement something like: memory_store.add_agent_message(agent_msg)
143216
memory_store = await DatabaseFactory.get_database(user_id=user_id)
144-
if hasattr(memory_store, "add_agent_message"):
145-
await memory_store.add_agent_message(agent_msg)
146-
else:
147-
# Fallback: log or ignore if persistence not yet implemented
148-
logging.debug("add_agent_message not implemented; skipping persistence")
149-
150-
# Optionally emit over websocket if you have a broadcaster:
151-
# await websocket_manager.broadcast(WebsocketMessageType.AGENT_MESSAGE, agent_msg.model_dump())
217+
await memory_store.add_agent_message(agent_msg)
152218

153219
return True
154220
except Exception as e:

src/backend/v3/models/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class MPlan(BaseModel):
2525
"""model of a plan"""
2626
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
2727
data_type: Literal[DataType.m_plan] = Field(DataType.m_plan, Literal=True)
28+
session_id: str = Field(default_factory=lambda: str(uuid.uuid4())) # db partition key
2829
user_id: str = ""
2930
team_id: str = ""
3031
plan_id: str = ""

0 commit comments

Comments
 (0)