Skip to content

Commit 31b09d8

Browse files
committed
fixes
1 parent 9cc16c4 commit 31b09d8

File tree

15 files changed

+295
-436
lines changed

15 files changed

+295
-436
lines changed

backend/app/api/routes/admin/events.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010

1111
from app.api.dependencies import admin_user
1212
from app.core.correlation import CorrelationContext
13-
from app.domain.admin import ReplayQuery
1413
from app.domain.enums.events import EventType
14+
from app.domain.replay import ReplayFilter
1515
from app.domain.events.event_models import EventFilter
1616
from app.schemas_pydantic.admin_events import (
1717
EventBrowseRequest,
@@ -153,7 +153,7 @@ async def replay_events(
153153
) -> EventReplayResponse:
154154
try:
155155
replay_correlation_id = f"replay_{CorrelationContext.get_correlation_id()}"
156-
rq = ReplayQuery(
156+
replay_filter = ReplayFilter(
157157
event_ids=request.event_ids,
158158
correlation_id=request.correlation_id,
159159
aggregate_id=request.aggregate_id,
@@ -162,7 +162,7 @@ async def replay_events(
162162
)
163163
try:
164164
result = await service.prepare_or_schedule_replay(
165-
replay_query=rq,
165+
replay_filter=replay_filter,
166166
dry_run=request.dry_run,
167167
replay_correlation_id=replay_correlation_id,
168168
target_service=request.target_service,

backend/app/db/docs/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from app.db.docs.event import (
1010
EventArchiveDocument,
1111
EventDocument,
12-
EventStoreDocument,
1312
)
1413
from app.db.docs.execution import ExecutionDocument, ResourceUsage
1514
from app.db.docs.notification import (
@@ -44,7 +43,6 @@
4443
SagaDocument,
4544
DLQMessageDocument,
4645
EventDocument,
47-
EventStoreDocument,
4846
EventArchiveDocument,
4947
ReplaySessionDocument,
5048
ResourceAllocationDocument,
@@ -74,7 +72,6 @@
7472
"DLQMessageDocument",
7573
# Event
7674
"EventDocument",
77-
"EventStoreDocument",
7875
"EventArchiveDocument",
7976
# Replay
8077
"ReplaySessionDocument",

backend/app/db/docs/event.py

Lines changed: 8 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime, timedelta, timezone
2-
from typing import Any, Dict
2+
from typing import Any
33
from uuid import uuid4
44

55
import pymongo
@@ -28,10 +28,10 @@ class EventMetadata(BaseModel):
2828

2929

3030
class EventDocument(Document):
31-
"""Event document as stored in database.
31+
"""Event document for event browsing/admin system.
3232
33-
Copied from EventInDB schema. Uses extra="allow" to store
34-
additional fields from polymorphic BaseEvent subclasses.
33+
Uses payload dict for flexible event data storage.
34+
This is separate from EventStoreDocument which uses flat structure for Kafka events.
3535
"""
3636

3737
event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
@@ -40,7 +40,7 @@ class EventDocument(Document):
4040
timestamp: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
4141
aggregate_id: Indexed(str) | None = None # type: ignore[valid-type]
4242
metadata: EventMetadata
43-
payload: Dict[str, Any] = Field(default_factory=dict)
43+
payload: dict[str, Any] = Field(default_factory=dict)
4444
stored_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
4545
ttl_expires_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc) + timedelta(days=30))
4646

@@ -86,68 +86,10 @@ class Settings:
8686
]
8787

8888

89-
class EventStoreDocument(Document):
90-
"""Event store document for permanent event storage.
91-
92-
Same structure as EventDocument but in event_store collection.
93-
Uses extra="allow" to store additional fields from polymorphic events.
94-
No TTL index since this is permanent storage.
95-
"""
96-
97-
event_id: Indexed(str, unique=True) = Field(default_factory=lambda: str(uuid4())) # type: ignore[valid-type]
98-
event_type: EventType # Indexed via Settings.indexes
99-
event_version: str = "1.0"
100-
timestamp: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
101-
aggregate_id: Indexed(str) | None = None # type: ignore[valid-type]
102-
metadata: EventMetadata
103-
payload: Dict[str, Any] = Field(default_factory=dict)
104-
stored_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
105-
ttl_expires_at: datetime | None = None
106-
107-
model_config = ConfigDict(from_attributes=True, extra="allow")
108-
109-
class Settings:
110-
name = "event_store"
111-
use_state_management = True
112-
indexes = [
113-
# Compound indexes for common query patterns
114-
IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)], name="idx_event_type_ts"),
115-
IndexModel([("aggregate_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_aggregate_ts"),
116-
IndexModel([("metadata.correlation_id", ASCENDING)], name="idx_meta_correlation"),
117-
IndexModel([("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_user_ts"),
118-
IndexModel([("metadata.service_name", ASCENDING), ("timestamp", DESCENDING)], name="idx_meta_service_ts"),
119-
# Payload sparse indexes
120-
IndexModel([("payload.execution_id", ASCENDING)], name="idx_payload_execution", sparse=True),
121-
IndexModel([("payload.pod_name", ASCENDING)], name="idx_payload_pod", sparse=True),
122-
# Additional compound indexes for query optimization
123-
IndexModel([("event_type", ASCENDING), ("aggregate_id", ASCENDING)], name="idx_events_type_agg"),
124-
IndexModel([("aggregate_id", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_agg_ts"),
125-
IndexModel([("event_type", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_type_ts_asc"),
126-
IndexModel([("metadata.user_id", ASCENDING), ("timestamp", ASCENDING)], name="idx_events_user_ts"),
127-
IndexModel([("metadata.user_id", ASCENDING), ("event_type", ASCENDING)], name="idx_events_user_type"),
128-
IndexModel(
129-
[("event_type", ASCENDING), ("metadata.user_id", ASCENDING), ("timestamp", DESCENDING)],
130-
name="idx_events_type_user_ts",
131-
),
132-
# Text search index
133-
IndexModel(
134-
[
135-
("event_type", pymongo.TEXT),
136-
("metadata.service_name", pymongo.TEXT),
137-
("metadata.user_id", pymongo.TEXT),
138-
("payload", pymongo.TEXT),
139-
],
140-
name="idx_text_search",
141-
language_override="none",
142-
default_language="english",
143-
),
144-
]
145-
146-
14789
class EventArchiveDocument(Document):
14890
"""Archived event with deletion metadata.
14991
150-
Uses extra="allow" to preserve all fields from polymorphic events.
92+
Mirrors EventDocument structure with additional archive metadata.
15193
"""
15294

15395
event_id: Indexed(str, unique=True) # type: ignore[valid-type]
@@ -156,13 +98,14 @@ class EventArchiveDocument(Document):
15698
timestamp: Indexed(datetime) # type: ignore[valid-type]
15799
aggregate_id: str | None = None
158100
metadata: EventMetadata
159-
payload: Dict[str, Any] = Field(default_factory=dict)
101+
payload: dict[str, Any] = Field(default_factory=dict)
160102
stored_at: datetime | None = None
161103
ttl_expires_at: datetime | None = None
162104

163105
# Archive metadata
164106
deleted_at: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
165107
deleted_by: str | None = None
108+
deletion_reason: str | None = None
166109

167110
model_config = ConfigDict(from_attributes=True, extra="allow")
168111

0 commit comments

Comments
 (0)