Skip to content

Commit 577a184

Browse files
committed
fixed test suite (extracted infra tests, removed global patching of params, removed mocks in favor of actual implementation; instances are provided by DI); added AsyncExitStack and for lifecycle control instead of manual calls of start/stop
1 parent f14264f commit 577a184

File tree

125 files changed

+2210
-5931
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

125 files changed

+2210
-5931
lines changed

.github/workflows/tests.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,12 @@ jobs:
139139
MONGODB_PORT: 27017
140140
# Explicit URL with default credentials
141141
MONGODB_URL: mongodb://root:[email protected]:27017/?authSource=admin
142+
# Optional isolation for Schema Registry subjects (fresh registry in CI, but safe to set)
143+
SCHEMA_SUBJECT_PREFIX: "ci.${{ github.run_id }}."
142144
run: |
143145
cd backend
144146
echo "Using BACKEND_BASE_URL=$BACKEND_BASE_URL"
147+
echo "Using SCHEMA_SUBJECT_PREFIX=$SCHEMA_SUBJECT_PREFIX"
145148
echo "MongoDB connection will use default CI credentials"
146149
python -m pytest tests/integration tests/unit -v --cov=app --cov-branch --cov-report=xml --cov-report=term --cov-report=term-missing
147150

backend/.env

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ TRACING_SERVICE_VERSION=1.0.0
4444
TRACING_SAMPLING_RATE=1.0
4545

4646
# Dead Letter Queue Configuration
47-
KAFKA_DLQ_TOPIC=dead-letter-queue
4847
DLQ_RETRY_MAX_ATTEMPTS=5
4948
DLQ_RETRY_BASE_DELAY_SECONDS=60.0
5049
DLQ_RETRY_MAX_DELAY_SECONDS=3600.0

backend/app/core/dishka_lifespan.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from contextlib import asynccontextmanager
1+
from contextlib import AsyncExitStack, asynccontextmanager
22
from typing import AsyncGenerator
33

44
import redis.asyncio as redis
@@ -10,6 +10,7 @@
1010
from app.core.startup import initialize_metrics_context, initialize_rate_limits
1111
from app.core.tracing import init_tracing
1212
from app.db.schema.schema_manager import SchemaManager
13+
from app.events.event_store_consumer import EventStoreConsumer
1314
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
1415
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
1516
from app.settings import get_settings
@@ -80,15 +81,14 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
8081

8182
# Rate limit middleware added during app creation; service resolved lazily at runtime
8283

83-
# Start SSE Kafka→Redis bridge to ensure consumers are running before any events are published
84-
_ = await container.get(SSEKafkaRedisBridge)
85-
logger.info("SSE Kafka→Redis bridge started with consumer pool")
84+
# Acquire long-lived services and manage lifecycle via AsyncExitStack
85+
sse_bridge = await container.get(SSEKafkaRedisBridge)
86+
event_store_consumer = await container.get(EventStoreConsumer)
8687

87-
# All services initialized by dishka providers
88-
logger.info("All services initialized by dishka providers")
89-
90-
try:
88+
async with AsyncExitStack() as stack:
89+
await stack.enter_async_context(sse_bridge)
90+
logger.info("SSE Kafka→Redis bridge started with consumer pool")
91+
await stack.enter_async_context(event_store_consumer)
92+
logger.info("EventStoreConsumer started - events will be persisted to MongoDB")
93+
logger.info("All services initialized by DI and managed by AsyncExitStack")
9194
yield
92-
finally:
93-
# Dishka automatically handles cleanup of all resources!
94-
logger.info("Application shutdown complete")

backend/app/core/k8s_clients.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from dataclasses import dataclass
2+
3+
from kubernetes import client as k8s_client
4+
from kubernetes import config as k8s_config
5+
6+
from app.core.logging import logger
7+
8+
9+
@dataclass(frozen=True)
10+
class K8sClients:
11+
api_client: k8s_client.ApiClient
12+
v1: k8s_client.CoreV1Api
13+
apps_v1: k8s_client.AppsV1Api
14+
networking_v1: k8s_client.NetworkingV1Api
15+
16+
17+
def create_k8s_clients(kubeconfig_path: str | None = None, in_cluster: bool | None = None) -> K8sClients:
18+
if in_cluster:
19+
k8s_config.load_incluster_config()
20+
elif kubeconfig_path:
21+
k8s_config.load_kube_config(config_file=kubeconfig_path)
22+
else:
23+
k8s_config.load_kube_config()
24+
25+
configuration = k8s_client.Configuration.get_default_copy()
26+
logger.info(f"Kubernetes API host: {configuration.host}")
27+
logger.info(f"SSL CA configured: {configuration.ssl_ca_cert is not None}")
28+
29+
api_client = k8s_client.ApiClient(configuration)
30+
return K8sClients(
31+
api_client=api_client,
32+
v1=k8s_client.CoreV1Api(api_client),
33+
apps_v1=k8s_client.AppsV1Api(api_client),
34+
networking_v1=k8s_client.NetworkingV1Api(api_client),
35+
)
36+
37+
38+
def close_k8s_clients(clients: K8sClients) -> None:
39+
close = getattr(clients.api_client, "close", None)
40+
if callable(close):
41+
close()
42+

backend/app/core/lifecycle.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from __future__ import annotations
2+
3+
from types import TracebackType
4+
from typing import Optional, Self, Type
5+
6+
7+
class LifecycleEnabled:
8+
async def start(self) -> None: # pragma: no cover
9+
raise NotImplementedError
10+
11+
async def stop(self) -> None: # pragma: no cover
12+
raise NotImplementedError
13+
14+
async def __aenter__(self) -> Self:
15+
await self.start()
16+
return self
17+
18+
async def __aexit__(
19+
self,
20+
exc_type: Optional[Type[BaseException]],
21+
exc: Optional[BaseException],
22+
tb: Optional[TracebackType],
23+
) -> None:
24+
await self.stop()

backend/app/core/providers.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
DatabaseConfig,
1010
create_database_connection,
1111
)
12+
from app.core.k8s_clients import K8sClients, close_k8s_clients, create_k8s_clients
13+
from app.core.logging import logger
1214
from app.core.metrics import (
1315
CoordinatorMetrics,
1416
DatabaseMetrics,
@@ -126,6 +128,9 @@ async def get_redis_client(self, settings: Settings) -> AsyncIterator[redis.Redi
126128
)
127129
# Test connection
128130
await client.ping()
131+
logger.info(
132+
f"Redis connected: {settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
133+
)
129134
try:
130135
yield client
131136
finally:
@@ -221,24 +226,34 @@ async def get_event_store_consumer(
221226
event_store: EventStore,
222227
schema_registry: SchemaRegistryManager,
223228
kafka_producer: UnifiedProducer
224-
) -> AsyncIterator[EventStoreConsumer]:
229+
) -> EventStoreConsumer:
225230
topics = get_all_topics()
226-
consumer = create_event_store_consumer(
231+
return create_event_store_consumer(
227232
event_store=event_store,
228233
topics=list(topics),
229234
schema_registry_manager=schema_registry,
230235
producer=kafka_producer
231236
)
232-
await consumer.start()
237+
238+
@provide
239+
async def get_event_bus_manager(self) -> AsyncIterator[EventBusManager]:
240+
manager = EventBusManager()
233241
try:
234-
yield consumer
242+
yield manager
235243
finally:
236-
await consumer.stop()
244+
await manager.close()
245+
246+
247+
class KubernetesProvider(Provider):
248+
scope = Scope.APP
237249

238250
@provide
239-
def get_event_bus_manager(self) -> EventBusManager:
240-
# Don't start the event bus here - let it start lazily when needed
241-
return EventBusManager()
251+
async def get_k8s_clients(self, settings: Settings) -> AsyncIterator[K8sClients]:
252+
clients = create_k8s_clients()
253+
try:
254+
yield clients
255+
finally:
256+
close_k8s_clients(clients)
242257

243258

244259
class ConnectionProvider(Provider):
@@ -306,18 +321,13 @@ async def get_sse_kafka_redis_bridge(
306321
settings: Settings,
307322
event_metrics: EventMetrics,
308323
sse_redis_bus: SSERedisBus,
309-
) -> AsyncIterator[SSEKafkaRedisBridge]:
310-
router = create_sse_kafka_redis_bridge(
324+
) -> SSEKafkaRedisBridge:
325+
return create_sse_kafka_redis_bridge(
311326
schema_registry=schema_registry,
312327
settings=settings,
313328
event_metrics=event_metrics,
314329
sse_bus=sse_redis_bus,
315330
)
316-
await router.start()
317-
try:
318-
yield router
319-
finally:
320-
await router.stop()
321331

322332
@provide
323333
def get_sse_repository(

backend/app/dlq/manager.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from motor.motor_asyncio import AsyncIOMotorCollection, AsyncIOMotorDatabase
88
from opentelemetry.trace import SpanKind
99

10+
from app.core.lifecycle import LifecycleEnabled
1011
from app.core.logging import logger
1112
from app.core.metrics.context import get_dlq_metrics
1213
from app.core.tracing import EventAttributes
@@ -26,7 +27,7 @@
2627
from app.settings import get_settings
2728

2829

29-
class DLQManager:
30+
class DLQManager(LifecycleEnabled):
3031
def __init__(
3132
self,
3233
database: AsyncIOMotorDatabase,
@@ -69,7 +70,8 @@ async def start(self) -> None:
6970
if self._running:
7071
return
7172

72-
self.consumer.subscribe([self.dlq_topic])
73+
topic_name = f"{get_settings().KAFKA_TOPIC_PREFIX}{str(self.dlq_topic)}"
74+
self.consumer.subscribe([topic_name])
7375

7476
self._running = True
7577

@@ -399,10 +401,11 @@ def create_dlq_manager(
399401
retry_topic_suffix: str = "-retry",
400402
default_retry_policy: RetryPolicy | None = None,
401403
) -> DLQManager:
404+
402405
settings = get_settings()
403406
consumer = Consumer({
404407
'bootstrap.servers': settings.KAFKA_BOOTSTRAP_SERVERS,
405-
'group.id': GroupId.DLQ_MANAGER,
408+
'group.id': f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
406409
'enable.auto.commit': False,
407410
'auto.offset.reset': 'earliest',
408411
'client.id': 'dlq-manager-consumer'

backend/app/events/core/consumer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from app.domain.enums.kafka import KafkaTopic
1515
from app.events.schema.schema_registry import SchemaRegistryManager
1616
from app.infrastructure.kafka.events.base import BaseEvent
17+
from app.settings import get_settings
1718

1819
from .dispatcher import EventDispatcher
1920
from .types import ConsumerConfig, ConsumerMetrics, ConsumerState
@@ -37,6 +38,7 @@ def __init__(
3738
self._event_metrics = get_event_metrics() # Singleton for Kafka metrics
3839
self._error_callback: "Callable[[Exception, BaseEvent], Awaitable[None]] | None" = None
3940
self._consume_task: asyncio.Task[None] | None = None
41+
self._topic_prefix = get_settings().KAFKA_TOPIC_PREFIX
4042

4143
async def start(self, topics: list[KafkaTopic]) -> None:
4244
self._state = (
@@ -49,7 +51,7 @@ async def start(self, topics: list[KafkaTopic]) -> None:
4951
consumer_config['stats_cb'] = self._handle_stats
5052

5153
self._consumer = Consumer(consumer_config)
52-
topic_strings = [str(topic) for topic in topics]
54+
topic_strings = [f"{self._topic_prefix}{str(topic)}" for topic in topics]
5355
self._consumer.subscribe(topic_strings)
5456
self._running = True
5557
self._consume_task = asyncio.create_task(self._consume_loop())

backend/app/events/core/producer.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,22 @@
77
from confluent_kafka import Message, Producer
88
from confluent_kafka.error import KafkaError
99

10+
from app.core.lifecycle import LifecycleEnabled
1011
from app.core.logging import logger
1112
from app.core.metrics.context import get_event_metrics
1213
from app.domain.enums.kafka import KafkaTopic
1314
from app.events.schema.schema_registry import SchemaRegistryManager
1415
from app.infrastructure.kafka.events import BaseEvent
1516
from app.infrastructure.mappers.dlq_mapper import DLQMapper
17+
from app.settings import get_settings
1618

1719
from .types import ProducerConfig, ProducerMetrics, ProducerState
1820

1921
DeliveryCallback: TypeAlias = Callable[[KafkaError | None, Message], None]
2022
StatsCallback: TypeAlias = Callable[[dict[str, Any]], None]
2123

2224

23-
class UnifiedProducer:
25+
class UnifiedProducer(LifecycleEnabled):
2426
def __init__(
2527
self,
2628
config: ProducerConfig,
@@ -36,6 +38,8 @@ def __init__(
3638
self._metrics = ProducerMetrics()
3739
self._event_metrics = get_event_metrics() # Singleton for Kafka metrics
3840
self._poll_task: asyncio.Task | None = None
41+
# Topic prefix (for tests/local isolation); cached on init
42+
self._topic_prefix = get_settings().KAFKA_TOPIC_PREFIX
3943

4044
@property
4145
def is_running(self) -> bool:
@@ -190,7 +194,7 @@ async def produce(
190194
# Serialize value
191195
serialized_value = self._schema_registry.serialize_event(event_to_produce)
192196

193-
topic = str(event_to_produce.topic)
197+
topic = f"{self._topic_prefix}{str(event_to_produce.topic)}"
194198
self._producer.produce(
195199
topic=topic,
196200
value=serialized_value,
@@ -257,7 +261,7 @@ async def send_to_dlq(
257261

258262
# Send to DLQ topic
259263
self._producer.produce(
260-
topic=str(KafkaTopic.DEAD_LETTER_QUEUE),
264+
topic=f"{self._topic_prefix}{str(KafkaTopic.DEAD_LETTER_QUEUE)}",
261265
value=serialized_value,
262266
key=original_event.event_id.encode() if original_event.event_id else None,
263267
headers=[
@@ -269,7 +273,9 @@ async def send_to_dlq(
269273
)
270274

271275
# Record metrics
272-
self._event_metrics.record_kafka_message_produced(str(KafkaTopic.DEAD_LETTER_QUEUE))
276+
self._event_metrics.record_kafka_message_produced(
277+
f"{self._topic_prefix}{str(KafkaTopic.DEAD_LETTER_QUEUE)}"
278+
)
273279
self._metrics.messages_sent += 1
274280

275281
logger.warning(

backend/app/events/event_store_consumer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from opentelemetry.trace import SpanKind
44

5+
from app.core.lifecycle import LifecycleEnabled
56
from app.core.logging import logger
67
from app.core.tracing.utils import trace_span
78
from app.domain.enums.events import EventType
@@ -13,7 +14,7 @@
1314
from app.settings import get_settings
1415

1516

16-
class EventStoreConsumer:
17+
class EventStoreConsumer(LifecycleEnabled):
1718
"""Consumes events from Kafka and stores them in MongoDB."""
1819

1920
def __init__(
@@ -49,7 +50,7 @@ async def start(self) -> None:
4950
settings = get_settings()
5051
config = ConsumerConfig(
5152
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
52-
group_id=self.group_id,
53+
group_id=f"{self.group_id}.{settings.KAFKA_GROUP_SUFFIX}",
5354
enable_auto_commit=False,
5455
max_poll_records=self.batch_size
5556
)

0 commit comments

Comments
 (0)