Skip to content

Commit 81a732e

Browse files
committed
Refactor JsonService to use CosmosMemoryContext
Replaces usage of the generic memory store in JsonService with CosmosMemoryContext for all team configuration operations. Adds async methods to CosmosMemoryContext for querying, retrieving, and deleting items by key, supporting parameterized queries and proper partition key handling.
1 parent ba95221 commit 81a732e

File tree

2 files changed

+87
-26
lines changed

2 files changed

+87
-26
lines changed

src/backend/context/cosmos_memory_kernel.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,65 @@ async def get_item_by_id(
168168
logging.exception(f"Failed to retrieve item from Cosmos DB: {e}")
169169
return None
170170

171+
async def query_items_with_parameters(
172+
self, query: str, parameters: List[Dict[str, Any]], limit: int = 1000
173+
) -> List[Dict[str, Any]]:
174+
"""Query items from Cosmos DB with parameters and return raw dictionaries."""
175+
await self.ensure_initialized()
176+
177+
try:
178+
items = self._container.query_items(
179+
query=query,
180+
parameters=parameters,
181+
enable_cross_partition_query=True
182+
)
183+
result_list = []
184+
count = 0
185+
async for item in items:
186+
if count >= limit:
187+
break
188+
result_list.append(item)
189+
count += 1
190+
return result_list
191+
except Exception as e:
192+
logging.exception(f"Failed to query items from Cosmos DB: {e}")
193+
return []
194+
195+
async def get_async(self, key: str) -> Optional[Dict[str, Any]]:
196+
"""Get an item by its key/ID."""
197+
await self.ensure_initialized()
198+
199+
try:
200+
# Query by ID across all partitions since we don't know the partition key
201+
query = "SELECT * FROM c WHERE c.id=@id"
202+
parameters = [{"name": "@id", "value": key}]
203+
204+
items = self._container.query_items(
205+
query=query,
206+
parameters=parameters,
207+
enable_cross_partition_query=True
208+
)
209+
210+
async for item in items:
211+
return item
212+
return None
213+
except Exception as e:
214+
logging.exception(f"Failed to get item from Cosmos DB: {e}")
215+
return None
216+
217+
async def delete_async(self, key: str) -> None:
218+
"""Delete an item by its key/ID."""
219+
await self.ensure_initialized()
220+
221+
try:
222+
# First get the item to find its partition key
223+
item = await self.get_async(key)
224+
if item:
225+
partition_key = item.get("session_id", item.get("user_id", key))
226+
await self._container.delete_item(item=key, partition_key=partition_key)
227+
except Exception as e:
228+
logging.exception(f"Failed to delete item from Cosmos DB: {e}")
229+
171230
async def query_items(
172231
self,
173232
query: str,

src/backend/services/json_service.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
from typing import Dict, Any, List, Optional
66

77
from models.messages_kernel import TeamConfiguration, TeamAgent, StartingTask
8+
from context.cosmos_memory_kernel import CosmosMemoryContext
89

910

1011
class JsonService:
1112
"""Service for handling JSON team configuration operations."""
1213

13-
def __init__(self, memory_store):
14-
"""Initialize with memory store."""
15-
self.memory_store = memory_store
14+
def __init__(self, memory_context: CosmosMemoryContext):
15+
"""Initialize with memory context."""
16+
self.memory_context = memory_context
1617
self.logger = logging.getLogger(__name__)
1718

1819
async def validate_and_parse_team_config(
@@ -148,14 +149,15 @@ async def save_team_configuration(self, team_config: TeamConfiguration) -> str:
148149
The unique ID of the saved configuration
149150
"""
150151
try:
151-
# Convert to dictionary for storage
152+
# Convert to dictionary and add data_type for proper querying
152153
config_dict = team_config.model_dump()
153-
154-
# Add the full JSON string for storage (with proper datetime serialization)
155-
config_dict["json_data"] = json.dumps(config_dict, indent=2, default=str)
156-
157-
# Save to memory store using the config ID as the key
158-
await self.memory_store.upsert_async(team_config.id, config_dict)
154+
config_dict["data_type"] = "team_config"
155+
156+
# Use the cosmos memory context to save the team configuration
157+
await self.memory_context.upsert_async(
158+
collection_name=team_config.id,
159+
record=config_dict
160+
)
159161

160162
self.logger.info(
161163
"Successfully saved team configuration with ID: %s", team_config.id
@@ -180,12 +182,12 @@ async def get_team_configuration(
180182
TeamConfiguration object or None if not found
181183
"""
182184
try:
183-
# Get the specific configuration by its ID
184-
config_dict = await self.memory_store.get_async(config_id)
185-
185+
# Get the specific configuration using cosmos memory context
186+
config_dict = await self.memory_context.get_async(config_id)
187+
186188
if config_dict is None:
187189
return None
188-
190+
189191
# Verify the configuration belongs to the user
190192
if config_dict.get("user_id") != user_id:
191193
self.logger.warning(
@@ -214,13 +216,14 @@ async def get_all_team_configurations(
214216
List of TeamConfiguration objects
215217
"""
216218
try:
217-
# Query configurations using SQL with parameters
218-
query = "SELECT * FROM memory WHERE memory.user_id=@user_id"
219+
# Query configurations using SQL with parameters through cosmos memory context
220+
query = "SELECT * FROM c WHERE c.user_id=@user_id AND c.data_type=@data_type"
219221
parameters = [
220222
{"name": "@user_id", "value": user_id},
223+
{"name": "@data_type", "value": "team_config"},
221224
]
222-
223-
configs = await self.memory_store.query_items_with_parameters(
225+
226+
configs = await self.memory_context.query_items_with_parameters(
224227
query, parameters, limit=1000
225228
)
226229

@@ -254,25 +257,24 @@ async def delete_team_configuration(self, config_id: str, user_id: str) -> bool:
254257
"""
255258
try:
256259
# First, verify the configuration exists and belongs to the user
257-
config_dict = await self.memory_store.get_async(config_id)
258-
260+
config_dict = await self.memory_context.get_async(config_id)
261+
259262
if config_dict is None:
260263
self.logger.warning(
261264
"Team configuration not found for deletion: %s", config_id
262265
)
263266
return False
264-
267+
265268
# Verify the configuration belongs to the user
266269
if config_dict.get("user_id") != user_id:
267270
self.logger.warning(
268-
"Access denied: cannot delete config %s for user %s",
269-
config_id,
270-
user_id,
271+
"Access denied: cannot delete config %s for user %s",
272+
config_id, user_id
271273
)
272274
return False
273275

274-
# Delete the configuration
275-
await self.memory_store.delete_async(config_id)
276+
# Delete the configuration using cosmos memory context
277+
await self.memory_context.delete_async(config_id)
276278

277279
self.logger.info("Successfully deleted team configuration: %s", config_id)
278280
return True

0 commit comments

Comments
 (0)