Skip to content

Commit e5f43ef

Browse files
committed
motor -> pymongo
1 parent a5949f5 commit e5f43ef

21 files changed

+81
-68
lines changed

backend/app/core/database_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ async def session(self) -> AsyncIterator[DBSession]:
167167
async with connection.session() as session:
168168
await collection.insert_one(doc, session=session)
169169
"""
170-
async with await self.client.start_session() as session:
171-
async with session.start_transaction():
170+
async with self.client.start_session() as session:
171+
async with await session.start_transaction():
172172
yield session
173173

174174

backend/app/db/repositories/admin/admin_events_repository.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:
120120

121121
# Get overview statistics
122122
overview_pipeline = EventStatsAggregation.build_overview_pipeline(start_time)
123-
overview_result = await self.events_collection.aggregate(overview_pipeline).to_list(1)
123+
overview_cursor = await self.events_collection.aggregate(overview_pipeline)
124+
overview_result = await overview_cursor.to_list(1)
124125

125126
stats = (
126127
overview_result[0]
@@ -140,19 +141,20 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:
140141

141142
# Get event types with counts
142143
type_pipeline = EventStatsAggregation.build_event_types_pipeline(start_time)
143-
top_types = await self.events_collection.aggregate(type_pipeline).to_list(10)
144+
type_cursor = await self.events_collection.aggregate(type_pipeline)
145+
top_types = await type_cursor.to_list(10)
144146
events_by_type = {t["_id"]: t["count"] for t in top_types}
145147

146148
# Get events by hour
147149
hourly_pipeline = EventStatsAggregation.build_hourly_events_pipeline(start_time)
148-
hourly_cursor = self.events_collection.aggregate(hourly_pipeline)
150+
hourly_cursor = await self.events_collection.aggregate(hourly_pipeline)
149151
events_by_hour: list[HourlyEventCount | dict[str, Any]] = [
150152
HourlyEventCount(hour=doc["_id"], count=doc["count"]) async for doc in hourly_cursor
151153
]
152154

153155
# Get top users
154156
user_pipeline = EventStatsAggregation.build_top_users_pipeline(start_time)
155-
top_users_cursor = self.events_collection.aggregate(user_pipeline)
157+
top_users_cursor = await self.events_collection.aggregate(user_pipeline)
156158
top_users = [
157159
UserEventCount(user_id=doc["_id"], event_count=doc["count"])
158160
async for doc in top_users_cursor
@@ -175,7 +177,8 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:
175177
{"$group": {"_id": None, "avg_duration": {"$avg": "$resource_usage.execution_time_wall_seconds"}}},
176178
]
177179

178-
exec_result = await executions_collection.aggregate(exec_pipeline).to_list(1)
180+
exec_cursor = await executions_collection.aggregate(exec_pipeline)
181+
exec_result = await exec_cursor.to_list(1)
179182
avg_processing_time = (
180183
exec_result[0]["avg_duration"] if exec_result and exec_result[0].get("avg_duration") else 0
181184
)

backend/app/db/repositories/dlq_repository.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def get_dlq_stats(self) -> DLQStatistics:
3535
]
3636

3737
status_results = []
38-
async for doc in self.dlq_collection.aggregate(status_pipeline):
38+
async for doc in await self.dlq_collection.aggregate(status_pipeline):
3939
status_results.append(doc)
4040

4141
# Convert status results to dict
@@ -58,7 +58,7 @@ async def get_dlq_stats(self) -> DLQStatistics:
5858
]
5959

6060
by_topic: List[TopicStatistic] = []
61-
async for doc in self.dlq_collection.aggregate(topic_pipeline):
61+
async for doc in await self.dlq_collection.aggregate(topic_pipeline):
6262
by_topic.append(
6363
TopicStatistic(topic=doc["_id"], count=doc["count"], avg_retry_count=round(doc["avg_retry_count"], 2))
6464
)
@@ -71,7 +71,7 @@ async def get_dlq_stats(self) -> DLQStatistics:
7171
]
7272

7373
by_event_type: List[EventTypeStatistic] = []
74-
async for doc in self.dlq_collection.aggregate(event_type_pipeline):
74+
async for doc in await self.dlq_collection.aggregate(event_type_pipeline):
7575
if doc["_id"]: # Skip null event types
7676
by_event_type.append(EventTypeStatistic(event_type=doc["_id"], count=doc["count"]))
7777

@@ -94,7 +94,8 @@ async def get_dlq_stats(self) -> DLQStatistics:
9494
},
9595
]
9696

97-
age_result = await self.dlq_collection.aggregate(age_pipeline).to_list(1)
97+
age_cursor = await self.dlq_collection.aggregate(age_pipeline)
98+
age_result = await age_cursor.to_list(1)
9899
age_stats_data = age_result[0] if age_result else {}
99100
age_stats = AgeStatistics(
100101
min_age_seconds=age_stats_data.get("min_age", 0.0),
@@ -148,7 +149,7 @@ async def get_topics_summary(self) -> list[DLQTopicSummary]:
148149
]
149150

150151
topics = []
151-
async for result in self.dlq_collection.aggregate(pipeline):
152+
async for result in await self.dlq_collection.aggregate(pipeline):
152153
status_counts: dict[str, int] = {}
153154
for status in result["statuses"]:
154155
status_counts[status] = status_counts.get(status, 0) + 1

backend/app/db/repositories/event_repository.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ async def get_event_statistics(
234234
]
235235
)
236236

237-
result = await self._collection.aggregate(pipeline).to_list(length=1)
237+
cursor = await self._collection.aggregate(pipeline)
238+
result = await cursor.to_list(length=1)
238239

239240
if result:
240241
stats = result[0]
@@ -296,7 +297,8 @@ async def get_event_statistics_filtered(
296297
]
297298
)
298299

299-
result = await self._collection.aggregate(pipeline).to_list(length=1)
300+
cursor = await self._collection.aggregate(pipeline)
301+
result = await cursor.to_list(length=1)
300302
if result:
301303
stats = result[0]
302304
return EventStatistics(
@@ -321,7 +323,9 @@ async def stream_events(
321323
if filters:
322324
pipeline.append({"$match": filters})
323325

324-
async with self._collection.watch(pipeline, start_after=start_after, full_document="updateLookup") as stream:
326+
async with await self._collection.watch(
327+
pipeline, start_after=start_after, full_document="updateLookup"
328+
) as stream:
325329
async for change in stream:
326330
if change["operationType"] in ["insert", "update", "replace"]:
327331
yield change["fullDocument"]
@@ -438,7 +442,7 @@ async def aggregate_events(self, pipeline: list[dict[str, object]], limit: int =
438442
pipeline.append({"$limit": limit})
439443

440444
results = []
441-
async for doc in self._collection.aggregate(pipeline):
445+
async for doc in await self._collection.aggregate(pipeline):
442446
if "_id" in doc and isinstance(doc["_id"], dict):
443447
doc["_id"] = str(doc["_id"])
444448
results.append(doc)
@@ -451,7 +455,7 @@ async def list_event_types(self, match: Mapping[str, object] = MappingProxyType(
451455
pipeline.append({"$match": dict(match)})
452456
pipeline.extend([{"$group": {"_id": f"${EventFields.EVENT_TYPE}"}}, {"$sort": {"_id": 1}}])
453457
event_types: list[str] = []
454-
async for doc in self._collection.aggregate(pipeline):
458+
async for doc in await self._collection.aggregate(pipeline):
455459
event_types.append(doc["_id"])
456460
return event_types
457461

backend/app/db/repositories/execution_repository.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,10 @@ async def get_executions(
130130
executions: list[DomainExecution] = []
131131
async for doc in cursor:
132132
sv = doc.get("status")
133+
resource_usage_data = doc.get("resource_usage")
133134
executions.append(
134135
DomainExecution(
135-
execution_id=doc.get("execution_id"),
136+
execution_id=str(doc.get("execution_id", "")),
136137
script=doc.get("script", ""),
137138
status=ExecutionStatus(str(sv)),
138139
stdout=doc.get("stdout"),
@@ -142,8 +143,8 @@ async def get_executions(
142143
created_at=doc.get("created_at", datetime.now(timezone.utc)),
143144
updated_at=doc.get("updated_at", datetime.now(timezone.utc)),
144145
resource_usage=(
145-
ResourceUsageDomain.from_dict(doc.get("resource_usage"))
146-
if doc.get("resource_usage") is not None
146+
ResourceUsageDomain.from_dict(dict(resource_usage_data))
147+
if resource_usage_data is not None
147148
else None
148149
),
149150
user_id=doc.get("user_id"),

backend/app/db/repositories/notification_repository.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ def __init__(self, database: Database):
2525

2626
async def create_indexes(self) -> None:
2727
# Create indexes if only _id exists
28-
notif_indexes = await self.notifications_collection.list_indexes().to_list(None)
28+
notif_cursor = await self.notifications_collection.list_indexes()
29+
notif_indexes = await notif_cursor.to_list(None)
2930
if len(notif_indexes) <= 1:
3031
await self.notifications_collection.create_indexes(
3132
[
@@ -38,7 +39,8 @@ async def create_indexes(self) -> None:
3839
]
3940
)
4041

41-
subs_indexes = await self.subscriptions_collection.list_indexes().to_list(None)
42+
subs_cursor = await self.subscriptions_collection.list_indexes()
43+
subs_indexes = await subs_cursor.to_list(None)
4244
if len(subs_indexes) <= 1:
4345
await self.subscriptions_collection.create_indexes(
4446
[

backend/app/db/repositories/saga_repository.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ async def count_sagas_by_state(self) -> dict[str, int]:
9393
pipeline = [{"$group": {"_id": "$state", "count": {"$sum": 1}}}]
9494

9595
result = {}
96-
async for doc in self.sagas.aggregate(pipeline):
96+
async for doc in await self.sagas.aggregate(pipeline):
9797
result[doc["_id"]] = doc["count"]
9898

9999
return result
@@ -123,7 +123,7 @@ async def get_saga_statistics(self, saga_filter: SagaFilter | None = None) -> di
123123
state_pipeline = [{"$match": query}, {"$group": {"_id": "$state", "count": {"$sum": 1}}}]
124124

125125
states = {}
126-
async for doc in self.sagas.aggregate(state_pipeline):
126+
async for doc in await self.sagas.aggregate(state_pipeline):
127127
states[doc["_id"]] = doc["count"]
128128

129129
# Average duration for completed sagas
@@ -134,7 +134,7 @@ async def get_saga_statistics(self, saga_filter: SagaFilter | None = None) -> di
134134
]
135135

136136
avg_duration = 0.0
137-
async for doc in self.sagas.aggregate(duration_pipeline):
137+
async for doc in await self.sagas.aggregate(duration_pipeline):
138138
# Convert milliseconds to seconds
139139
avg_duration = doc["avg_duration"] / 1000.0 if doc["avg_duration"] else 0.0
140140

backend/app/dlq/manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ async def _update_queue_metrics(self) -> None:
334334
{"$group": {"_id": f"${DLQFields.ORIGINAL_TOPIC}", "count": {"$sum": 1}}},
335335
]
336336

337-
async for result in self.dlq_collection.aggregate(pipeline):
337+
async for result in await self.dlq_collection.aggregate(pipeline):
338338
# Note: OpenTelemetry doesn't have direct gauge set, using delta tracking
339339
self.metrics.update_dlq_queue_size(result["_id"], result["count"])
340340

backend/app/events/event_store.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ async def initialize(self) -> None:
6868
),
6969
]
7070

71-
existing = await self.collection.list_indexes().to_list(None)
71+
indexes_cursor = await self.collection.list_indexes()
72+
existing = await indexes_cursor.to_list(None)
7273
if len(existing) <= 1:
7374
await self.collection.create_indexes(event_indexes)
7475
logger.info(f"Created {len(event_indexes)} indexes for events collection")
@@ -300,7 +301,7 @@ async def get_event_stats(
300301
]
301302
)
302303

303-
cursor = self.collection.aggregate(pipeline)
304+
cursor = await self.collection.aggregate(pipeline)
304305
stats: Dict[str, Any] = {"total_events": 0, "event_types": {}, "start_time": start_time, "end_time": end_time}
305306
async for r in cursor:
306307
et = r["_id"]

backend/tests/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from dishka import AsyncContainer
1212
from dotenv import load_dotenv
1313
from httpx import ASGITransport
14-
from pymongo.asynchronous.database import AsyncDatabase as AsyncIOMotorDatabase
14+
from app.core.database_context import Database
1515
import redis.asyncio as redis
1616

1717
# Load test environment variables BEFORE any app imports
@@ -175,8 +175,8 @@ async def scope(app_container: AsyncContainer): # type: ignore[valid-type]
175175

176176

177177
@pytest_asyncio.fixture(scope="function")
178-
async def db(scope) -> AsyncGenerator[AsyncIOMotorDatabase, None]: # type: ignore[valid-type]
179-
database: AsyncIOMotorDatabase = await scope.get(AsyncIOMotorDatabase)
178+
async def db(scope) -> AsyncGenerator[Database, None]: # type: ignore[valid-type]
179+
database: Database = await scope.get(Database)
180180
yield database
181181

182182

0 commit comments

Comments
 (0)