Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def browse_events(request: EventBrowseRequest, service: FromDishka[AdminEv
event_filter = EventFilterMapper.from_admin_pydantic(request.filters)

result = await service.browse_events(
filter=event_filter,
event_filter=event_filter,
skip=request.skip,
limit=request.limit,
sort_by=request.sort_by,
Expand Down Expand Up @@ -92,7 +92,7 @@ async def export_events_csv(
end_time=end_time,
)
)
result = await service.export_events_csv_content(filter=export_filter, limit=limit)
result = await service.export_events_csv_content(event_filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
Expand Down Expand Up @@ -128,7 +128,7 @@ async def export_events_json(
end_time=end_time,
)
)
result = await service.export_events_json_content(filter=export_filter, limit=limit)
result = await service.export_events_json_content(event_filter=export_filter, limit=limit)
return StreamingResponse(
iter([result.content]),
media_type=result.media_type,
Expand Down
3 changes: 2 additions & 1 deletion backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from app.dlq import RetryPolicy
from app.dlq.manager import DLQManager
from app.dlq.models import DLQMessageStatus
from app.domain.enums.events import EventType
from app.schemas_pydantic.dlq import (
DLQBatchRetryResponse,
DLQMessageDetail,
Expand Down Expand Up @@ -50,7 +51,7 @@ async def get_dlq_messages(
repository: FromDishka[DLQRepository],
status: DLQMessageStatus | None = Query(None),
topic: str | None = None,
event_type: str | None = None,
event_type: EventType | None = Query(None),
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0),
) -> DLQMessagesResponse:
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ async def replay_aggregate_events(
user_id=admin.user_id,
)
await kafka_event_service.publish_event(
event_type=f"replay.{event.event_type}",
event_type=event.event_type,
payload=event.payload,
aggregate_id=aggregate_id,
correlation_id=replay_correlation_id,
Expand Down
51 changes: 5 additions & 46 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
from app.core.exceptions import IntegrationException
from app.core.tracing import EventAttributes, add_span_attributes
from app.core.utils import get_client_ip
from app.domain.enums.common import ErrorType
from app.domain.enums.events import EventType
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.storage import ExecutionErrorType
from app.domain.enums.user import UserRole
from app.infrastructure.kafka.events.base import BaseEvent
from app.infrastructure.kafka.events.metadata import EventMetadata
Expand All @@ -30,7 +28,6 @@
ExecutionResponse,
ExecutionResult,
ResourceLimits,
ResourceUsage,
RetryExecutionRequest,
)
from app.schemas_pydantic.user import UserResponse
Expand All @@ -54,35 +51,7 @@ async def get_execution_with_access(
if domain_exec.user_id and domain_exec.user_id != current_user.user_id and current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Access denied")

# Map domain to Pydantic for dependency consumer
ru = None
if domain_exec.resource_usage is not None:
ru = ResourceUsage(**vars(domain_exec.resource_usage))
# Map error_type to public ErrorType in API model via mapper rules
error_type = (
(
ErrorType.SCRIPT_ERROR
if domain_exec.error_type == ExecutionErrorType.SCRIPT_ERROR
else ErrorType.SYSTEM_ERROR
)
if domain_exec.error_type is not None
else None
)
return ExecutionInDB(
execution_id=domain_exec.execution_id,
script=domain_exec.script,
status=domain_exec.status,
stdout=domain_exec.stdout,
stderr=domain_exec.stderr,
lang=domain_exec.lang,
lang_version=domain_exec.lang_version,
resource_usage=ru,
user_id=domain_exec.user_id,
exit_code=domain_exec.exit_code,
error_type=error_type,
created_at=domain_exec.created_at,
updated_at=domain_exec.updated_at,
)
return ExecutionInDB.model_validate(domain_exec)


@router.post("/execute", response_model=ExecutionResponse)
Expand Down Expand Up @@ -268,24 +237,14 @@ async def retry_execution(
async def get_execution_events(
execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
event_service: FromDishka[EventService],
event_types: str | None = Query(None, description="Comma-separated event types to filter"),
event_types: list[EventType] | None = Query(None, description="Event types to filter"),
limit: int = Query(100, ge=1, le=1000),
) -> list[ExecutionEventResponse]:
"""Get all events for an execution."""
event_type_list = None
if event_types:
event_type_list = [t.strip() for t in event_types.split(",")]

events = await event_service.get_events_by_aggregate(
aggregate_id=execution.execution_id, event_types=event_type_list, limit=limit
aggregate_id=execution.execution_id, event_types=event_types, limit=limit
)

return [
ExecutionEventResponse(
event_id=event.event_id, event_type=event.event_type, timestamp=event.timestamp, payload=event.payload
)
for event in events
]
return [ExecutionEventResponse.model_validate(e) for e in events]


@router.get("/user/executions", response_model=ExecutionListResponse)
Expand Down Expand Up @@ -336,7 +295,7 @@ async def get_k8s_resource_limits(
) -> ResourceLimits:
try:
limits = await execution_service.get_k8s_resource_limits()
return ResourceLimits(**vars(limits))
return ResourceLimits.model_validate(limits)
except Exception as e:
raise HTTPException(status_code=500, detail="Failed to retrieve resource limits") from e

Expand Down
8 changes: 4 additions & 4 deletions backend/app/db/repositories/admin/admin_events_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ def __init__(self, db: Database):

async def browse_events(
self,
filter: EventFilter,
event_filter: EventFilter,
skip: int = 0,
limit: int = 50,
sort_by: str = EventFields.TIMESTAMP,
sort_order: int = SortDirection.DESCENDING,
) -> EventBrowseResult:
"""Browse events with filters using domain models."""
query = EventFilterMapper.to_mongo_query(filter)
query = EventFilterMapper.to_mongo_query(event_filter)

# Get total count
total = await self.events_collection.count_documents(query)
Expand Down Expand Up @@ -191,9 +191,9 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:

return statistics

async def export_events_csv(self, filter: EventFilter) -> List[EventExportRow]:
async def export_events_csv(self, event_filter: EventFilter) -> List[EventExportRow]:
"""Export events as CSV data."""
query = EventFilterMapper.to_mongo_query(filter)
query = EventFilterMapper.to_mongo_query(event_filter)

cursor = self.events_collection.find(query).sort(EventFields.TIMESTAMP, SortDirection.DESCENDING).limit(10000)

Expand Down
13 changes: 5 additions & 8 deletions backend/app/db/repositories/dlq_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TopicStatistic,
)
from app.dlq.manager import DLQManager
from app.domain.enums.events import EventType
from app.domain.events.event_models import CollectionNames
from app.infrastructure.mappers.dlq_mapper import DLQMapper

Expand Down Expand Up @@ -105,18 +106,14 @@ async def get_dlq_stats(self) -> DLQStatistics:

async def get_messages(
self,
status: str | None = None,
status: DLQMessageStatus | None = None,
topic: str | None = None,
event_type: str | None = None,
event_type: EventType | None = None,
limit: int = 50,
offset: int = 0,
) -> DLQMessageListResult:
# Create filter
filter = DLQMessageFilter(
status=DLQMessageStatus(status) if status else None, topic=topic, event_type=event_type
)

query = DLQMapper.filter_to_query(filter)
msg_filter = DLQMessageFilter(status=status, topic=topic, event_type=event_type)
query = DLQMapper.filter_to_query(msg_filter)
total_count = await self.dlq_collection.count_documents(query)

cursor = self.dlq_collection.find(query).sort(DLQFields.FAILED_AT, -1).skip(offset).limit(limit)
Expand Down
5 changes: 3 additions & 2 deletions backend/app/db/repositories/event_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from app.core.logging import logger
from app.core.tracing import EventAttributes
from app.core.tracing.utils import add_span_attributes
from app.domain.enums.events import EventType
from app.domain.enums.user import UserRole
from app.domain.events import (
ArchivedEvent,
Expand Down Expand Up @@ -114,11 +115,11 @@ async def get_events_by_type(
return [self.mapper.from_mongo_document(doc) for doc in docs]

async def get_events_by_aggregate(
self, aggregate_id: str, event_types: list[str] | None = None, limit: int = 100
self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100
) -> list[Event]:
query: dict[str, Any] = {EventFields.AGGREGATE_ID: aggregate_id}
if event_types:
query[EventFields.EVENT_TYPE] = {"$in": event_types}
query[EventFields.EVENT_TYPE] = {"$in": [t.value for t in event_types]}

cursor = self._collection.find(query).sort(EventFields.TIMESTAMP, ASCENDING).limit(limit)
docs = await cursor.to_list(length=limit)
Expand Down
26 changes: 16 additions & 10 deletions backend/app/db/repositories/replay_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from app.core.database_context import Collection, Database
from app.core.logging import logger
from app.domain.admin.replay_updates import ReplaySessionUpdate
from app.domain.enums.replay import ReplayStatus
from app.domain.events.event_models import CollectionNames
from app.domain.replay import ReplayFilter, ReplaySessionState
from app.infrastructure.mappers import ReplayStateMapper
Expand Down Expand Up @@ -42,13 +43,13 @@ async def get_session(self, session_id: str) -> ReplaySessionState | None:
return self._mapper.from_mongo_document(data) if data else None

async def list_sessions(
self, status: str | None = None, user_id: str | None = None, limit: int = 100, skip: int = 0
self, status: ReplayStatus | None = None, user_id: str | None = None, limit: int = 100, skip: int = 0
) -> list[ReplaySessionState]:
collection = self.replay_collection

query = {}
query: dict[str, object] = {}
if status:
query["status"] = status
query["status"] = status.value
if user_id:
query["config.filter.user_id"] = user_id

Expand All @@ -58,15 +59,20 @@ async def list_sessions(
sessions.append(self._mapper.from_mongo_document(doc))
return sessions

async def update_session_status(self, session_id: str, status: str) -> bool:
async def update_session_status(self, session_id: str, status: ReplayStatus) -> bool:
"""Update the status of a replay session"""
result = await self.replay_collection.update_one({"session_id": session_id}, {"$set": {"status": status}})
result = await self.replay_collection.update_one({"session_id": session_id}, {"$set": {"status": status.value}})
return result.modified_count > 0

async def delete_old_sessions(self, cutoff_time: str) -> int:
"""Delete old completed/failed/cancelled sessions"""
terminal_statuses = [
ReplayStatus.COMPLETED.value,
ReplayStatus.FAILED.value,
ReplayStatus.CANCELLED.value,
]
result = await self.replay_collection.delete_many(
{"created_at": {"$lt": cutoff_time}, "status": {"$in": ["completed", "failed", "cancelled"]}}
{"created_at": {"$lt": cutoff_time}, "status": {"$in": terminal_statuses}}
)
return result.deleted_count

Expand All @@ -83,16 +89,16 @@ async def update_replay_session(self, session_id: str, updates: ReplaySessionUpd
result = await self.replay_collection.update_one({"session_id": session_id}, {"$set": mongo_updates})
return result.modified_count > 0

async def count_events(self, filter: ReplayFilter) -> int:
async def count_events(self, replay_filter: ReplayFilter) -> int:
"""Count events matching the given filter"""
query = filter.to_mongo_query()
query = replay_filter.to_mongo_query()
return await self.events_collection.count_documents(query)

async def fetch_events(
self, filter: ReplayFilter, batch_size: int = 100, skip: int = 0
self, replay_filter: ReplayFilter, batch_size: int = 100, skip: int = 0
) -> AsyncIterator[List[Dict[str, Any]]]:
"""Fetch events in batches based on filter"""
query = filter.to_mongo_query()
query = replay_filter.to_mongo_query()
cursor = self.events_collection.find(query).sort("timestamp", 1).skip(skip)

batch = []
Expand Down
16 changes: 8 additions & 8 deletions backend/app/db/repositories/saga_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ async def get_saga(self, saga_id: str) -> Saga | None:
doc = await self.sagas.find_one({"saga_id": saga_id})
return self.mapper.from_mongo(doc) if doc else None

async def get_sagas_by_execution(self, execution_id: str, state: str | None = None) -> list[Saga]:
async def get_sagas_by_execution(self, execution_id: str, state: SagaState | None = None) -> list[Saga]:
query: dict[str, object] = {"execution_id": execution_id}
if state:
query["state"] = state
query["state"] = state.value

cursor = self.sagas.find(query).sort("created_at", DESCENDING)
docs = await cursor.to_list(length=None)
return [self.mapper.from_mongo(doc) for doc in docs]

async def list_sagas(self, filter: SagaFilter, limit: int = 100, skip: int = 0) -> SagaListResult:
query = self.filter_mapper.to_mongodb_query(filter)
async def list_sagas(self, saga_filter: SagaFilter, limit: int = 100, skip: int = 0) -> SagaListResult:
query = self.filter_mapper.to_mongodb_query(saga_filter)

# Get total count
total = await self.sagas.count_documents(query)
Expand All @@ -69,8 +69,8 @@ async def list_sagas(self, filter: SagaFilter, limit: int = 100, skip: int = 0)

return SagaListResult(sagas=sagas, total=total, skip=skip, limit=limit)

async def update_saga_state(self, saga_id: str, state: str, error_message: str | None = None) -> bool:
update_data = {"state": state, "updated_at": datetime.now(timezone.utc)}
async def update_saga_state(self, saga_id: str, state: SagaState, error_message: str | None = None) -> bool:
update_data: dict[str, object] = {"state": state.value, "updated_at": datetime.now(timezone.utc)}

if error_message:
update_data["error_message"] = error_message
Expand Down Expand Up @@ -108,8 +108,8 @@ async def find_timed_out_sagas(
docs = await cursor.to_list(length=limit)
return [self.mapper.from_mongo(doc) for doc in docs]

async def get_saga_statistics(self, filter: SagaFilter | None = None) -> dict[str, object]:
query = self.filter_mapper.to_mongodb_query(filter) if filter else {}
async def get_saga_statistics(self, saga_filter: SagaFilter | None = None) -> dict[str, object]:
query = self.filter_mapper.to_mongodb_query(saga_filter) if saga_filter else {}

# Basic counts
total = await self.sagas.count_documents(query)
Expand Down
35 changes: 11 additions & 24 deletions backend/app/db/repositories/sse_repository.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
from datetime import datetime, timezone

from app.core.database_context import Collection, Database
from app.domain.enums.execution import ExecutionStatus
from app.domain.events.event_models import CollectionNames
from app.domain.execution import DomainExecution
from app.domain.sse import SSEEventDomain, SSEExecutionStatusDomain
from app.domain.sse import SSEExecutionStatusDomain
from app.infrastructure.mappers import SSEMapper


class SSERepository:
def __init__(self, database: Database):
def __init__(self, database: Database) -> None:
self.db = database
self.executions_collection: Collection = self.db.get_collection(CollectionNames.EXECUTIONS)
self.events_collection: Collection = self.db.get_collection(CollectionNames.EVENTS)
self.mapper = SSEMapper()

async def get_execution_status(self, execution_id: str) -> SSEExecutionStatusDomain | None:
execution = await self.executions_collection.find_one(
{"execution_id": execution_id}, {"status": 1, "execution_id": 1, "_id": 0}
)

if execution:
return self.mapper.to_execution_status(execution_id, execution.get("status", "unknown"))
return None

async def get_execution_events(self, execution_id: str, limit: int = 100, skip: int = 0) -> list[SSEEventDomain]:
cursor = (
self.events_collection.find({"aggregate_id": execution_id}).sort("timestamp", 1).skip(skip).limit(limit)
)

events: list[SSEEventDomain] = []
async for event in cursor:
events.append(self.mapper.event_from_mongo_document(event))
return events

async def get_execution_for_user(self, execution_id: str, user_id: str) -> DomainExecution | None:
doc = await self.executions_collection.find_one({"execution_id": execution_id, "user_id": user_id})
doc = await self.executions_collection.find_one({"execution_id": execution_id}, {"status": 1, "_id": 0})
if not doc:
return None
return self.mapper.execution_from_mongo_document(doc)
return SSEExecutionStatusDomain(
execution_id=execution_id,
status=ExecutionStatus(doc["status"]),
timestamp=datetime.now(timezone.utc).isoformat(),
)

async def get_execution(self, execution_id: str) -> DomainExecution | None:
doc = await self.executions_collection.find_one({"execution_id": execution_id})
Expand Down
Loading
Loading