Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/backend-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ jobs:
docker compose -f docker-compose.ci.yaml up -d --wait --wait-timeout 120
docker compose -f docker-compose.ci.yaml ps

- name: Create Kafka topics
timeout-minutes: 2
env:
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
KAFKA_TOPIC_PREFIX: "ci.${{ github.run_id }}."
run: |
cd backend
uv run python -m scripts.create_topics

- name: Run integration tests
timeout-minutes: 10
env:
Expand All @@ -99,6 +108,7 @@ jobs:
MONGODB_PORT: 27017
MONGODB_URL: mongodb://root:[email protected]:27017/?authSource=admin
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
KAFKA_TOPIC_PREFIX: "ci.${{ github.run_id }}."
SCHEMA_REGISTRY_URL: http://localhost:8081
REDIS_HOST: localhost
REDIS_PORT: 6379
Expand Down Expand Up @@ -174,13 +184,23 @@ jobs:
timeout 90 bash -c 'until sudo k3s kubectl cluster-info; do sleep 5; done'
kubectl create namespace integr8scode --dry-run=client -o yaml | kubectl apply -f -

- name: Create Kafka topics
timeout-minutes: 2
env:
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
KAFKA_TOPIC_PREFIX: "ci.${{ github.run_id }}."
run: |
cd backend
uv run python -m scripts.create_topics

- name: Run E2E tests
timeout-minutes: 10
env:
MONGO_ROOT_USER: root
MONGO_ROOT_PASSWORD: rootpassword
MONGODB_URL: mongodb://root:[email protected]:27017/?authSource=admin
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
KAFKA_TOPIC_PREFIX: "ci.${{ github.run_id }}."
SCHEMA_REGISTRY_URL: http://localhost:8081
REDIS_HOST: localhost
REDIS_PORT: 6379
Expand Down
6 changes: 5 additions & 1 deletion backend/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ REDIS_DECODE_RESPONSES=true

# Kafka - use localhost for tests
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_TOPIC_PREFIX=test.
SCHEMA_REGISTRY_URL=http://localhost:8081

# Security
Expand All @@ -31,9 +32,12 @@ CORS_ALLOWED_ORIGINS=["http://localhost:3000","https://localhost:3000"]
# Features
RATE_LIMIT_ENABLED=true
ENABLE_TRACING=false
OTEL_SDK_DISABLED=true

# OpenTelemetry - explicitly disabled for tests (no endpoint = NoOp meter)
OTEL_EXPORTER_OTLP_ENDPOINT=
OTEL_METRICS_EXPORTER=none
OTEL_TRACES_EXPORTER=none
OTEL_LOGS_EXPORTER=none

# Development
DEVELOPMENT_MODE=false
Expand Down
137 changes: 129 additions & 8 deletions backend/app/core/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,174 @@
AdminServicesProvider,
AuthProvider,
BusinessServicesProvider,
ConnectionProvider,
CoordinatorProvider,
CoreServicesProvider,
DatabaseProvider,
DLQProcessorProvider,
EventProvider,
EventReplayProvider,
K8sWorkerProvider,
KafkaServicesProvider,
KubernetesProvider,
LoggingProvider,
MessagingProvider,
MetricsProvider,
PodMonitorProvider,
RedisProvider,
ResultProcessorProvider,
RepositoryProvider,
SagaOrchestratorProvider,
SettingsProvider,
SSEProvider,
UserServicesProvider,
)
from app.settings import Settings


def create_app_container() -> AsyncContainer:
def create_app_container(settings: Settings) -> AsyncContainer:
"""
Create the application DI container.

Args:
settings: Application settings (injected via from_context).
"""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
ConnectionProvider(),
KafkaServicesProvider(),
SSEProvider(),
AuthProvider(),
UserServicesProvider(),
AdminServicesProvider(),
BusinessServicesProvider(),
FastapiProvider(),
context={Settings: settings},
)


def create_result_processor_container() -> AsyncContainer:
def create_result_processor_container(settings: Settings) -> AsyncContainer:
"""
Create a minimal DI container for the ResultProcessor worker.
Includes only settings, database, event/kafka, and required repositories.

Args:
settings: Application settings (injected via from_context).
"""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
EventProvider(),
MessagingProvider(),
context={Settings: settings},
)


def create_coordinator_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the ExecutionCoordinator worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
CoordinatorProvider(),
context={Settings: settings},
)


def create_k8s_worker_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the KubernetesWorker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KubernetesProvider(),
K8sWorkerProvider(),
context={Settings: settings},
)


def create_pod_monitor_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the PodMonitor worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
CoreServicesProvider(),
ConnectionProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
KafkaServicesProvider(),
KubernetesProvider(),
PodMonitorProvider(),
context={Settings: settings},
)


def create_saga_orchestrator_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the SagaOrchestrator worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
RedisProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
EventProvider(),
SagaOrchestratorProvider(),
context={Settings: settings},
)


def create_event_replay_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the EventReplay worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
MessagingProvider(),
ResultProcessorProvider(),
EventProvider(),
EventReplayProvider(),
context={Settings: settings},
)


def create_dlq_processor_container(settings: Settings) -> AsyncContainer:
"""Create DI container for the DLQ processor worker."""
return make_async_container(
SettingsProvider(),
LoggingProvider(),
DatabaseProvider(),
CoreServicesProvider(),
MetricsProvider(),
RepositoryProvider(),
EventProvider(),
DLQProcessorProvider(),
context={Settings: settings},
)
58 changes: 48 additions & 10 deletions backend/app/core/lifecycle.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,62 @@
from __future__ import annotations

from types import TracebackType
from typing import Optional, Self, Type
from typing import Self


class LifecycleEnabled:
async def start(self) -> None: # pragma: no cover
raise NotImplementedError
"""Base class for services with async lifecycle management.

async def stop(self) -> None: # pragma: no cover
raise NotImplementedError
Usage:
async with MyService() as service:
# service is running
# service is stopped

Subclasses override _on_start() and _on_stop() for their logic.
Base class handles idempotency and context manager protocol.

For internal component cleanup, use aclose() which follows Python's
standard async cleanup pattern (like aiofiles, aiohttp).
"""

def __init__(self) -> None:
self._lifecycle_started: bool = False

async def _on_start(self) -> None:
"""Override with startup logic. Called once on enter."""
pass

async def _on_stop(self) -> None:
"""Override with cleanup logic. Called once on exit."""
pass

async def aclose(self) -> None:
"""Close the service. For internal component cleanup.

Mirrors Python's standard aclose() pattern (like aiofiles, aiohttp).
Idempotent - safe to call multiple times.
"""
if not self._lifecycle_started:
return
self._lifecycle_started = False
await self._on_stop()

@property
def is_running(self) -> bool:
"""Check if service is currently running."""
return self._lifecycle_started

async def __aenter__(self) -> Self:
await self.start()
if self._lifecycle_started:
return self # Already started, idempotent
await self._on_start()
self._lifecycle_started = True
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
await self.stop()
await self.aclose()
Loading