Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dataclasses import asdict
from datetime import datetime
from typing import Annotated

Expand Down Expand Up @@ -206,8 +205,8 @@ async def get_replay_status(session_id: str, service: FromDishka[AdminEventsServ
execution_results = status.execution_results
return EventReplayStatusResponse(
**{
**asdict(session),
"status": session.status.value,
**session.model_dump(),
"status": session.status,
"estimated_completion": estimated_completion,
"execution_results": execution_results,
}
Expand Down
8 changes: 6 additions & 2 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from app.core.utils import get_client_ip
from app.domain.enums.common import SortOrder
from app.domain.events.event_models import EventFilter
from app.domain.events.typed import BaseEvent
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.schemas_pydantic.events import (
DeleteEventResponse,
Expand Down Expand Up @@ -296,7 +297,7 @@ async def delete_event(
"admin_email": admin.email,
"event_type": result.event_type,
"aggregate_id": result.aggregate_id,
"correlation_id": result.correlation_id,
"correlation_id": result.metadata.correlation_id,
},
)

Expand Down Expand Up @@ -345,9 +346,12 @@ async def replay_aggregate_events(
service_version=settings.SERVICE_VERSION,
user_id=admin.user_id,
)
# Extract payload fields (exclude base event fields + event_type discriminator)
base_fields = set(BaseEvent.model_fields.keys()) | {"event_type"}
extra_fields = {k: v for k, v in event.model_dump().items() if k not in base_fields}
await kafka_event_service.publish_event(
event_type=event.event_type,
payload=event.payload,
payload=extra_fields,
aggregate_id=aggregate_id,
correlation_id=replay_correlation_id,
metadata=meta,
Expand Down
4 changes: 2 additions & 2 deletions backend/app/api/routes/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class LivenessResponse(BaseModel):

status: str = Field(description="Health status")
uptime_seconds: int = Field(description="Server uptime in seconds")
timestamp: str = Field(description="ISO timestamp of health check")
timestamp: datetime = Field(description="Timestamp of health check")


class ReadinessResponse(BaseModel):
Expand All @@ -31,7 +31,7 @@ async def liveness() -> LivenessResponse:
return LivenessResponse(
status="ok",
uptime_seconds=int(time.time() - _START_TIME),
timestamp=datetime.now(timezone.utc).isoformat(),
timestamp=datetime.now(timezone.utc),
)


Expand Down
4 changes: 1 addition & 3 deletions backend/app/api/routes/replay.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from dataclasses import asdict

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, Query
Expand Down Expand Up @@ -65,7 +63,7 @@ async def list_replay_sessions(
limit: int = Query(100, ge=1, le=1000),
) -> list[SessionSummary]:
return [
SessionSummary.model_validate({**asdict(s), **asdict(s)["config"]})
SessionSummary.model_validate({**s.model_dump(), **s.model_dump()["config"]})
for s in service.list_sessions(status=status, limit=limit)
]

Expand Down
39 changes: 12 additions & 27 deletions backend/app/db/docs/event.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
from datetime import datetime, timedelta, timezone
from typing import Any
from uuid import uuid4

import pymongo
from beanie import Document, Indexed
from pydantic import BaseModel, ConfigDict, Field
from pydantic import ConfigDict, Field
from pymongo import ASCENDING, DESCENDING, IndexModel

from app.domain.enums.common import Environment
from app.domain.enums.events import EventType


# Pydantic model required here because Beanie embedded documents must be Pydantic BaseModel subclasses.
# This is NOT an API schema - it defines the MongoDB subdocument structure.
class EventMetadata(BaseModel):
"""Event metadata embedded document for Beanie storage."""

model_config = ConfigDict(from_attributes=True)

service_name: str
service_version: str
correlation_id: str = Field(default_factory=lambda: str(uuid4()))
user_id: str | None = None
ip_address: str | None = None
user_agent: str | None = None
environment: Environment = Environment.PRODUCTION
from app.domain.events.typed import EventMetadata


class EventDocument(Document):
"""Event document for event browsing/admin system.

Uses payload dict for flexible event data storage.
This is separate from EventStoreDocument which uses flat structure for Kafka events.
Uses extra='allow' for flexible event data storage - event-specific fields
are stored directly at document level (no payload wrapper needed).
"""

event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
Expand All @@ -40,10 +23,12 @@ class EventDocument(Document):
timestamp: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
aggregate_id: Indexed(str) | None = None # type: ignore[valid-type]
metadata: EventMetadata
payload: dict[str, Any] = Field(default_factory=dict)
stored_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
ttl_expires_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(days=30))

# Most event types have execution_id (sparse-indexed)
execution_id: str | None = None

model_config = ConfigDict(from_attributes=True, extra="allow")

class Settings:
Expand All @@ -56,9 +41,9 @@ class Settings:
IndexModel([("metadata.correlation_id", ASCENDING)], name="idx_meta_correlation"),
IndexModel([("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_user_ts"),
IndexModel([("metadata.service_name", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_service_ts"),
# Payload sparse indexes
IndexModel([("payload.execution_id", ASCENDING)], name="idx_payload_execution", sparse=True),
IndexModel([("payload.pod_name", ASCENDING)], name="idx_payload_pod", sparse=True),
# Event-specific field indexes (sparse - only exist on relevant event types)
IndexModel([("execution_id", ASCENDING)], name="idx_execution_id", sparse=True),
IndexModel([("pod_name", ASCENDING)], name="idx_pod_name", sparse=True),
# TTL index (expireAfterSeconds=0 means use ttl_expires_at value directly)
IndexModel([("ttl_expires_at", ASCENDING)], name="idx_ttl", expireAfterSeconds=0),
# Additional compound indexes for query optimization
Expand All @@ -77,7 +62,7 @@ class Settings:
("event_type", pymongo.TEXT),
("metadata.service_name", pymongo.TEXT),
("metadata.user_id", pymongo.TEXT),
("payload", pymongo.TEXT),
("execution_id", pymongo.TEXT),
],
name="idx_text_search",
language_override="none",
Expand All @@ -90,6 +75,7 @@ class EventArchiveDocument(Document):
"""Archived event with deletion metadata.

Mirrors EventDocument structure with additional archive metadata.
Uses extra='allow' for event-specific fields.
"""

event_id: Indexed(str, unique=True) # type: ignore[valid-type]
Expand All @@ -98,7 +84,6 @@ class EventArchiveDocument(Document):
timestamp: Indexed(datetime) # type: ignore[valid-type]
aggregate_id: str | None = None
metadata: EventMetadata
payload: dict[str, Any] = Field(default_factory=dict)
stored_at: datetime | None = None
ttl_expires_at: datetime | None = None

Expand Down
64 changes: 15 additions & 49 deletions backend/app/db/repositories/admin/admin_events_repository.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dataclasses import asdict
from datetime import datetime, timedelta, timezone
from typing import Any

Expand All @@ -14,9 +13,8 @@
from app.domain.admin import ReplaySessionData, ReplaySessionStatusDetail
from app.domain.admin.replay_updates import ReplaySessionUpdate
from app.domain.enums.replay import ReplayStatus
from app.domain.events import EventMetadata as DomainEventMetadata
from app.domain.events.event_models import (
Event,
from app.domain.events import (
DomainEvent,
EventBrowseResult,
EventDetail,
EventExportRow,
Expand All @@ -25,9 +23,10 @@
EventSummary,
HourlyEventCount,
UserEventCount,
domain_event_adapter,
)
from app.domain.events.query_builders import EventStatsAggregation
from app.domain.replay.models import ReplayConfig, ReplayFilter, ReplaySessionState
from app.domain.replay.models import ReplayFilter, ReplaySessionState


class AdminEventsRepository:
Expand Down Expand Up @@ -58,15 +57,7 @@ async def browse_events(
total = await query.count()

docs = await query.sort([(sort_by, sort_order)]).skip(skip).limit(limit).to_list()
events = [
Event(
**{
**d.model_dump(exclude={"id", "revision_id"}),
"metadata": DomainEventMetadata(**d.metadata.model_dump()),
}
)
for d in docs
]
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]

return EventBrowseResult(events=events, total=total, skip=skip, limit=limit)

Expand All @@ -75,12 +66,7 @@ async def get_event_detail(self, event_id: str) -> EventDetail | None:
if not doc:
return None

event = Event(
**{
**doc.model_dump(exclude={"id", "revision_id"}),
"metadata": DomainEventMetadata(**doc.metadata.model_dump()),
}
)
event = domain_event_adapter.validate_python(doc, from_attributes=True)

related_query = {"metadata.correlation_id": doc.metadata.correlation_id, "event_id": {"$ne": event_id}}
related_docs = await (
Expand Down Expand Up @@ -178,7 +164,7 @@ async def export_events_csv(self, event_filter: EventFilter) -> list[EventExport
EventExportRow(
event_id=doc.event_id,
event_type=str(doc.event_type),
timestamp=doc.timestamp.isoformat(),
timestamp=doc.timestamp,
correlation_id=doc.metadata.correlation_id or "",
aggregate_id=doc.aggregate_id or "",
user_id=doc.metadata.user_id or "",
Expand All @@ -189,47 +175,33 @@ async def export_events_csv(self, event_filter: EventFilter) -> list[EventExport
for doc in docs
]

async def archive_event(self, event: Event, deleted_by: str) -> bool:
async def archive_event(self, event: DomainEvent, deleted_by: str) -> bool:
archive_doc = EventArchiveDocument(
event_id=event.event_id,
event_type=event.event_type,
event_version=event.event_version,
timestamp=event.timestamp,
aggregate_id=event.aggregate_id,
metadata=event.metadata,
payload=event.payload,
stored_at=event.stored_at,
ttl_expires_at=event.ttl_expires_at,
**event.model_dump(),
deleted_at=datetime.now(timezone.utc),
deleted_by=deleted_by,
)
await archive_doc.insert()
return True

async def create_replay_session(self, session: ReplaySessionState) -> str:
data = asdict(session)
data["config"] = session.config.model_dump()
doc = ReplaySessionDocument(**data)
doc = ReplaySessionDocument(**session.model_dump())
await doc.insert()
return session.session_id

async def get_replay_session(self, session_id: str) -> ReplaySessionState | None:
doc = await ReplaySessionDocument.find_one({"session_id": session_id})
if not doc:
return None
data = doc.model_dump(exclude={"id", "revision_id"})
data["config"] = ReplayConfig.model_validate(data["config"])
return ReplaySessionState(**data)
return ReplaySessionState.model_validate(doc, from_attributes=True)

async def update_replay_session(self, session_id: str, updates: ReplaySessionUpdate) -> bool:
update_dict = {k: (v.value if hasattr(v, "value") else v) for k, v in asdict(updates).items() if v is not None}
update_dict = updates.model_dump(exclude_none=True)
if not update_dict:
return False

doc = await ReplaySessionDocument.find_one({"session_id": session_id})
if not doc:
return False

await doc.set(update_dict)
return True

Expand Down Expand Up @@ -276,19 +248,15 @@ async def get_replay_status_with_progress(self, session_id: str) -> ReplaySessio
original_query = doc.config.filter.custom_query
original_events = await EventDocument.find(original_query).limit(10).to_list()

execution_ids = set()
for event in original_events:
exec_id = event.payload.get("execution_id") or event.aggregate_id
if exec_id:
execution_ids.add(exec_id)
execution_ids = {event.execution_id for event in original_events if event.execution_id}

for exec_id in list(execution_ids)[:10]:
exec_doc = await ExecutionDocument.find_one({"execution_id": exec_id})
if exec_doc:
execution_results.append(
{
"execution_id": exec_doc.execution_id,
"status": exec_doc.status.value if exec_doc.status else None,
"status": exec_doc.status if exec_doc.status else None,
"stdout": exec_doc.stdout,
"stderr": exec_doc.stderr,
"exit_code": exec_doc.exit_code,
Expand All @@ -300,9 +268,7 @@ async def get_replay_status_with_progress(self, session_id: str) -> ReplaySessio
)

# Convert document to domain
data = doc.model_dump(exclude={"id", "revision_id"})
data["config"] = ReplayConfig.model_validate(data["config"])
session = ReplaySessionState(**data)
session = ReplaySessionState.model_validate(doc, from_attributes=True)

return ReplaySessionStatusDetail(
session=session,
Expand Down
13 changes: 6 additions & 7 deletions backend/app/db/repositories/admin/admin_user_repository.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import re
from dataclasses import asdict
from datetime import datetime, timezone

from beanie.odm.operators.find import BaseFindOperator
Expand All @@ -24,9 +23,9 @@ def __init__(self, security_service: SecurityService) -> None:
self.security_service = security_service

async def create_user(self, create_data: DomainUserCreate) -> User:
doc = UserDocument(**asdict(create_data))
doc = UserDocument(**create_data.model_dump())
await doc.insert()
return User(**doc.model_dump(exclude={"id", "revision_id"}))
return User.model_validate(doc, from_attributes=True)

async def list_users(
self, limit: int = 100, offset: int = 0, search: str | None = None, role: UserRole | None = None
Expand All @@ -48,27 +47,27 @@ async def list_users(
query = UserDocument.find(*conditions)
total = await query.count()
docs = await query.skip(offset).limit(limit).to_list()
users = [User(**doc.model_dump(exclude={"id", "revision_id"})) for doc in docs]
users = [User.model_validate(doc, from_attributes=True) for doc in docs]
return UserListResult(users=users, total=total, offset=offset, limit=limit)

async def get_user_by_id(self, user_id: str) -> User | None:
doc = await UserDocument.find_one({"user_id": user_id})
return User(**doc.model_dump(exclude={"id", "revision_id"})) if doc else None
return User.model_validate(doc, from_attributes=True) if doc else None

async def update_user(self, user_id: str, update_data: UserUpdate) -> User | None:
doc = await UserDocument.find_one({"user_id": user_id})
if not doc:
return None

update_dict = {k: v for k, v in asdict(update_data).items() if v is not None}
update_dict = update_data.model_dump(exclude_none=True)
# Handle password hashing
if "password" in update_dict:
update_dict["hashed_password"] = self.security_service.get_password_hash(update_dict.pop("password"))

if update_dict:
update_dict["updated_at"] = datetime.now(timezone.utc)
await doc.set(update_dict)
return User(**doc.model_dump(exclude={"id", "revision_id"}))
return User.model_validate(doc, from_attributes=True)

async def delete_user(self, user_id: str, cascade: bool = True) -> dict[str, int]:
deleted_counts = {}
Expand Down
Loading