Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ async def get_dlq_message(event_id: str, repository: FromDishka[DLQRepository])
raise HTTPException(status_code=404, detail="Message not found")

return DLQMessageDetail(
event_id=message.event_id,
event_id=message.event.event_id,
event=message.event.model_dump(),
event_type=message.event_type,
event_type=message.event.event_type,
original_topic=message.original_topic,
error=message.error,
retry_count=message.retry_count,
Expand Down
3 changes: 1 addition & 2 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from app.core.utils import get_client_ip
from app.domain.enums.common import SortOrder
from app.domain.events.event_models import EventFilter
from app.domain.events.typed import BaseEvent
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.domain.events.typed import BaseEvent, EventMetadata
from app.schemas_pydantic.events import (
DeleteEventResponse,
EventAggregationRequest,
Expand Down
3 changes: 1 addition & 2 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
from app.domain.enums.events import EventType
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.user import UserRole
from app.domain.events.typed import BaseEvent, EventMetadata
from app.domain.exceptions import DomainError
from app.infrastructure.kafka.events.base import BaseEvent
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.schemas_pydantic.execution import (
CancelExecutionRequest,
CancelResponse,
Expand Down
41 changes: 12 additions & 29 deletions backend/app/db/docs/dlq.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
from datetime import datetime, timezone
from typing import Any

from beanie import Document, Indexed
from pydantic import ConfigDict, Field
from pymongo import ASCENDING, DESCENDING, IndexModel

from app.dlq.models import DLQMessageStatus
from app.domain.enums.events import EventType
from app.domain.events.typed import DomainEvent


class DLQMessageDocument(Document):
"""Unified DLQ message document for the entire system.
"""Unified DLQ message document. Access event_id/event_type via event.event_id, event.event_type."""

Copied from DLQMessage dataclass.
"""

# Core fields - always required
event: dict[str, Any] # The original event as dict (BaseEvent serialized)
event_id: Indexed(str, unique=True) # type: ignore[valid-type]
event_type: EventType # Indexed via Settings.indexes
original_topic: Indexed(str) # type: ignore[valid-type]
error: str # Error message from the failure
retry_count: Indexed(int) # type: ignore[valid-type]
failed_at: Indexed(datetime) # type: ignore[valid-type]
status: DLQMessageStatus # Indexed via Settings.indexes
producer_id: str # ID of the producer that sent to DLQ
model_config = ConfigDict(from_attributes=True)

# Optional fields
event: DomainEvent # Discriminated union - contains event_id, event_type
original_topic: Indexed(str) = "" # type: ignore[valid-type]
error: str = "Unknown error"
retry_count: Indexed(int) = 0 # type: ignore[valid-type]
failed_at: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
status: DLQMessageStatus = DLQMessageStatus.PENDING
producer_id: str = "unknown"
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_updated: datetime | None = None
next_retry_at: Indexed(datetime) | None = None # type: ignore[valid-type]
Expand All @@ -36,25 +29,15 @@ class DLQMessageDocument(Document):
dlq_offset: int | None = None
dlq_partition: int | None = None
last_error: str | None = None

# Kafka message headers (optional)
headers: dict[str, str] = Field(default_factory=dict)

model_config = ConfigDict(from_attributes=True)

class Settings:
name = "dlq_messages"
use_state_management = True
indexes = [
IndexModel([("event_type", ASCENDING)], name="idx_dlq_event_type"),
IndexModel([("event.event_id", ASCENDING)], unique=True, name="idx_dlq_event_id"),
IndexModel([("event.event_type", ASCENDING)], name="idx_dlq_event_type"),
IndexModel([("status", ASCENDING)], name="idx_dlq_status"),
IndexModel([("failed_at", DESCENDING)], name="idx_dlq_failed_desc"),
# TTL index - auto-delete after 7 days
IndexModel([("created_at", ASCENDING)], name="idx_dlq_created_ttl", expireAfterSeconds=7 * 24 * 3600),
]

@property
def age_seconds(self) -> float:
"""Get message age in seconds since failure."""
failed_at: datetime = self.failed_at
return (datetime.now(timezone.utc) - failed_at).total_seconds()
Loading
Loading