diff --git a/backend/app/api/routes/admin/events.py b/backend/app/api/routes/admin/events.py index 5af09a0d..681c7146 100644 --- a/backend/app/api/routes/admin/events.py +++ b/backend/app/api/routes/admin/events.py @@ -43,7 +43,7 @@ async def browse_events(request: EventBrowseRequest, service: FromDishka[AdminEv event_filter = EventFilterMapper.from_admin_pydantic(request.filters) result = await service.browse_events( - filter=event_filter, + event_filter=event_filter, skip=request.skip, limit=request.limit, sort_by=request.sort_by, @@ -92,7 +92,7 @@ async def export_events_csv( end_time=end_time, ) ) - result = await service.export_events_csv_content(filter=export_filter, limit=limit) + result = await service.export_events_csv_content(event_filter=export_filter, limit=limit) return StreamingResponse( iter([result.content]), media_type=result.media_type, @@ -128,7 +128,7 @@ async def export_events_json( end_time=end_time, ) ) - result = await service.export_events_json_content(filter=export_filter, limit=limit) + result = await service.export_events_json_content(event_filter=export_filter, limit=limit) return StreamingResponse( iter([result.content]), media_type=result.media_type, diff --git a/backend/app/api/routes/dlq.py b/backend/app/api/routes/dlq.py index 5da8e683..5123b3fd 100644 --- a/backend/app/api/routes/dlq.py +++ b/backend/app/api/routes/dlq.py @@ -10,6 +10,7 @@ from app.dlq import RetryPolicy from app.dlq.manager import DLQManager from app.dlq.models import DLQMessageStatus +from app.domain.enums.events import EventType from app.schemas_pydantic.dlq import ( DLQBatchRetryResponse, DLQMessageDetail, @@ -50,7 +51,7 @@ async def get_dlq_messages( repository: FromDishka[DLQRepository], status: DLQMessageStatus | None = Query(None), topic: str | None = None, - event_type: str | None = None, + event_type: EventType | None = Query(None), limit: int = Query(50, ge=1, le=1000), offset: int = Query(0, ge=0), ) -> DLQMessagesResponse: diff --git a/backend/app/api/routes/events.py b/backend/app/api/routes/events.py index ad053753..017625eb 100644 --- a/backend/app/api/routes/events.py +++ b/backend/app/api/routes/events.py @@ -325,7 +325,7 @@ async def replay_aggregate_events( user_id=admin.user_id, ) await kafka_event_service.publish_event( - event_type=f"replay.{event.event_type}", + event_type=event.event_type, payload=event.payload, aggregate_id=aggregate_id, correlation_id=replay_correlation_id, diff --git a/backend/app/api/routes/execution.py b/backend/app/api/routes/execution.py index ef3e9a45..d714a5f5 100644 --- a/backend/app/api/routes/execution.py +++ b/backend/app/api/routes/execution.py @@ -10,10 +10,8 @@ from app.core.exceptions import IntegrationException from app.core.tracing import EventAttributes, add_span_attributes from app.core.utils import get_client_ip -from app.domain.enums.common import ErrorType from app.domain.enums.events import EventType from app.domain.enums.execution import ExecutionStatus -from app.domain.enums.storage import ExecutionErrorType from app.domain.enums.user import UserRole from app.infrastructure.kafka.events.base import BaseEvent from app.infrastructure.kafka.events.metadata import EventMetadata @@ -30,7 +28,6 @@ ExecutionResponse, ExecutionResult, ResourceLimits, - ResourceUsage, RetryExecutionRequest, ) from app.schemas_pydantic.user import UserResponse @@ -54,35 +51,7 @@ async def get_execution_with_access( if domain_exec.user_id and domain_exec.user_id != current_user.user_id and current_user.role != UserRole.ADMIN: raise HTTPException(status_code=403, detail="Access denied") - # Map domain to Pydantic for dependency consumer - ru = None - if domain_exec.resource_usage is not None: - ru = ResourceUsage(**vars(domain_exec.resource_usage)) - # Map error_type to public ErrorType in API model via mapper rules - error_type = ( - ( - ErrorType.SCRIPT_ERROR - if domain_exec.error_type == ExecutionErrorType.SCRIPT_ERROR - else ErrorType.SYSTEM_ERROR - ) - if domain_exec.error_type is not None - else None - ) - return ExecutionInDB( - execution_id=domain_exec.execution_id, - script=domain_exec.script, - status=domain_exec.status, - stdout=domain_exec.stdout, - stderr=domain_exec.stderr, - lang=domain_exec.lang, - lang_version=domain_exec.lang_version, - resource_usage=ru, - user_id=domain_exec.user_id, - exit_code=domain_exec.exit_code, - error_type=error_type, - created_at=domain_exec.created_at, - updated_at=domain_exec.updated_at, - ) + return ExecutionInDB.model_validate(domain_exec) @router.post("/execute", response_model=ExecutionResponse) @@ -268,24 +237,14 @@ async def retry_execution( async def get_execution_events( execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], event_service: FromDishka[EventService], - event_types: str | None = Query(None, description="Comma-separated event types to filter"), + event_types: list[EventType] | None = Query(None, description="Event types to filter"), limit: int = Query(100, ge=1, le=1000), ) -> list[ExecutionEventResponse]: """Get all events for an execution.""" - event_type_list = None - if event_types: - event_type_list = [t.strip() for t in event_types.split(",")] - events = await event_service.get_events_by_aggregate( - aggregate_id=execution.execution_id, event_types=event_type_list, limit=limit + aggregate_id=execution.execution_id, event_types=event_types, limit=limit ) - - return [ - ExecutionEventResponse( - event_id=event.event_id, event_type=event.event_type, timestamp=event.timestamp, payload=event.payload - ) - for event in events - ] + return [ExecutionEventResponse.model_validate(e) for e in events] @router.get("/user/executions", response_model=ExecutionListResponse) @@ -336,7 +295,7 @@ async def get_k8s_resource_limits( ) -> ResourceLimits: try: limits = await execution_service.get_k8s_resource_limits() - return ResourceLimits(**vars(limits)) + return ResourceLimits.model_validate(limits) except Exception as e: raise HTTPException(status_code=500, detail="Failed to retrieve resource limits") from e diff --git a/backend/app/db/repositories/admin/admin_events_repository.py b/backend/app/db/repositories/admin/admin_events_repository.py index f7f311f8..0ba68df4 100644 --- a/backend/app/db/repositories/admin/admin_events_repository.py +++ b/backend/app/db/repositories/admin/admin_events_repository.py @@ -58,14 +58,14 @@ def __init__(self, db: Database): async def browse_events( self, - filter: EventFilter, + event_filter: EventFilter, skip: int = 0, limit: int = 50, sort_by: str = EventFields.TIMESTAMP, sort_order: int = SortDirection.DESCENDING, ) -> EventBrowseResult: """Browse events with filters using domain models.""" - query = EventFilterMapper.to_mongo_query(filter) + query = EventFilterMapper.to_mongo_query(event_filter) # Get total count total = await self.events_collection.count_documents(query) @@ -191,9 +191,9 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics: return statistics - async def export_events_csv(self, filter: EventFilter) -> List[EventExportRow]: + async def export_events_csv(self, event_filter: EventFilter) -> List[EventExportRow]: """Export events as CSV data.""" - query = EventFilterMapper.to_mongo_query(filter) + query = EventFilterMapper.to_mongo_query(event_filter) cursor = self.events_collection.find(query).sort(EventFields.TIMESTAMP, SortDirection.DESCENDING).limit(10000) diff --git a/backend/app/db/repositories/dlq_repository.py b/backend/app/db/repositories/dlq_repository.py index e873ca36..b1659a49 100644 --- a/backend/app/db/repositories/dlq_repository.py +++ b/backend/app/db/repositories/dlq_repository.py @@ -18,6 +18,7 @@ TopicStatistic, ) from app.dlq.manager import DLQManager +from app.domain.enums.events import EventType from app.domain.events.event_models import CollectionNames from app.infrastructure.mappers.dlq_mapper import DLQMapper @@ -105,18 +106,14 @@ async def get_dlq_stats(self) -> DLQStatistics: async def get_messages( self, - status: str | None = None, + status: DLQMessageStatus | None = None, topic: str | None = None, - event_type: str | None = None, + event_type: EventType | None = None, limit: int = 50, offset: int = 0, ) -> DLQMessageListResult: - # Create filter - filter = DLQMessageFilter( - status=DLQMessageStatus(status) if status else None, topic=topic, event_type=event_type - ) - - query = DLQMapper.filter_to_query(filter) + msg_filter = DLQMessageFilter(status=status, topic=topic, event_type=event_type) + query = DLQMapper.filter_to_query(msg_filter) total_count = await self.dlq_collection.count_documents(query) cursor = self.dlq_collection.find(query).sort(DLQFields.FAILED_AT, -1).skip(offset).limit(limit) diff --git a/backend/app/db/repositories/event_repository.py b/backend/app/db/repositories/event_repository.py index f26d3e21..82b2bb20 100644 --- a/backend/app/db/repositories/event_repository.py +++ b/backend/app/db/repositories/event_repository.py @@ -9,6 +9,7 @@ from app.core.logging import logger from app.core.tracing import EventAttributes from app.core.tracing.utils import add_span_attributes +from app.domain.enums.events import EventType from app.domain.enums.user import UserRole from app.domain.events import ( ArchivedEvent, @@ -114,11 +115,11 @@ async def get_events_by_type( return [self.mapper.from_mongo_document(doc) for doc in docs] async def get_events_by_aggregate( - self, aggregate_id: str, event_types: list[str] | None = None, limit: int = 100 + self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100 ) -> list[Event]: query: dict[str, Any] = {EventFields.AGGREGATE_ID: aggregate_id} if event_types: - query[EventFields.EVENT_TYPE] = {"$in": event_types} + query[EventFields.EVENT_TYPE] = {"$in": [t.value for t in event_types]} cursor = self._collection.find(query).sort(EventFields.TIMESTAMP, ASCENDING).limit(limit) docs = await cursor.to_list(length=limit) diff --git a/backend/app/db/repositories/replay_repository.py b/backend/app/db/repositories/replay_repository.py index 1bf86eea..fc21d3aa 100644 --- a/backend/app/db/repositories/replay_repository.py +++ b/backend/app/db/repositories/replay_repository.py @@ -5,6 +5,7 @@ from app.core.database_context import Collection, Database from app.core.logging import logger from app.domain.admin.replay_updates import ReplaySessionUpdate +from app.domain.enums.replay import ReplayStatus from app.domain.events.event_models import CollectionNames from app.domain.replay import ReplayFilter, ReplaySessionState from app.infrastructure.mappers import ReplayStateMapper @@ -42,13 +43,13 @@ async def get_session(self, session_id: str) -> ReplaySessionState | None: return self._mapper.from_mongo_document(data) if data else None async def list_sessions( - self, status: str | None = None, user_id: str | None = None, limit: int = 100, skip: int = 0 + self, status: ReplayStatus | None = None, user_id: str | None = None, limit: int = 100, skip: int = 0 ) -> list[ReplaySessionState]: collection = self.replay_collection - query = {} + query: dict[str, object] = {} if status: - query["status"] = status + query["status"] = status.value if user_id: query["config.filter.user_id"] = user_id @@ -58,15 +59,20 @@ async def list_sessions( sessions.append(self._mapper.from_mongo_document(doc)) return sessions - async def update_session_status(self, session_id: str, status: str) -> bool: + async def update_session_status(self, session_id: str, status: ReplayStatus) -> bool: """Update the status of a replay session""" - result = await self.replay_collection.update_one({"session_id": session_id}, {"$set": {"status": status}}) + result = await self.replay_collection.update_one({"session_id": session_id}, {"$set": {"status": status.value}}) return result.modified_count > 0 async def delete_old_sessions(self, cutoff_time: str) -> int: """Delete old completed/failed/cancelled sessions""" + terminal_statuses = [ + ReplayStatus.COMPLETED.value, + ReplayStatus.FAILED.value, + ReplayStatus.CANCELLED.value, + ] result = await self.replay_collection.delete_many( - {"created_at": {"$lt": cutoff_time}, "status": {"$in": ["completed", "failed", "cancelled"]}} + {"created_at": {"$lt": cutoff_time}, "status": {"$in": terminal_statuses}} ) return result.deleted_count @@ -83,16 +89,16 @@ async def update_replay_session(self, session_id: str, updates: ReplaySessionUpd result = await self.replay_collection.update_one({"session_id": session_id}, {"$set": mongo_updates}) return result.modified_count > 0 - async def count_events(self, filter: ReplayFilter) -> int: + async def count_events(self, replay_filter: ReplayFilter) -> int: """Count events matching the given filter""" - query = filter.to_mongo_query() + query = replay_filter.to_mongo_query() return await self.events_collection.count_documents(query) async def fetch_events( - self, filter: ReplayFilter, batch_size: int = 100, skip: int = 0 + self, replay_filter: ReplayFilter, batch_size: int = 100, skip: int = 0 ) -> AsyncIterator[List[Dict[str, Any]]]: """Fetch events in batches based on filter""" - query = filter.to_mongo_query() + query = replay_filter.to_mongo_query() cursor = self.events_collection.find(query).sort("timestamp", 1).skip(skip) batch = [] diff --git a/backend/app/db/repositories/saga_repository.py b/backend/app/db/repositories/saga_repository.py index e5bcf86c..eb82be2b 100644 --- a/backend/app/db/repositories/saga_repository.py +++ b/backend/app/db/repositories/saga_repository.py @@ -46,17 +46,17 @@ async def get_saga(self, saga_id: str) -> Saga | None: doc = await self.sagas.find_one({"saga_id": saga_id}) return self.mapper.from_mongo(doc) if doc else None - async def get_sagas_by_execution(self, execution_id: str, state: str | None = None) -> list[Saga]: + async def get_sagas_by_execution(self, execution_id: str, state: SagaState | None = None) -> list[Saga]: query: dict[str, object] = {"execution_id": execution_id} if state: - query["state"] = state + query["state"] = state.value cursor = self.sagas.find(query).sort("created_at", DESCENDING) docs = await cursor.to_list(length=None) return [self.mapper.from_mongo(doc) for doc in docs] - async def list_sagas(self, filter: SagaFilter, limit: int = 100, skip: int = 0) -> SagaListResult: - query = self.filter_mapper.to_mongodb_query(filter) + async def list_sagas(self, saga_filter: SagaFilter, limit: int = 100, skip: int = 0) -> SagaListResult: + query = self.filter_mapper.to_mongodb_query(saga_filter) # Get total count total = await self.sagas.count_documents(query) @@ -69,8 +69,8 @@ async def list_sagas(self, filter: SagaFilter, limit: int = 100, skip: int = 0) return SagaListResult(sagas=sagas, total=total, skip=skip, limit=limit) - async def update_saga_state(self, saga_id: str, state: str, error_message: str | None = None) -> bool: - update_data = {"state": state, "updated_at": datetime.now(timezone.utc)} + async def update_saga_state(self, saga_id: str, state: SagaState, error_message: str | None = None) -> bool: + update_data: dict[str, object] = {"state": state.value, "updated_at": datetime.now(timezone.utc)} if error_message: update_data["error_message"] = error_message @@ -108,8 +108,8 @@ async def find_timed_out_sagas( docs = await cursor.to_list(length=limit) return [self.mapper.from_mongo(doc) for doc in docs] - async def get_saga_statistics(self, filter: SagaFilter | None = None) -> dict[str, object]: - query = self.filter_mapper.to_mongodb_query(filter) if filter else {} + async def get_saga_statistics(self, saga_filter: SagaFilter | None = None) -> dict[str, object]: + query = self.filter_mapper.to_mongodb_query(saga_filter) if saga_filter else {} # Basic counts total = await self.sagas.count_documents(query) diff --git a/backend/app/db/repositories/sse_repository.py b/backend/app/db/repositories/sse_repository.py index 8b578413..d2112cb3 100644 --- a/backend/app/db/repositories/sse_repository.py +++ b/backend/app/db/repositories/sse_repository.py @@ -1,41 +1,28 @@ +from datetime import datetime, timezone + from app.core.database_context import Collection, Database +from app.domain.enums.execution import ExecutionStatus from app.domain.events.event_models import CollectionNames from app.domain.execution import DomainExecution -from app.domain.sse import SSEEventDomain, SSEExecutionStatusDomain +from app.domain.sse import SSEExecutionStatusDomain from app.infrastructure.mappers import SSEMapper class SSERepository: - def __init__(self, database: Database): + def __init__(self, database: Database) -> None: self.db = database self.executions_collection: Collection = self.db.get_collection(CollectionNames.EXECUTIONS) - self.events_collection: Collection = self.db.get_collection(CollectionNames.EVENTS) self.mapper = SSEMapper() async def get_execution_status(self, execution_id: str) -> SSEExecutionStatusDomain | None: - execution = await self.executions_collection.find_one( - {"execution_id": execution_id}, {"status": 1, "execution_id": 1, "_id": 0} - ) - - if execution: - return self.mapper.to_execution_status(execution_id, execution.get("status", "unknown")) - return None - - async def get_execution_events(self, execution_id: str, limit: int = 100, skip: int = 0) -> list[SSEEventDomain]: - cursor = ( - self.events_collection.find({"aggregate_id": execution_id}).sort("timestamp", 1).skip(skip).limit(limit) - ) - - events: list[SSEEventDomain] = [] - async for event in cursor: - events.append(self.mapper.event_from_mongo_document(event)) - return events - - async def get_execution_for_user(self, execution_id: str, user_id: str) -> DomainExecution | None: - doc = await self.executions_collection.find_one({"execution_id": execution_id, "user_id": user_id}) + doc = await self.executions_collection.find_one({"execution_id": execution_id}, {"status": 1, "_id": 0}) if not doc: return None - return self.mapper.execution_from_mongo_document(doc) + return SSEExecutionStatusDomain( + execution_id=execution_id, + status=ExecutionStatus(doc["status"]), + timestamp=datetime.now(timezone.utc).isoformat(), + ) async def get_execution(self, execution_id: str) -> DomainExecution | None: doc = await self.executions_collection.find_one({"execution_id": execution_id}) diff --git a/backend/app/dlq/models.py b/backend/app/dlq/models.py index 95523e61..2f577e14 100644 --- a/backend/app/dlq/models.py +++ b/backend/app/dlq/models.py @@ -3,6 +3,7 @@ from typing import Any from app.core.utils import StringEnum +from app.domain.enums.events import EventType from app.infrastructure.kafka.events import BaseEvent @@ -89,9 +90,9 @@ def age_seconds(self) -> float: return (datetime.now(timezone.utc) - self.failed_at).total_seconds() @property - def event_type(self) -> str: + def event_type(self) -> EventType: """Get event type from the event.""" - return str(self.event.event_type) + return self.event.event_type @dataclass @@ -114,7 +115,7 @@ class DLQMessageFilter: status: DLQMessageStatus | None = None topic: str | None = None - event_type: str | None = None + event_type: EventType | None = None @dataclass diff --git a/backend/app/domain/events/event_models.py b/backend/app/domain/events/event_models.py index 99f0c2ba..35dbc6f6 100644 --- a/backend/app/domain/events/event_models.py +++ b/backend/app/domain/events/event_models.py @@ -3,6 +3,7 @@ from typing import Any from app.core.utils import StringEnum +from app.domain.enums.events import EventType from app.infrastructure.kafka.events.metadata import EventMetadata MongoQueryValue = str | dict[str, str | list[str] | float | datetime] @@ -77,7 +78,7 @@ class Event: """Domain model for an event.""" event_id: str - event_type: str + event_type: EventType event_version: str timestamp: datetime metadata: EventMetadata diff --git a/backend/app/domain/events/query_builders.py b/backend/app/domain/events/query_builders.py index 69ffd158..dc7549f3 100644 --- a/backend/app/domain/events/query_builders.py +++ b/backend/app/domain/events/query_builders.py @@ -51,9 +51,9 @@ def size(field: str) -> dict[str, str]: return {"$size": field} @staticmethod - def date_to_string(date_field: str, format: str = "%Y-%m-%d-%H") -> dict[str, Any]: + def date_to_string(date_field: str, date_format: str = "%Y-%m-%d-%H") -> dict[str, Any]: """Create a $dateToString expression.""" - return {"$dateToString": {"format": format, "date": date_field}} + return {"$dateToString": {"format": date_format, "date": date_field}} class EventStatsAggregation: diff --git a/backend/app/domain/execution/__init__.py b/backend/app/domain/execution/__init__.py index 4b66b31c..fb725417 100644 --- a/backend/app/domain/execution/__init__.py +++ b/backend/app/domain/execution/__init__.py @@ -7,6 +7,7 @@ from .models import ( DomainExecution, ExecutionResultDomain, + LanguageInfoDomain, ResourceLimitsDomain, ResourceUsageDomain, ) @@ -14,6 +15,7 @@ __all__ = [ "DomainExecution", "ExecutionResultDomain", + "LanguageInfoDomain", "ResourceLimitsDomain", "ResourceUsageDomain", "ExecutionServiceError", diff --git a/backend/app/domain/execution/models.py b/backend/app/domain/execution/models.py index 482e9f39..ab49ff6a 100644 --- a/backend/app/domain/execution/models.py +++ b/backend/app/domain/execution/models.py @@ -64,6 +64,14 @@ def from_dict(data: dict[str, Any]) -> "ResourceUsageDomain": ) +@dataclass +class LanguageInfoDomain: + """Language runtime information.""" + + versions: list[str] + file_ext: str + + @dataclass class ResourceLimitsDomain: """K8s resource limits configuration.""" @@ -73,4 +81,4 @@ class ResourceLimitsDomain: cpu_request: str memory_request: str execution_timeout: int - supported_runtimes: dict[str, list[str]] + supported_runtimes: dict[str, LanguageInfoDomain] diff --git a/backend/app/domain/sse/models.py b/backend/app/domain/sse/models.py index 576637c2..e4dfa5fe 100644 --- a/backend/app/domain/sse/models.py +++ b/backend/app/domain/sse/models.py @@ -3,6 +3,8 @@ from dataclasses import dataclass from datetime import datetime +from app.domain.enums.execution import ExecutionStatus + @dataclass class ShutdownStatus: @@ -31,7 +33,7 @@ class SSEHealthDomain: @dataclass class SSEExecutionStatusDomain: execution_id: str - status: str + status: ExecutionStatus timestamp: str diff --git a/backend/app/domain/user/settings_models.py b/backend/app/domain/user/settings_models.py index 66f2f715..171f1b17 100644 --- a/backend/app/domain/user/settings_models.py +++ b/backend/app/domain/user/settings_models.py @@ -20,14 +20,12 @@ class DomainNotificationSettings: @dataclass class DomainEditorSettings: - theme: str = "one-dark" + theme: str = "auto" font_size: int = 14 tab_size: int = 4 use_tabs: bool = False word_wrap: bool = True show_line_numbers: bool = True - # fixed, non-configurable editor attributes are omitted from domain - # as they are UI concerns @dataclass @@ -55,24 +53,6 @@ class DomainUserSettingsUpdate: editor: Optional[DomainEditorSettings] = None custom_settings: Optional[Dict[str, Any]] = None - def to_update_dict(self) -> Dict[str, Any]: - out: Dict[str, Any] = {} - if self.theme is not None: - out["theme"] = self.theme - if self.timezone is not None: - out["timezone"] = self.timezone - if self.date_format is not None: - out["date_format"] = self.date_format - if self.time_format is not None: - out["time_format"] = self.time_format - if self.notifications is not None: - out["notifications"] = self.notifications - if self.editor is not None: - out["editor"] = self.editor - if self.custom_settings is not None: - out["custom_settings"] = self.custom_settings - return out - @dataclass class DomainSettingChange: @@ -94,7 +74,7 @@ class DomainSettingsEvent: @dataclass class DomainSettingsHistoryEntry: timestamp: datetime - event_type: str + event_type: EventType field: str old_value: Any new_value: Any diff --git a/backend/app/infrastructure/mappers/execution_api_mapper.py b/backend/app/infrastructure/mappers/execution_api_mapper.py index a72105a7..bf775fd0 100644 --- a/backend/app/infrastructure/mappers/execution_api_mapper.py +++ b/backend/app/infrastructure/mappers/execution_api_mapper.py @@ -1,10 +1,6 @@ from __future__ import annotations -from typing import Optional - -from app.domain.enums.common import ErrorType -from app.domain.enums.storage import ExecutionErrorType -from app.domain.execution import DomainExecution, ResourceUsageDomain +from app.domain.execution import DomainExecution from app.schemas_pydantic.execution import ExecutionResponse, ExecutionResult from app.schemas_pydantic.execution import ResourceUsage as ResourceUsageSchema @@ -20,18 +16,9 @@ def to_response(e: DomainExecution) -> ExecutionResponse: @staticmethod def to_result(e: DomainExecution) -> ExecutionResult: ru = None - if isinstance(e.resource_usage, ResourceUsageDomain): + if e.resource_usage is not None: ru = ResourceUsageSchema(**e.resource_usage.to_dict()) - # Map domain ExecutionErrorType -> public ErrorType - def _map_error(t: Optional[ExecutionErrorType]) -> Optional[ErrorType]: - if t is None: - return None - if t == ExecutionErrorType.SCRIPT_ERROR: - return ErrorType.SCRIPT_ERROR - # TIMEOUT, RESOURCE_LIMIT, SYSTEM_ERROR, PERMISSION_DENIED -> SYSTEM_ERROR class - return ErrorType.SYSTEM_ERROR - return ExecutionResult( execution_id=e.execution_id, status=e.status, @@ -41,5 +28,5 @@ def _map_error(t: Optional[ExecutionErrorType]) -> Optional[ErrorType]: lang_version=e.lang_version, resource_usage=ru, exit_code=e.exit_code, - error_type=_map_error(e.error_type), + error_type=e.error_type, ) diff --git a/backend/app/infrastructure/mappers/saga_mapper.py b/backend/app/infrastructure/mappers/saga_mapper.py index fe2ef1b6..ba631abb 100644 --- a/backend/app/infrastructure/mappers/saga_mapper.py +++ b/backend/app/infrastructure/mappers/saga_mapper.py @@ -214,31 +214,34 @@ def to_cancelled_event( class SagaFilterMapper: """Maps saga filters to MongoDB queries.""" - def to_mongodb_query(self, filter: SagaFilter) -> dict[str, Any]: + def to_mongodb_query(self, saga_filter: SagaFilter | None) -> dict[str, Any]: """Convert filter to MongoDB query.""" query: dict[str, Any] = {} - if filter.state: - query["state"] = filter.state.value + if not saga_filter: + return query - if filter.execution_ids: - query["execution_id"] = {"$in": filter.execution_ids} + if saga_filter.state: + query["state"] = saga_filter.state.value - if filter.saga_name: - query["saga_name"] = filter.saga_name + if saga_filter.execution_ids: + query["execution_id"] = {"$in": saga_filter.execution_ids} - if filter.error_status is not None: - if filter.error_status: + if saga_filter.saga_name: + query["saga_name"] = saga_filter.saga_name + + if saga_filter.error_status is not None: + if saga_filter.error_status: query["error_message"] = {"$ne": None} else: query["error_message"] = None - if filter.created_after or filter.created_before: + if saga_filter.created_after or saga_filter.created_before: time_query: dict[str, Any] = {} - if filter.created_after: - time_query["$gte"] = filter.created_after - if filter.created_before: - time_query["$lte"] = filter.created_before + if saga_filter.created_after: + time_query["$gte"] = saga_filter.created_after + if saga_filter.created_before: + time_query["$lte"] = saga_filter.created_before query["created_at"] = time_query return query diff --git a/backend/app/infrastructure/mappers/sse_mapper.py b/backend/app/infrastructure/mappers/sse_mapper.py index aa8f6b3f..5a391e5e 100644 --- a/backend/app/infrastructure/mappers/sse_mapper.py +++ b/backend/app/infrastructure/mappers/sse_mapper.py @@ -1,46 +1,27 @@ from datetime import datetime, timezone -from typing import Any, Dict +from typing import Any from app.domain.enums.execution import ExecutionStatus from app.domain.execution import DomainExecution, ResourceUsageDomain -from app.domain.sse import SSEEventDomain, SSEExecutionStatusDomain class SSEMapper: - """Mapper for SSE-related domain models and MongoDB documents.""" + """Mapper for SSE-related domain models from MongoDB documents.""" - # Execution status (lightweight) @staticmethod - def to_execution_status(execution_id: str, status: str) -> SSEExecutionStatusDomain: - return SSEExecutionStatusDomain( - execution_id=execution_id, - status=status, - timestamp=datetime.now(timezone.utc).isoformat(), - ) - - # Execution events - @staticmethod - def event_from_mongo_document(doc: Dict[str, Any]) -> SSEEventDomain: - return SSEEventDomain( - aggregate_id=str(doc.get("aggregate_id", "")), - timestamp=doc["timestamp"], - ) - - # Executions - @staticmethod - def execution_from_mongo_document(doc: Dict[str, Any]) -> DomainExecution: - sv = doc.get("status") + def execution_from_mongo_document(doc: dict[str, Any]) -> DomainExecution: + resource_usage_data = doc.get("resource_usage") return DomainExecution( execution_id=str(doc.get("execution_id")), script=str(doc.get("script", "")), - status=ExecutionStatus(str(sv)), + status=ExecutionStatus(str(doc.get("status"))), stdout=doc.get("stdout"), stderr=doc.get("stderr"), lang=str(doc.get("lang", "python")), lang_version=str(doc.get("lang_version", "3.11")), created_at=doc.get("created_at", datetime.now(timezone.utc)), updated_at=doc.get("updated_at", datetime.now(timezone.utc)), - resource_usage=ResourceUsageDomain.from_dict(doc.get("resource_usage") or {}), + resource_usage=ResourceUsageDomain.from_dict(resource_usage_data) if resource_usage_data else None, user_id=doc.get("user_id"), exit_code=doc.get("exit_code"), error_type=doc.get("error_type"), diff --git a/backend/app/runtime_registry.py b/backend/app/runtime_registry.py index cf4dd441..7200d61d 100644 --- a/backend/app/runtime_registry.py +++ b/backend/app/runtime_registry.py @@ -1,5 +1,7 @@ from typing import NamedTuple, TypedDict +from app.domain.execution import LanguageInfoDomain + class RuntimeConfig(NamedTuple): image: str # Full Docker image reference @@ -178,4 +180,7 @@ def _make_runtime_configs() -> dict[str, dict[str, RuntimeConfig]]: RUNTIME_REGISTRY: dict[str, dict[str, RuntimeConfig]] = _make_runtime_configs() -SUPPORTED_RUNTIMES: dict[str, list[str]] = {lang: list(versions.keys()) for lang, versions in RUNTIME_REGISTRY.items()} +SUPPORTED_RUNTIMES: dict[str, LanguageInfoDomain] = { + lang: LanguageInfoDomain(versions=spec["versions"], file_ext=spec["file_ext"]) + for lang, spec in LANGUAGE_SPECS.items() +} diff --git a/backend/app/schemas_pydantic/dlq.py b/backend/app/schemas_pydantic/dlq.py index b0e9f8e2..a52a37ae 100644 --- a/backend/app/schemas_pydantic/dlq.py +++ b/backend/app/schemas_pydantic/dlq.py @@ -4,6 +4,7 @@ from pydantic import BaseModel from app.dlq import DLQMessageStatus, RetryStrategy +from app.domain.enums.events import EventType class DLQStats(BaseModel): @@ -20,7 +21,7 @@ class DLQMessageResponse(BaseModel): """Response model for a DLQ message.""" event_id: str - event_type: str + event_type: EventType original_topic: str error: str retry_count: int @@ -82,7 +83,7 @@ class DLQMessageDetail(BaseModel): event_id: str event: dict[str, Any] # BaseEvent as dict - event_type: str + event_type: EventType original_topic: str error: str retry_count: int diff --git a/backend/app/schemas_pydantic/execution.py b/backend/app/schemas_pydantic/execution.py index ad91cf9a..74fc8b18 100644 --- a/backend/app/schemas_pydantic/execution.py +++ b/backend/app/schemas_pydantic/execution.py @@ -6,8 +6,9 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator -from app.domain.enums.common import ErrorType +from app.domain.enums.events import EventType from app.domain.enums.execution import ExecutionStatus +from app.domain.enums.storage import ExecutionErrorType from app.settings import get_settings @@ -37,9 +38,9 @@ class ExecutionInDB(ExecutionBase): resource_usage: ResourceUsage | None = None user_id: str | None = None exit_code: int | None = None - error_type: ErrorType | None = None + error_type: ExecutionErrorType | None = None - model_config = ConfigDict(populate_by_name=True) + model_config = ConfigDict(populate_by_name=True, from_attributes=True) class ExecutionUpdate(BaseModel): @@ -50,7 +51,7 @@ class ExecutionUpdate(BaseModel): stderr: str | None = None resource_usage: ResourceUsage | None = None exit_code: int | None = None - error_type: ErrorType | None = None + error_type: ExecutionErrorType | None = None class ResourceUsage(BaseModel): @@ -63,6 +64,8 @@ class ResourceUsage(BaseModel): clk_tck_hertz: int | None = Field(default=None, description="Clock ticks per second (usually 100)") peak_memory_kb: int | None = Field(default=None, description="Peak memory usage in KB") + model_config = ConfigDict(from_attributes=True) + class ExecutionRequest(BaseModel): """Model for execution request.""" @@ -73,13 +76,13 @@ class ExecutionRequest(BaseModel): @model_validator(mode="after") def validate_runtime_supported(self) -> "ExecutionRequest": # noqa: D401 - settings = get_settings() - runtimes = settings.SUPPORTED_RUNTIMES or {} - if self.lang not in runtimes: + runtimes = get_settings().SUPPORTED_RUNTIMES + if not (lang_info := runtimes.get(self.lang)): raise ValueError(f"Language '{self.lang}' not supported. Supported: {list(runtimes.keys())}") - versions = runtimes.get(self.lang, []) - if self.lang_version not in versions: - raise ValueError(f"Version '{self.lang_version}' not supported for {self.lang}. Supported: {versions}") + if self.lang_version not in lang_info.versions: + raise ValueError( + f"Version '{self.lang_version}' not supported for {self.lang}. Supported: {lang_info.versions}" + ) return self @@ -103,7 +106,16 @@ class ExecutionResult(BaseModel): lang_version: str resource_usage: ResourceUsage | None = None exit_code: int | None = None - error_type: ErrorType | None = None + error_type: ExecutionErrorType | None = None + + model_config = ConfigDict(from_attributes=True) + + +class LanguageInfo(BaseModel): + """Language runtime information.""" + + versions: list[str] + file_ext: str model_config = ConfigDict(from_attributes=True) @@ -116,7 +128,9 @@ class ResourceLimits(BaseModel): cpu_request: str memory_request: str execution_timeout: int - supported_runtimes: dict[str, list[str]] + supported_runtimes: dict[str, LanguageInfo] + + model_config = ConfigDict(from_attributes=True) class ExampleScripts(BaseModel): @@ -142,10 +156,12 @@ class ExecutionEventResponse(BaseModel): """Model for execution event response.""" event_id: str - event_type: str + event_type: EventType timestamp: datetime payload: dict[str, Any] + model_config = ConfigDict(from_attributes=True) + class ExecutionListResponse(BaseModel): """Model for paginated execution list.""" diff --git a/backend/app/schemas_pydantic/sse.py b/backend/app/schemas_pydantic/sse.py index ed2b22fd..489156db 100644 --- a/backend/app/schemas_pydantic/sse.py +++ b/backend/app/schemas_pydantic/sse.py @@ -1,14 +1,115 @@ from datetime import datetime -from typing import Any, Dict +from typing import Any, Dict, Literal, TypeVar from pydantic import BaseModel, Field +from app.domain.enums.events import EventType +from app.domain.enums.execution import ExecutionStatus +from app.domain.enums.notification import NotificationSeverity, NotificationStatus +from app.schemas_pydantic.execution import ExecutionResult, ResourceUsage -class SSEEvent(BaseModel): - """Base model for SSE events.""" +# Control event types sent by SSE (not from Kafka) +SSEControlEventType = Literal["connected", "heartbeat", "shutdown", "status", "error"] - event: str = Field(description="Event type") - data: str = Field(description="JSON-encoded event data") +# Type variable for generic Redis message parsing +T = TypeVar("T", bound=BaseModel) + + +class SSEExecutionEventData(BaseModel): + """Typed model for SSE execution stream event payload. + + This represents the JSON data sent inside each SSE message for execution streams. + All fields except event_type and execution_id are optional since different + event types carry different data. + """ + + # Always present - identifies the event + event_type: EventType | SSEControlEventType = Field( + description="Event type identifier (business event or control event)" + ) + execution_id: str = Field(description="Execution ID this event relates to") + + # Present in most events + timestamp: str | None = Field(default=None, description="ISO 8601 timestamp") + + # Present in business events from Kafka + event_id: str | None = Field(default=None, description="Unique event identifier") + type: EventType | SSEControlEventType | None = Field( + default=None, description="Event type (legacy field, same as event_type)" + ) + + # Control event specific fields + connection_id: str | None = Field(default=None, description="SSE connection ID (connected event)") + message: str | None = Field(default=None, description="Human-readable message (heartbeat, shutdown)") + grace_period: int | None = Field(default=None, description="Shutdown grace period in seconds") + error: str | None = Field(default=None, description="Error message (error event)") + + # Execution status + status: ExecutionStatus | None = Field(default=None, description="Current execution status") + + # Execution output (completed/failed/timeout events) + stdout: str | None = Field(default=None, description="Standard output from execution") + stderr: str | None = Field(default=None, description="Standard error from execution") + exit_code: int | None = Field(default=None, description="Process exit code") + timeout_seconds: int | None = Field(default=None, description="Timeout duration in seconds") + + # Resource usage metrics + resource_usage: ResourceUsage | None = Field(default=None, description="CPU/memory usage metrics") + + # Full execution result (only for result_stored event) + result: ExecutionResult | None = Field(default=None, description="Complete execution result") + + +class RedisSSEMessage(BaseModel): + """Message structure published to Redis for execution SSE delivery.""" + + event_type: EventType = Field(description="Event type from Kafka") + execution_id: str | None = Field(None, description="Execution ID") + data: Dict[str, Any] = Field(description="Full event data from BaseEvent.model_dump()") + + +# Control event types for notification SSE stream +SSENotificationControlEventType = Literal["connected", "heartbeat", "notification"] + + +class SSENotificationEventData(BaseModel): + """Typed model for SSE notification stream event payload. + + This represents the JSON data sent inside each SSE message for notification streams. + """ + + # Always present - identifies the event type + event_type: SSENotificationControlEventType = Field( + description="Event type identifier (connected, heartbeat, or notification)" + ) + + # Present in control events (connected, heartbeat) + user_id: str | None = Field(default=None, description="User ID for the notification stream") + timestamp: str | None = Field(default=None, description="ISO 8601 timestamp") + message: str | None = Field(default=None, description="Human-readable message") + + # Present only in notification events + notification_id: str | None = Field(default=None, description="Unique notification ID") + severity: NotificationSeverity | None = Field(default=None, description="Notification severity level") + status: NotificationStatus | None = Field(default=None, description="Notification delivery status") + tags: list[str] | None = Field(default=None, description="Notification tags") + subject: str | None = Field(default=None, description="Notification subject/title") + body: str | None = Field(default=None, description="Notification body content") + action_url: str | None = Field(default=None, description="Optional action URL") + created_at: str | None = Field(default=None, description="ISO 8601 creation timestamp") + + +class RedisNotificationMessage(BaseModel): + """Message structure published to Redis for notification SSE delivery.""" + + notification_id: str = Field(description="Unique notification ID") + severity: NotificationSeverity = Field(description="Notification severity level") + status: NotificationStatus = Field(description="Notification delivery status") + tags: list[str] = Field(default_factory=list, description="Notification tags") + subject: str = Field(description="Notification subject/title") + body: str = Field(description="Notification body content") + action_url: str = Field(default="", description="Optional action URL") + created_at: str = Field(description="ISO 8601 creation timestamp") class ShutdownStatusResponse(BaseModel): @@ -33,33 +134,3 @@ class SSEHealthResponse(BaseModel): max_connections_per_user: int = Field(description="Maximum connections allowed per user") shutdown: ShutdownStatusResponse = Field(description="Shutdown status information") timestamp: datetime = Field(description="Health check timestamp") - - -class ExecutionStreamEvent(BaseModel): - """Model for execution stream events.""" - - event_id: str | None = Field(None, description="Unique event identifier") - timestamp: datetime | None = Field(None, description="Event timestamp") - type: str | None = Field(None, description="Event type") - execution_id: str = Field(description="Execution ID") - status: str | None = Field(None, description="Execution status") - payload: Dict[str, Any] = Field(default_factory=dict, description="Event payload") - stdout: str | None = Field(None, description="Execution stdout") - stderr: str | None = Field(None, description="Execution stderr") - - -class NotificationStreamEvent(BaseModel): - """Model for notification stream events.""" - - message: str = Field(description="Notification message") - user_id: str = Field(description="User ID") - timestamp: datetime = Field(description="Event timestamp") - - -class HeartbeatEvent(BaseModel): - """Model for heartbeat events.""" - - timestamp: datetime = Field(description="Heartbeat timestamp") - execution_id: str | None = Field(None, description="Associated execution ID") - user_id: str | None = Field(None, description="Associated user ID") - message: str | None = Field(None, description="Optional heartbeat message") diff --git a/backend/app/schemas_pydantic/user_settings.py b/backend/app/schemas_pydantic/user_settings.py index 2066ca45..5fcc30d5 100644 --- a/backend/app/schemas_pydantic/user_settings.py +++ b/backend/app/schemas_pydantic/user_settings.py @@ -21,18 +21,12 @@ class NotificationSettings(BaseModel): class EditorSettings(BaseModel): """Code editor preferences""" - theme: str = "one-dark" + theme: str = "auto" font_size: int = 14 tab_size: int = 4 use_tabs: bool = False word_wrap: bool = True show_line_numbers: bool = True - # These are always on in the editor, not user-configurable - font_family: str = "Monaco, Consolas, 'Courier New', monospace" - auto_complete: bool = True - bracket_matching: bool = True - highlight_active_line: bool = True - default_language: str = "python" @field_validator("font_size") @classmethod @@ -97,7 +91,7 @@ class SettingsHistoryEntry(BaseModel): """Single entry in settings history""" timestamp: datetime - event_type: str + event_type: EventType field: str old_value: Any new_value: Any @@ -122,7 +116,7 @@ class RestoreSettingsRequest(BaseModel): class SettingsEvent(BaseModel): """Minimal event model for user settings service consumption.""" - event_type: str | EventType + event_type: EventType timestamp: datetime payload: Dict[str, Any] correlation_id: str | None = None diff --git a/backend/app/services/admin/admin_events_service.py b/backend/app/services/admin/admin_events_service.py index 2419f675..fe70a45f 100644 --- a/backend/app/services/admin/admin_events_service.py +++ b/backend/app/services/admin/admin_events_service.py @@ -59,14 +59,14 @@ def __init__(self, repository: AdminEventsRepository, replay_service: ReplayServ async def browse_events( self, *, - filter: EventFilter, + event_filter: EventFilter, skip: int, limit: int, sort_by: str, sort_order: int, ) -> EventBrowseResult: return await self._repo.browse_events( - filter=filter, skip=skip, limit=limit, sort_by=sort_by, sort_order=sort_order + event_filter=event_filter, skip=skip, limit=limit, sort_by=sort_by, sort_order=sort_order ) async def get_event_detail(self, event_id: str) -> EventDetail | None: @@ -180,12 +180,12 @@ async def get_replay_status(self, session_id: str) -> ReplaySessionStatusDetail status = await self._repo.get_replay_status_with_progress(session_id) return status - async def export_events_csv(self, filter: EventFilter) -> List[EventExportRow]: - rows = await self._repo.export_events_csv(filter) + async def export_events_csv(self, event_filter: EventFilter) -> List[EventExportRow]: + rows = await self._repo.export_events_csv(event_filter) return rows - async def export_events_csv_content(self, *, filter: EventFilter, limit: int) -> ExportResult: - rows = await self._repo.export_events_csv(filter) + async def export_events_csv_content(self, *, event_filter: EventFilter, limit: int) -> ExportResult: + rows = await self._repo.export_events_csv(event_filter) output = StringIO() writer = csv.DictWriter( output, @@ -216,8 +216,10 @@ async def export_events_csv_content(self, *, filter: EventFilter, limit: int) -> ) return ExportResult(file_name=filename, content=output.getvalue(), media_type="text/csv") - async def export_events_json_content(self, *, filter: EventFilter, limit: int) -> ExportResult: - result = await self._repo.browse_events(filter=filter, skip=0, limit=limit, sort_by="timestamp", sort_order=-1) + async def export_events_json_content(self, *, event_filter: EventFilter, limit: int) -> ExportResult: + result = await self._repo.browse_events( + event_filter=event_filter, skip=0, limit=limit, sort_by="timestamp", sort_order=-1 + ) event_mapper = EventMapper() events_data: list[dict[str, Any]] = [] for event in result.events: @@ -232,13 +234,13 @@ async def export_events_json_content(self, *, filter: EventFilter, limit: int) - "exported_at": datetime.now(timezone.utc).isoformat(), "total_events": len(events_data), "filters_applied": { - "event_types": filter.event_types, - "aggregate_id": filter.aggregate_id, - "correlation_id": filter.correlation_id, - "user_id": filter.user_id, - "service_name": filter.service_name, - "start_time": filter.start_time.isoformat() if filter.start_time else None, - "end_time": filter.end_time.isoformat() if filter.end_time else None, + "event_types": event_filter.event_types, + "aggregate_id": event_filter.aggregate_id, + "correlation_id": event_filter.correlation_id, + "user_id": event_filter.user_id, + "service_name": event_filter.service_name, + "start_time": event_filter.start_time.isoformat() if event_filter.start_time else None, + "end_time": event_filter.end_time.isoformat() if event_filter.end_time else None, }, "export_limit": limit, }, diff --git a/backend/app/services/event_replay/replay_service.py b/backend/app/services/event_replay/replay_service.py index 8d88f356..1f87dc49 100644 --- a/backend/app/services/event_replay/replay_service.py +++ b/backend/app/services/event_replay/replay_service.py @@ -187,7 +187,7 @@ async def _fetch_event_batches(self, session: ReplaySessionState) -> AsyncIterat max_events = session.config.max_events async for batch_docs in self._repository.fetch_events( - filter=session.config.filter, batch_size=session.config.batch_size + replay_filter=session.config.filter, batch_size=session.config.batch_size ): batch: List[BaseEvent] = [] for doc in batch_docs: diff --git a/backend/app/services/event_service.py b/backend/app/services/event_service.py index 506f1cee..28e0c61e 100644 --- a/backend/app/services/event_service.py +++ b/backend/app/services/event_service.py @@ -1,10 +1,11 @@ from datetime import datetime -from typing import Any, Dict, List +from typing import Any from pymongo import ASCENDING, DESCENDING from app.db.repositories.event_repository import EventRepository from app.domain.enums.common import SortOrder +from app.domain.enums.events import EventType from app.domain.enums.user import UserRole from app.domain.events import ( Event, @@ -33,7 +34,7 @@ async def get_execution_events( user_id: str, user_role: UserRole, include_system_events: bool = False, - ) -> List[Event] | None: + ) -> list[Event] | None: events = await self.repository.get_events_by_aggregate(aggregate_id=execution_id, limit=1000) if not events: return [] @@ -55,7 +56,7 @@ async def get_execution_events( async def get_user_events_paginated( self, user_id: str, - event_types: List[str] | None = None, + event_types: list[str] | None = None, start_time: datetime | None = None, end_time: datetime | None = None, limit: int = 100, @@ -117,7 +118,7 @@ async def get_events_by_correlation( user_role: UserRole, include_all_users: bool = False, limit: int = 100, - ) -> List[Event]: + ) -> list[Event]: events = await self.repository.get_events_by_correlation(correlation_id=correlation_id, limit=limit) if not include_all_users or user_role != UserRole.ADMIN: events = [e for e in events if (e.metadata and e.metadata.user_id == user_id)] @@ -156,7 +157,7 @@ async def aggregate_events( self, user_id: str, user_role: UserRole, - pipeline: List[Dict[str, Any]], + pipeline: list[dict[str, Any]], limit: int = 100, ) -> EventAggregationResult: user_filter = self._build_user_filter(user_id, user_role) @@ -172,7 +173,7 @@ async def list_event_types( self, user_id: str, user_role: UserRole, - ) -> List[str]: + ) -> list[str]: match = self._build_user_filter(user_id, user_role) return await self.repository.list_event_types(match=match) @@ -194,7 +195,7 @@ async def get_aggregate_replay_info(self, aggregate_id: str) -> EventReplayInfo async def get_events_by_aggregate( self, aggregate_id: str, - event_types: List[str] | None = None, + event_types: list[EventType] | None = None, limit: int = 100, ) -> list[Event]: return await self.repository.get_events_by_aggregate( diff --git a/backend/app/services/execution_service.py b/backend/app/services/execution_service.py index cd0204dc..9b2cebfc 100644 --- a/backend/app/services/execution_service.py +++ b/backend/app/services/execution_service.py @@ -10,7 +10,12 @@ from app.db.repositories.execution_repository import ExecutionRepository from app.domain.enums.events import EventType from app.domain.enums.execution import ExecutionStatus -from app.domain.execution import DomainExecution, ExecutionResultDomain, ResourceLimitsDomain, ResourceUsageDomain +from app.domain.execution import ( + DomainExecution, + ExecutionResultDomain, + ResourceLimitsDomain, + ResourceUsageDomain, +) from app.events.core import UnifiedProducer from app.events.event_store import EventStore from app.infrastructure.kafka.events.base import BaseEvent diff --git a/backend/app/services/kafka_event_service.py b/backend/app/services/kafka_event_service.py index 875a480b..1f32029a 100644 --- a/backend/app/services/kafka_event_service.py +++ b/backend/app/services/kafka_event_service.py @@ -29,7 +29,7 @@ def __init__(self, event_repository: EventRepository, kafka_producer: UnifiedPro async def publish_event( self, - event_type: str, + event_type: EventType, payload: Dict[str, Any], aggregate_id: str | None, correlation_id: str | None = None, @@ -81,15 +81,14 @@ async def publish_event( _ = await self.event_repository.store_event(event) # Get event class and create proper event instance - event_type_enum = EventType(event_type) - event_class = get_event_class_for_type(event_type_enum) + event_class = get_event_class_for_type(event_type) if not event_class: raise ValueError(f"No event class found for event type: {event_type}") # Create proper event instance with all required fields event_data = { "event_id": event.event_id, - "event_type": event_type_enum, + "event_type": event_type, "event_version": "1.0", "timestamp": timestamp, "aggregate_id": aggregate_id, @@ -138,7 +137,7 @@ async def publish_event( async def publish_execution_event( self, - event_type: str, + event_type: EventType, execution_id: str, status: str, metadata: EventMetadata | None = None, @@ -179,7 +178,7 @@ async def publish_execution_event( async def publish_pod_event( self, - event_type: str, + event_type: EventType, pod_name: str, execution_id: str, namespace: str = "integr8scode", diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index 68d68674..e5dfa937 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from enum import auto -from typing import Awaitable, Callable, Mapping +from typing import Awaitable, Callable import httpx @@ -34,6 +34,7 @@ ExecutionTimeoutEvent, ) from app.infrastructure.kafka.mappings import get_topic_for_event +from app.schemas_pydantic.sse import RedisNotificationMessage from app.services.event_bus import EventBusManager from app.services.kafka_event_service import KafkaEventService from app.services.sse.redis_bus import SSERedisBus @@ -790,17 +791,17 @@ async def delete_notification(self, user_id: str, notification_id: str) -> bool: async def _publish_notification_sse(self, notification: DomainNotification) -> None: """Publish an in-app notification to the SSE bus for realtime delivery.""" - payload: Mapping[str, object] = { - "notification_id": notification.notification_id, - "severity": str(notification.severity), - "tags": list(notification.tags or []), - "subject": notification.subject, - "body": notification.body, - "action_url": notification.action_url or "", - "created_at": notification.created_at.isoformat(), - "status": str(notification.status), - } - await self.sse_bus.publish_notification(notification.user_id, payload) + message = RedisNotificationMessage( + notification_id=notification.notification_id, + severity=notification.severity, + status=notification.status, + tags=list(notification.tags or []), + subject=notification.subject, + body=notification.body, + action_url=notification.action_url or "", + created_at=notification.created_at.isoformat(), + ) + await self.sse_bus.publish_notification(notification.user_id, message) async def _should_skip_notification( self, notification: DomainNotification, subscription: DomainNotificationSubscription | None diff --git a/backend/app/services/sse/redis_bus.py b/backend/app/services/sse/redis_bus.py index bb3dae5a..e0d9fad8 100644 --- a/backend/app/services/sse/redis_bus.py +++ b/backend/app/services/sse/redis_bus.py @@ -1,30 +1,36 @@ from __future__ import annotations -import json -from typing import Mapping +from typing import Type, TypeVar import redis.asyncio as redis +from pydantic import BaseModel +from app.core.logging import logger from app.infrastructure.kafka.events.base import BaseEvent +from app.schemas_pydantic.sse import RedisNotificationMessage, RedisSSEMessage + +T = TypeVar("T", bound=BaseModel) class SSERedisSubscription: + """Subscription wrapper for Redis pubsub with typed message parsing.""" + def __init__(self, pubsub: redis.client.PubSub, channel: str) -> None: self._pubsub = pubsub self._channel = channel - async def get(self, timeout: float = 0.5) -> dict[str, object] | None: - """Get next message from the subscription with timeout seconds.""" - msg = await self._pubsub.get_message(ignore_subscribe_messages=True, timeout=timeout) + async def get(self, model: Type[T]) -> T | None: + """Get next typed message from the subscription.""" + msg = await self._pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5) if not msg or msg.get("type") != "message": return None - data = msg.get("data") - if isinstance(data, (bytes, bytearray)): - data = data.decode("utf-8", errors="ignore") try: - parsed = json.loads(data) if isinstance(data, str) else data - return parsed if isinstance(parsed, dict) else None - except Exception: + return model.model_validate_json(msg["data"]) + except Exception as e: + logger.warning( + f"Failed to parse Redis message on channel {self._channel}: {e}", + extra={"channel": self._channel, "model": model.__name__}, + ) return None async def close(self) -> None: @@ -51,12 +57,12 @@ def _notif_channel(self, user_id: str) -> str: return f"{self._notif_prefix}{user_id}" async def publish_event(self, execution_id: str, event: BaseEvent) -> None: - payload: dict[str, object] = { - "event_type": str(event.event_type), - "execution_id": getattr(event, "execution_id", None), - "data": event.model_dump(mode="json"), - } - await self._redis.publish(self._exec_channel(execution_id), json.dumps(payload)) + message = RedisSSEMessage( + event_type=event.event_type, + execution_id=execution_id, + data=event.model_dump(mode="json"), + ) + await self._redis.publish(self._exec_channel(execution_id), message.model_dump_json()) async def open_subscription(self, execution_id: str) -> SSERedisSubscription: pubsub = self._redis.pubsub() @@ -64,9 +70,9 @@ async def open_subscription(self, execution_id: str) -> SSERedisSubscription: await pubsub.subscribe(channel) return SSERedisSubscription(pubsub, channel) - async def publish_notification(self, user_id: str, payload: Mapping[str, object]) -> None: - # Expect a JSON-serializable mapping - await self._redis.publish(self._notif_channel(user_id), json.dumps(dict(payload))) + async def publish_notification(self, user_id: str, notification: RedisNotificationMessage) -> None: + """Publish a typed notification message to Redis for SSE delivery.""" + await self._redis.publish(self._notif_channel(user_id), notification.model_dump_json()) async def open_notification_subscription(self, user_id: str) -> SSERedisSubscription: pubsub = self._redis.pubsub() diff --git a/backend/app/services/sse/sse_service.py b/backend/app/services/sse/sse_service.py index 63f6bc57..9824ec8a 100644 --- a/backend/app/services/sse/sse_service.py +++ b/backend/app/services/sse/sse_service.py @@ -1,5 +1,4 @@ import asyncio -import json from collections.abc import AsyncGenerator from datetime import datetime, timezone from typing import Any, Dict @@ -9,7 +8,13 @@ from app.db.repositories.sse_repository import SSERepository from app.domain.enums.events import EventType from app.domain.sse import SSEHealthDomain -from app.infrastructure.kafka.events.base import BaseEvent +from app.schemas_pydantic.execution import ExecutionResult +from app.schemas_pydantic.sse import ( + RedisNotificationMessage, + RedisSSEMessage, + SSEExecutionEventData, + SSENotificationEventData, +) from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge from app.services.sse.redis_bus import SSERedisBus from app.services.sse.sse_shutdown_manager import SSEShutdownManager @@ -17,9 +22,7 @@ class SSEService: - # Only result_stored should terminate the stream; other terminal-ish - # execution events precede the final persisted result and must not close - # the connection prematurely. + # Terminal event types that should close the SSE stream TERMINAL_EVENT_TYPES: set[EventType] = { EventType.RESULT_STORED, EventType.EXECUTION_FAILED, @@ -48,8 +51,13 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn shutdown_event = await self.shutdown_manager.register_connection(execution_id, connection_id) if shutdown_event is None: - yield self._format_event( - "error", {"error": "Server is shutting down", "timestamp": datetime.now(timezone.utc).isoformat()} + yield self._format_sse_event( + SSEExecutionEventData( + event_type="error", + execution_id=execution_id, + timestamp=datetime.now(timezone.utc).isoformat(), + error="Server is shutting down", + ) ) return @@ -57,13 +65,13 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn try: # Start opening subscription concurrently, then yield handshake sub_task = asyncio.create_task(self.sse_bus.open_subscription(execution_id)) - yield self._format_event( - "connected", - { - "execution_id": execution_id, - "timestamp": datetime.now(timezone.utc).isoformat(), - "connection_id": connection_id, - }, + yield self._format_sse_event( + SSEExecutionEventData( + event_type="connected", + execution_id=execution_id, + timestamp=datetime.now(timezone.utc).isoformat(), + connection_id=connection_id, + ) ) # Complete Redis subscription after handshake @@ -73,12 +81,14 @@ async def create_execution_stream(self, execution_id: str, user_id: str) -> Asyn initial_status = await self.repository.get_execution_status(execution_id) if initial_status: - payload = { - "execution_id": initial_status.execution_id, - "status": initial_status.status, - "timestamp": initial_status.timestamp, - } - yield self._format_event("status", payload) + yield self._format_sse_event( + SSEExecutionEventData( + event_type="status", + execution_id=initial_status.execution_id, + timestamp=initial_status.timestamp, + status=initial_status.status, + ) + ) self.metrics.record_sse_message_sent("executions", "status") async for event_data in self._stream_events_redis( @@ -105,97 +115,80 @@ async def _stream_events_redis( last_heartbeat = datetime.now(timezone.utc) while True: if shutdown_event.is_set(): - yield self._format_event( - "shutdown", - { - "message": "Server is shutting down", - "grace_period": 30, - "timestamp": datetime.now(timezone.utc).isoformat(), - }, + yield self._format_sse_event( + SSEExecutionEventData( + event_type="shutdown", + execution_id=execution_id, + timestamp=datetime.now(timezone.utc).isoformat(), + message="Server is shutting down", + grace_period=30, + ) ) break now = datetime.now(timezone.utc) if include_heartbeat and (now - last_heartbeat).total_seconds() >= self.heartbeat_interval: - yield self._format_event( - "heartbeat", - {"execution_id": execution_id, "timestamp": now.isoformat(), "message": "SSE connection active"}, + yield self._format_sse_event( + SSEExecutionEventData( + event_type="heartbeat", + execution_id=execution_id, + timestamp=now.isoformat(), + message="SSE connection active", + ) ) last_heartbeat = now - msg = await subscription.get(timeout=0.5) + msg: RedisSSEMessage | None = await subscription.get(RedisSSEMessage) if not msg: continue - # msg contains {'event_type': str, 'execution_id': str, 'data': {...}} - logger.info(f"Received Redis message for execution {execution_id}: {msg.get('event_type')}") - try: - raw_event_type = msg.get("event_type") - # Normalize to EventType when possible - try: - event_type = EventType(str(raw_event_type)) - except Exception: - event_type = None - data = msg.get("data", {}) - # Build SSE payload similar to _event_to_sse_format - sse_event: Dict[str, Any] = { - "event_id": data.get("event_id"), - "timestamp": data.get("timestamp"), - "type": str(event_type) if event_type is not None else str(raw_event_type), - "execution_id": execution_id, - } - if "status" in data: - sse_event["status"] = data["status"] - # Include stdout/stderr/exit_code if present - for key in ("stdout", "stderr", "exit_code", "timeout_seconds"): - if key in data: - sse_event[key] = data[key] - # Include resource_usage if present - if "resource_usage" in data: - sse_event["resource_usage"] = data["resource_usage"] - - # If this is result_stored, enrich with full execution result payload - if event_type == EventType.RESULT_STORED: - exec_domain = await self.repository.get_execution(execution_id) - if exec_domain: - ru_payload = None - if getattr(exec_domain, "resource_usage", None) is not None: - ru_obj = exec_domain.resource_usage - ru_payload = ru_obj.to_dict() if ru_obj and hasattr(ru_obj, "to_dict") else ru_obj - sse_event["result"] = { - "execution_id": exec_domain.execution_id, - "status": exec_domain.status, - "stdout": exec_domain.stdout, - "stderr": exec_domain.stderr, - "lang": exec_domain.lang, - "lang_version": exec_domain.lang_version, - "resource_usage": ru_payload, - "exit_code": exec_domain.exit_code, - "error_type": exec_domain.error_type, - } - yield self._format_event(str(event_type) if event_type is not None else str(raw_event_type), sse_event) + logger.info(f"Received Redis message for execution {execution_id}: {msg.event_type}") + try: + sse_event = await self._build_sse_event_from_redis(execution_id, msg) + yield self._format_sse_event(sse_event) # End on terminal event types - if event_type in self.TERMINAL_EVENT_TYPES: - logger.info(f"Terminal event for execution {execution_id}: {event_type}") + if msg.event_type in self.TERMINAL_EVENT_TYPES: + logger.info(f"Terminal event for execution {execution_id}: {msg.event_type}") break - except Exception: - # Ignore malformed messages + except Exception as e: + logger.warning( + f"Failed to process SSE message for execution {execution_id}: {e}", + extra={"execution_id": execution_id, "event_type": str(msg.event_type)}, + ) continue + async def _build_sse_event_from_redis(self, execution_id: str, msg: RedisSSEMessage) -> SSEExecutionEventData: + """Build typed SSE event from Redis message.""" + result: ExecutionResult | None = None + if msg.event_type == EventType.RESULT_STORED: + exec_domain = await self.repository.get_execution(execution_id) + if exec_domain: + result = ExecutionResult.model_validate(exec_domain) + + return SSEExecutionEventData.model_validate( + { + **msg.data, + "event_type": msg.event_type, + "execution_id": execution_id, + "type": msg.event_type, + "result": result, + } + ) + async def create_notification_stream(self, user_id: str) -> AsyncGenerator[Dict[str, Any], None]: subscription = None try: # Start opening subscription concurrently, then yield handshake sub_task = asyncio.create_task(self.sse_bus.open_notification_subscription(user_id)) - yield self._format_event( - "connected", - { - "message": "Connected to notification stream", - "user_id": user_id, - "timestamp": datetime.now(timezone.utc).isoformat(), - }, + yield self._format_notification_event( + SSENotificationEventData( + event_type="connected", + user_id=user_id, + timestamp=datetime.now(timezone.utc).isoformat(), + message="Connected to notification stream", + ) ) # Complete Redis subscription after handshake @@ -206,17 +199,32 @@ async def create_notification_stream(self, user_id: str) -> AsyncGenerator[Dict[ # Heartbeat now = datetime.now(timezone.utc) if (now - last_heartbeat).total_seconds() >= self.heartbeat_interval: - yield self._format_event( - "heartbeat", - {"timestamp": now.isoformat(), "user_id": user_id, "message": "Notification stream active"}, + yield self._format_notification_event( + SSENotificationEventData( + event_type="heartbeat", + user_id=user_id, + timestamp=now.isoformat(), + message="Notification stream active", + ) ) last_heartbeat = now # Forward notification messages as SSE data - msg = await subscription.get(timeout=0.5) - if msg: - # msg already contains the notification payload - yield self._format_event("notification", msg) + redis_msg = await subscription.get(RedisNotificationMessage) + if redis_msg: + yield self._format_notification_event( + SSENotificationEventData( + event_type="notification", + notification_id=redis_msg.notification_id, + severity=redis_msg.severity, + status=redis_msg.status, + tags=redis_msg.tags, + subject=redis_msg.subject, + body=redis_msg.body, + action_url=redis_msg.action_url, + created_at=redis_msg.created_at, + ) + ) finally: try: if subscription is not None: @@ -237,45 +245,10 @@ async def get_health_status(self) -> SSEHealthDomain: timestamp=datetime.now(timezone.utc), ) - async def _event_to_sse_format(self, event: BaseEvent, execution_id: str) -> Dict[str, Any]: - event_data = event.model_dump(mode="json") - - sse_event: Dict[str, Any] = { - "event_id": event.event_id, - "timestamp": event_data.get("timestamp"), - "type": str(event.event_type), - "execution_id": execution_id, - } - - if "status" in event_data: - sse_event["status"] = event_data["status"] - - if event.event_type == EventType.RESULT_STORED: - exec_domain = await self.repository.get_execution(execution_id) - if exec_domain: - ru_payload = None - if getattr(exec_domain, "resource_usage", None) is not None: - ru_obj = exec_domain.resource_usage - ru_payload = ru_obj.to_dict() if ru_obj and hasattr(ru_obj, "to_dict") else ru_obj - sse_event["result"] = { - "execution_id": exec_domain.execution_id, - "status": exec_domain.status, - "stdout": exec_domain.stdout, - "stderr": exec_domain.stderr, - "lang": exec_domain.lang, - "lang_version": exec_domain.lang_version, - "resource_usage": ru_payload, - "exit_code": exec_domain.exit_code, - "error_type": exec_domain.error_type, - } - - skip_fields = {"event_id", "timestamp", "event_type", "metadata", "payload"} - for key, value in event_data.items(): - if key not in skip_fields and key not in sse_event: - sse_event[key] = value - - return sse_event + def _format_sse_event(self, event: SSEExecutionEventData) -> Dict[str, Any]: + """Format typed SSE event for sse-starlette.""" + return {"data": event.model_dump_json(exclude_none=True)} - def _format_event(self, event_type: str, data: Dict[str, Any]) -> Dict[str, Any]: - data["event_type"] = event_type - return {"data": json.dumps(data)} + def _format_notification_event(self, event: SSENotificationEventData) -> Dict[str, Any]: + """Format typed notification SSE event for sse-starlette.""" + return {"data": event.model_dump_json(exclude_none=True)} diff --git a/backend/app/services/user_settings_service.py b/backend/app/services/user_settings_service.py index 93ac16e2..5e53d8d2 100644 --- a/backend/app/services/user_settings_service.py +++ b/backend/app/services/user_settings_service.py @@ -249,7 +249,7 @@ async def get_settings_history(self, user_id: str, limit: int = 50) -> List[Doma history.append( DomainSettingsHistoryEntry( timestamp=event.timestamp, - event_type=str(event.event_type), + event_type=event.event_type, field="/theme", old_value=event.payload.get("old_theme"), new_value=event.payload.get("new_theme"), @@ -266,7 +266,7 @@ async def get_settings_history(self, user_id: str, limit: int = 50) -> List[Doma history.append( DomainSettingsHistoryEntry( timestamp=event.timestamp, - event_type=str(event.event_type), + event_type=event.event_type, field=path, old_value=None, new_value=None, diff --git a/backend/app/settings.py b/backend/app/settings.py index 34b0383b..6e80b55f 100644 --- a/backend/app/settings.py +++ b/backend/app/settings.py @@ -3,6 +3,7 @@ from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict +from app.domain.execution import LanguageInfoDomain from app.runtime_registry import EXAMPLE_SCRIPTS as EXEC_EXAMPLE_SCRIPTS from app.runtime_registry import SUPPORTED_RUNTIMES as RUNTIME_MATRIX @@ -37,7 +38,7 @@ class Settings(BaseSettings): K8S_POD_EXECUTION_TIMEOUT: int = 300 # in seconds K8S_POD_PRIORITY_CLASS_NAME: str | None = None - SUPPORTED_RUNTIMES: dict[str, list[str]] = Field(default_factory=lambda: RUNTIME_MATRIX) + SUPPORTED_RUNTIMES: dict[str, LanguageInfoDomain] = Field(default_factory=lambda: RUNTIME_MATRIX) EXAMPLE_SCRIPTS: dict[str, str] = Field(default_factory=lambda: EXEC_EXAMPLE_SCRIPTS) diff --git a/backend/tests/integration/notifications/test_notification_sse.py b/backend/tests/integration/notifications/test_notification_sse.py index 1d4d0a05..c2fbb401 100644 --- a/backend/tests/integration/notifications/test_notification_sse.py +++ b/backend/tests/integration/notifications/test_notification_sse.py @@ -4,6 +4,7 @@ import pytest from app.domain.enums.notification import NotificationChannel, NotificationSeverity +from app.schemas_pydantic.sse import RedisNotificationMessage from app.services.notification_service import NotificationService from app.services.sse.redis_bus import SSERedisBus from tests.helpers.eventually import eventually @@ -34,13 +35,13 @@ async def test_in_app_notification_published_to_sse(scope) -> None: # type: ign ) # Receive published SSE payload - async def _recv(): - m = await sub.get(timeout=0.5) + async def _recv() -> RedisNotificationMessage: + m = await sub.get(RedisNotificationMessage) assert m is not None return m msg = await eventually(_recv, timeout=5.0, interval=0.1) # Basic shape assertions - assert msg.get("subject") == "Hello" - assert msg.get("body") == "World" - assert "notification_id" in msg + assert msg.subject == "Hello" + assert msg.body == "World" + assert msg.notification_id diff --git a/backend/tests/integration/services/sse/test_partitioned_event_router_integration.py b/backend/tests/integration/services/sse/test_partitioned_event_router_integration.py index fd36b59a..f4b604ac 100644 --- a/backend/tests/integration/services/sse/test_partitioned_event_router_integration.py +++ b/backend/tests/integration/services/sse/test_partitioned_event_router_integration.py @@ -8,6 +8,7 @@ from app.events.schema.schema_registry import SchemaRegistryManager from tests.helpers import make_execution_requested_event from app.infrastructure.kafka.events.pod import PodCreatedEvent +from app.schemas_pydantic.sse import RedisSSEMessage from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge from app.services.sse.redis_bus import SSERedisBus from app.settings import Settings @@ -42,12 +43,12 @@ async def test_router_bridges_to_redis(redis_client) -> None: # type: ignore[va await handler(ev) async def _recv(): - m = await subscription.get(timeout=0.2) + m = await subscription.get(RedisSSEMessage) assert m is not None return m msg = await eventually(_recv, timeout=2.0, interval=0.05) - assert msg.get("event_type") == str(ev.event_type) + assert str(msg.event_type) == str(ev.event_type) @pytest.mark.asyncio diff --git a/backend/tests/integration/test_sse_routes.py b/backend/tests/integration/test_sse_routes.py index 010f8c5b..c6c297a9 100644 --- a/backend/tests/integration/test_sse_routes.py +++ b/backend/tests/integration/test_sse_routes.py @@ -6,7 +6,8 @@ import pytest from httpx import AsyncClient -from app.schemas_pydantic.sse import SSEHealthResponse +from app.domain.enums.notification import NotificationSeverity, NotificationStatus +from app.schemas_pydantic.sse import RedisNotificationMessage, SSEHealthResponse from app.infrastructure.kafka.events.pod import PodCreatedEvent from app.infrastructure.kafka.events.metadata import EventMetadata from app.services.sse.redis_bus import SSERedisBus @@ -73,7 +74,17 @@ async def _connected() -> None: await eventually(_connected, timeout=2.0, interval=0.05) # Publish a notification - await bus.publish_notification(user_id, {"subject": "Hello", "body": "World", "event_type": "notification"}) + notification = RedisNotificationMessage( + notification_id=f"notif-{uuid4().hex[:8]}", + severity=NotificationSeverity.MEDIUM, + status=NotificationStatus.PENDING, + tags=[], + subject="Hello", + body="World", + action_url="", + created_at="2024-01-01T00:00:00Z", + ) + await bus.publish_notification(user_id, notification) # Wait for collection to complete try: diff --git a/backend/tests/integration/test_user_settings_routes.py b/backend/tests/integration/test_user_settings_routes.py index 20b89dce..c6378351 100644 --- a/backend/tests/integration/test_user_settings_routes.py +++ b/backend/tests/integration/test_user_settings_routes.py @@ -129,7 +129,7 @@ async def test_get_user_settings(self, client: AsyncClient, test_user: Dict[str, assert settings.editor is not None assert isinstance(settings.editor.font_size, int) assert 8 <= settings.editor.font_size <= 32 - assert settings.editor.theme in ["one-dark", "monokai", "github", "dracula", "solarized", "vs", "vscode"] + assert settings.editor.theme in ["auto", "one-dark", "monokai", "github", "dracula", "solarized", "vs", "vscode"] assert isinstance(settings.editor.tab_size, int) assert settings.editor.tab_size in [2, 4, 8] assert isinstance(settings.editor.word_wrap, bool) diff --git a/backend/tests/unit/db/repositories/test_replay_repository.py b/backend/tests/unit/db/repositories/test_replay_repository.py index b9d269b4..8f8aaf2e 100644 --- a/backend/tests/unit/db/repositories/test_replay_repository.py +++ b/backend/tests/unit/db/repositories/test_replay_repository.py @@ -26,7 +26,7 @@ async def test_indexes_and_session_crud(repo: ReplayRepository) -> None: assert got and got.session_id == "s1" lst = await repo.list_sessions(limit=5) assert any(s.session_id == "s1" for s in lst) - assert await repo.update_session_status("s1", "running") is True + assert await repo.update_session_status("s1", ReplayStatus.RUNNING) is True session_update = ReplaySessionUpdate(status=ReplayStatus.COMPLETED) assert await repo.update_replay_session("s1", session_update) is True diff --git a/backend/tests/unit/db/repositories/test_saga_repository.py b/backend/tests/unit/db/repositories/test_saga_repository.py index 2d548712..0e3c8f0b 100644 --- a/backend/tests/unit/db/repositories/test_saga_repository.py +++ b/backend/tests/unit/db/repositories/test_saga_repository.py @@ -3,6 +3,7 @@ import pytest from app.db.repositories.saga_repository import SagaRepository +from app.domain.enums.saga import SagaState from app.domain.saga.models import Saga, SagaFilter, SagaListResult pytestmark = pytest.mark.unit @@ -30,7 +31,7 @@ async def test_saga_crud_and_queries(repo: SagaRepository, db) -> None: # type: result = await repo.list_sagas(f, limit=2) assert isinstance(result, SagaListResult) - assert await repo.update_saga_state("s1", "completed") in (True, False) + assert await repo.update_saga_state("s1", SagaState.COMPLETED) in (True, False) # user execution ids await db.get_collection("executions").insert_many([ diff --git a/backend/tests/unit/db/repositories/test_sse_repository.py b/backend/tests/unit/db/repositories/test_sse_repository.py index 6810e39e..fde10f35 100644 --- a/backend/tests/unit/db/repositories/test_sse_repository.py +++ b/backend/tests/unit/db/repositories/test_sse_repository.py @@ -1,6 +1,7 @@ import pytest from app.db.repositories.sse_repository import SSERepository +from app.domain.enums.execution import ExecutionStatus pytestmark = pytest.mark.unit @@ -8,10 +9,11 @@ @pytest.mark.asyncio async def test_get_execution_status(db) -> None: # type: ignore[valid-type] repo = SSERepository(db) - # Insert execution await db.get_collection("executions").insert_one({"execution_id": "e1", "status": "running"}) status = await repo.get_execution_status("e1") - assert status and status.status == "running" and status.execution_id == "e1" + assert status is not None + assert status.status == ExecutionStatus.RUNNING + assert status.execution_id == "e1" @pytest.mark.asyncio @@ -21,33 +23,19 @@ async def test_get_execution_status_none(db) -> None: # type: ignore[valid-type @pytest.mark.asyncio -async def test_get_execution_events(db) -> None: # type: ignore[valid-type] - repo = SSERepository(db) - await db.get_collection("events").insert_one({"aggregate_id": "e1", "timestamp": 1, "event_type": "X"}) - events = await repo.get_execution_events("e1") - assert len(events) == 1 and events[0].aggregate_id == "e1" - - -@pytest.mark.asyncio -async def test_get_execution_for_user_and_plain(db) -> None: # type: ignore[valid-type] +async def test_get_execution(db) -> None: # type: ignore[valid-type] repo = SSERepository(db) await db.get_collection("executions").insert_one({ "execution_id": "e1", - "user_id": "u1", "status": "queued", "resource_usage": {} }) - doc = await repo.get_execution_for_user("e1", "u1") - assert doc and doc.user_id == "u1" - await db.get_collection("executions").insert_one({ - "execution_id": "e2", - "status": "queued", # Add required status field - "resource_usage": {} - }) - assert (await repo.get_execution("e2")) is not None + doc = await repo.get_execution("e1") + assert doc is not None + assert doc.execution_id == "e1" @pytest.mark.asyncio -async def test_get_execution_for_user_not_found(db) -> None: # type: ignore[valid-type] +async def test_get_execution_not_found(db) -> None: # type: ignore[valid-type] repo = SSERepository(db) - assert await repo.get_execution_for_user("e1", "uX") is None + assert await repo.get_execution("missing") is None diff --git a/backend/tests/unit/dlq/test_dlq_models.py b/backend/tests/unit/dlq/test_dlq_models.py index c36f03d7..16497625 100644 --- a/backend/tests/unit/dlq/test_dlq_models.py +++ b/backend/tests/unit/dlq/test_dlq_models.py @@ -1,19 +1,18 @@ -from datetime import datetime, timezone import json - -import pytest +from datetime import datetime, timezone from app.dlq import ( AgeStatistics, DLQFields, DLQMessageFilter, DLQMessageStatus, + DLQStatistics, EventTypeStatistic, RetryPolicy, RetryStrategy, TopicStatistic, - DLQStatistics, ) +from app.domain.enums.events import EventType from app.events.schema.schema_registry import SchemaRegistryManager from app.infrastructure.kafka.events.metadata import EventMetadata from app.infrastructure.kafka.events.user import UserLoggedInEvent @@ -22,6 +21,7 @@ def _make_event() -> UserLoggedInEvent: from app.domain.enums.auth import LoginMethod + return UserLoggedInEvent( user_id="u1", login_method=LoginMethod.PASSWORD, @@ -95,10 +95,11 @@ def test_retry_policy_bounds() -> None: def test_filter_and_stats_models() -> None: - f = DLQMessageFilter(status=DLQMessageStatus.PENDING, topic="t", event_type="X") + f = DLQMessageFilter(status=DLQMessageStatus.PENDING, topic="t", event_type=EventType.EXECUTION_REQUESTED) q = DLQMapper.filter_to_query(f) assert q[DLQFields.STATUS] == DLQMessageStatus.PENDING assert q[DLQFields.ORIGINAL_TOPIC] == "t" + assert q[DLQFields.EVENT_TYPE] == EventType.EXECUTION_REQUESTED ts = TopicStatistic(topic="t", count=2, avg_retry_count=1.5) es = EventTypeStatistic(event_type="X", count=3) @@ -106,4 +107,3 @@ def test_filter_and_stats_models() -> None: stats = DLQStatistics(by_status={"pending": 1}, by_topic=[ts], by_event_type=[es], age_stats=ages) assert stats.by_status["pending"] == 1 assert isinstance(stats.timestamp, datetime) - diff --git a/backend/tests/unit/infrastructure/mappers/test_dlq_mapper.py b/backend/tests/unit/infrastructure/mappers/test_dlq_mapper.py index 86866039..a7a7aaf7 100644 --- a/backend/tests/unit/infrastructure/mappers/test_dlq_mapper.py +++ b/backend/tests/unit/infrastructure/mappers/test_dlq_mapper.py @@ -5,8 +5,6 @@ from unittest.mock import MagicMock, patch import pytest -from confluent_kafka import Message - from app.dlq.models import ( DLQBatchRetryResult, DLQFields, @@ -16,8 +14,11 @@ DLQMessageUpdate, DLQRetryResult, ) -from tests.helpers import make_execution_requested_event +from app.domain.enums.events import EventType from app.infrastructure.mappers.dlq_mapper import DLQMapper +from confluent_kafka import Message + +from tests.helpers import make_execution_requested_event @pytest.fixture @@ -112,7 +113,7 @@ def test_from_mongo_document_full(self, sample_dlq_message): """Test creating DLQ message from MongoDB document with all fields.""" doc = DLQMapper.to_mongo_document(sample_dlq_message) - with patch('app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager') as mock_registry: + with patch("app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager") as mock_registry: mock_registry.return_value.deserialize_json.return_value = sample_dlq_message.event msg = DLQMapper.from_mongo_document(doc) @@ -136,7 +137,7 @@ def test_from_mongo_document_minimal(self, sample_event): DLQFields.FAILED_AT: datetime.now(timezone.utc), } - with patch('app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager') as mock_registry: + with patch("app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager") as mock_registry: mock_registry.return_value.deserialize_json.return_value = sample_event msg = DLQMapper.from_mongo_document(doc) @@ -157,7 +158,7 @@ def test_from_mongo_document_with_string_datetime(self, sample_event): DLQFields.CREATED_AT: now.isoformat(), } - with patch('app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager') as mock_registry: + with patch("app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager") as mock_registry: mock_registry.return_value.deserialize_json.return_value = sample_event msg = DLQMapper.from_mongo_document(doc) @@ -171,7 +172,7 @@ def test_from_mongo_document_missing_failed_at(self, sample_event): DLQFields.EVENT: sample_event.to_dict(), } - with patch('app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager') as mock_registry: + with patch("app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager") as mock_registry: mock_registry.return_value.deserialize_json.return_value = sample_event with pytest.raises(ValueError, match="Missing failed_at"): @@ -184,7 +185,7 @@ def test_from_mongo_document_invalid_failed_at(self, sample_event): DLQFields.FAILED_AT: None, } - with patch('app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager') as mock_registry: + with patch("app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager") as mock_registry: mock_registry.return_value.deserialize_json.return_value = sample_event with pytest.raises(ValueError, match="Missing failed_at"): @@ -197,7 +198,7 @@ def test_from_mongo_document_invalid_event(self): DLQFields.EVENT: "not a dict", } - with patch('app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager'): + with patch("app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager"): with pytest.raises(ValueError, match="Missing or invalid event data"): DLQMapper.from_mongo_document(doc) @@ -208,7 +209,7 @@ def test_from_mongo_document_invalid_datetime_type(self, sample_event): DLQFields.FAILED_AT: 12345, # Invalid type } - with patch('app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager') as mock_registry: + with patch("app.infrastructure.mappers.dlq_mapper.SchemaRegistryManager") as mock_registry: mock_registry.return_value.deserialize_json.return_value = sample_event with pytest.raises(ValueError, match="Invalid datetime type"): @@ -321,11 +322,7 @@ def test_retry_result_to_dict_with_error(self): d = DLQMapper.retry_result_to_dict(result) - assert d == { - "event_id": "event-123", - "status": "failed", - "error": "Connection error" - } + assert d == {"event_id": "event-123", "status": "failed", "error": "Connection error"} def test_batch_retry_result_to_dict(self): """Test converting batch retry result to dictionary.""" @@ -403,14 +400,14 @@ def test_filter_to_query_full(self): f = DLQMessageFilter( status=DLQMessageStatus.PENDING, topic="test-topic", - event_type="execution_requested", + event_type=EventType.EXECUTION_REQUESTED, ) query = DLQMapper.filter_to_query(f) assert query[DLQFields.STATUS] == DLQMessageStatus.PENDING assert query[DLQFields.ORIGINAL_TOPIC] == "test-topic" - assert query[DLQFields.EVENT_TYPE] == "execution_requested" + assert query[DLQFields.EVENT_TYPE] == EventType.EXECUTION_REQUESTED def test_filter_to_query_empty(self): """Test converting empty DLQ message filter to MongoDB query.""" diff --git a/backend/tests/unit/infrastructure/mappers/test_execution_api_mapper.py b/backend/tests/unit/infrastructure/mappers/test_execution_api_mapper.py index e944c5d6..50740204 100644 --- a/backend/tests/unit/infrastructure/mappers/test_execution_api_mapper.py +++ b/backend/tests/unit/infrastructure/mappers/test_execution_api_mapper.py @@ -2,7 +2,6 @@ import pytest -from app.domain.enums.common import ErrorType from app.domain.enums.execution import ExecutionStatus from app.domain.enums.storage import ExecutionErrorType from app.domain.execution import DomainExecution, ResourceUsageDomain @@ -93,7 +92,7 @@ def test_to_result_without_resource_usage(self): assert result.lang == "javascript" assert result.lang_version == "20" assert result.exit_code == 1 - assert result.error_type == ErrorType.SCRIPT_ERROR + assert result.error_type == ExecutionErrorType.SCRIPT_ERROR assert result.resource_usage is None def test_to_result_with_script_error(self): @@ -106,7 +105,7 @@ def test_to_result_with_script_error(self): result = ExecutionApiMapper.to_result(execution) - assert result.error_type == ErrorType.SCRIPT_ERROR + assert result.error_type == ExecutionErrorType.SCRIPT_ERROR def test_to_result_with_timeout_error(self): """Test converting domain execution with timeout error.""" @@ -118,8 +117,7 @@ def test_to_result_with_timeout_error(self): result = ExecutionApiMapper.to_result(execution) - # TIMEOUT maps to SYSTEM_ERROR - assert result.error_type == ErrorType.SYSTEM_ERROR + assert result.error_type == ExecutionErrorType.TIMEOUT def test_to_result_with_resource_limit_error(self): """Test converting domain execution with resource limit error.""" @@ -131,8 +129,7 @@ def test_to_result_with_resource_limit_error(self): result = ExecutionApiMapper.to_result(execution) - # RESOURCE_LIMIT maps to SYSTEM_ERROR - assert result.error_type == ErrorType.SYSTEM_ERROR + assert result.error_type == ExecutionErrorType.RESOURCE_LIMIT def test_to_result_with_system_error(self): """Test converting domain execution with system error.""" @@ -144,8 +141,7 @@ def test_to_result_with_system_error(self): result = ExecutionApiMapper.to_result(execution) - # SYSTEM_ERROR maps to SYSTEM_ERROR - assert result.error_type == ErrorType.SYSTEM_ERROR + assert result.error_type == ExecutionErrorType.SYSTEM_ERROR def test_to_result_with_permission_denied_error(self): """Test converting domain execution with permission denied error.""" @@ -157,8 +153,7 @@ def test_to_result_with_permission_denied_error(self): result = ExecutionApiMapper.to_result(execution) - # PERMISSION_DENIED maps to SYSTEM_ERROR - assert result.error_type == ErrorType.SYSTEM_ERROR + assert result.error_type == ExecutionErrorType.PERMISSION_DENIED def test_to_result_with_no_error_type(self): """Test converting domain execution with no error type.""" @@ -172,19 +167,6 @@ def test_to_result_with_no_error_type(self): assert result.error_type is None - def test_to_result_with_invalid_resource_usage(self): - """Test converting domain execution with non-ResourceUsageDomain object.""" - execution = DomainExecution( - execution_id="exec-007", - status=ExecutionStatus.COMPLETED, - resource_usage="invalid", # Not a ResourceUsageDomain - ) - - result = ExecutionApiMapper.to_result(execution) - - # Should handle gracefully and set resource_usage to None - assert result.resource_usage is None - def test_to_result_minimal(self): """Test converting minimal domain execution to result.""" execution = DomainExecution( diff --git a/backend/tests/unit/services/sse/test_redis_bus.py b/backend/tests/unit/services/sse/test_redis_bus.py index 91ca3da6..c24598e3 100644 --- a/backend/tests/unit/services/sse/test_redis_bus.py +++ b/backend/tests/unit/services/sse/test_redis_bus.py @@ -6,8 +6,9 @@ pytestmark = pytest.mark.unit -from app.services.sse.redis_bus import SSERedisBus from app.domain.enums.events import EventType +from app.schemas_pydantic.sse import RedisNotificationMessage, RedisSSEMessage +from app.services.sse.redis_bus import SSERedisBus class _DummyEvent: @@ -76,14 +77,13 @@ async def test_publish_and_subscribe_round_trip() -> None: assert ch.endswith("exec-1") # Push to pubsub and read from subscription await r._pubsub.push(ch, payload) - msg = await sub.get(timeout=0.02) - assert msg and msg["event_type"] == str(EventType.EXECUTION_COMPLETED) - assert msg["execution_id"] == "exec-1" - assert isinstance(json.dumps(msg), str) + msg = await sub.get(RedisSSEMessage, timeout=0.02) + assert msg and msg.event_type == EventType.EXECUTION_COMPLETED + assert msg.execution_id == "exec-1" # Non-message / invalid JSON paths await r._pubsub.push(ch, b"not-json") - assert await sub.get(timeout=0.02) is None + assert await sub.get(RedisSSEMessage, timeout=0.02) is None # Close await sub.close() @@ -97,11 +97,22 @@ async def test_notifications_channels() -> None: nsub = await bus.open_notification_subscription("user-1") assert "sse:notif:user-1" in r._pubsub.subscribed - await bus.publish_notification("user-1", {"a": 1}) + notif = RedisNotificationMessage( + notification_id="n1", + severity="low", + status="pending", + tags=[], + subject="test", + body="body", + action_url="", + created_at="2025-01-01T00:00:00Z", + ) + await bus.publish_notification("user-1", notif) ch, payload = r.published[-1] assert ch.endswith("user-1") await r._pubsub.push(ch, payload) - got = await nsub.get(timeout=0.02) - assert got == {"a": 1} + got = await nsub.get(RedisNotificationMessage, timeout=0.02) + assert got is not None + assert got.notification_id == "n1" await nsub.close() diff --git a/backend/tests/unit/services/sse/test_shutdown_manager.py b/backend/tests/unit/services/sse/test_shutdown_manager.py index 721f076d..b39b76eb 100644 --- a/backend/tests/unit/services/sse/test_shutdown_manager.py +++ b/backend/tests/unit/services/sse/test_shutdown_manager.py @@ -27,7 +27,7 @@ async def on_shutdown(event, cid): # noqa: ANN001 done = await mgr.wait_for_shutdown(timeout=1.0) assert done is True status = mgr.get_shutdown_status() - assert status["phase"] == "complete" + assert status.phase == "complete" await asyncio.gather(t1, t2) @@ -46,7 +46,7 @@ async def test_shutdown_force_close_calls_router_stop_and_rejects_new(): assert router.stopped is True assert mgr.is_shutting_down() is True status = mgr.get_shutdown_status() - assert status["draining_connections"] == 0 + assert status.draining_connections == 0 # New connections should be rejected ev2 = await mgr.register_connection("e2", "c2") @@ -63,7 +63,7 @@ async def test_shutdown_force_close_calls_router_stop_and_rejects_new(): async def test_get_shutdown_status_transitions(): m = SSEShutdownManager(drain_timeout=0.01, notification_timeout=0.0, force_close_timeout=0.0) st0 = m.get_shutdown_status() - assert st0["phase"] == "ready" + assert st0.phase == "ready" await m.initiate_shutdown() st1 = m.get_shutdown_status() - assert st1["phase"] in ("draining", "complete", "closing", "notifying") + assert st1.phase in ("draining", "complete", "closing", "notifying") diff --git a/backend/tests/unit/services/sse/test_sse_service.py b/backend/tests/unit/services/sse/test_sse_service.py index b768ddfc..dc8c588c 100644 --- a/backend/tests/unit/services/sse/test_sse_service.py +++ b/backend/tests/unit/services/sse/test_sse_service.py @@ -1,27 +1,36 @@ import asyncio from datetime import datetime, timezone -from typing import Any +from typing import Any, Type import pytest +from pydantic import BaseModel pytestmark = pytest.mark.unit from app.domain.enums.events import EventType from app.domain.execution import DomainExecution, ResourceUsageDomain from app.domain.sse import ShutdownStatus, SSEHealthDomain +from app.schemas_pydantic.sse import RedisNotificationMessage, RedisSSEMessage from app.services.sse.sse_service import SSEService +T = Any # TypeVar for fake + class _FakeSubscription: def __init__(self) -> None: self._q: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue() self.closed = False - async def get(self, timeout: float = 0.5): # noqa: ARG002 + async def get(self, model: Type[BaseModel], timeout: float = 0.5) -> T | None: try: - return await asyncio.wait_for(self._q.get(), timeout=timeout) + raw = await asyncio.wait_for(self._q.get(), timeout=timeout) + if raw is None: + return None + return model.model_validate(raw) except asyncio.TimeoutError: return None + except Exception: + return None async def push(self, msg: dict[str, Any]) -> None: self._q.put_nowait(msg) @@ -121,7 +130,7 @@ async def test_execution_stream_closes_on_failed_event() -> None: assert _decode(stat)["event_type"] == "status" # Push a failed event and ensure stream ends after yielding it - await bus.exec_sub.push({"event_type": str(EventType.EXECUTION_FAILED), "execution_id": "exec-1", "data": {}}) + await bus.exec_sub.push({"event_type": EventType.EXECUTION_FAILED, "execution_id": "exec-1", "data": {}}) failed = await agen.__anext__() assert _decode(failed)["event_type"] == str(EventType.EXECUTION_FAILED) @@ -153,7 +162,7 @@ async def test_execution_stream_result_stored_includes_result_payload() -> None: await agen.__anext__() # connected await agen.__anext__() # status - await bus.exec_sub.push({"event_type": str(EventType.RESULT_STORED), "execution_id": "exec-2", "data": {}}) + await bus.exec_sub.push({"event_type": EventType.RESULT_STORED, "execution_id": "exec-2", "data": {}}) evt = await agen.__anext__() data = _decode(evt) assert data["event_type"] == str(EventType.RESULT_STORED) @@ -180,8 +189,17 @@ async def test_notification_stream_connected_and_heartbeat_and_message() -> None hb = await agen.__anext__() assert _decode(hb)["event_type"] == "heartbeat" - # Push a notification payload - await bus.notif_sub.push({"notification_id": "n1", "subject": "s", "body": "b"}) + # Push a notification payload (must match RedisNotificationMessage schema) + await bus.notif_sub.push({ + "notification_id": "n1", + "severity": "low", + "status": "pending", + "tags": [], + "subject": "s", + "body": "b", + "action_url": "", + "created_at": "2025-01-01T00:00:00Z", + }) notif = await agen.__anext__() assert _decode(notif)["event_type"] == "notification" diff --git a/backend/tests/unit/services/sse/test_sse_shutdown_manager.py b/backend/tests/unit/services/sse/test_sse_shutdown_manager.py index 9ff7836b..825e98ad 100644 --- a/backend/tests/unit/services/sse/test_sse_shutdown_manager.py +++ b/backend/tests/unit/services/sse/test_sse_shutdown_manager.py @@ -31,7 +31,7 @@ async def test_register_unregister_and_shutdown_flow() -> None: from tests.helpers.eventually import eventually async def _is_notifying(): - return mgr.get_shutdown_status()["phase"] == "notifying" + return mgr.get_shutdown_status().phase == "notifying" await eventually(_is_notifying, timeout=1.0, interval=0.02) @@ -42,7 +42,7 @@ async def _is_notifying(): await mgr.unregister_connection("exec-1", "c2") await task - assert mgr.get_shutdown_status()["complete"] is True + assert mgr.get_shutdown_status().complete is True @pytest.mark.asyncio diff --git a/docs/reference/openapi.json b/docs/reference/openapi.json index 2e243585..e0053102 100644 --- a/docs/reference/openapi.json +++ b/docs/reference/openapi.json @@ -366,16 +366,19 @@ "schema": { "anyOf": [ { - "type": "string" + "type": "array", + "items": { + "$ref": "#/components/schemas/EventType" + } }, { "type": "null" } ], - "description": "Comma-separated event types to filter", + "description": "Event types to filter", "title": "Event Types" }, - "description": "Comma-separated event types to filter" + "description": "Event types to filter" }, { "name": "limit", @@ -1160,8 +1163,7 @@ "content": { "application/json": { "schema": { - "type": "object", - "title": "Response Liveness Api V1 Health Live Get" + "$ref": "#/components/schemas/LivenessResponse" } } } @@ -1183,8 +1185,7 @@ "content": { "application/json": { "schema": { - "type": "object", - "title": "Response Readiness Api V1 Health Ready Get" + "$ref": "#/components/schemas/ReadinessResponse" } } } @@ -2359,171 +2360,6 @@ } } }, - "/api/v1/admin/events/{event_id}": { - "get": { - "tags": [ - "admin-events" - ], - "summary": "Get Event Detail", - "operationId": "get_event_detail_api_v1_admin_events__event_id__get", - "parameters": [ - { - "name": "event_id", - "in": "path", - "required": true, - "schema": { - "type": "string", - "title": "Event Id" - } - } - ], - "responses": { - "200": { - "description": "Successful Response", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/EventDetailResponse" - } - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - }, - "delete": { - "tags": [ - "admin-events" - ], - "summary": "Delete Event", - "operationId": "delete_event_api_v1_admin_events__event_id__delete", - "parameters": [ - { - "name": "event_id", - "in": "path", - "required": true, - "schema": { - "type": "string", - "title": "Event Id" - } - } - ], - "responses": { - "200": { - "description": "Successful Response", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/EventDeleteResponse" - } - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - } - }, - "/api/v1/admin/events/replay": { - "post": { - "tags": [ - "admin-events" - ], - "summary": "Replay Events", - "operationId": "replay_events_api_v1_admin_events_replay_post", - "requestBody": { - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/EventReplayRequest" - } - } - }, - "required": true - }, - "responses": { - "200": { - "description": "Successful Response", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/EventReplayResponse" - } - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - } - }, - "/api/v1/admin/events/replay/{session_id}/status": { - "get": { - "tags": [ - "admin-events" - ], - "summary": "Get Replay Status", - "operationId": "get_replay_status_api_v1_admin_events_replay__session_id__status_get", - "parameters": [ - { - "name": "session_id", - "in": "path", - "required": true, - "schema": { - "type": "string", - "title": "Session Id" - } - } - ], - "responses": { - "200": { - "description": "Successful Response", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/EventReplayStatusResponse" - } - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - } - }, "/api/v1/admin/events/export/csv": { "get": { "tags": [ @@ -2799,6 +2635,171 @@ } } }, + "/api/v1/admin/events/{event_id}": { + "get": { + "tags": [ + "admin-events" + ], + "summary": "Get Event Detail", + "operationId": "get_event_detail_api_v1_admin_events__event_id__get", + "parameters": [ + { + "name": "event_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Event Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EventDetailResponse" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + }, + "delete": { + "tags": [ + "admin-events" + ], + "summary": "Delete Event", + "operationId": "delete_event_api_v1_admin_events__event_id__delete", + "parameters": [ + { + "name": "event_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Event Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EventDeleteResponse" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, + "/api/v1/admin/events/replay": { + "post": { + "tags": [ + "admin-events" + ], + "summary": "Replay Events", + "operationId": "replay_events_api_v1_admin_events_replay_post", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EventReplayRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EventReplayResponse" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, + "/api/v1/admin/events/replay/{session_id}/status": { + "get": { + "tags": [ + "admin-events" + ], + "summary": "Get Replay Status", + "operationId": "get_replay_status_api_v1_admin_events_replay__session_id__status_get", + "parameters": [ + { + "name": "session_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "title": "Session Id" + } + } + ], + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EventReplayStatusResponse" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, "/api/v1/admin/settings/": { "get": { "tags": [ @@ -3141,8 +3142,7 @@ "content": { "application/json": { "schema": { - "type": "object", - "title": "Response Delete User Api V1 Admin Users User Id Delete" + "$ref": "#/components/schemas/DeleteUserResponse" } } } @@ -3281,8 +3281,7 @@ "content": { "application/json": { "schema": { - "type": "object", - "title": "Response Get User Rate Limits Api V1 Admin Users User Id Rate Limits Get" + "$ref": "#/components/schemas/UserRateLimitsResponse" } } } @@ -3333,8 +3332,7 @@ "content": { "application/json": { "schema": { - "type": "object", - "title": "Response Update User Rate Limits Api V1 Admin Users User Id Rate Limits Put" + "$ref": "#/components/schemas/RateLimitUpdateResponse" } } } @@ -4037,7 +4035,7 @@ "sagas" ], "summary": "Get Saga Status", - "description": "Get saga status by ID.\n\nArgs:\n saga_id: The saga identifier\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n \nReturns:\n Saga status response\n \nRaises:\n HTTPException: 404 if saga not found, 403 if access denied", + "description": "Get saga status by ID.\n\nArgs:\n saga_id: The saga identifier\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n\nReturns:\n Saga status response\n\nRaises:\n HTTPException: 404 if saga not found, 403 if access denied", "operationId": "get_saga_status_api_v1_sagas__saga_id__get", "parameters": [ { @@ -4080,7 +4078,7 @@ "sagas" ], "summary": "Get Execution Sagas", - "description": "Get all sagas for an execution.\n\nArgs:\n execution_id: The execution identifier\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n state: Optional state filter\n \nReturns:\n List of sagas for the execution\n \nRaises:\n HTTPException: 403 if access denied", + "description": "Get all sagas for an execution.\n\nArgs:\n execution_id: The execution identifier\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n state: Optional state filter\n\nReturns:\n List of sagas for the execution\n\nRaises:\n HTTPException: 403 if access denied", "operationId": "get_execution_sagas_api_v1_sagas_execution__execution_id__get", "parameters": [ { @@ -4141,7 +4139,7 @@ "sagas" ], "summary": "List Sagas", - "description": "List sagas accessible by the current user.\n\nArgs:\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n state: Optional state filter\n limit: Maximum number of results\n offset: Number of results to skip\n \nReturns:\n Paginated list of sagas", + "description": "List sagas accessible by the current user.\n\nArgs:\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n state: Optional state filter\n limit: Maximum number of results\n offset: Number of results to skip\n\nReturns:\n Paginated list of sagas", "operationId": "list_sagas_api_v1_sagas__get", "parameters": [ { @@ -4216,7 +4214,7 @@ "sagas" ], "summary": "Cancel Saga", - "description": "Cancel a running saga.\n\nArgs:\n saga_id: The saga identifier\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n \nReturns:\n Cancellation response with success status\n \nRaises:\n HTTPException: 404 if not found, 403 if denied, 400 if invalid state", + "description": "Cancel a running saga.\n\nArgs:\n saga_id: The saga identifier\n request: FastAPI request object\n saga_service: Saga service from DI\n auth_service: Auth service from DI\n\nReturns:\n Cancellation response with success status\n\nRaises:\n HTTPException: 404 if not found, 403 if denied, 400 if invalid state", "operationId": "cancel_saga_api_v1_sagas__saga_id__cancel_post", "parameters": [ { @@ -4560,8 +4558,7 @@ "title": "Event" }, "event_type": { - "type": "string", - "title": "Event Type" + "$ref": "#/components/schemas/EventType" }, "original_topic": { "type": "string", @@ -4714,8 +4711,7 @@ "title": "Event Id" }, "event_type": { - "type": "string", - "title": "Event Type" + "$ref": "#/components/schemas/EventType" }, "original_topic": { "type": "string", @@ -4950,10 +4946,32 @@ "type": "object", "required": [ "message", - "execution_id" + "execution_id" + ], + "title": "DeleteResponse", + "description": "Model for execution deletion response." + }, + "DeleteUserResponse": { + "properties": { + "message": { + "type": "string", + "title": "Message" + }, + "deleted_counts": { + "additionalProperties": { + "type": "integer" + }, + "type": "object", + "title": "Deleted Counts" + } + }, + "type": "object", + "required": [ + "message", + "deleted_counts" ], - "title": "DeleteResponse", - "description": "Model for execution deletion response." + "title": "DeleteUserResponse", + "description": "Response model for user deletion." }, "DerivedCounts": { "properties": { @@ -4991,7 +5009,7 @@ "theme": { "type": "string", "title": "Theme", - "default": "one-dark" + "default": "auto" }, "font_size": { "type": "integer", @@ -5017,31 +5035,6 @@ "type": "boolean", "title": "Show Line Numbers", "default": true - }, - "font_family": { - "type": "string", - "title": "Font Family", - "default": "Monaco, Consolas, 'Courier New', monospace" - }, - "auto_complete": { - "type": "boolean", - "title": "Auto Complete", - "default": true - }, - "bracket_matching": { - "type": "boolean", - "title": "Bracket Matching", - "default": true - }, - "highlight_active_line": { - "type": "boolean", - "title": "Highlight Active Line", - "default": true - }, - "default_language": { - "type": "string", - "title": "Default Language", - "default": "python" } }, "type": "object", @@ -5061,16 +5054,6 @@ ], "title": "EndpointGroup" }, - "ErrorType": { - "type": "string", - "enum": [ - "script_error", - "system_error", - "success" - ], - "title": "ErrorType", - "description": "Classification of error types in execution platform." - }, "EventAggregationRequest": { "properties": { "pipeline": { @@ -6029,6 +6012,18 @@ "title": "ExampleScripts", "description": "Model for example scripts." }, + "ExecutionErrorType": { + "type": "string", + "enum": [ + "system_error", + "timeout", + "resource_limit", + "script_error", + "permission_denied" + ], + "title": "ExecutionErrorType", + "description": "Types of execution errors." + }, "ExecutionEventResponse": { "properties": { "event_id": { @@ -6036,8 +6031,7 @@ "title": "Event Id" }, "event_type": { - "type": "string", - "title": "Event Type" + "$ref": "#/components/schemas/EventType" }, "timestamp": { "type": "string", @@ -6244,7 +6238,7 @@ "error_type": { "anyOf": [ { - "$ref": "#/components/schemas/ErrorType" + "$ref": "#/components/schemas/ExecutionErrorType" }, { "type": "null" @@ -6388,6 +6382,55 @@ "type": "object", "title": "HTTPValidationError" }, + "LanguageInfo": { + "properties": { + "versions": { + "items": { + "type": "string" + }, + "type": "array", + "title": "Versions" + }, + "file_ext": { + "type": "string", + "title": "File Ext" + } + }, + "type": "object", + "required": [ + "versions", + "file_ext" + ], + "title": "LanguageInfo", + "description": "Language runtime information." + }, + "LivenessResponse": { + "properties": { + "status": { + "type": "string", + "title": "Status", + "description": "Health status" + }, + "uptime_seconds": { + "type": "integer", + "title": "Uptime Seconds", + "description": "Server uptime in seconds" + }, + "timestamp": { + "type": "string", + "title": "Timestamp", + "description": "ISO timestamp of health check" + } + }, + "type": "object", + "required": [ + "status", + "uptime_seconds", + "timestamp" + ], + "title": "LivenessResponse", + "description": "Response model for liveness probe." + }, "LoginResponse": { "properties": { "message": { @@ -6949,6 +6992,55 @@ ], "title": "RateLimitRule" }, + "RateLimitRuleResponse": { + "properties": { + "endpoint_pattern": { + "type": "string", + "title": "Endpoint Pattern" + }, + "group": { + "type": "string", + "title": "Group" + }, + "requests": { + "type": "integer", + "title": "Requests" + }, + "window_seconds": { + "type": "integer", + "title": "Window Seconds" + }, + "algorithm": { + "type": "string", + "title": "Algorithm" + }, + "burst_multiplier": { + "type": "number", + "title": "Burst Multiplier", + "default": 1.5 + }, + "priority": { + "type": "integer", + "title": "Priority", + "default": 0 + }, + "enabled": { + "type": "boolean", + "title": "Enabled", + "default": true + } + }, + "type": "object", + "required": [ + "endpoint_pattern", + "group", + "requests", + "window_seconds", + "algorithm" + ], + "title": "RateLimitRuleResponse", + "description": "Response model for rate limit rule." + }, "RateLimitSummary": { "properties": { "bypass_rate_limit": { @@ -6988,6 +7080,50 @@ "type": "object", "title": "RateLimitSummary" }, + "RateLimitUpdateResponse": { + "properties": { + "user_id": { + "type": "string", + "title": "User Id" + }, + "updated": { + "type": "boolean", + "title": "Updated" + }, + "config": { + "$ref": "#/components/schemas/UserRateLimitConfigResponse" + } + }, + "type": "object", + "required": [ + "user_id", + "updated", + "config" + ], + "title": "RateLimitUpdateResponse", + "description": "Response model for rate limit update." + }, + "ReadinessResponse": { + "properties": { + "status": { + "type": "string", + "title": "Status", + "description": "Readiness status" + }, + "uptime_seconds": { + "type": "integer", + "title": "Uptime Seconds", + "description": "Server uptime in seconds" + } + }, + "type": "object", + "required": [ + "status", + "uptime_seconds" + ], + "title": "ReadinessResponse", + "description": "Response model for readiness probe." + }, "ReplayAggregateResponse": { "properties": { "dry_run": { @@ -7580,10 +7716,7 @@ }, "supported_runtimes": { "additionalProperties": { - "items": { - "type": "string" - }, - "type": "array" + "$ref": "#/components/schemas/LanguageInfo" }, "type": "object", "title": "Supported Runtimes" @@ -7791,8 +7924,7 @@ "description": "Maximum connections allowed per user" }, "shutdown": { - "type": "object", - "title": "Shutdown", + "$ref": "#/components/schemas/ShutdownStatusResponse", "description": "Shutdown status information" }, "timestamp": { @@ -8212,8 +8344,7 @@ "title": "Timestamp" }, "event_type": { - "type": "string", - "title": "Event Type" + "$ref": "#/components/schemas/EventType" }, "field": { "type": "string", @@ -8281,6 +8412,57 @@ "title": "SettingsHistoryResponse", "description": "Response model for settings history" }, + "ShutdownStatusResponse": { + "properties": { + "phase": { + "type": "string", + "title": "Phase", + "description": "Current shutdown phase" + }, + "initiated": { + "type": "boolean", + "title": "Initiated", + "description": "Whether shutdown has been initiated" + }, + "complete": { + "type": "boolean", + "title": "Complete", + "description": "Whether shutdown is complete" + }, + "active_connections": { + "type": "integer", + "title": "Active Connections", + "description": "Number of active connections" + }, + "draining_connections": { + "type": "integer", + "title": "Draining Connections", + "description": "Number of connections being drained" + }, + "duration": { + "anyOf": [ + { + "type": "number" + }, + { + "type": "null" + } + ], + "title": "Duration", + "description": "Duration of shutdown in seconds" + } + }, + "type": "object", + "required": [ + "phase", + "initiated", + "complete", + "active_connections", + "draining_connections" + ], + "title": "ShutdownStatusResponse", + "description": "Response model for shutdown status." + }, "SortOrder": { "type": "string", "enum": [ @@ -8601,6 +8783,105 @@ ], "title": "UserRateLimit" }, + "UserRateLimitConfigResponse": { + "properties": { + "user_id": { + "type": "string", + "title": "User Id" + }, + "bypass_rate_limit": { + "type": "boolean", + "title": "Bypass Rate Limit" + }, + "global_multiplier": { + "type": "number", + "title": "Global Multiplier" + }, + "rules": { + "items": { + "$ref": "#/components/schemas/RateLimitRuleResponse" + }, + "type": "array", + "title": "Rules" + }, + "created_at": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": "null" + } + ], + "title": "Created At" + }, + "updated_at": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": "null" + } + ], + "title": "Updated At" + }, + "notes": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Notes" + } + }, + "type": "object", + "required": [ + "user_id", + "bypass_rate_limit", + "global_multiplier", + "rules" + ], + "title": "UserRateLimitConfigResponse", + "description": "Response model for user rate limit config." + }, + "UserRateLimitsResponse": { + "properties": { + "user_id": { + "type": "string", + "title": "User Id" + }, + "rate_limit_config": { + "anyOf": [ + { + "$ref": "#/components/schemas/UserRateLimitConfigResponse" + }, + { + "type": "null" + } + ] + }, + "current_usage": { + "additionalProperties": { + "type": "object" + }, + "type": "object", + "title": "Current Usage" + } + }, + "type": "object", + "required": [ + "user_id", + "current_usage" + ], + "title": "UserRateLimitsResponse", + "description": "Response model for user rate limits with usage stats." + }, "UserResponse": { "properties": { "username": { diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 241d03df..9cadc2d3 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -11,8 +11,11 @@ "@babel/runtime": "^7.27.6", "@codemirror/autocomplete": "^6.17.0", "@codemirror/commands": "^6.7.0", + "@codemirror/lang-go": "^6.0.1", + "@codemirror/lang-javascript": "^6.2.4", "@codemirror/lang-python": "^6.1.6", "@codemirror/language": "^6.10.2", + "@codemirror/legacy-modes": "^6.5.2", "@codemirror/state": "^6.4.1", "@codemirror/theme-one-dark": "^6.1.2", "@codemirror/view": "^6.34.1", @@ -46,6 +49,7 @@ "@babel/runtime": "^7.24.7", "@hey-api/openapi-ts": "0.89.2", "@playwright/test": "^1.52.0", + "@rollup/plugin-alias": "^6.0.0", "@rollup/plugin-typescript": "^12.1.2", "@sveltejs/vite-plugin-svelte": "^5.0.3", "@tailwindcss/forms": "^0.5.11", @@ -208,6 +212,32 @@ "@lezer/common": "^1.1.0" } }, + "node_modules/@codemirror/lang-go": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/@codemirror/lang-go/-/lang-go-6.0.1.tgz", + "integrity": "sha512-7fNvbyNylvqCphW9HD6WFnRpcDjr+KXX/FgqXy5H5ZS0eC5edDljukm/yNgYkwTsgp2busdod50AOTIy6Jikfg==", + "dependencies": { + "@codemirror/autocomplete": "^6.0.0", + "@codemirror/language": "^6.6.0", + "@codemirror/state": "^6.0.0", + "@lezer/common": "^1.0.0", + "@lezer/go": "^1.0.0" + } + }, + "node_modules/@codemirror/lang-javascript": { + "version": "6.2.4", + "resolved": "https://registry.npmjs.org/@codemirror/lang-javascript/-/lang-javascript-6.2.4.tgz", + "integrity": "sha512-0WVmhp1QOqZ4Rt6GlVGwKJN3KW7Xh4H2q8ZZNGZaP6lRdxXJzmjm4FqvmOojVj6khWJHIb9sp7U/72W7xQgqAA==", + "dependencies": { + "@codemirror/autocomplete": "^6.0.0", + "@codemirror/language": "^6.6.0", + "@codemirror/lint": "^6.0.0", + "@codemirror/state": "^6.0.0", + "@codemirror/view": "^6.17.0", + "@lezer/common": "^1.0.0", + "@lezer/javascript": "^1.0.0" + } + }, "node_modules/@codemirror/lang-python": { "version": "6.2.1", "resolved": "https://registry.npmjs.org/@codemirror/lang-python/-/lang-python-6.2.1.tgz", @@ -233,6 +263,14 @@ "style-mod": "^4.0.0" } }, + "node_modules/@codemirror/legacy-modes": { + "version": "6.5.2", + "resolved": "https://registry.npmjs.org/@codemirror/legacy-modes/-/legacy-modes-6.5.2.tgz", + "integrity": "sha512-/jJbwSTazlQEDOQw2FJ8LEEKVS72pU0lx6oM54kGpL8t/NJ2Jda3CZ4pcltiKTdqYSRk3ug1B3pil1gsjA6+8Q==", + "dependencies": { + "@codemirror/language": "^6.0.0" + } + }, "node_modules/@codemirror/lint": { "version": "6.9.2", "resolved": "https://registry.npmjs.org/@codemirror/lint/-/lint-6.9.2.tgz", @@ -969,6 +1007,16 @@ "resolved": "https://registry.npmjs.org/@lezer/common/-/common-1.4.0.tgz", "integrity": "sha512-DVeMRoGrgn/k45oQNu189BoW4SZwgZFzJ1+1TV5j2NJ/KFC83oa/enRqZSGshyeMk5cPWMhsKs9nx+8o0unwGg==" }, + "node_modules/@lezer/go": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@lezer/go/-/go-1.0.1.tgz", + "integrity": "sha512-xToRsYxwsgJNHTgNdStpcvmbVuKxTapV0dM0wey1geMMRc9aggoVyKgzYp41D2/vVOx+Ii4hmE206kvxIXBVXQ==", + "dependencies": { + "@lezer/common": "^1.2.0", + "@lezer/highlight": "^1.0.0", + "@lezer/lr": "^1.3.0" + } + }, "node_modules/@lezer/highlight": { "version": "1.2.3", "resolved": "https://registry.npmjs.org/@lezer/highlight/-/highlight-1.2.3.tgz", @@ -977,6 +1025,16 @@ "@lezer/common": "^1.3.0" } }, + "node_modules/@lezer/javascript": { + "version": "1.5.4", + "resolved": "https://registry.npmjs.org/@lezer/javascript/-/javascript-1.5.4.tgz", + "integrity": "sha512-vvYx3MhWqeZtGPwDStM2dwgljd5smolYD2lR2UyFcHfxbBQebqx8yjmFmxtJ/E6nN6u1D9srOiVWm3Rb4tmcUA==", + "dependencies": { + "@lezer/common": "^1.2.0", + "@lezer/highlight": "^1.1.3", + "@lezer/lr": "^1.3.0" + } + }, "node_modules/@lezer/lr": { "version": "1.4.5", "resolved": "https://registry.npmjs.org/@lezer/lr/-/lr-1.4.5.tgz", @@ -1046,6 +1104,23 @@ "resolved": "https://registry.npmjs.org/@polka/url/-/url-1.0.0-next.29.tgz", "integrity": "sha512-wwQAWhWSuHaag8c4q/KN/vCoeOJYshAIvMQwD4GpSb3OiZklFfvAgmj0VCBBImRpuF/aFgIRzllXlVX93Jevww==" }, + "node_modules/@rollup/plugin-alias": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/@rollup/plugin-alias/-/plugin-alias-6.0.0.tgz", + "integrity": "sha512-tPCzJOtS7uuVZd+xPhoy5W4vThe6KWXNmsFCNktaAh5RTqcLiSfT4huPQIXkgJ6YCOjJHvecOAzQxLFhPxKr+g==", + "dev": true, + "engines": { + "node": ">=20.19.0" + }, + "peerDependencies": { + "rollup": ">=4.0.0" + }, + "peerDependenciesMeta": { + "rollup": { + "optional": true + } + } + }, "node_modules/@rollup/plugin-commonjs": { "version": "29.0.0", "resolved": "https://registry.npmjs.org/@rollup/plugin-commonjs/-/plugin-commonjs-29.0.0.tgz", diff --git a/frontend/package.json b/frontend/package.json index 906e8758..d7f70d76 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -18,8 +18,11 @@ "@babel/runtime": "^7.27.6", "@codemirror/autocomplete": "^6.17.0", "@codemirror/commands": "^6.7.0", + "@codemirror/lang-go": "^6.0.1", + "@codemirror/lang-javascript": "^6.2.4", "@codemirror/lang-python": "^6.1.6", "@codemirror/language": "^6.10.2", + "@codemirror/legacy-modes": "^6.5.2", "@codemirror/state": "^6.4.1", "@codemirror/theme-one-dark": "^6.1.2", "@codemirror/view": "^6.34.1", @@ -53,6 +56,7 @@ "@babel/runtime": "^7.24.7", "@hey-api/openapi-ts": "0.89.2", "@playwright/test": "^1.52.0", + "@rollup/plugin-alias": "^6.0.0", "@rollup/plugin-typescript": "^12.1.2", "@sveltejs/vite-plugin-svelte": "^5.0.3", "@tailwindcss/forms": "^0.5.11", diff --git a/frontend/rollup.config.js b/frontend/rollup.config.js index 8cf1e141..818837e3 100644 --- a/frontend/rollup.config.js +++ b/frontend/rollup.config.js @@ -6,12 +6,26 @@ import postcss from 'rollup-plugin-postcss'; import sveltePreprocess from 'svelte-preprocess'; import replace from '@rollup/plugin-replace'; import typescript from '@rollup/plugin-typescript'; +import alias from '@rollup/plugin-alias'; import dotenv from 'dotenv'; import fs from 'fs'; import https from 'https'; import path from 'path'; import json from '@rollup/plugin-json'; +// Path aliases - must match tsconfig.json paths +const projectRoot = path.resolve('.'); +const aliases = alias({ + entries: [ + { find: '$lib', replacement: path.resolve(projectRoot, 'src/lib') }, + { find: '$components', replacement: path.resolve(projectRoot, 'src/components') }, + { find: '$stores', replacement: path.resolve(projectRoot, 'src/stores') }, + { find: '$routes', replacement: path.resolve(projectRoot, 'src/routes') }, + { find: '$utils', replacement: path.resolve(projectRoot, 'src/utils') }, + { find: '$styles', replacement: path.resolve(projectRoot, 'src/styles') } + ] +}); + dotenv.config(); const production = !process.env.ROLLUP_WATCH; @@ -144,6 +158,8 @@ export default { '@codemirror/language', '@codemirror/autocomplete', '@codemirror/lang-python', + '@codemirror/lang-javascript', + '@codemirror/lang-go', '@codemirror/theme-one-dark', '@uiw/codemirror-theme-github' ] @@ -155,6 +171,7 @@ export default { } }, plugins: [ + aliases, replace({ 'process.env.VITE_BACKEND_URL': JSON.stringify(''), preventAssignment: true @@ -182,7 +199,7 @@ export default { // Prefer ES modules mainFields: ['svelte', 'module', 'browser', 'main'], exportConditions: ['svelte'], - extensions: ['.mjs', '.js', '.json', '.node', '.svelte'] + extensions: ['.mjs', '.js', '.ts', '.json', '.node', '.svelte'] }), commonjs(), !production && { diff --git a/frontend/src/App.svelte b/frontend/src/App.svelte index 12cfc63e..956d912b 100644 --- a/frontend/src/App.svelte +++ b/frontend/src/App.svelte @@ -1,29 +1,29 @@