Skip to content

Commit 8a9ab47

Browse files
committed
better types
1 parent d223c36 commit 8a9ab47

File tree

34 files changed

+480
-551
lines changed

34 files changed

+480
-551
lines changed

backend/app/api/routes/events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ async def replay_aggregate_events(
325325
user_id=admin.user_id,
326326
)
327327
await kafka_event_service.publish_event(
328-
event_type=f"replay.{event.event_type}",
328+
event_type=event.event_type,
329329
payload=event.payload,
330330
aggregate_id=aggregate_id,
331331
correlation_id=replay_correlation_id,

backend/app/api/routes/execution.py

Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from dataclasses import asdict
21
from datetime import datetime, timezone
32
from typing import Annotated
43
from uuid import uuid4
@@ -11,10 +10,8 @@
1110
from app.core.exceptions import IntegrationException
1211
from app.core.tracing import EventAttributes, add_span_attributes
1312
from app.core.utils import get_client_ip
14-
from app.domain.enums.common import ErrorType
1513
from app.domain.enums.events import EventType
1614
from app.domain.enums.execution import ExecutionStatus
17-
from app.domain.enums.storage import ExecutionErrorType
1815
from app.domain.enums.user import UserRole
1916
from app.infrastructure.kafka.events.base import BaseEvent
2017
from app.infrastructure.kafka.events.metadata import EventMetadata
@@ -31,7 +28,6 @@
3128
ExecutionResponse,
3229
ExecutionResult,
3330
ResourceLimits,
34-
ResourceUsage,
3531
RetryExecutionRequest,
3632
)
3733
from app.schemas_pydantic.user import UserResponse
@@ -55,35 +51,7 @@ async def get_execution_with_access(
5551
if domain_exec.user_id and domain_exec.user_id != current_user.user_id and current_user.role != UserRole.ADMIN:
5652
raise HTTPException(status_code=403, detail="Access denied")
5753

58-
# Map domain to Pydantic for dependency consumer
59-
ru = None
60-
if domain_exec.resource_usage is not None:
61-
ru = ResourceUsage(**vars(domain_exec.resource_usage))
62-
# Map error_type to public ErrorType in API model via mapper rules
63-
error_type = (
64-
(
65-
ErrorType.SCRIPT_ERROR
66-
if domain_exec.error_type == ExecutionErrorType.SCRIPT_ERROR
67-
else ErrorType.SYSTEM_ERROR
68-
)
69-
if domain_exec.error_type is not None
70-
else None
71-
)
72-
return ExecutionInDB(
73-
execution_id=domain_exec.execution_id,
74-
script=domain_exec.script,
75-
status=domain_exec.status,
76-
stdout=domain_exec.stdout,
77-
stderr=domain_exec.stderr,
78-
lang=domain_exec.lang,
79-
lang_version=domain_exec.lang_version,
80-
resource_usage=ru,
81-
user_id=domain_exec.user_id,
82-
exit_code=domain_exec.exit_code,
83-
error_type=error_type,
84-
created_at=domain_exec.created_at,
85-
updated_at=domain_exec.updated_at,
86-
)
54+
return ExecutionInDB.model_validate(domain_exec)
8755

8856

8957
@router.post("/execute", response_model=ExecutionResponse)
@@ -269,24 +237,14 @@ async def retry_execution(
269237
async def get_execution_events(
270238
execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
271239
event_service: FromDishka[EventService],
272-
event_types: str | None = Query(None, description="Comma-separated event types to filter"),
240+
event_types: list[EventType] | None = Query(None, description="Event types to filter"),
273241
limit: int = Query(100, ge=1, le=1000),
274242
) -> list[ExecutionEventResponse]:
275243
"""Get all events for an execution."""
276-
event_type_list = None
277-
if event_types:
278-
event_type_list = [t.strip() for t in event_types.split(",")]
279-
280244
events = await event_service.get_events_by_aggregate(
281-
aggregate_id=execution.execution_id, event_types=event_type_list, limit=limit
245+
aggregate_id=execution.execution_id, event_types=event_types, limit=limit
282246
)
283-
284-
return [
285-
ExecutionEventResponse(
286-
event_id=event.event_id, event_type=event.event_type, timestamp=event.timestamp, payload=event.payload
287-
)
288-
for event in events
289-
]
247+
return [ExecutionEventResponse.model_validate(e) for e in events]
290248

291249

292250
@router.get("/user/executions", response_model=ExecutionListResponse)
@@ -337,7 +295,7 @@ async def get_k8s_resource_limits(
337295
) -> ResourceLimits:
338296
try:
339297
limits = await execution_service.get_k8s_resource_limits()
340-
return ResourceLimits(**asdict(limits))
298+
return ResourceLimits.model_validate(limits)
341299
except Exception as e:
342300
raise HTTPException(status_code=500, detail="Failed to retrieve resource limits") from e
343301

backend/app/db/repositories/event_repository.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from app.core.logging import logger
1010
from app.core.tracing import EventAttributes
1111
from app.core.tracing.utils import add_span_attributes
12+
from app.domain.enums.events import EventType
1213
from app.domain.enums.user import UserRole
1314
from app.domain.events import (
1415
ArchivedEvent,
@@ -114,11 +115,11 @@ async def get_events_by_type(
114115
return [self.mapper.from_mongo_document(doc) for doc in docs]
115116

116117
async def get_events_by_aggregate(
117-
self, aggregate_id: str, event_types: list[str] | None = None, limit: int = 100
118+
self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100
118119
) -> list[Event]:
119120
query: dict[str, Any] = {EventFields.AGGREGATE_ID: aggregate_id}
120121
if event_types:
121-
query[EventFields.EVENT_TYPE] = {"$in": event_types}
122+
query[EventFields.EVENT_TYPE] = {"$in": [t.value for t in event_types]}
122123

123124
cursor = self._collection.find(query).sort(EventFields.TIMESTAMP, ASCENDING).limit(limit)
124125
docs = await cursor.to_list(length=limit)

backend/app/db/repositories/sse_repository.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,30 @@
1+
from datetime import datetime, timezone
2+
13
from app.core.database_context import Collection, Database
4+
from app.domain.enums.execution import ExecutionStatus
25
from app.domain.events.event_models import CollectionNames
36
from app.domain.execution import DomainExecution
4-
from app.domain.sse import SSEEventDomain, SSEExecutionStatusDomain
7+
from app.domain.sse import SSEExecutionStatusDomain
58
from app.infrastructure.mappers import SSEMapper
69

710

811
class SSERepository:
9-
def __init__(self, database: Database):
12+
def __init__(self, database: Database) -> None:
1013
self.db = database
1114
self.executions_collection: Collection = self.db.get_collection(CollectionNames.EXECUTIONS)
12-
self.events_collection: Collection = self.db.get_collection(CollectionNames.EVENTS)
1315
self.mapper = SSEMapper()
1416

1517
async def get_execution_status(self, execution_id: str) -> SSEExecutionStatusDomain | None:
16-
execution = await self.executions_collection.find_one(
17-
{"execution_id": execution_id}, {"status": 1, "execution_id": 1, "_id": 0}
18-
)
19-
20-
if execution:
21-
return self.mapper.to_execution_status(execution_id, execution.get("status", "unknown"))
22-
return None
23-
24-
async def get_execution_events(self, execution_id: str, limit: int = 100, skip: int = 0) -> list[SSEEventDomain]:
25-
cursor = (
26-
self.events_collection.find({"aggregate_id": execution_id}).sort("timestamp", 1).skip(skip).limit(limit)
18+
doc = await self.executions_collection.find_one(
19+
{"execution_id": execution_id}, {"status": 1, "_id": 0}
2720
)
28-
29-
events: list[SSEEventDomain] = []
30-
async for event in cursor:
31-
events.append(self.mapper.event_from_mongo_document(event))
32-
return events
33-
34-
async def get_execution_for_user(self, execution_id: str, user_id: str) -> DomainExecution | None:
35-
doc = await self.executions_collection.find_one({"execution_id": execution_id, "user_id": user_id})
3621
if not doc:
3722
return None
38-
return self.mapper.execution_from_mongo_document(doc)
23+
return SSEExecutionStatusDomain(
24+
execution_id=execution_id,
25+
status=ExecutionStatus(doc["status"]),
26+
timestamp=datetime.now(timezone.utc).isoformat(),
27+
)
3928

4029
async def get_execution(self, execution_id: str) -> DomainExecution | None:
4130
doc = await self.executions_collection.find_one({"execution_id": execution_id})

backend/app/dlq/models.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Any
44

55
from app.core.utils import StringEnum
6+
from app.domain.enums.events import EventType
67
from app.infrastructure.kafka.events import BaseEvent
78

89

@@ -89,9 +90,9 @@ def age_seconds(self) -> float:
8990
return (datetime.now(timezone.utc) - self.failed_at).total_seconds()
9091

9192
@property
92-
def event_type(self) -> str:
93+
def event_type(self) -> EventType:
9394
"""Get event type from the event."""
94-
return str(self.event.event_type)
95+
return self.event.event_type
9596

9697

9798
@dataclass

backend/app/domain/events/event_models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Any
44

55
from app.core.utils import StringEnum
6+
from app.domain.enums.events import EventType
67
from app.infrastructure.kafka.events.metadata import EventMetadata
78

89
MongoQueryValue = str | dict[str, str | list[str] | float | datetime]
@@ -77,7 +78,7 @@ class Event:
7778
"""Domain model for an event."""
7879

7980
event_id: str
80-
event_type: str
81+
event_type: EventType
8182
event_version: str
8283
timestamp: datetime
8384
metadata: EventMetadata

backend/app/domain/sse/models.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from dataclasses import dataclass
44
from datetime import datetime
55

6+
from app.domain.enums.execution import ExecutionStatus
7+
68

79
@dataclass
810
class ShutdownStatus:
@@ -31,7 +33,7 @@ class SSEHealthDomain:
3133
@dataclass
3234
class SSEExecutionStatusDomain:
3335
execution_id: str
34-
status: str
36+
status: ExecutionStatus
3537
timestamp: str
3638

3739

backend/app/domain/user/settings_models.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ class DomainEditorSettings:
2626
use_tabs: bool = False
2727
word_wrap: bool = True
2828
show_line_numbers: bool = True
29-
# fixed, non-configurable editor attributes are omitted from domain
30-
# as they are UI concerns
3129

3230

3331
@dataclass
@@ -55,24 +53,6 @@ class DomainUserSettingsUpdate:
5553
editor: Optional[DomainEditorSettings] = None
5654
custom_settings: Optional[Dict[str, Any]] = None
5755

58-
def to_update_dict(self) -> Dict[str, Any]:
59-
out: Dict[str, Any] = {}
60-
if self.theme is not None:
61-
out["theme"] = self.theme
62-
if self.timezone is not None:
63-
out["timezone"] = self.timezone
64-
if self.date_format is not None:
65-
out["date_format"] = self.date_format
66-
if self.time_format is not None:
67-
out["time_format"] = self.time_format
68-
if self.notifications is not None:
69-
out["notifications"] = self.notifications
70-
if self.editor is not None:
71-
out["editor"] = self.editor
72-
if self.custom_settings is not None:
73-
out["custom_settings"] = self.custom_settings
74-
return out
75-
7656

7757
@dataclass
7858
class DomainSettingChange:
@@ -94,7 +74,7 @@ class DomainSettingsEvent:
9474
@dataclass
9575
class DomainSettingsHistoryEntry:
9676
timestamp: datetime
97-
event_type: str
77+
event_type: EventType
9878
field: str
9979
old_value: Any
10080
new_value: Any
Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import Optional
4-
5-
from app.domain.enums.common import ErrorType
6-
from app.domain.enums.storage import ExecutionErrorType
7-
from app.domain.execution import DomainExecution, ResourceUsageDomain
3+
from app.domain.execution import DomainExecution
84
from app.schemas_pydantic.execution import ExecutionResponse, ExecutionResult
95
from app.schemas_pydantic.execution import ResourceUsage as ResourceUsageSchema
106

@@ -20,18 +16,9 @@ def to_response(e: DomainExecution) -> ExecutionResponse:
2016
@staticmethod
2117
def to_result(e: DomainExecution) -> ExecutionResult:
2218
ru = None
23-
if isinstance(e.resource_usage, ResourceUsageDomain):
19+
if e.resource_usage is not None:
2420
ru = ResourceUsageSchema(**e.resource_usage.to_dict())
2521

26-
# Map domain ExecutionErrorType -> public ErrorType
27-
def _map_error(t: Optional[ExecutionErrorType]) -> Optional[ErrorType]:
28-
if t is None:
29-
return None
30-
if t == ExecutionErrorType.SCRIPT_ERROR:
31-
return ErrorType.SCRIPT_ERROR
32-
# TIMEOUT, RESOURCE_LIMIT, SYSTEM_ERROR, PERMISSION_DENIED -> SYSTEM_ERROR class
33-
return ErrorType.SYSTEM_ERROR
34-
3522
return ExecutionResult(
3623
execution_id=e.execution_id,
3724
status=e.status,
@@ -41,5 +28,5 @@ def _map_error(t: Optional[ExecutionErrorType]) -> Optional[ErrorType]:
4128
lang_version=e.lang_version,
4229
resource_usage=ru,
4330
exit_code=e.exit_code,
44-
error_type=_map_error(e.error_type),
31+
error_type=e.error_type,
4532
)

backend/app/infrastructure/mappers/sse_mapper.py

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,27 @@
11
from datetime import datetime, timezone
2-
from typing import Any, Dict
2+
from typing import Any
33

44
from app.domain.enums.execution import ExecutionStatus
55
from app.domain.execution import DomainExecution, ResourceUsageDomain
6-
from app.domain.sse import SSEEventDomain, SSEExecutionStatusDomain
76

87

98
class SSEMapper:
10-
"""Mapper for SSE-related domain models and MongoDB documents."""
9+
"""Mapper for SSE-related domain models from MongoDB documents."""
1110

12-
# Execution status (lightweight)
1311
@staticmethod
14-
def to_execution_status(execution_id: str, status: str) -> SSEExecutionStatusDomain:
15-
return SSEExecutionStatusDomain(
16-
execution_id=execution_id,
17-
status=status,
18-
timestamp=datetime.now(timezone.utc).isoformat(),
19-
)
20-
21-
# Execution events
22-
@staticmethod
23-
def event_from_mongo_document(doc: Dict[str, Any]) -> SSEEventDomain:
24-
return SSEEventDomain(
25-
aggregate_id=str(doc.get("aggregate_id", "")),
26-
timestamp=doc["timestamp"],
27-
)
28-
29-
# Executions
30-
@staticmethod
31-
def execution_from_mongo_document(doc: Dict[str, Any]) -> DomainExecution:
32-
sv = doc.get("status")
12+
def execution_from_mongo_document(doc: dict[str, Any]) -> DomainExecution:
13+
resource_usage_data = doc.get("resource_usage")
3314
return DomainExecution(
3415
execution_id=str(doc.get("execution_id")),
3516
script=str(doc.get("script", "")),
36-
status=ExecutionStatus(str(sv)),
17+
status=ExecutionStatus(str(doc.get("status"))),
3718
stdout=doc.get("stdout"),
3819
stderr=doc.get("stderr"),
3920
lang=str(doc.get("lang", "python")),
4021
lang_version=str(doc.get("lang_version", "3.11")),
4122
created_at=doc.get("created_at", datetime.now(timezone.utc)),
4223
updated_at=doc.get("updated_at", datetime.now(timezone.utc)),
43-
resource_usage=ResourceUsageDomain.from_dict(doc.get("resource_usage") or {}),
24+
resource_usage=ResourceUsageDomain.from_dict(resource_usage_data) if resource_usage_data else None,
4425
user_id=doc.get("user_id"),
4526
exit_code=doc.get("exit_code"),
4627
error_type=doc.get("error_type"),

0 commit comments

Comments
 (0)