Skip to content

Commit 50f79b8

Browse files
committed
fixes
1 parent 1e40f7b commit 50f79b8

File tree

11 files changed

+89
-82
lines changed

11 files changed

+89
-82
lines changed

backend/app/api/routes/admin/events.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ async def browse_events(request: EventBrowseRequest, service: FromDishka[AdminEv
4343
event_filter = EventFilterMapper.from_admin_pydantic(request.filters)
4444

4545
result = await service.browse_events(
46-
filter=event_filter,
46+
event_filter=event_filter,
4747
skip=request.skip,
4848
limit=request.limit,
4949
sort_by=request.sort_by,
@@ -92,7 +92,7 @@ async def export_events_csv(
9292
end_time=end_time,
9393
)
9494
)
95-
result = await service.export_events_csv_content(filter=export_filter, limit=limit)
95+
result = await service.export_events_csv_content(event_filter=export_filter, limit=limit)
9696
return StreamingResponse(
9797
iter([result.content]),
9898
media_type=result.media_type,
@@ -128,7 +128,7 @@ async def export_events_json(
128128
end_time=end_time,
129129
)
130130
)
131-
result = await service.export_events_json_content(filter=export_filter, limit=limit)
131+
result = await service.export_events_json_content(event_filter=export_filter, limit=limit)
132132
return StreamingResponse(
133133
iter([result.content]),
134134
media_type=result.media_type,

backend/app/db/repositories/admin/admin_events_repository.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ def __init__(self, db: Database):
5858

5959
async def browse_events(
6060
self,
61-
filter: EventFilter,
61+
event_filter: EventFilter,
6262
skip: int = 0,
6363
limit: int = 50,
6464
sort_by: str = EventFields.TIMESTAMP,
6565
sort_order: int = SortDirection.DESCENDING,
6666
) -> EventBrowseResult:
6767
"""Browse events with filters using domain models."""
68-
query = EventFilterMapper.to_mongo_query(filter)
68+
query = EventFilterMapper.to_mongo_query(event_filter)
6969

7070
# Get total count
7171
total = await self.events_collection.count_documents(query)
@@ -191,9 +191,9 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:
191191

192192
return statistics
193193

194-
async def export_events_csv(self, filter: EventFilter) -> List[EventExportRow]:
194+
async def export_events_csv(self, event_filter: EventFilter) -> List[EventExportRow]:
195195
"""Export events as CSV data."""
196-
query = EventFilterMapper.to_mongo_query(filter)
196+
query = EventFilterMapper.to_mongo_query(event_filter)
197197

198198
cursor = self.events_collection.find(query).sort(EventFields.TIMESTAMP, SortDirection.DESCENDING).limit(10000)
199199

backend/app/db/repositories/dlq_repository.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,8 @@ async def get_messages(
112112
limit: int = 50,
113113
offset: int = 0,
114114
) -> DLQMessageListResult:
115-
# Create filter
116-
filter = DLQMessageFilter(status=status, topic=topic, event_type=event_type)
117-
118-
query = DLQMapper.filter_to_query(filter)
115+
msg_filter = DLQMessageFilter(status=status, topic=topic, event_type=event_type)
116+
query = DLQMapper.filter_to_query(msg_filter)
119117
total_count = await self.dlq_collection.count_documents(query)
120118

121119
cursor = self.dlq_collection.find(query).sort(DLQFields.FAILED_AT, -1).skip(offset).limit(limit)

backend/app/db/repositories/replay_repository.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,16 @@ async def update_replay_session(self, session_id: str, updates: ReplaySessionUpd
8989
result = await self.replay_collection.update_one({"session_id": session_id}, {"$set": mongo_updates})
9090
return result.modified_count > 0
9191

92-
async def count_events(self, filter: ReplayFilter) -> int:
92+
async def count_events(self, replay_filter: ReplayFilter) -> int:
9393
"""Count events matching the given filter"""
94-
query = filter.to_mongo_query()
94+
query = replay_filter.to_mongo_query()
9595
return await self.events_collection.count_documents(query)
9696

9797
async def fetch_events(
98-
self, filter: ReplayFilter, batch_size: int = 100, skip: int = 0
98+
self, replay_filter: ReplayFilter, batch_size: int = 100, skip: int = 0
9999
) -> AsyncIterator[List[Dict[str, Any]]]:
100100
"""Fetch events in batches based on filter"""
101-
query = filter.to_mongo_query()
101+
query = replay_filter.to_mongo_query()
102102
cursor = self.events_collection.find(query).sort("timestamp", 1).skip(skip)
103103

104104
batch = []

backend/app/db/repositories/saga_repository.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ async def get_sagas_by_execution(self, execution_id: str, state: SagaState | Non
5555
docs = await cursor.to_list(length=None)
5656
return [self.mapper.from_mongo(doc) for doc in docs]
5757

58-
async def list_sagas(self, filter: SagaFilter, limit: int = 100, skip: int = 0) -> SagaListResult:
59-
query = self.filter_mapper.to_mongodb_query(filter)
58+
async def list_sagas(self, saga_filter: SagaFilter, limit: int = 100, skip: int = 0) -> SagaListResult:
59+
query = self.filter_mapper.to_mongodb_query(saga_filter)
6060

6161
# Get total count
6262
total = await self.sagas.count_documents(query)
@@ -108,8 +108,8 @@ async def find_timed_out_sagas(
108108
docs = await cursor.to_list(length=limit)
109109
return [self.mapper.from_mongo(doc) for doc in docs]
110110

111-
async def get_saga_statistics(self, filter: SagaFilter | None = None) -> dict[str, object]:
112-
query = self.filter_mapper.to_mongodb_query(filter) if filter else {}
111+
async def get_saga_statistics(self, saga_filter: SagaFilter | None = None) -> dict[str, object]:
112+
query = self.filter_mapper.to_mongodb_query(saga_filter) if saga_filter else {}
113113

114114
# Basic counts
115115
total = await self.sagas.count_documents(query)

backend/app/domain/events/query_builders.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ def size(field: str) -> dict[str, str]:
5151
return {"$size": field}
5252

5353
@staticmethod
54-
def date_to_string(date_field: str, format: str = "%Y-%m-%d-%H") -> dict[str, Any]:
54+
def date_to_string(date_field: str, date_format: str = "%Y-%m-%d-%H") -> dict[str, Any]:
5555
"""Create a $dateToString expression."""
56-
return {"$dateToString": {"format": format, "date": date_field}}
56+
return {"$dateToString": {"format": date_format, "date": date_field}}
5757

5858

5959
class EventStatsAggregation:

backend/app/infrastructure/mappers/saga_mapper.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -214,31 +214,34 @@ def to_cancelled_event(
214214
class SagaFilterMapper:
215215
"""Maps saga filters to MongoDB queries."""
216216

217-
def to_mongodb_query(self, filter: SagaFilter) -> dict[str, Any]:
217+
def to_mongodb_query(self, saga_filter: SagaFilter | None) -> dict[str, Any]:
218218
"""Convert filter to MongoDB query."""
219219
query: dict[str, Any] = {}
220220

221-
if filter.state:
222-
query["state"] = filter.state.value
221+
if not saga_filter:
222+
return query
223223

224-
if filter.execution_ids:
225-
query["execution_id"] = {"$in": filter.execution_ids}
224+
if saga_filter.state:
225+
query["state"] = saga_filter.state.value
226226

227-
if filter.saga_name:
228-
query["saga_name"] = filter.saga_name
227+
if saga_filter.execution_ids:
228+
query["execution_id"] = {"$in": saga_filter.execution_ids}
229229

230-
if filter.error_status is not None:
231-
if filter.error_status:
230+
if saga_filter.saga_name:
231+
query["saga_name"] = saga_filter.saga_name
232+
233+
if saga_filter.error_status is not None:
234+
if saga_filter.error_status:
232235
query["error_message"] = {"$ne": None}
233236
else:
234237
query["error_message"] = None
235238

236-
if filter.created_after or filter.created_before:
239+
if saga_filter.created_after or saga_filter.created_before:
237240
time_query: dict[str, Any] = {}
238-
if filter.created_after:
239-
time_query["$gte"] = filter.created_after
240-
if filter.created_before:
241-
time_query["$lte"] = filter.created_before
241+
if saga_filter.created_after:
242+
time_query["$gte"] = saga_filter.created_after
243+
if saga_filter.created_before:
244+
time_query["$lte"] = saga_filter.created_before
242245
query["created_at"] = time_query
243246

244247
return query

backend/app/services/admin/admin_events_service.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ def __init__(self, repository: AdminEventsRepository, replay_service: ReplayServ
5959
async def browse_events(
6060
self,
6161
*,
62-
filter: EventFilter,
62+
event_filter: EventFilter,
6363
skip: int,
6464
limit: int,
6565
sort_by: str,
6666
sort_order: int,
6767
) -> EventBrowseResult:
6868
return await self._repo.browse_events(
69-
filter=filter, skip=skip, limit=limit, sort_by=sort_by, sort_order=sort_order
69+
event_filter=event_filter, skip=skip, limit=limit, sort_by=sort_by, sort_order=sort_order
7070
)
7171

7272
async def get_event_detail(self, event_id: str) -> EventDetail | None:
@@ -180,12 +180,12 @@ async def get_replay_status(self, session_id: str) -> ReplaySessionStatusDetail
180180
status = await self._repo.get_replay_status_with_progress(session_id)
181181
return status
182182

183-
async def export_events_csv(self, filter: EventFilter) -> List[EventExportRow]:
184-
rows = await self._repo.export_events_csv(filter)
183+
async def export_events_csv(self, event_filter: EventFilter) -> List[EventExportRow]:
184+
rows = await self._repo.export_events_csv(event_filter)
185185
return rows
186186

187-
async def export_events_csv_content(self, *, filter: EventFilter, limit: int) -> ExportResult:
188-
rows = await self._repo.export_events_csv(filter)
187+
async def export_events_csv_content(self, *, event_filter: EventFilter, limit: int) -> ExportResult:
188+
rows = await self._repo.export_events_csv(event_filter)
189189
output = StringIO()
190190
writer = csv.DictWriter(
191191
output,
@@ -216,8 +216,10 @@ async def export_events_csv_content(self, *, filter: EventFilter, limit: int) ->
216216
)
217217
return ExportResult(file_name=filename, content=output.getvalue(), media_type="text/csv")
218218

219-
async def export_events_json_content(self, *, filter: EventFilter, limit: int) -> ExportResult:
220-
result = await self._repo.browse_events(filter=filter, skip=0, limit=limit, sort_by="timestamp", sort_order=-1)
219+
async def export_events_json_content(self, *, event_filter: EventFilter, limit: int) -> ExportResult:
220+
result = await self._repo.browse_events(
221+
event_filter=event_filter, skip=0, limit=limit, sort_by="timestamp", sort_order=-1
222+
)
221223
event_mapper = EventMapper()
222224
events_data: list[dict[str, Any]] = []
223225
for event in result.events:
@@ -232,13 +234,13 @@ async def export_events_json_content(self, *, filter: EventFilter, limit: int) -
232234
"exported_at": datetime.now(timezone.utc).isoformat(),
233235
"total_events": len(events_data),
234236
"filters_applied": {
235-
"event_types": filter.event_types,
236-
"aggregate_id": filter.aggregate_id,
237-
"correlation_id": filter.correlation_id,
238-
"user_id": filter.user_id,
239-
"service_name": filter.service_name,
240-
"start_time": filter.start_time.isoformat() if filter.start_time else None,
241-
"end_time": filter.end_time.isoformat() if filter.end_time else None,
237+
"event_types": event_filter.event_types,
238+
"aggregate_id": event_filter.aggregate_id,
239+
"correlation_id": event_filter.correlation_id,
240+
"user_id": event_filter.user_id,
241+
"service_name": event_filter.service_name,
242+
"start_time": event_filter.start_time.isoformat() if event_filter.start_time else None,
243+
"end_time": event_filter.end_time.isoformat() if event_filter.end_time else None,
242244
},
243245
"export_limit": limit,
244246
},

backend/tests/unit/dlq/test_dlq_models.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
from datetime import datetime, timezone
21
import json
3-
4-
import pytest
2+
from datetime import datetime, timezone
53

64
from app.dlq import (
75
AgeStatistics,
86
DLQFields,
97
DLQMessageFilter,
108
DLQMessageStatus,
9+
DLQStatistics,
1110
EventTypeStatistic,
1211
RetryPolicy,
1312
RetryStrategy,
1413
TopicStatistic,
15-
DLQStatistics,
1614
)
15+
from app.domain.enums.events import EventType
1716
from app.events.schema.schema_registry import SchemaRegistryManager
1817
from app.infrastructure.kafka.events.metadata import EventMetadata
1918
from app.infrastructure.kafka.events.user import UserLoggedInEvent
@@ -22,6 +21,7 @@
2221

2322
def _make_event() -> UserLoggedInEvent:
2423
from app.domain.enums.auth import LoginMethod
24+
2525
return UserLoggedInEvent(
2626
user_id="u1",
2727
login_method=LoginMethod.PASSWORD,
@@ -95,15 +95,15 @@ def test_retry_policy_bounds() -> None:
9595

9696

9797
def test_filter_and_stats_models() -> None:
98-
f = DLQMessageFilter(status=DLQMessageStatus.PENDING, topic="t", event_type="X")
98+
f = DLQMessageFilter(status=DLQMessageStatus.PENDING, topic="t", event_type=EventType.EXECUTION_REQUESTED)
9999
q = DLQMapper.filter_to_query(f)
100100
assert q[DLQFields.STATUS] == DLQMessageStatus.PENDING
101101
assert q[DLQFields.ORIGINAL_TOPIC] == "t"
102+
assert q[DLQFields.EVENT_TYPE] == EventType.EXECUTION_REQUESTED
102103

103104
ts = TopicStatistic(topic="t", count=2, avg_retry_count=1.5)
104105
es = EventTypeStatistic(event_type="X", count=3)
105106
ages = AgeStatistics(min_age_seconds=1, max_age_seconds=10, avg_age_seconds=5)
106107
stats = DLQStatistics(by_status={"pending": 1}, by_topic=[ts], by_event_type=[es], age_stats=ages)
107108
assert stats.by_status["pending"] == 1
108109
assert isinstance(stats.timestamp, datetime)
109-

0 commit comments

Comments
 (0)