Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2d63bb2
Refactor session and team management in backend
Fr4nc3 Sep 10, 2025
ce02065
Add async agent message handlers and improve logging
Fr4nc3 Sep 10, 2025
48a6926
Merge branch 'macae-v3-dev-v2-vip' into macae-v3-fr-dev-92
Fr4nc3 Sep 10, 2025
128bb0b
Refactor plan page state and improve streaming parsing
Fr4nc3 Sep 10, 2025
d53e031
Merge branch 'macae-v3-dev-v2-vip' into macae-v3-fr-dev-92
Fr4nc3 Sep 10, 2025
3c72b0d
Add plan approval and mplan management features
Fr4nc3 Sep 10, 2025
867166f
Add user_id parameter to message handlers
Fr4nc3 Sep 10, 2025
427450a
Add fields to AgentMessage models for plan messaging
Fr4nc3 Sep 11, 2025
2b9c1be
Improve human clarification handling in API
Fr4nc3 Sep 11, 2025
78495c1
Add agent message endpoint and update AgentMessageResponse model
Fr4nc3 Sep 11, 2025
3bff1f2
Merge branch 'macae-v3-dev-marktayl' into macae-v3-fr-dev-92
Fr4nc3 Sep 11, 2025
9804be5
Merge branch 'macae-v3-dev-v2-vip' into macae-v3-fr-dev-92
Fr4nc3 Sep 11, 2025
a61d821
Refactor AgentMessageResponse field defaults and order
Fr4nc3 Sep 11, 2025
2271392
Refactor agent message handling and validation logic
Fr4nc3 Sep 11, 2025
d233db2
Improve agent message parsing for multiple formats
Fr4nc3 Sep 11, 2025
b6358e8
Add agent message support to database and services
Fr4nc3 Sep 11, 2025
7ab7196
Add agent message persistence to PlanPage
Fr4nc3 Sep 12, 2025
6354ec2
Merge branch 'macae-v3-dev-v2-vip' into macae-v3-fr-dev-92
Fr4nc3 Sep 12, 2025
4b9658c
Refactor plan_service.py for readability and formatting
Fr4nc3 Sep 12, 2025
2cbc49e
Merge branch 'macae-v3-dev-v2-vip' into macae-v3-fr-dev-92
Fr4nc3 Sep 12, 2025
f141f5d
Add OpenAPI docstrings to API endpoints
Fr4nc3 Sep 12, 2025
377365f
Refactor agent message handling and plan retrieval
Fr4nc3 Sep 12, 2025
c4120fd
Fix indentation for HTTPException in upload_team_config
Fr4nc3 Sep 12, 2025
fd0dccc
Refactor plan data handling and WebSocket params
Fr4nc3 Sep 12, 2025
b930096
Remove unused getPlanWithSteps method and update PlanPage
Fr4nc3 Sep 12, 2025
4be82af
Add final message handling and AgentMessageResponse type
Fr4nc3 Sep 12, 2025
3132e5b
Update messages.py
Fr4nc3 Sep 12, 2025
58672c1
Refactor get_team_by_id and improve plan retrieval error handling
Fr4nc3 Sep 12, 2025
a04a1c4
Pass planData to agent message processing in PlanPage
Fr4nc3 Sep 12, 2025
5893f74
Refactor AgentMessageResponse model and usage
Fr4nc3 Sep 12, 2025
9112a94
Simplify agent message content before processing
Fr4nc3 Sep 12, 2025
5007810
Merge branch 'macae-v3-dev-v2-vip' into macae-v3-fr-dev-92
Fr4nc3 Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 83 additions & 63 deletions src/backend/common/database/cosmosdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from azure.cosmos.aio import CosmosClient
from azure.cosmos.aio._database import DatabaseProxy
from azure.cosmos.exceptions import CosmosResourceExistsError
from pytest import Session
import v3.models.messages as messages

from common.models.messages_kernel import (
AgentMessage,
Expand All @@ -23,8 +23,8 @@

from .database_base import DatabaseBase
from ..models.messages_kernel import (
AgentMessageData,
BaseDataModel,
Session,
Plan,
Step,
AgentMessage,
Expand All @@ -38,7 +38,6 @@ class CosmosDBClient(DatabaseBase):
"""CosmosDB implementation of the database interface."""

MODEL_CLASS_MAPPING = {
DataType.session: Session,
DataType.plan: Plan,
DataType.step: Step,
DataType.agent_message: AgentMessage,
Expand Down Expand Up @@ -190,29 +189,6 @@ async def delete_item(self, item_id: str, partition_key: str) -> None:
self.logger.error("Failed to delete item from CosmosDB: %s", str(e))
raise

# Session Operations
async def add_session(self, session: Session) -> None:
"""Add a session to CosmosDB."""
await self.add_item(session)

async def get_session(self, session_id: str) -> Optional[Session]:
"""Retrieve a session by session_id."""
query = "SELECT * FROM c WHERE c.id=@id AND c.data_type=@data_type"
parameters = [
{"name": "@id", "value": session_id},
{"name": "@data_type", "value": DataType.session},
]
results = await self.query_items(query, parameters, Session)
return results[0] if results else None

async def get_all_sessions(self) -> List[Session]:
"""Retrieve all sessions for the user."""
query = "SELECT * FROM c WHERE c.user_id=@user_id AND c.data_type=@data_type"
parameters = [
{"name": "@user_id", "value": self.user_id},
{"name": "@data_type", "value": DataType.session},
]
return await self.query_items(query, parameters, Session)

# Plan Operations
async def add_plan(self, plan: Plan) -> None:
Expand All @@ -223,17 +199,6 @@ async def update_plan(self, plan: Plan) -> None:
"""Update a plan in CosmosDB."""
await self.update_item(plan)

async def get_plan_by_session(self, session_id: str) -> Optional[Plan]:
"""Retrieve a plan by session_id."""
query = (
"SELECT * FROM c WHERE c.session_id=@session_id AND c.data_type=@data_type"
)
parameters = [
{"name": "@session_id", "value": session_id},
{"name": "@data_type", "value": DataType.plan},
]
results = await self.query_items(query, parameters, Plan)
return results[0] if results else None

async def get_plan_by_plan_id(self, plan_id: str) -> Optional[Plan]:
"""Retrieve a plan by plan_id."""
Expand Down Expand Up @@ -272,7 +237,7 @@ async def get_all_plans_by_team_id(self, team_id: str) -> List[Plan]:

async def get_all_plans_by_team_id_status(self, team_id: str, status: str) -> List[Plan]:
"""Retrieve all plans for a specific team."""
query = "SELECT * FROM c WHERE c.team_id=@team_id AND c.data_type=@data_type and c.user_id=@user_id and c.overall_status=@status"
query = "SELECT * FROM c WHERE c.team_id=@team_id AND c.data_type=@data_type and c.user_id=@user_id and c.overall_status=@status ORDER BY c._ts DESC"
parameters = [
{"name": "@user_id", "value": self.user_id},
{"name": "@team_id", "value": team_id},
Expand Down Expand Up @@ -328,7 +293,7 @@ async def get_team(self, team_id: str) -> Optional[TeamConfiguration]:
teams = await self.query_items(query, parameters, TeamConfiguration)
return teams[0] if teams else None

async def get_team_by_id(self, id: str) -> Optional[TeamConfiguration]:
async def get_team_by_id(self, team_id: str) -> Optional[TeamConfiguration]:
"""Retrieve a specific team configuration by its document id.

Args:
Expand All @@ -337,9 +302,9 @@ async def get_team_by_id(self, id: str) -> Optional[TeamConfiguration]:
Returns:
TeamConfiguration object or None if not found
"""
query = "SELECT * FROM c WHERE c.id=@id AND c.data_type=@data_type"
query = "SELECT * FROM c WHERE c.team_id=@team_id AND c.data_type=@data_type"
parameters = [
{"name": "@id", "value": id},
{"name": "@team_id", "value": team_id},
{"name": "@data_type", "value": DataType.team_config},
]
teams = await self.query_items(query, parameters, TeamConfiguration)
Expand Down Expand Up @@ -383,27 +348,6 @@ async def delete_team(self, team_id: str) -> bool:
logging.exception(f"Failed to delete team from Cosmos DB: {e}")
return False

async def get_data_by_type_and_session_id(
self, data_type: str, session_id: str
) -> List[BaseDataModel]:
"""Query the Cosmos DB for documents with the matching data_type, session_id and user_id."""
await self._ensure_initialized()
if self.container is None:
return []

model_class = self.MODEL_CLASS_MAPPING.get(data_type, BaseDataModel)
try:
query = "SELECT * FROM c WHERE c.session_id=@session_id AND c.user_id=@user_id AND c.data_type=@data_type ORDER BY c._ts ASC"
parameters = [
{"name": "@session_id", "value": session_id},
{"name": "@data_type", "value": data_type},
{"name": "@user_id", "value": self.user_id},
]
return await self.query_items(query, parameters, model_class)
except Exception as e:
logging.exception(f"Failed to query data by type from Cosmos DB: {e}")
return []

# Data Management Operations
async def get_data_by_type(self, data_type: str) -> List[BaseDataModel]:
"""Retrieve all data of a specific type."""
Expand Down Expand Up @@ -470,13 +414,89 @@ async def get_current_team(self, user_id: str) -> Optional[UserCurrentTeam]:
teams = await self.query_items(query, parameters, UserCurrentTeam)
return teams[0] if teams else None



async def delete_current_team(self, user_id: str) -> bool:
"""Delete the current team for a user."""
query = "SELECT c.id, c.session_id FROM c WHERE c.user_id=@user_id AND c.data_type=@data_type"

params = [
{"name": "@user_id", "value": user_id},
{"name": "@data_type", "value": DataType.user_current_team},
]
items = self.container.query_items(query=query, parameters=params)
print("Items to delete:", items)
if items:
async for doc in items:
try:
await self.container.delete_item(doc["id"], partition_key=doc["session_id"])
except Exception as e:
self.logger.warning("Failed deleting current team doc %s: %s", doc.get("id"), e)

return True

async def set_current_team(self, current_team: UserCurrentTeam) -> None:
"""Set the current team for a user."""
await self._ensure_initialized()
await self.add_item(current_team)


async def update_current_team(self, current_team: UserCurrentTeam) -> None:
"""Update the current team for a user."""
await self._ensure_initialized()
await self.update_item(current_team)

async def delete_plan_by_plan_id(self, plan_id: str) -> bool:
"""Delete a plan by its ID."""
query = "SELECT c.id, c.session_id FROM c WHERE c.id=@plan_id "

params = [
{"name": "@plan_id", "value": plan_id},
]
items = self.container.query_items(query=query, parameters=params)
print("Items to delete planid:", items)
if items:
async for doc in items:
try:
await self.container.delete_item(doc["id"], partition_key=doc["session_id"])
except Exception as e:
self.logger.warning("Failed deleting current team doc %s: %s", doc.get("id"), e)

return True

async def add_mplan(self, mplan: messages.MPlan) -> None:
"""Add a team configuration to the database."""
await self.add_item(mplan)

async def update_mplan(self, mplan: messages.MPlan) -> None:
"""Update a team configuration in the database."""
await self.update_item(mplan)


async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]:
"""Retrieve a mplan configuration by mplan_id."""
query = "SELECT * FROM c WHERE c.plan_id=@plan_id AND c.data_type=@data_type"
parameters = [
{"name": "@plan_id", "value": plan_id},
{"name": "@data_type", "value": DataType.m_plan},
]
results = await self.query_items(query, parameters, messages.MPlan)
return results[0] if results else None


async def add_agent_message(self, message: AgentMessageData) -> None:
"""Add an agent message to the database."""
await self.add_item(message)

async def update_agent_message(self, message: AgentMessageData) -> None:
"""Update an agent message in the database."""
await self.update_item(message)

async def get_agent_messages(self, plan_id: str) -> List[AgentMessageData]:
"""Retrieve an agent message by message_id."""
query = "SELECT * FROM c WHERE c.plan_id=@plan_id AND c.data_type=@data_type ORDER BY c._ts ASC"
parameters = [
{"name": "@plan_id", "value": plan_id},
{"name": "@data_type", "value": DataType.m_plan_message},
]

return await self.query_items(query, parameters, AgentMessageData)
71 changes: 43 additions & 28 deletions src/backend/common/database/database_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type

import v3.models.messages as messages
from ..models.messages_kernel import (
AgentMessageData,
BaseDataModel,
Session,
Plan,
Step,
TeamConfiguration,
Expand Down Expand Up @@ -59,21 +59,6 @@ async def delete_item(self, item_id: str, partition_key: str) -> None:
"""Delete an item from the database."""
pass

# Session Operations
@abstractmethod
async def add_session(self, session: Session) -> None:
"""Add a session to the database."""
pass

@abstractmethod
async def get_session(self, session_id: str) -> Optional[Session]:
"""Retrieve a session by session_id."""
pass

@abstractmethod
async def get_all_sessions(self) -> List[Session]:
"""Retrieve all sessions for the user."""
pass

# Plan Operations
@abstractmethod
Expand All @@ -86,11 +71,6 @@ async def update_plan(self, plan: Plan) -> None:
"""Update a plan in the database."""
pass

@abstractmethod
async def get_plan_by_session(self, session_id: str) -> Optional[Plan]:
"""Retrieve a plan by session_id."""
pass

@abstractmethod
async def get_plan_by_plan_id(self, plan_id: str) -> Optional[Plan]:
"""Retrieve a plan by plan_id."""
Expand Down Expand Up @@ -118,11 +98,7 @@ async def get_all_plans_by_team_id_status(
"""Retrieve all plans for a specific team."""
pass

@abstractmethod
async def get_data_by_type_and_session_id(
self, data_type: str, session_id: str
) -> List[BaseDataModel]:
pass


# Step Operations
@abstractmethod
Expand Down Expand Up @@ -162,7 +138,7 @@ async def get_team(self, team_id: str) -> Optional[TeamConfiguration]:
pass

@abstractmethod
async def get_team_by_id(self, id: str) -> Optional[TeamConfiguration]:
async def get_team_by_id(self, team_id: str) -> Optional[TeamConfiguration]:
"""Retrieve a team configuration by internal id."""
pass

Expand Down Expand Up @@ -207,6 +183,11 @@ async def get_current_team(self, user_id: str) -> Optional[UserCurrentTeam]:
"""Retrieve the current team for a user."""
pass

@abstractmethod
async def delete_current_team(self, user_id: str) -> Optional[UserCurrentTeam]:
"""Retrieve the current team for a user."""
pass

@abstractmethod
async def set_current_team(self, current_team: UserCurrentTeam) -> None:
pass
Expand All @@ -215,3 +196,37 @@ async def set_current_team(self, current_team: UserCurrentTeam) -> None:
async def update_current_team(self, current_team: UserCurrentTeam) -> None:
"""Update the current team for a user."""
pass

@abstractmethod
async def delete_plan_by_plan_id(self, plan_id: str) -> bool:
"""Retrieve the current team for a user."""
pass

@abstractmethod
async def add_mplan(self, mplan: messages.MPlan) -> None:
"""Add a team configuration to the database."""
pass

@abstractmethod
async def update_mplan(self, mplan: messages.MPlan) -> None:
"""Update a team configuration in the database."""
pass

@abstractmethod
async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]:
"""Retrieve a mplan configuration by plan_id."""
pass

@abstractmethod
async def add_agent_message(self, message: AgentMessageData) -> None:
pass

@abstractmethod
async def update_agent_message(self, message: AgentMessageData) -> None:
"""Update an agent message in the database."""
pass

@abstractmethod
async def get_agent_messages(self, plan_id: str) -> Optional[AgentMessageData]:
"""Retrieve an agent message by message_id."""
pass
Loading
Loading