Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backend/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ BCRYPT_ROUNDS=4
RATE_LIMIT_ENABLED=true
ENABLE_TRACING=false

# OpenTelemetry - explicitly disabled for tests
# OpenTelemetry - disabled for tests
# Empty endpoint prevents OTLP exporter creation in setup_metrics()
# OTEL_SDK_DISABLED=true (set via pytest-env) provides additional safety
OTEL_EXPORTER_OTLP_ENDPOINT=

# Development
Expand Down
43 changes: 4 additions & 39 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from datetime import datetime, timezone
from typing import List

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter, Depends, HTTPException, Query
Expand Down Expand Up @@ -31,19 +28,7 @@
@router.get("/stats", response_model=DLQStats)
async def get_dlq_statistics(repository: FromDishka[DLQRepository]) -> DLQStats:
stats = await repository.get_dlq_stats()
return DLQStats(
by_status=stats.by_status,
by_topic=[{"topic": t.topic, "count": t.count, "avg_retry_count": t.avg_retry_count} for t in stats.by_topic],
by_event_type=[{"event_type": e.event_type, "count": e.count} for e in stats.by_event_type],
age_stats={
"min_age": stats.age_stats.min_age_seconds,
"max_age": stats.age_stats.max_age_seconds,
"avg_age": stats.age_stats.avg_age_seconds,
}
if stats.age_stats
else {},
timestamp=stats.timestamp,
)
return DLQStats.model_validate(stats, from_attributes=True)


@router.get("/messages", response_model=DLQMessagesResponse)
Expand All @@ -70,27 +55,7 @@ async def get_dlq_message(event_id: str, repository: FromDishka[DLQRepository])
message = await repository.get_message_by_id(event_id)
if not message:
raise HTTPException(status_code=404, detail="Message not found")

return DLQMessageDetail(
event_id=message.event_id,
event=message.event.model_dump(),
event_type=message.event_type,
original_topic=message.original_topic,
error=message.error,
retry_count=message.retry_count,
failed_at=message.failed_at or datetime(1970, 1, 1, tzinfo=timezone.utc),
status=DLQMessageStatus(message.status),
created_at=message.created_at,
last_updated=message.last_updated,
next_retry_at=message.next_retry_at,
retried_at=message.retried_at,
discarded_at=message.discarded_at,
discard_reason=message.discard_reason,
producer_id=message.producer_id,
dlq_offset=message.dlq_offset,
dlq_partition=message.dlq_partition,
last_error=message.last_error,
)
return DLQMessageDetail.model_validate(message, from_attributes=True)


@router.post("/retry", response_model=DLQBatchRetryResponse)
Expand Down Expand Up @@ -141,7 +106,7 @@ async def discard_dlq_message(
return MessageResponse(message=f"Message {event_id} discarded")


@router.get("/topics", response_model=List[DLQTopicSummaryResponse])
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> List[DLQTopicSummaryResponse]:
@router.get("/topics", response_model=list[DLQTopicSummaryResponse])
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> list[DLQTopicSummaryResponse]:
topics = await repository.get_topics_summary()
return [DLQTopicSummaryResponse.model_validate(topic) for topic in topics]
3 changes: 1 addition & 2 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from app.core.utils import get_client_ip
from app.domain.enums.common import SortOrder
from app.domain.events.event_models import EventFilter
from app.domain.events.typed import BaseEvent
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.domain.events.typed import BaseEvent, EventMetadata
from app.schemas_pydantic.events import (
DeleteEventResponse,
EventAggregationRequest,
Expand Down
3 changes: 1 addition & 2 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
from app.domain.enums.events import EventType
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.user import UserRole
from app.domain.events.typed import BaseEvent, EventMetadata
from app.domain.exceptions import DomainError
from app.infrastructure.kafka.events.base import BaseEvent
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
from app.schemas_pydantic.execution import (
CancelExecutionRequest,
CancelResponse,
Expand Down
41 changes: 12 additions & 29 deletions backend/app/db/docs/dlq.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
from datetime import datetime, timezone
from typing import Any

from beanie import Document, Indexed
from pydantic import ConfigDict, Field
from pymongo import ASCENDING, DESCENDING, IndexModel

from app.dlq.models import DLQMessageStatus
from app.domain.enums.events import EventType
from app.domain.events.typed import DomainEvent


class DLQMessageDocument(Document):
"""Unified DLQ message document for the entire system.
"""Unified DLQ message document. Access event_id/event_type via event.event_id, event.event_type."""

Copied from DLQMessage dataclass.
"""

# Core fields - always required
event: dict[str, Any] # The original event as dict (BaseEvent serialized)
event_id: Indexed(str, unique=True) # type: ignore[valid-type]
event_type: EventType # Indexed via Settings.indexes
original_topic: Indexed(str) # type: ignore[valid-type]
error: str # Error message from the failure
retry_count: Indexed(int) # type: ignore[valid-type]
failed_at: Indexed(datetime) # type: ignore[valid-type]
status: DLQMessageStatus # Indexed via Settings.indexes
producer_id: str # ID of the producer that sent to DLQ
model_config = ConfigDict(from_attributes=True)

# Optional fields
event: DomainEvent # Discriminated union - contains event_id, event_type
original_topic: Indexed(str) = "" # type: ignore[valid-type]
error: str = "Unknown error"
retry_count: Indexed(int) = 0 # type: ignore[valid-type]
failed_at: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
status: DLQMessageStatus = DLQMessageStatus.PENDING
producer_id: str = "unknown"
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_updated: datetime | None = None
next_retry_at: Indexed(datetime) | None = None # type: ignore[valid-type]
Expand All @@ -36,25 +29,15 @@ class DLQMessageDocument(Document):
dlq_offset: int | None = None
dlq_partition: int | None = None
last_error: str | None = None

# Kafka message headers (optional)
headers: dict[str, str] = Field(default_factory=dict)

model_config = ConfigDict(from_attributes=True)

class Settings:
name = "dlq_messages"
use_state_management = True
indexes = [
IndexModel([("event_type", ASCENDING)], name="idx_dlq_event_type"),
IndexModel([("event.event_id", ASCENDING)], unique=True, name="idx_dlq_event_id"),
IndexModel([("event.event_type", ASCENDING)], name="idx_dlq_event_type"),
IndexModel([("status", ASCENDING)], name="idx_dlq_status"),
IndexModel([("failed_at", DESCENDING)], name="idx_dlq_failed_desc"),
# TTL index - auto-delete after 7 days
IndexModel([("created_at", ASCENDING)], name="idx_dlq_created_ttl", expireAfterSeconds=7 * 24 * 3600),
]

@property
def age_seconds(self) -> float:
"""Get message age in seconds since failure."""
failed_at: datetime = self.failed_at
return (datetime.now(timezone.utc) - failed_at).total_seconds()
118 changes: 79 additions & 39 deletions backend/app/db/repositories/admin/admin_events_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from beanie.odm.enums import SortDirection
from beanie.operators import GTE, LTE, In, Text
from monggregate import Pipeline, S

from app.db.docs import (
EventArchiveDocument,
Expand All @@ -25,7 +26,6 @@
UserEventCount,
domain_event_adapter,
)
from app.domain.events.query_builders import EventStatsAggregation
from app.domain.replay.models import ReplayFilter, ReplaySessionState


Expand Down Expand Up @@ -62,7 +62,7 @@ async def browse_events(
return EventBrowseResult(events=events, total=total, skip=skip, limit=limit)

async def get_event_detail(self, event_id: str) -> EventDetail | None:
doc = await EventDocument.find_one({"event_id": event_id})
doc = await EventDocument.find_one(EventDocument.event_id == event_id)
if not doc:
return None

Expand All @@ -86,7 +86,7 @@ async def get_event_detail(self, event_id: str) -> EventDetail | None:
return EventDetail(event=event, related_events=related_events, timeline=timeline)

async def delete_event(self, event_id: str) -> bool:
doc = await EventDocument.find_one({"event_id": event_id})
doc = await EventDocument.find_one(EventDocument.event_id == event_id)
if not doc:
return False
await doc.delete()
Expand All @@ -95,9 +95,29 @@ async def delete_event(self, event_id: str) -> bool:
async def get_event_stats(self, hours: int = 24) -> EventStatistics:
start_time = datetime.now(timezone.utc) - timedelta(hours=hours)

overview_pipeline = EventStatsAggregation.build_overview_pipeline(start_time)
overview_result = await EventDocument.aggregate(overview_pipeline).to_list()

# Overview stats pipeline
# Note: monggregate doesn't have S.add_to_set - use raw dict syntax
overview_pipeline = (
Pipeline()
.match({EventDocument.timestamp: {"$gte": start_time}})
.group(
by=None,
query={
"total_events": S.sum(1),
"event_types": {"$addToSet": S.field(EventDocument.event_type)},
"unique_users": {"$addToSet": S.field(EventDocument.metadata.user_id)},
"services": {"$addToSet": S.field(EventDocument.metadata.service_name)},
},
)
.project(
_id=0,
total_events=1,
event_type_count={"$size": "$event_types"},
unique_user_count={"$size": "$unique_users"},
service_count={"$size": "$services"},
)
)
overview_result = await EventDocument.aggregate(overview_pipeline.export()).to_list()
stats = (
overview_result[0]
if overview_result
Expand All @@ -106,41 +126,61 @@ async def get_event_stats(self, hours: int = 24) -> EventStatistics:

error_count = await EventDocument.find(
{
"timestamp": {"$gte": start_time},
"event_type": {"$regex": "failed|error|timeout", "$options": "i"},
EventDocument.timestamp: {"$gte": start_time},
EventDocument.event_type: {"$regex": "failed|error|timeout", "$options": "i"},
}
).count()

error_rate = (error_count / stats["total_events"] * 100) if stats["total_events"] > 0 else 0

type_pipeline = EventStatsAggregation.build_event_types_pipeline(start_time)
top_types = await EventDocument.aggregate(type_pipeline).to_list()
# Event types pipeline
type_pipeline = (
Pipeline()
.match({EventDocument.timestamp: {"$gte": start_time}})
.group(by=EventDocument.event_type, query={"count": S.sum(1)})
.sort(by="count", descending=True)
.limit(10)
)
top_types = await EventDocument.aggregate(type_pipeline.export()).to_list()
events_by_type = {t["_id"]: t["count"] for t in top_types}

hourly_pipeline = EventStatsAggregation.build_hourly_events_pipeline(start_time)
hourly_result = await EventDocument.aggregate(hourly_pipeline).to_list()
events_by_hour: list[HourlyEventCount | dict[str, Any]] = [
HourlyEventCount(hour=doc["_id"], count=doc["count"]) for doc in hourly_result
]

user_pipeline = EventStatsAggregation.build_top_users_pipeline(start_time)
top_users_result = await EventDocument.aggregate(user_pipeline).to_list()
top_users = [
UserEventCount(user_id=doc["_id"], event_count=doc["count"]) for doc in top_users_result if doc["_id"]
]

exec_pipeline: list[dict[str, Any]] = [
{
"$match": {
"created_at": {"$gte": start_time},
"status": "completed",
"resource_usage.execution_time_wall_seconds": {"$exists": True},
}
},
{"$group": {"_id": None, "avg_duration": {"$avg": "$resource_usage.execution_time_wall_seconds"}}},
]

exec_result = await ExecutionDocument.aggregate(exec_pipeline).to_list()
# Hourly events pipeline - project renames _id->hour
hourly_pipeline = (
Pipeline()
.match({EventDocument.timestamp: {"$gte": start_time}})
.group(
by={"$dateToString": {"format": "%Y-%m-%d-%H", "date": S.field(EventDocument.timestamp)}},
query={"count": S.sum(1)},
)
.sort(by="_id")
.project(_id=0, hour="$_id", count=1)
)
hourly_result = await EventDocument.aggregate(hourly_pipeline.export()).to_list()
events_by_hour: list[HourlyEventCount | dict[str, Any]] = [HourlyEventCount(**doc) for doc in hourly_result]

# Top users pipeline - project renames _id->user_id, count->event_count
user_pipeline = (
Pipeline()
.match({EventDocument.timestamp: {"$gte": start_time}})
.group(by=EventDocument.metadata.user_id, query={"count": S.sum(1)})
.sort(by="count", descending=True)
.limit(10)
.project(_id=0, user_id="$_id", event_count="$count")
)
top_users_result = await EventDocument.aggregate(user_pipeline.export()).to_list()
top_users = [UserEventCount(**doc) for doc in top_users_result if doc["user_id"]]

# Execution duration pipeline
exec_time_field = S.field(ExecutionDocument.resource_usage.execution_time_wall_seconds)
exec_pipeline = (
Pipeline()
.match({
ExecutionDocument.created_at: {"$gte": start_time},
ExecutionDocument.status: "completed",
ExecutionDocument.resource_usage.execution_time_wall_seconds: {"$exists": True},
})
.group(by=None, query={"avg_duration": S.avg(exec_time_field)})
)
exec_result = await ExecutionDocument.aggregate(exec_pipeline.export()).to_list()
avg_processing_time = (
exec_result[0]["avg_duration"] if exec_result and exec_result[0].get("avg_duration") else 0
)
Expand Down Expand Up @@ -190,7 +230,7 @@ async def create_replay_session(self, session: ReplaySessionState) -> str:
return session.session_id

async def get_replay_session(self, session_id: str) -> ReplaySessionState | None:
doc = await ReplaySessionDocument.find_one({"session_id": session_id})
doc = await ReplaySessionDocument.find_one(ReplaySessionDocument.session_id == session_id)
if not doc:
return None
return ReplaySessionState.model_validate(doc, from_attributes=True)
Expand All @@ -199,14 +239,14 @@ async def update_replay_session(self, session_id: str, updates: ReplaySessionUpd
update_dict = updates.model_dump(exclude_none=True)
if not update_dict:
return False
doc = await ReplaySessionDocument.find_one({"session_id": session_id})
doc = await ReplaySessionDocument.find_one(ReplaySessionDocument.session_id == session_id)
if not doc:
return False
await doc.set(update_dict)
return True

async def get_replay_status_with_progress(self, session_id: str) -> ReplaySessionStatusDetail | None:
doc = await ReplaySessionDocument.find_one({"session_id": session_id})
doc = await ReplaySessionDocument.find_one(ReplaySessionDocument.session_id == session_id)
if not doc:
return None

Expand Down Expand Up @@ -251,7 +291,7 @@ async def get_replay_status_with_progress(self, session_id: str) -> ReplaySessio
execution_ids = {event.execution_id for event in original_events if event.execution_id}

for exec_id in list(execution_ids)[:10]:
exec_doc = await ExecutionDocument.find_one({"execution_id": exec_id})
exec_doc = await ExecutionDocument.find_one(ExecutionDocument.execution_id == exec_id)
if exec_doc:
execution_results.append(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, logger: logging.Logger):
self.logger = logger

async def get_system_settings(self) -> SystemSettings:
doc = await SystemSettingsDocument.find_one({"settings_id": "global"})
doc = await SystemSettingsDocument.find_one(SystemSettingsDocument.settings_id == "global")
if not doc:
self.logger.info("System settings not found, creating defaults")
doc = SystemSettingsDocument(
Expand All @@ -40,7 +40,7 @@ async def update_system_settings(
updated_by: str,
user_id: str,
) -> SystemSettings:
doc = await SystemSettingsDocument.find_one({"settings_id": "global"})
doc = await SystemSettingsDocument.find_one(SystemSettingsDocument.settings_id == "global")
if not doc:
doc = SystemSettingsDocument(settings_id="global")

Expand Down Expand Up @@ -68,7 +68,7 @@ async def update_system_settings(
)

async def reset_system_settings(self, username: str, user_id: str) -> SystemSettings:
doc = await SystemSettingsDocument.find_one({"settings_id": "global"})
doc = await SystemSettingsDocument.find_one(SystemSettingsDocument.settings_id == "global")
if doc:
await doc.delete()

Expand Down
Loading