Skip to content

Commit 86a01a4

Browse files
authored
2 events system (domain, avro) -> single one (#84)
* 2 events system (domain, avro) -> single one * removed time-bound fields from baseEvent * beanie: jsons -> well-typed odm objs * mongoggregate for pipelines in repos * teardown 40s timeout removed + fixed conftests to clean up dbs created * request timeout added/updated: 40s for prod, 5s for tests * docs updates * removed config, passing params directly
1 parent 2790997 commit 86a01a4

File tree

133 files changed

+1655
-3046
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

133 files changed

+1655
-3046
lines changed

backend/.env

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ EVENT_RETENTION_DAYS=30
2424
KAFKA_CONSUMER_GROUP_ID=integr8scode-backend
2525
KAFKA_AUTO_OFFSET_RESET=earliest
2626
KAFKA_ENABLE_AUTO_COMMIT=true
27-
KAFKA_SESSION_TIMEOUT_MS=30000
27+
KAFKA_SESSION_TIMEOUT_MS=45000
28+
KAFKA_HEARTBEAT_INTERVAL_MS=10000
29+
KAFKA_REQUEST_TIMEOUT_MS=40000
2830
KAFKA_MAX_POLL_RECORDS=500
2931

3032
# WebSocket Configuration

backend/.env.test

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ KAFKA_TOPIC_PREFIX=test.
2525
SCHEMA_SUBJECT_PREFIX=test.
2626
SCHEMA_REGISTRY_URL=http://localhost:8081
2727

28+
# Reduce consumer pool and timeouts for faster test startup/teardown
29+
# https://github.com/aio-libs/aiokafka/issues/773
30+
SSE_CONSUMER_POOL_SIZE=1
31+
KAFKA_SESSION_TIMEOUT_MS=6000
32+
KAFKA_HEARTBEAT_INTERVAL_MS=2000
33+
KAFKA_REQUEST_TIMEOUT_MS=5000
34+
2835
# Security
2936
SECURE_COOKIES=true
3037
BCRYPT_ROUNDS=4
@@ -33,7 +40,9 @@ BCRYPT_ROUNDS=4
3340
RATE_LIMIT_ENABLED=true
3441
ENABLE_TRACING=false
3542

36-
# OpenTelemetry - explicitly disabled for tests
43+
# OpenTelemetry - disabled for tests
44+
# Empty endpoint prevents OTLP exporter creation in setup_metrics()
45+
# OTEL_SDK_DISABLED=true (set via pytest-env) provides additional safety
3746
OTEL_EXPORTER_OTLP_ENDPOINT=
3847

3948
# Development

backend/app/api/routes/dlq.py

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
from datetime import datetime, timezone
2-
from typing import List
3-
41
from dishka import FromDishka
52
from dishka.integrations.fastapi import DishkaRoute
63
from fastapi import APIRouter, Depends, HTTPException, Query
@@ -31,19 +28,7 @@
3128
@router.get("/stats", response_model=DLQStats)
3229
async def get_dlq_statistics(repository: FromDishka[DLQRepository]) -> DLQStats:
3330
stats = await repository.get_dlq_stats()
34-
return DLQStats(
35-
by_status=stats.by_status,
36-
by_topic=[{"topic": t.topic, "count": t.count, "avg_retry_count": t.avg_retry_count} for t in stats.by_topic],
37-
by_event_type=[{"event_type": e.event_type, "count": e.count} for e in stats.by_event_type],
38-
age_stats={
39-
"min_age": stats.age_stats.min_age_seconds,
40-
"max_age": stats.age_stats.max_age_seconds,
41-
"avg_age": stats.age_stats.avg_age_seconds,
42-
}
43-
if stats.age_stats
44-
else {},
45-
timestamp=stats.timestamp,
46-
)
31+
return DLQStats.model_validate(stats, from_attributes=True)
4732

4833

4934
@router.get("/messages", response_model=DLQMessagesResponse)
@@ -70,27 +55,7 @@ async def get_dlq_message(event_id: str, repository: FromDishka[DLQRepository])
7055
message = await repository.get_message_by_id(event_id)
7156
if not message:
7257
raise HTTPException(status_code=404, detail="Message not found")
73-
74-
return DLQMessageDetail(
75-
event_id=message.event_id,
76-
event=message.event.model_dump(),
77-
event_type=message.event_type,
78-
original_topic=message.original_topic,
79-
error=message.error,
80-
retry_count=message.retry_count,
81-
failed_at=message.failed_at or datetime(1970, 1, 1, tzinfo=timezone.utc),
82-
status=DLQMessageStatus(message.status),
83-
created_at=message.created_at,
84-
last_updated=message.last_updated,
85-
next_retry_at=message.next_retry_at,
86-
retried_at=message.retried_at,
87-
discarded_at=message.discarded_at,
88-
discard_reason=message.discard_reason,
89-
producer_id=message.producer_id,
90-
dlq_offset=message.dlq_offset,
91-
dlq_partition=message.dlq_partition,
92-
last_error=message.last_error,
93-
)
58+
return DLQMessageDetail.model_validate(message, from_attributes=True)
9459

9560

9661
@router.post("/retry", response_model=DLQBatchRetryResponse)
@@ -141,7 +106,7 @@ async def discard_dlq_message(
141106
return MessageResponse(message=f"Message {event_id} discarded")
142107

143108

144-
@router.get("/topics", response_model=List[DLQTopicSummaryResponse])
145-
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> List[DLQTopicSummaryResponse]:
109+
@router.get("/topics", response_model=list[DLQTopicSummaryResponse])
110+
async def get_dlq_topics(repository: FromDishka[DLQRepository]) -> list[DLQTopicSummaryResponse]:
146111
topics = await repository.get_topics_summary()
147112
return [DLQTopicSummaryResponse.model_validate(topic) for topic in topics]

backend/app/api/routes/events.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
from app.core.utils import get_client_ip
1313
from app.domain.enums.common import SortOrder
1414
from app.domain.events.event_models import EventFilter
15-
from app.domain.events.typed import BaseEvent
16-
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
15+
from app.domain.events.typed import BaseEvent, EventMetadata
1716
from app.schemas_pydantic.events import (
1817
DeleteEventResponse,
1918
EventAggregationRequest,

backend/app/api/routes/execution.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
from app.domain.enums.events import EventType
1313
from app.domain.enums.execution import ExecutionStatus
1414
from app.domain.enums.user import UserRole
15+
from app.domain.events.typed import BaseEvent, EventMetadata
1516
from app.domain.exceptions import DomainError
16-
from app.infrastructure.kafka.events.base import BaseEvent
17-
from app.infrastructure.kafka.events.metadata import AvroEventMetadata as EventMetadata
1817
from app.schemas_pydantic.execution import (
1918
CancelExecutionRequest,
2019
CancelResponse,

backend/app/core/providers.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from app.dlq.manager import DLQManager, create_dlq_manager
4444
from app.domain.enums.kafka import KafkaTopic
4545
from app.domain.saga.models import SagaConfig
46-
from app.events.core import ProducerConfig, UnifiedProducer
46+
from app.events.core import UnifiedProducer
4747
from app.events.event_store import EventStore, create_event_store
4848
from app.events.event_store_consumer import EventStoreConsumer, create_event_store_consumer
4949
from app.events.schema.schema_registry import SchemaRegistryManager
@@ -160,8 +160,7 @@ class MessagingProvider(Provider):
160160
async def get_kafka_producer(
161161
self, settings: Settings, schema_registry: SchemaRegistryManager, logger: logging.Logger
162162
) -> AsyncIterator[UnifiedProducer]:
163-
config = ProducerConfig(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)
164-
async with UnifiedProducer(config, schema_registry, logger, settings=settings) as producer:
163+
async with UnifiedProducer(schema_registry, logger, settings) as producer:
165164
yield producer
166165

167166
@provide

backend/app/db/docs/dlq.py

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,25 @@
11
from datetime import datetime, timezone
2-
from typing import Any
32

43
from beanie import Document, Indexed
54
from pydantic import ConfigDict, Field
65
from pymongo import ASCENDING, DESCENDING, IndexModel
76

87
from app.dlq.models import DLQMessageStatus
9-
from app.domain.enums.events import EventType
8+
from app.domain.events.typed import DomainEvent
109

1110

1211
class DLQMessageDocument(Document):
13-
"""Unified DLQ message document for the entire system.
12+
"""Unified DLQ message document. Access event_id/event_type via event.event_id, event.event_type."""
1413

15-
Copied from DLQMessage dataclass.
16-
"""
17-
18-
# Core fields - always required
19-
event: dict[str, Any] # The original event as dict (BaseEvent serialized)
20-
event_id: Indexed(str, unique=True) # type: ignore[valid-type]
21-
event_type: EventType # Indexed via Settings.indexes
22-
original_topic: Indexed(str) # type: ignore[valid-type]
23-
error: str # Error message from the failure
24-
retry_count: Indexed(int) # type: ignore[valid-type]
25-
failed_at: Indexed(datetime) # type: ignore[valid-type]
26-
status: DLQMessageStatus # Indexed via Settings.indexes
27-
producer_id: str # ID of the producer that sent to DLQ
14+
model_config = ConfigDict(from_attributes=True)
2815

29-
# Optional fields
16+
event: DomainEvent # Discriminated union - contains event_id, event_type
17+
original_topic: Indexed(str) = "" # type: ignore[valid-type]
18+
error: str = "Unknown error"
19+
retry_count: Indexed(int) = 0 # type: ignore[valid-type]
20+
failed_at: Indexed(datetime) = Field(default_factory=lambda: datetime.now(timezone.utc)) # type: ignore[valid-type]
21+
status: DLQMessageStatus = DLQMessageStatus.PENDING
22+
producer_id: str = "unknown"
3023
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
3124
last_updated: datetime | None = None
3225
next_retry_at: Indexed(datetime) | None = None # type: ignore[valid-type]
@@ -36,25 +29,15 @@ class DLQMessageDocument(Document):
3629
dlq_offset: int | None = None
3730
dlq_partition: int | None = None
3831
last_error: str | None = None
39-
40-
# Kafka message headers (optional)
4132
headers: dict[str, str] = Field(default_factory=dict)
4233

43-
model_config = ConfigDict(from_attributes=True)
44-
4534
class Settings:
4635
name = "dlq_messages"
4736
use_state_management = True
4837
indexes = [
49-
IndexModel([("event_type", ASCENDING)], name="idx_dlq_event_type"),
38+
IndexModel([("event.event_id", ASCENDING)], unique=True, name="idx_dlq_event_id"),
39+
IndexModel([("event.event_type", ASCENDING)], name="idx_dlq_event_type"),
5040
IndexModel([("status", ASCENDING)], name="idx_dlq_status"),
5141
IndexModel([("failed_at", DESCENDING)], name="idx_dlq_failed_desc"),
52-
# TTL index - auto-delete after 7 days
5342
IndexModel([("created_at", ASCENDING)], name="idx_dlq_created_ttl", expireAfterSeconds=7 * 24 * 3600),
5443
]
55-
56-
@property
57-
def age_seconds(self) -> float:
58-
"""Get message age in seconds since failure."""
59-
failed_at: datetime = self.failed_at
60-
return (datetime.now(timezone.utc) - failed_at).total_seconds()

0 commit comments

Comments
 (0)