Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ async def replay_aggregate_events(
)
await kafka_event_service.publish_event(
event_type=event.event_type,
payload=event.payload,
payload=event.model_extra or {},
aggregate_id=aggregate_id,
correlation_id=replay_correlation_id,
metadata=meta,
Expand Down
15 changes: 7 additions & 8 deletions backend/app/db/docs/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class EventMetadata(BaseModel):
class EventDocument(Document):
"""Event document for event browsing/admin system.

Uses payload dict for flexible event data storage.
This is separate from EventStoreDocument which uses flat structure for Kafka events.
Uses extra='allow' for flexible event data storage - event-specific fields
are stored directly at document level (no payload wrapper needed).
"""

event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
Expand All @@ -40,7 +40,6 @@ class EventDocument(Document):
timestamp: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
aggregate_id: Indexed(str) | None = None # type: ignore[valid-type]
metadata: EventMetadata
payload: dict[str, Any] = Field(default_factory=dict)
stored_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
ttl_expires_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(days=30))

Expand All @@ -56,9 +55,9 @@ class Settings:
IndexModel([("metadata.correlation_id", ASCENDING)], name="idx_meta_correlation"),
IndexModel([("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_user_ts"),
IndexModel([("metadata.service_name", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_service_ts"),
# Payload sparse indexes
IndexModel([("payload.execution_id", ASCENDING)], name="idx_payload_execution", sparse=True),
IndexModel([("payload.pod_name", ASCENDING)], name="idx_payload_pod", sparse=True),
# Event-specific field indexes (sparse - only exist on relevant event types)
IndexModel([("execution_id", ASCENDING)], name="idx_execution_id", sparse=True),
IndexModel([("pod_name", ASCENDING)], name="idx_pod_name", sparse=True),
# TTL index (expireAfterSeconds=0 means use ttl_expires_at value directly)
IndexModel([("ttl_expires_at", ASCENDING)], name="idx_ttl", expireAfterSeconds=0),
# Additional compound indexes for query optimization
Expand All @@ -77,7 +76,7 @@ class Settings:
("event_type", pymongo.TEXT),
("metadata.service_name", pymongo.TEXT),
("metadata.user_id", pymongo.TEXT),
("payload", pymongo.TEXT),
("execution_id", pymongo.TEXT),
],
name="idx_text_search",
language_override="none",
Expand All @@ -90,6 +89,7 @@ class EventArchiveDocument(Document):
"""Archived event with deletion metadata.

Mirrors EventDocument structure with additional archive metadata.
Uses extra='allow' for event-specific fields.
"""

event_id: Indexed(str, unique=True) # type: ignore[valid-type]
Expand All @@ -98,7 +98,6 @@ class EventArchiveDocument(Document):
timestamp: Indexed(datetime) # type: ignore[valid-type]
aggregate_id: str | None = None
metadata: EventMetadata
payload: dict[str, Any] = Field(default_factory=dict)
stored_at: datetime | None = None
ttl_expires_at: datetime | None = None

Expand Down
12 changes: 2 additions & 10 deletions backend/app/db/repositories/admin/admin_events_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,7 @@ async def export_events_csv(self, event_filter: EventFilter) -> list[EventExport

async def archive_event(self, event: Event, deleted_by: str) -> bool:
archive_doc = EventArchiveDocument(
event_id=event.event_id,
event_type=event.event_type,
event_version=event.event_version,
timestamp=event.timestamp,
aggregate_id=event.aggregate_id,
metadata=event.metadata,
payload=event.payload,
stored_at=event.stored_at,
ttl_expires_at=event.ttl_expires_at,
**vars(event),
deleted_at=datetime.now(timezone.utc),
deleted_by=deleted_by,
)
Expand Down Expand Up @@ -278,7 +270,7 @@ async def get_replay_status_with_progress(self, session_id: str) -> ReplaySessio

execution_ids = set()
for event in original_events:
exec_id = event.payload.get("execution_id") or event.aggregate_id
exec_id = (event.model_extra or {}).get("execution_id") or event.aggregate_id
if exec_id:
execution_ids.add(exec_id)

Expand Down
28 changes: 6 additions & 22 deletions backend/app/db/repositories/event_repository.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from dataclasses import asdict
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping

Expand Down Expand Up @@ -37,12 +36,8 @@ def _build_time_filter(self, start_time: datetime | None, end_time: datetime | N
return {key: value for key, value in {"$gte": start_time, "$lte": end_time}.items() if value is not None}

async def store_event(self, event: Event) -> str:
data = asdict(event)
if not data.get("stored_at"):
data["stored_at"] = datetime.now(timezone.utc)
# Remove None values so EventDocument defaults can apply (e.g., ttl_expires_at)
data = {k: v for k, v in data.items() if v is not None}

data = {k: v for k, v in vars(event).items() if v is not None}
data.setdefault("stored_at", datetime.now(timezone.utc))
doc = EventDocument(**data)
add_span_attributes(
**{
Expand All @@ -61,11 +56,8 @@ async def store_events_batch(self, events: list[Event]) -> list[str]:
now = datetime.now(timezone.utc)
docs = []
for event in events:
data = asdict(event)
if not data.get("stored_at"):
data["stored_at"] = now
# Remove None values so EventDocument defaults can apply
data = {k: v for k, v in data.items() if v is not None}
data = {k: v for k, v in vars(event).items() if v is not None}
data.setdefault("stored_at", now)
docs.append(EventDocument(**data))
await EventDocument.insert_many(docs)
add_span_attributes(**{"events.batch.count": len(events)})
Expand Down Expand Up @@ -154,7 +146,7 @@ async def get_execution_events(
) -> EventListResult:
conditions: list[Any] = [
Or(
EventDocument.payload["execution_id"] == execution_id,
{"execution_id": execution_id},
EventDocument.aggregate_id == execution_id,
),
Not(RegEx(EventDocument.metadata.service_name, "^system-")) if exclude_system_events else None,
Expand Down Expand Up @@ -338,15 +330,7 @@ async def delete_event_with_archival(

deleted_at = datetime.now(timezone.utc)
archived_doc = EventArchiveDocument(
event_id=doc.event_id,
event_type=doc.event_type,
event_version=doc.event_version,
timestamp=doc.timestamp,
metadata=doc.metadata,
payload=doc.payload,
aggregate_id=doc.aggregate_id,
stored_at=doc.stored_at,
ttl_expires_at=doc.ttl_expires_at,
**doc.model_dump(exclude={"id", "revision_id"}),
deleted_at=deleted_at,
deleted_by=deleted_by,
deletion_reason=deletion_reason,
Expand Down
4 changes: 1 addition & 3 deletions backend/app/db/repositories/replay_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ async def fetch_events(

batch = []
async for doc in cursor:
# Merge payload to top level for schema_registry deserialization
d = doc.model_dump(exclude={"id", "revision_id", "stored_at", "ttl_expires_at"})
batch.append({**{k: v for k, v in d.items() if k != "payload"}, **d.get("payload", {})})
batch.append(doc.model_dump(exclude={"id", "revision_id", "stored_at", "ttl_expires_at"}))
if len(batch) >= batch_size:
yield batch
batch = []
Expand Down
16 changes: 11 additions & 5 deletions backend/app/db/repositories/user_settings_repository.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import logging
from dataclasses import asdict
from datetime import datetime
from typing import List

from beanie.odm.enums import SortDirection
from beanie.operators import GT, LTE, In

from app.db.docs import EventDocument, UserSettingsDocument, UserSettingsSnapshotDocument
from app.domain.enums.events import EventType
from app.domain.user.settings_models import DomainUserSettings
from app.domain.user.settings_models import DomainUserSettings, DomainUserSettingsChangedEvent


class UserSettingsRepository:
Expand All @@ -32,11 +31,11 @@ async def create_snapshot(self, settings: DomainUserSettings) -> None:
async def get_settings_events(
self,
user_id: str,
event_types: List[EventType],
event_types: list[EventType],
since: datetime | None = None,
until: datetime | None = None,
limit: int | None = None,
) -> List[EventDocument]:
) -> list[DomainUserSettingsChangedEvent]:
aggregate_id = f"user_settings_{user_id}"
conditions = [
EventDocument.aggregate_id == aggregate_id,
Expand All @@ -50,7 +49,14 @@ async def get_settings_events(
if limit:
find_query = find_query.limit(limit)

return await find_query.to_list()
docs = await find_query.to_list()
return [
DomainUserSettingsChangedEvent(**{
**e.model_dump(exclude={"id", "revision_id", "metadata"}),
"correlation_id": e.metadata.correlation_id,
})
for e in docs
]

async def count_events_since_snapshot(self, user_id: str) -> int:
aggregate_id = f"user_settings_{user_id}"
Expand Down
28 changes: 25 additions & 3 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,7 @@ async def _retry_message(self, message: DLQMessage) -> None:
"dlq_retry_timestamp": datetime.now(timezone.utc).isoformat(),
}
hdrs = inject_trace_context(hdrs)
from typing import cast

kafka_headers = cast(list[tuple[str, str | bytes]], [(k, v.encode()) for k, v in hdrs.items()])
kafka_headers: list[tuple[str, str | bytes]] = [(k, v.encode()) for k, v in hdrs.items()]

# Get the original event
event = message.event
Expand Down Expand Up @@ -490,6 +488,30 @@ async def retry_messages_batch(self, event_ids: list[str]) -> DLQBatchRetryResul

return DLQBatchRetryResult(total=len(event_ids), successful=successful, failed=failed, details=details)

async def discard_message_manually(self, event_id: str, reason: str) -> bool:
"""Manually discard a DLQ message with state validation.

Args:
event_id: The event ID to discard
reason: Reason for discarding

Returns:
True if discarded, False if not found or in terminal state
"""
doc = await DLQMessageDocument.find_one({"event_id": event_id})
if not doc:
self.logger.error("Message not found in DLQ", extra={"event_id": event_id})
return False

# Guard against invalid states (terminal states)
if doc.status in {DLQMessageStatus.DISCARDED, DLQMessageStatus.RETRIED}:
self.logger.info("Skipping manual discard", extra={"event_id": event_id, "status": str(doc.status)})
return False

message = self._doc_to_message(doc)
await self._discard_message(message, reason)
return True


def create_dlq_manager(
settings: Settings,
Expand Down
6 changes: 3 additions & 3 deletions backend/app/domain/events/event_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime
from typing import Any

from pydantic import ConfigDict
from pydantic.dataclasses import dataclass

from app.core.utils import StringEnum
Expand Down Expand Up @@ -40,16 +41,15 @@ class CollectionNames(StringEnum):
DLQ_MESSAGES = "dlq_messages"


@dataclass
@dataclass(config=ConfigDict(extra="allow"))
class Event:
"""Domain model for an event."""
"""Domain model for an event. Uses extra='allow' to store event-specific fields flat."""

event_id: str
event_type: EventType
event_version: str
timestamp: datetime
metadata: EventMetadata
payload: dict[str, Any]
aggregate_id: str | None = None
stored_at: datetime | None = None
ttl_expires_at: datetime | None = None
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/replay/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def to_mongo_query(self) -> Dict[str, Any]:
query["event_id"] = {"$in": self.event_ids}

if self.execution_id:
query["payload.execution_id"] = str(self.execution_id)
query["execution_id"] = str(self.execution_id)

if self.correlation_id:
query["metadata.correlation_id"] = self.correlation_id
Expand Down
4 changes: 2 additions & 2 deletions backend/app/domain/user/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
CachedSettings,
DomainEditorSettings,
DomainNotificationSettings,
DomainSettingsEvent,
DomainSettingsHistoryEntry,
DomainUserSettings,
DomainUserSettingsChangedEvent,
DomainUserSettingsUpdate,
)
from .user_models import (
Expand All @@ -37,7 +37,7 @@
"CSRFValidationError",
"DomainEditorSettings",
"DomainNotificationSettings",
"DomainSettingsEvent",
"DomainUserSettingsChangedEvent",
"DomainSettingsHistoryEntry",
"DomainUserCreate",
"DomainUserSettings",
Expand Down
Loading
Loading