Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f5fabe3
event repo update
HardMax71 Jan 6, 2026
4c0ea42
small bug fixes (domain vs doc inconsistency) + removed unused function
HardMax71 Jan 6, 2026
6ddf1e5
flaky test fix
HardMax71 Jan 6, 2026
4afcf95
idempotent logic in manager: skipping any alr existing doc, adding on…
HardMax71 Jan 6, 2026
f92148f
idempotent logic in manager: atomic upserts
HardMax71 Jan 6, 2026
34fa313
idempotent logic in manager: atomic upserts
HardMax71 Jan 6, 2026
8c47240
idempotent logic in manager: terminal states ignored
HardMax71 Jan 6, 2026
45e7115
idempotent logic in manager: terminal states ignored
HardMax71 Jan 6, 2026
297f9ef
updated tests, trying to fix flakiness
HardMax71 Jan 7, 2026
3d52bf3
other flaky tests
HardMax71 Jan 7, 2026
1ac646f
faker redis lib added, also removed exclusion of mypy+ruff for /tests
HardMax71 Jan 7, 2026
6d49185
further fixes
HardMax71 Jan 7, 2026
c16a8dd
further fixes
HardMax71 Jan 7, 2026
3266dcf
further fixes
HardMax71 Jan 7, 2026
8d80def
further fixes
HardMax71 Jan 7, 2026
fc63193
further fixes
HardMax71 Jan 7, 2026
8bd6494
further fixes
HardMax71 Jan 7, 2026
04edff1
further fixes
HardMax71 Jan 7, 2026
a382c06
further fixes, new lib for async comms to k8s
HardMax71 Jan 7, 2026
34a08ec
further fixes
HardMax71 Jan 8, 2026
87b078f
further fixes
HardMax71 Jan 8, 2026
522e0e5
init event schemas - from now only in fastapi server, removed from wo…
HardMax71 Jan 8, 2026
23ade68
fixes
HardMax71 Jan 8, 2026
2b2fa54
removed init beanie from all workers, now only main fastapi server do…
HardMax71 Jan 8, 2026
d4adcd2
removed EventStore from 8s worker
HardMax71 Jan 8, 2026
0ac6f8b
fix in test
HardMax71 Jan 8, 2026
06374a4
removed flaky test, also added module scoped init_beanie
HardMax71 Jan 8, 2026
6368e14
scope mismatch in e2e tests
HardMax71 Jan 8, 2026
54bb765
hcl stuff for deploy, also removed separate compose file for CI, now …
HardMax71 Jan 8, 2026
bcdb8ea
frontend e2e ci fix, also asgitransport fix (now testing against real…
HardMax71 Jan 8, 2026
506dcf1
further fixes
HardMax71 Jan 8, 2026
1b4a85b
dumb error fix
HardMax71 Jan 8, 2026
cf513f8
removed cleanup - test workers use separate dbs
HardMax71 Jan 8, 2026
8c53c26
further fixes
HardMax71 Jan 8, 2026
1017815
further fixes
HardMax71 Jan 8, 2026
b011e5e
further fixes
HardMax71 Jan 8, 2026
4a00f3c
further fixes
HardMax71 Jan 8, 2026
e2153bf
urllib 2.6.2 -> 2.6.3
HardMax71 Jan 8, 2026
00e2f42
more tests
HardMax71 Jan 8, 2026
d1da7ed
mypy
HardMax71 Jan 8, 2026
d2bde04
invalid yamls
HardMax71 Jan 8, 2026
78498f6
fixes
HardMax71 Jan 8, 2026
5f5e025
fixes
HardMax71 Jan 8, 2026
229e473
fixes
HardMax71 Jan 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,6 @@ async def cancel_execution(
cancel_request: CancelExecutionRequest,
event_service: FromDishka[KafkaEventService],
) -> CancelResponse:
# Handle terminal states
terminal_states = [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.TIMEOUT]

if execution.status in terminal_states:
raise HTTPException(status_code=400, detail=f"Cannot cancel execution in {str(execution.status)} state")

# Handle idempotency - if already cancelled, return success
if execution.status == ExecutionStatus.CANCELLED:
return CancelResponse(
Expand All @@ -178,6 +172,10 @@ async def cancel_execution(
event_id="-1", # exact event_id unknown
)

# Reject cancellation for other terminal states
if execution.status.is_terminal:
raise HTTPException(status_code=400, detail=f"Cannot cancel execution in {execution.status} state")

settings = get_settings()
payload = {
"execution_id": execution.execution_id,
Expand Down
16 changes: 13 additions & 3 deletions backend/app/db/docs/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,27 @@
class ReplayFilter(BaseModel):
"""Replay filter configuration (embedded document).

Copied from domain/replay/models.py ReplayFilter.
Must match domain/replay/models.py ReplayFilter exactly.
"""

# Event selection filters
event_ids: List[str] | None = None
execution_id: str | None = None
correlation_id: str | None = None
aggregate_id: str | None = None
event_types: List[EventType] | None = None
exclude_event_types: List[EventType] | None = None

# Time range
start_time: datetime | None = None
end_time: datetime | None = None

# Metadata filters
user_id: str | None = None
service_name: str | None = None

# Escape hatch for complex queries
custom_query: Dict[str, Any] | None = None
exclude_event_types: List[EventType] | None = None

model_config = ConfigDict(from_attributes=True)

Expand All @@ -43,7 +53,7 @@ class ReplayConfig(BaseModel):
batch_size: int = Field(default=100, ge=1, le=1000)
max_events: int | None = Field(default=None, ge=1)

target_topics: Dict[str, str] | None = None # EventType -> topic mapping as strings
target_topics: Dict[EventType, str] | None = None
target_file_path: str | None = None

skip_errors: bool = True
Expand Down
124 changes: 13 additions & 111 deletions backend/app/db/repositories/event_repository.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging
from dataclasses import asdict
from datetime import datetime, timedelta, timezone
from dataclasses import asdict, fields
from datetime import datetime, timezone
from typing import Any, Mapping

from beanie.odm.enums import SortDirection
from beanie.operators import GTE, LT, LTE, In, Not, Or, RegEx
from beanie.operators import GTE, LTE, In, Not, Or, RegEx

from app.core.tracing import EventAttributes
from app.core.tracing.utils import add_span_attributes
Expand Down Expand Up @@ -55,50 +55,12 @@ async def store_event(self, event: Event) -> str:
self.logger.debug(f"Stored event {event.event_id} of type {event.event_type}")
return event.event_id

async def store_events_batch(self, events: list[Event]) -> list[str]:
if not events:
return []
now = datetime.now(timezone.utc)
docs = []
for event in events:
data = asdict(event)
if not data.get("stored_at"):
data["stored_at"] = now
# Remove None values so EventDocument defaults can apply
data = {k: v for k, v in data.items() if v is not None}
docs.append(EventDocument(**data))
await EventDocument.insert_many(docs)
add_span_attributes(**{"events.batch.count": len(events)})
self.logger.info(f"Stored {len(events)} events in batch")
return [event.event_id for event in events]

async def get_event(self, event_id: str) -> Event | None:
doc = await EventDocument.find_one({"event_id": event_id})
if not doc:
return None
return Event(**doc.model_dump(exclude={"id", "revision_id"}))

async def get_events_by_type(
self,
event_type: str,
start_time: datetime | None = None,
end_time: datetime | None = None,
limit: int = 100,
skip: int = 0,
) -> list[Event]:
conditions = [
EventDocument.event_type == event_type,
*self._time_conditions(start_time, end_time),
]
docs = (
await EventDocument.find(*conditions)
.sort([("timestamp", SortDirection.DESCENDING)])
.skip(skip)
.limit(limit)
.to_list()
)
return [Event(**d.model_dump(exclude={"id", "revision_id"})) for d in docs]

async def get_events_by_aggregate(
self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100
) -> list[Event]:
Expand All @@ -125,30 +87,6 @@ async def get_events_by_correlation(self, correlation_id: str, limit: int = 100,
has_more=(skip + limit) < total_count,
)

async def get_events_by_user(
self,
user_id: str,
event_types: list[str] | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
limit: int = 100,
skip: int = 0,
) -> list[Event]:
conditions = [
EventDocument.metadata.user_id == user_id,
In(EventDocument.event_type, event_types) if event_types else None,
*self._time_conditions(start_time, end_time),
]
conditions = [c for c in conditions if c is not None]
docs = (
await EventDocument.find(*conditions)
.sort([("timestamp", SortDirection.DESCENDING)])
.skip(skip)
.limit(limit)
.to_list()
)
return [Event(**d.model_dump(exclude={"id", "revision_id"})) for d in docs]

async def get_execution_events(
self, execution_id: str, limit: int = 100, skip: int = 0, exclude_system_events: bool = False
) -> EventListResult:
Expand Down Expand Up @@ -240,26 +178,6 @@ async def get_event_statistics(

return EventStatistics(total_events=0, events_by_type={}, events_by_service={}, events_by_hour=[])

async def cleanup_old_events(
self, older_than_days: int = 30, event_types: list[str] | None = None, dry_run: bool = False
) -> int:
cutoff_dt = datetime.now(timezone.utc) - timedelta(days=older_than_days)
conditions: list[Any] = [
LT(EventDocument.timestamp, cutoff_dt),
In(EventDocument.event_type, event_types) if event_types else None,
]
conditions = [c for c in conditions if c is not None]

if dry_run:
count = await EventDocument.find(*conditions).count()
self.logger.info(f"Would delete {count} events older than {older_than_days} days")
return count

result = await EventDocument.find(*conditions).delete()
deleted_count = result.deleted_count if result else 0
self.logger.info(f"Deleted {deleted_count} events older than {older_than_days} days")
return deleted_count

async def get_user_events_paginated(
self,
user_id: str,
Expand Down Expand Up @@ -290,9 +208,6 @@ async def get_user_events_paginated(
has_more=(skip + limit) < total_count,
)

async def count_events(self, *conditions: Any) -> int:
return await EventDocument.find(*conditions).count()

async def query_events(
self,
query: dict[str, Any],
Expand Down Expand Up @@ -338,15 +253,7 @@ async def delete_event_with_archival(

deleted_at = datetime.now(timezone.utc)
archived_doc = EventArchiveDocument(
event_id=doc.event_id,
event_type=doc.event_type,
event_version=doc.event_version,
timestamp=doc.timestamp,
metadata=doc.metadata,
payload=doc.payload,
aggregate_id=doc.aggregate_id,
stored_at=doc.stored_at,
ttl_expires_at=doc.ttl_expires_at,
**doc.model_dump(exclude={"id", "revision_id"}),
deleted_at=deleted_at,
deleted_by=deleted_by,
deletion_reason=deletion_reason,
Expand All @@ -360,9 +267,6 @@ async def delete_event_with_archival(
deletion_reason=deletion_reason,
)

async def get_aggregate_events_for_replay(self, aggregate_id: str, limit: int = 10000) -> list[Event]:
return await self.get_events_by_aggregate(aggregate_id=aggregate_id, limit=limit)

async def get_aggregate_replay_info(self, aggregate_id: str) -> EventReplayInfo | None:
pipeline = [
{"$match": {"aggregate_id": aggregate_id}},
Expand All @@ -380,14 +284,12 @@ async def get_aggregate_replay_info(self, aggregate_id: str) -> EventReplayInfo
{"$project": {"_id": 0}},
]

async for doc in EventDocument.aggregate(pipeline):
events = [Event(**e) for e in doc["events"]]
return EventReplayInfo(
events=events,
event_count=doc["event_count"],
event_types=doc["event_types"],
start_time=doc["start_time"],
end_time=doc["end_time"],
)

return None
doc = await anext(EventDocument.aggregate(pipeline), None)
if not doc:
return None
# Only pass keys that Event dataclass accepts (filters out _id, revision_id, etc.)
event_keys = {f.name for f in fields(Event)}
return EventReplayInfo(
events=[Event(**{k: v for k, v in e.items() if k in event_keys}) for e in doc["events"]],
**{k: v for k, v in doc.items() if k != "events"},
)
3 changes: 0 additions & 3 deletions backend/app/db/repositories/replay_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ async def delete_old_sessions(self, cutoff_time: datetime) -> int:
).delete()
return result.deleted_count if result else 0

async def count_sessions(self, *conditions: Any) -> int:
return await ReplaySessionDocument.find(*conditions).count()

async def update_replay_session(self, session_id: str, updates: ReplaySessionUpdate) -> bool:
update_dict = {k: (v.value if hasattr(v, "value") else v) for k, v in asdict(updates).items() if v is not None}
if not update_dict:
Expand Down
30 changes: 21 additions & 9 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from confluent_kafka import Consumer, KafkaError, Message, Producer
from opentelemetry.trace import SpanKind
from pymongo.errors import DuplicateKeyError

from app.core.lifecycle import LifecycleEnabled
from app.core.metrics.context import get_dlq_metrics
Expand Down Expand Up @@ -251,8 +252,9 @@ async def _process_dlq_message(self, message: DLQMessage) -> None:
self.logger.info("Message filtered out", extra={"event_id": message.event_id})
return

# Store in MongoDB via Beanie
await self._store_message(message)
# Store in MongoDB via Beanie (returns False if already processed)
if not await self._store_message(message):
return

# Get retry policy for topic
retry_policy = self._retry_policies.get(message.original_topic, self.default_retry_policy)
Expand All @@ -275,18 +277,26 @@ async def _process_dlq_message(self, message: DLQMessage) -> None:
if retry_policy.strategy == RetryStrategy.IMMEDIATE:
await self._retry_message(message)

async def _store_message(self, message: DLQMessage) -> None:
# Ensure message has proper status and timestamps
async def _store_message(self, message: DLQMessage) -> bool:
"""Store message. Skip only if already terminal (DISCARDED/RETRIED)."""
existing = await DLQMessageDocument.find_one({"event_id": message.event_id})

if existing and existing.status in {DLQMessageStatus.DISCARDED, DLQMessageStatus.RETRIED}:
return False

message.status = DLQMessageStatus.PENDING
message.last_updated = datetime.now(timezone.utc)

doc = self._message_to_doc(message)

# Upsert using Beanie
existing = await DLQMessageDocument.find_one({"event_id": message.event_id})
if existing:
doc.id = existing.id
await doc.save()

try:
await doc.save()
except DuplicateKeyError:
return False # Lost race - Kafka will redeliver

return True

async def _update_message_status(self, event_id: str, update: DLQMessageUpdate) -> None:
doc = await DLQMessageDocument.find_one({"event_id": event_id})
Expand Down Expand Up @@ -467,11 +477,13 @@ def create_dlq_manager(
dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE,
retry_topic_suffix: str = "-retry",
default_retry_policy: RetryPolicy | None = None,
group_id_suffix: str | None = None,
) -> DLQManager:
suffix = group_id_suffix or settings.KAFKA_GROUP_SUFFIX
consumer = Consumer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"group.id": f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
"group.id": f"{GroupId.DLQ_MANAGER}.{suffix}",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"client.id": "dlq-manager-consumer",
Expand Down
11 changes: 11 additions & 0 deletions backend/app/domain/enums/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,14 @@ class ExecutionStatus(StringEnum):
TIMEOUT = "timeout"
CANCELLED = "cancelled"
ERROR = "error"

@property
def is_terminal(self) -> bool:
"""True if this status represents a final state (no further transitions)."""
return self in (
ExecutionStatus.COMPLETED,
ExecutionStatus.FAILED,
ExecutionStatus.TIMEOUT,
ExecutionStatus.CANCELLED,
ExecutionStatus.ERROR,
)
2 changes: 1 addition & 1 deletion backend/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,4 @@ class Settings(BaseSettings):

@lru_cache(maxsize=1)
def get_settings() -> Settings:
return Settings() # type: ignore[call-arg]
return Settings()
9 changes: 7 additions & 2 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ packages = ["app", "workers"]
[dependency-groups]
dev = [
"coverage==7.13.0",
"fakeredis>=2.33.0",
"hypothesis==6.103.4",
"iniconfig==2.0.0",
"matplotlib==3.10.8",
Expand Down Expand Up @@ -182,8 +183,12 @@ warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
disable_error_code = ["import-untyped", "import-not-found"]
# TODO: REMOVE NEXT LINE
exclude = '(^tests/|/tests/)'
plugins = ["pydantic.mypy"]

[tool.pydantic-mypy]
init_forbid_extra = true
init_typed = true
warn_required_dynamic_aliases = true

# Pytest configuration
[tool.pytest.ini_options]
Expand Down
Loading
Loading