Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions backend/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ REDIS_DECODE_RESPONSES=true

# Kafka - use localhost for tests
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_TOPIC_PREFIX=test.
SCHEMA_REGISTRY_URL=http://localhost:8081

# Security
Expand All @@ -31,9 +32,6 @@ CORS_ALLOWED_ORIGINS=["http://localhost:3000","https://localhost:3000"]
# Features
RATE_LIMIT_ENABLED=true
ENABLE_TRACING=false
OTEL_SDK_DISABLED=true
OTEL_METRICS_EXPORTER=none
OTEL_TRACES_EXPORTER=none

# Development
DEVELOPMENT_MODE=false
Expand Down
14 changes: 11 additions & 3 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
SettingsProvider,
UserServicesProvider,
)
from app.settings import Settings


def create_app_container() -> AsyncContainer:
def create_app_container(settings: Settings) -> AsyncContainer:
"""
Create the application DI container.

Args:
settings: Application settings (injected via from_context).
"""
return make_async_container(
SettingsProvider(),
Expand All @@ -36,13 +40,16 @@ def create_app_container() -> AsyncContainer:
AdminServicesProvider(),
BusinessServicesProvider(),
FastapiProvider(),
context={Settings: settings},
)


def create_result_processor_container() -> AsyncContainer:
def create_result_processor_container(settings: Settings) -> AsyncContainer:
"""
Create a minimal DI container for the ResultProcessor worker.
Includes only settings, database, event/kafka, and required repositories.

Args:
settings: Application settings (injected via from_context).
"""
return make_async_container(
SettingsProvider(),
Expand All @@ -54,4 +61,5 @@ def create_result_processor_container() -> AsyncContainer:
EventProvider(),
MessagingProvider(),
ResultProcessorProvider(),
context={Settings: settings},
)
18 changes: 8 additions & 10 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import AsyncIterator

import redis.asyncio as redis
from dishka import Provider, Scope, provide
from dishka import Provider, Scope, from_context, provide
from pymongo.asynchronous.mongo_client import AsyncMongoClient

from app.core.database_context import Database
Expand Down Expand Up @@ -69,15 +69,13 @@
from app.services.sse.sse_service import SSEService
from app.services.sse.sse_shutdown_manager import SSEShutdownManager, create_sse_shutdown_manager
from app.services.user_settings_service import UserSettingsService
from app.settings import Settings, get_settings
from app.settings import Settings


class SettingsProvider(Provider):
scope = Scope.APP
"""Provides Settings from context (passed to make_async_container)."""

@provide
def get_settings(self) -> Settings:
return get_settings()
settings = from_context(provides=Settings, scope=Scope.APP)


class LoggingProvider(Provider):
Expand Down Expand Up @@ -161,9 +159,9 @@ async def get_kafka_producer(

@provide
async def get_dlq_manager(
self, schema_registry: SchemaRegistryManager, logger: logging.Logger
self, settings: Settings, schema_registry: SchemaRegistryManager, logger: logging.Logger
) -> AsyncIterator[DLQManager]:
manager = create_dlq_manager(schema_registry, logger)
manager = create_dlq_manager(settings, schema_registry, logger)
await manager.start()
try:
yield manager
Expand All @@ -190,8 +188,8 @@ class EventProvider(Provider):
scope = Scope.APP

@provide
def get_schema_registry(self, logger: logging.Logger) -> SchemaRegistryManager:
return create_schema_registry_manager(logger)
def get_schema_registry(self, settings: Settings, logger: logging.Logger) -> SchemaRegistryManager:
return create_schema_registry_manager(settings, logger)

@provide
async def get_event_store(self, schema_registry: SchemaRegistryManager, logger: logging.Logger) -> EventStore:
Expand Down
13 changes: 8 additions & 5 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
)
from app.domain.enums.kafka import GroupId, KafkaTopic
from app.events.schema.schema_registry import SchemaRegistryManager
from app.settings import get_settings
from app.settings import Settings


class DLQManager(LifecycleEnabled):
def __init__(
self,
settings: Settings,
consumer: Consumer,
producer: Producer,
schema_registry: SchemaRegistryManager,
Expand All @@ -35,6 +36,7 @@ def __init__(
retry_topic_suffix: str = "-retry",
default_retry_policy: RetryPolicy | None = None,
):
self.settings = settings
self.metrics = get_dlq_metrics()
self.schema_registry = schema_registry
self.logger = logger
Expand Down Expand Up @@ -147,7 +149,7 @@ async def start(self) -> None:
if self._running:
return

topic_name = f"{get_settings().KAFKA_TOPIC_PREFIX}{str(self.dlq_topic)}"
topic_name = f"{self.settings.KAFKA_TOPIC_PREFIX}{str(self.dlq_topic)}"
self.consumer.subscribe([topic_name])

self._running = True
Expand Down Expand Up @@ -190,7 +192,7 @@ async def _process_messages(self) -> None:
if not await self._validate_message(msg):
continue

start_time = asyncio.get_event_loop().time()
start_time = asyncio.get_running_loop().time()
dlq_message = self._kafka_msg_to_message(msg)

await self._record_message_metrics(dlq_message)
Expand Down Expand Up @@ -249,7 +251,7 @@ async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMess
async def _commit_and_record_duration(self, start_time: float) -> None:
"""Commit offset and record processing duration."""
await asyncio.to_thread(self.consumer.commit, asynchronous=False)
duration = asyncio.get_event_loop().time() - start_time
duration = asyncio.get_running_loop().time() - start_time
self.metrics.record_dlq_processing_duration(duration, "process")

async def _process_dlq_message(self, message: DLQMessage) -> None:
Expand Down Expand Up @@ -469,13 +471,13 @@ async def retry_message_manually(self, event_id: str) -> bool:


def create_dlq_manager(
settings: Settings,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE,
retry_topic_suffix: str = "-retry",
default_retry_policy: RetryPolicy | None = None,
) -> DLQManager:
settings = get_settings()
consumer = Consumer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
Expand All @@ -499,6 +501,7 @@ def create_dlq_manager(
if default_retry_policy is None:
default_retry_policy = RetryPolicy(topic="default", strategy=RetryStrategy.EXPONENTIAL_BACKOFF)
return DLQManager(
settings=settings,
consumer=consumer,
producer=producer,
schema_registry=schema_registry,
Expand Down
2 changes: 1 addition & 1 deletion backend/app/events/admin_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def create_topic(self, topic: str, num_partitions: int = 1, replication_fa
futures = self._admin.create_topics([new_topic], operation_timeout=30.0)

# Wait for result - result() returns None on success, raises exception on failure
await asyncio.get_event_loop().run_in_executor(None, lambda: futures[topic].result(timeout=30.0))
await asyncio.get_running_loop().run_in_executor(None, lambda: futures[topic].result(timeout=30.0))
self.logger.info(f"Topic {topic} created successfully")
return True
except Exception as e:
Expand Down
36 changes: 18 additions & 18 deletions backend/app/events/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def initialize(self) -> None:
self.logger.info("Event store initialized with Beanie")

async def store_event(self, event: BaseEvent) -> bool:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
try:
now = datetime.now(timezone.utc)
data = event.model_dump(exclude={"topic"})
Expand All @@ -71,7 +71,7 @@ async def store_event(self, event: BaseEvent) -> bool:
}
)

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_store_duration(duration, "store_single", "event_store")
self.metrics.record_event_stored(event.event_type, "event_store")
return True
Expand All @@ -84,7 +84,7 @@ async def store_event(self, event: BaseEvent) -> bool:
return False

async def store_batch(self, events: list[BaseEvent]) -> dict[str, int]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
results = {"total": len(events), "stored": 0, "duplicates": 0, "failed": 0}
if not events:
return results
Expand Down Expand Up @@ -112,7 +112,7 @@ async def store_batch(self, events: list[BaseEvent]) -> dict[str, int]:
else:
raise

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_store_duration(duration, "store_batch", "event_store")
add_span_attributes(**{"events.batch.count": len(events)})
if results["stored"] > 0:
Expand All @@ -125,14 +125,14 @@ async def store_batch(self, events: list[BaseEvent]) -> dict[str, int]:
return results

async def get_event(self, event_id: str) -> BaseEvent | None:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
doc = await EventDocument.find_one({"event_id": event_id})
if not doc:
return None

event = self.schema_registry.deserialize_json(_flatten_doc(doc))

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_by_id", "event_store")
return event

Expand All @@ -144,7 +144,7 @@ async def get_events_by_type(
limit: int = 100,
offset: int = 0,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"event_type": event_type}
if tr := self._time_range(start_time, end_time):
query["timestamp"] = tr
Expand All @@ -158,7 +158,7 @@ async def get_events_by_type(
)
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_by_type", "event_store")
return events

Expand All @@ -167,15 +167,15 @@ async def get_execution_events(
execution_id: str,
event_types: list[EventType] | None = None,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"$or": [{"payload.execution_id": execution_id}, {"aggregate_id": execution_id}]}
if event_types:
query["event_type"] = {"$in": event_types}

docs = await EventDocument.find(query).sort([("timestamp", SortDirection.ASCENDING)]).to_list()
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_execution_events", "event_store")
return events

Expand All @@ -187,7 +187,7 @@ async def get_user_events(
end_time: datetime | None = None,
limit: int = 100,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"metadata.user_id": str(user_id)}
if event_types:
query["event_type"] = {"$in": event_types}
Expand All @@ -197,7 +197,7 @@ async def get_user_events(
docs = await EventDocument.find(query).sort([("timestamp", SortDirection.DESCENDING)]).limit(limit).to_list()
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_user_events", "event_store")
return events

Expand All @@ -208,7 +208,7 @@ async def get_security_events(
user_id: str | None = None,
limit: int = 100,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"event_type": {"$in": self._SECURITY_TYPES}}
if user_id:
query["metadata.user_id"] = str(user_id)
Expand All @@ -218,20 +218,20 @@ async def get_security_events(
docs = await EventDocument.find(query).sort([("timestamp", SortDirection.DESCENDING)]).limit(limit).to_list()
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_security_events", "event_store")
return events

async def get_correlation_chain(self, correlation_id: str) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
docs = await (
EventDocument.find({"metadata.correlation_id": str(correlation_id)})
.sort([("timestamp", SortDirection.ASCENDING)])
.to_list()
)
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_correlation_chain", "event_store")
return events

Expand All @@ -242,7 +242,7 @@ async def replay_events(
event_types: list[EventType] | None = None,
callback: Callable[[BaseEvent], Awaitable[None]] | None = None,
) -> int:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
count = 0

try:
Expand All @@ -258,7 +258,7 @@ async def replay_events(
await callback(event)
count += 1

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "replay_events", "event_store")
self.logger.info(f"Replayed {count} events from {start_time} to {end_time}")
return count
Expand Down
7 changes: 4 additions & 3 deletions backend/app/events/event_store_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
self.producer = producer # For DLQ handling
self._batch_buffer: list[BaseEvent] = []
self._batch_lock = asyncio.Lock()
self._last_batch_time = asyncio.get_event_loop().time()
self._last_batch_time: float = 0.0
self._batch_task: asyncio.Task[None] | None = None
self._running = False

Expand All @@ -49,6 +49,7 @@ async def start(self) -> None:
if self._running:
return

self._last_batch_time = asyncio.get_running_loop().time()
settings = get_settings()
config = ConsumerConfig(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
Expand Down Expand Up @@ -126,7 +127,7 @@ async def _batch_processor(self) -> None:
await asyncio.sleep(1)

async with self._batch_lock:
time_since_last_batch = asyncio.get_event_loop().time() - self._last_batch_time
time_since_last_batch = asyncio.get_running_loop().time() - self._last_batch_time

if self._batch_buffer and time_since_last_batch >= self.batch_timeout:
await self._flush_batch()
Expand All @@ -140,7 +141,7 @@ async def _flush_batch(self) -> None:

batch = self._batch_buffer.copy()
self._batch_buffer.clear()
self._last_batch_time = asyncio.get_event_loop().time()
self._last_batch_time = asyncio.get_running_loop().time()

self.logger.info(f"Event store flushing batch of {len(batch)} events")
with trace_span(
Expand Down
Loading
Loading