Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
728743c
fixes
HardMax71 Jan 8, 2026
e1fcdd9
fixes
HardMax71 Jan 8, 2026
9c2b6e8
fixes
HardMax71 Jan 8, 2026
884fb04
fixes
HardMax71 Jan 8, 2026
118bd6d
fixes
HardMax71 Jan 9, 2026
277cabf
fixes
HardMax71 Jan 9, 2026
6027ac4
fixes
HardMax71 Jan 9, 2026
7cc31fe
fixes (removed duplicate logins)
HardMax71 Jan 9, 2026
4abcb71
flaky test fix
HardMax71 Jan 9, 2026
aa3c8ac
xdist-group
HardMax71 Jan 9, 2026
5489e39
pyproject fix
HardMax71 Jan 9, 2026
93a79e6
durations=0 for tests: checking which ones take most time
HardMax71 Jan 9, 2026
bc944e1
optimizations of sse routes tests
HardMax71 Jan 9, 2026
48aa71f
optimizations of sse routes tests
HardMax71 Jan 9, 2026
824d686
security service added to DI, simplified settings passing
HardMax71 Jan 9, 2026
e88e606
more DI for settings obj, also removed random bootstrap_servers being…
HardMax71 Jan 9, 2026
72a7733
mypy
HardMax71 Jan 9, 2026
2cb4d4d
removed env block from toml file, deps updated accordingly
HardMax71 Jan 9, 2026
9a22a4c
Otel transient error fix
HardMax71 Jan 9, 2026
b3cdef8
Otel transient error fix + DI for metrics
HardMax71 Jan 9, 2026
4d78cc1
other fixes
HardMax71 Jan 9, 2026
57fd0b1
other fixes
HardMax71 Jan 9, 2026
8120957
mypy fixes part 1
HardMax71 Jan 9, 2026
a3907bb
mypy fixes part 2
HardMax71 Jan 9, 2026
47d1215
mypy fixes part 3
HardMax71 Jan 9, 2026
8cde784
mypy fixes part 5; also added csrf middleware
HardMax71 Jan 9, 2026
5796a27
less fixtures, passing directly xx_user fixture as logged in user of …
HardMax71 Jan 9, 2026
726e2f9
settings fixes (lifespan now reads from DI)
HardMax71 Jan 9, 2026
f1109d5
new sse event (subscribed), updated tests, dlq retry msgs moved to ma…
HardMax71 Jan 10, 2026
4452603
fixes
HardMax71 Jan 10, 2026
a202287
fixes
HardMax71 Jan 10, 2026
c6837fc
fixes
HardMax71 Jan 10, 2026
e67ef8a
fixes
HardMax71 Jan 10, 2026
1016bf7
single source of truth regarding loading Settings - DI (not get_setti…
HardMax71 Jan 10, 2026
d375e5e
fixes + parallel execution
HardMax71 Jan 10, 2026
2f1a020
non-flaky tests
HardMax71 Jan 10, 2026
225c737
fixes
HardMax71 Jan 10, 2026
a5a1f99
fixes
HardMax71 Jan 10, 2026
9ff0f12
fixes
HardMax71 Jan 10, 2026
2f0ee35
fixes
HardMax71 Jan 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions backend/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ PROJECT_NAME=integr8scode
DATABASE_NAME=integr8scode_test
API_V1_STR=/api/v1
SECRET_KEY=test-secret-key-for-testing-only-32chars!!
ENVIRONMENT=testing
TESTING=true

# MongoDB - use localhost for tests
Expand All @@ -27,17 +26,13 @@ SCHEMA_REGISTRY_URL=http://localhost:8081

# Security
SECURE_COOKIES=true
CORS_ALLOWED_ORIGINS=["http://localhost:3000","https://localhost:3000"]

# Features
RATE_LIMIT_ENABLED=true
ENABLE_TRACING=false

# OpenTelemetry - explicitly disabled for tests (no endpoint = NoOp meter)
# OpenTelemetry - explicitly disabled for tests
OTEL_EXPORTER_OTLP_ENDPOINT=
OTEL_METRICS_EXPORTER=none
OTEL_TRACES_EXPORTER=none
OTEL_LOGS_EXPORTER=none

# Development
DEVELOPMENT_MODE=false
Expand Down
38 changes: 22 additions & 16 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,31 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Metrics setup moved to app creation to allow middleware registration
logger.info("Lifespan start: tracing and services initialization")

# Initialize tracing
instrumentation_report = init_tracing(
service_name=settings.TRACING_SERVICE_NAME,
logger=logger,
service_version=settings.TRACING_SERVICE_VERSION,
sampling_rate=settings.TRACING_SAMPLING_RATE,
enable_console_exporter=settings.TESTING,
adaptive_sampling=settings.TRACING_ADAPTIVE_SAMPLING,
)

if instrumentation_report.has_failures():
logger.warning(
"Some instrumentation libraries failed to initialize",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
# Initialize tracing only when enabled (avoid exporter retries in tests)
if settings.ENABLE_TRACING and not settings.TESTING:
instrumentation_report = init_tracing(
service_name=settings.TRACING_SERVICE_NAME,
logger=logger,
service_version=settings.TRACING_SERVICE_VERSION,
sampling_rate=settings.TRACING_SAMPLING_RATE,
enable_console_exporter=settings.TESTING,
adaptive_sampling=settings.TRACING_ADAPTIVE_SAMPLING,
)

if instrumentation_report.has_failures():
logger.warning(
"Some instrumentation libraries failed to initialize",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing initialized successfully",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
)
else:
logger.info(
"Distributed tracing initialized successfully",
extra={"instrumentation_summary": instrumentation_report.get_summary()},
"Distributed tracing disabled",
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
)

# Initialize schema registry once at startup
Expand Down
2 changes: 1 addition & 1 deletion backend/app/core/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _create_meter(self, config: MetricsConfig, meter_name: str) -> Meter:
"""
# If tracing/metrics disabled or no OTLP endpoint configured, use NoOp meter to avoid threads/network
settings = get_settings()
if not settings.ENABLE_TRACING or not config.otlp_endpoint:
if settings.TESTING or not settings.ENABLE_TRACING:
return NoOpMeterProvider().get_meter(meter_name)

resource = Resource.create(
Expand Down
9 changes: 3 additions & 6 deletions backend/app/core/middlewares/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ def setup_metrics(app: FastAPI, logger: logging.Logger) -> None:
"""Set up OpenTelemetry metrics with OTLP exporter."""
settings = get_settings()
# Fast opt-out for tests or when explicitly disabled
if settings.TESTING or os.getenv("OTEL_SDK_DISABLED", "").lower() in {"1", "true", "yes"}:
logger.info("OpenTelemetry metrics disabled (TESTING/OTEL_SDK_DISABLED)")
if settings.TESTING:
logger.info("OpenTelemetry metrics disabled (TESTING)")
return

# Configure OpenTelemetry resource
Expand All @@ -137,10 +137,7 @@ def setup_metrics(app: FastAPI, logger: logging.Logger) -> None:

# Configure OTLP exporter (sends to OpenTelemetry Collector or compatible backend)
# Default endpoint is localhost:4317 for gRPC
otlp_exporter = OTLPMetricExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"),
insecure=True, # Use insecure for local development
)
otlp_exporter = OTLPMetricExporter(endpoint=settings.OTEL_EXPORTER_OTLP_ENDPOINT, insecure=True)

# Create metric reader with 60 second export interval
metric_reader = PeriodicExportingMetricReader(
Expand Down
2 changes: 1 addition & 1 deletion backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async def get_kafka_producer(
self, settings: Settings, schema_registry: SchemaRegistryManager, logger: logging.Logger
) -> AsyncIterator[UnifiedProducer]:
config = ProducerConfig(bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS)
async with UnifiedProducer(config, schema_registry, logger) as producer:
async with UnifiedProducer(config, schema_registry, logger, settings=settings) as producer:
yield producer

@provide
Expand Down
5 changes: 3 additions & 2 deletions backend/app/events/core/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from app.domain.enums.kafka import KafkaTopic
from app.events.schema.schema_registry import SchemaRegistryManager
from app.infrastructure.kafka.events import BaseEvent
from app.settings import get_settings
from app.settings import Settings, get_settings

from .types import ProducerConfig, ProducerMetrics, ProducerState

Expand All @@ -34,6 +34,7 @@ def __init__(
schema_registry_manager: SchemaRegistryManager,
logger: logging.Logger,
stats_callback: StatsCallback | None = None,
settings: Settings | None = None,
):
super().__init__()
self._config = config
Expand All @@ -46,7 +47,7 @@ def __init__(
self._event_metrics = get_event_metrics() # Singleton for Kafka metrics
self._poll_task: asyncio.Task[None] | None = None
# Topic prefix (for tests/local isolation); cached on init
self._topic_prefix = get_settings().KAFKA_TOPIC_PREFIX
self._topic_prefix = (settings or get_settings()).KAFKA_TOPIC_PREFIX

@property
def is_running(self) -> bool:
Expand Down
3 changes: 1 addition & 2 deletions backend/app/events/schema/schema_registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import logging
import os
import struct
from functools import lru_cache
from typing import Any, Dict, Type, TypeVar
Expand Down Expand Up @@ -60,7 +59,7 @@ def __init__(self, settings: Settings, logger: logging.Logger, schema_registry_u
self.namespace = "com.integr8scode.events"
# Optional per-session/worker subject prefix for tests/local isolation
# e.g., "test.<session>.<worker>." -> subjects become "test.x.y.ExecutionRequestedEvent-value"
self.subject_prefix = os.getenv("SCHEMA_SUBJECT_PREFIX", "")
self.subject_prefix = settings.SCHEMA_SUBJECT_PREFIX

config = {"url": self.url}
if settings.SCHEMA_REGISTRY_AUTH:
Expand Down
11 changes: 3 additions & 8 deletions backend/app/services/sse/kafka_redis_bridge.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import logging
import os

from app.core.lifecycle import LifecycleEnabled
from app.core.metrics.events import EventMetrics
Expand Down Expand Up @@ -62,13 +61,9 @@ async def _on_stop(self) -> None:
self.logger.info("SSE Kafka→Redis bridge stopped")

async def _create_consumer(self, consumer_index: int) -> UnifiedConsumer:
suffix = os.environ.get("KAFKA_GROUP_SUFFIX", "")
group_id = "sse-bridge-pool"
if suffix:
group_id = f"{group_id}.{suffix}"
client_id = f"sse-bridge-{consumer_index}"
if suffix:
client_id = f"{client_id}-{suffix}"
suffix = self.settings.KAFKA_GROUP_SUFFIX
group_id = f"sse-bridge-pool-{suffix}"
client_id = f"sse-bridge-{consumer_index}-{suffix}"

config = ConsumerConfig(
bootstrap_servers=self.settings.KAFKA_BOOTSTRAP_SERVERS,
Expand Down
1 change: 1 addition & 0 deletions backend/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Settings(BaseSettings):
SCHEMA_BASE_PATH: str = "app/schemas_avro"
SCHEMA_AVRO_PATH: str = "app/schemas_avro"
SCHEMA_CONFIG_PATH: str | None = None
SCHEMA_SUBJECT_PREFIX: str = ""

# OpenTelemetry / Jaeger Configuration
ENABLE_TRACING: bool = True
Expand Down
2 changes: 1 addition & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ log_cli = false
log_cli_level = "ERROR"
log_level = "ERROR"
addopts = "-n 4 --dist loadfile --tb=short -q --no-header -q"
env = ["OTEL_SDK_DISABLED=true"]
env = ["TESTING=true"]

# Coverage configuration
[tool.coverage.run]
Expand Down
21 changes: 16 additions & 5 deletions backend/scripts/seed_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,33 @@

Environment Variables:
MONGODB_URL: Connection string (default: mongodb://mongo:27017/integr8scode)
DATABASE_NAME: Database name for the application (default: integr8scode_db)
DEFAULT_USER_PASSWORD: Default user password (default: user123)
ADMIN_USER_PASSWORD: Admin user password (default: admin123)
"""

import asyncio
import os
from datetime import datetime, timezone
from typing import Any

from bson import ObjectId
from passlib.context import CryptContext
from pydantic_settings import BaseSettings, SettingsConfigDict
from pymongo.asynchronous.database import AsyncDatabase
from pymongo.asynchronous.mongo_client import AsyncMongoClient

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class SeedSettings(BaseSettings):
model_config = SettingsConfigDict(case_sensitive=False)

mongodb_url: str = "mongodb://mongo:27017/integr8scode"
database_name: str = "integr8scode_db"
default_user_password: str = "user123"
admin_user_password: str = "admin123"


async def upsert_user(
db: AsyncDatabase[dict[str, Any]],
username: str,
Expand Down Expand Up @@ -69,8 +79,9 @@ async def upsert_user(


async def seed_users() -> None:
mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongo:27017/integr8scode")
db_name = os.getenv("DATABASE_NAME", "integr8scode_db")
settings = SeedSettings()
mongodb_url = settings.mongodb_url
db_name = settings.database_name

print(f"Connecting to MongoDB (database: {db_name})...")

Expand All @@ -82,7 +93,7 @@ async def seed_users() -> None:
db,
username="user",
email="user@integr8scode.com",
password=os.getenv("DEFAULT_USER_PASSWORD", "user123"),
password=settings.default_user_password,
role="user",
is_superuser=False,
)
Expand All @@ -92,7 +103,7 @@ async def seed_users() -> None:
db,
username="admin",
email="admin@integr8scode.com",
password=os.getenv("ADMIN_USER_PASSWORD", "admin123"),
password=settings.admin_user_password,
role="admin",
is_superuser=True,
)
Expand Down
83 changes: 23 additions & 60 deletions backend/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,77 +12,40 @@
from app.settings import Settings
from dishka import AsyncContainer
from httpx import ASGITransport
from pydantic_settings import SettingsConfigDict


class TestSettings(Settings):
"""Test configuration - loads from .env.test instead of .env"""

model_config = SettingsConfigDict(
env_file=".env.test",
env_file_encoding="utf-8",
case_sensitive=True,
extra="ignore",
)


# ===== Worker-specific isolation for pytest-xdist =====
def _compute_worker_id() -> str:
return os.environ.get("PYTEST_XDIST_WORKER", "gw0")


def _setup_worker_env() -> None:
"""Set worker-specific environment variables for pytest-xdist isolation.

Must be called BEFORE TestSettings is instantiated so env vars are picked up.
"""
session_id = os.environ.get("PYTEST_SESSION_ID") or uuid.uuid4().hex[:8]
worker_id = _compute_worker_id()
os.environ["PYTEST_SESSION_ID"] = session_id

# Unique database name per worker
os.environ["DATABASE_NAME"] = f"integr8scode_test_{session_id}_{worker_id}"

# Distribute Redis DBs across workers (0-15)
try:
worker_num = int(worker_id[2:]) if worker_id.startswith("gw") else 0
os.environ["REDIS_DB"] = str(worker_num % 16)
except Exception:
os.environ.setdefault("REDIS_DB", "0")

# Unique Kafka consumer group per worker
os.environ["KAFKA_GROUP_SUFFIX"] = f"{session_id}.{worker_id}"

# Unique Schema Registry prefix per worker
os.environ["SCHEMA_SUBJECT_PREFIX"] = f"test.{session_id}.{worker_id}."

# Disable OpenTelemetry exporters to prevent "otel-collector:4317" retry noise
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = ""
os.environ["OTEL_METRICS_EXPORTER"] = "none"
os.environ["OTEL_TRACES_EXPORTER"] = "none"
os.environ["OTEL_LOGS_EXPORTER"] = "none"


# Set up worker env at module load time (before any Settings instantiation)
_setup_worker_env()
_WORKER_ID = os.environ.get("PYTEST_XDIST_WORKER", "gw0")


# ===== Settings fixture =====
@pytest.fixture(scope="session")
def test_settings() -> Settings:
"""Provide TestSettings for tests that need to create their own components."""
return TestSettings()
"""Provide test settings with a unique Kafka topic prefix for isolation."""
base = Settings(_env_file=".env.test", _env_file_encoding="utf-8")
session_id = uuid.uuid4().hex[:8]
base_prefix = f"{base.KAFKA_TOPIC_PREFIX.rstrip('.')}."
worker_num = sum(_WORKER_ID.encode()) % 16
unique_prefix = f"{base_prefix}{session_id}.{_WORKER_ID}."
return base.model_copy(
update={
"DATABASE_NAME": f"integr8scode_test_{session_id}_{_WORKER_ID}",
"REDIS_DB": worker_num % 16,
"KAFKA_GROUP_SUFFIX": f"{session_id}.{_WORKER_ID}",
"SCHEMA_SUBJECT_PREFIX": f"test.{session_id}.{_WORKER_ID}.",
"KAFKA_TOPIC_PREFIX": unique_prefix,
}
)


# ===== App fixture =====
@pytest_asyncio.fixture(scope="session")
async def app():
"""Create FastAPI app with TestSettings.
async def app(test_settings: Settings):
"""Create FastAPI app with test settings.

Session-scoped to avoid Pydantic schema validator memory issues when
FastAPI recreates OpenAPI schemas hundreds of times with pytest-xdist.
"""
application = create_app(settings=TestSettings())
application = create_app(settings=test_settings)

yield application

Expand All @@ -101,10 +64,10 @@ async def app_container(app):
async def client(app) -> AsyncGenerator[httpx.AsyncClient, None]:
"""HTTP client for testing API endpoints."""
async with httpx.AsyncClient(
transport=ASGITransport(app=app),
base_url="https://test",
timeout=30.0,
follow_redirects=True,
transport=ASGITransport(app=app),
base_url="https://test",
timeout=30.0,
follow_redirects=True,
) as c:
yield c

Expand Down
8 changes: 3 additions & 5 deletions backend/tests/integration/events/test_admin_utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import logging
import os

import pytest
from app.events.admin_utils import AdminUtils
from app.settings import Settings

_test_logger = logging.getLogger("test.events.admin_utils")


@pytest.mark.kafka
@pytest.mark.asyncio
async def test_admin_utils_real_topic_checks() -> None:
prefix = os.environ.get("KAFKA_TOPIC_PREFIX", "test.")
topic = f"{prefix}adminutils.{os.environ.get('PYTEST_SESSION_ID', 'sid')}"
async def test_admin_utils_real_topic_checks(test_settings: Settings) -> None:
topic = f"{test_settings.KAFKA_TOPIC_PREFIX}adminutils.{test_settings.KAFKA_GROUP_SUFFIX}"
au = AdminUtils(logger=_test_logger)

# Ensure topic exists (idempotent)
Expand Down
Loading
Loading