Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ async def _process_messages(self) -> None:
if not await self._validate_message(msg):
continue

start_time = asyncio.get_event_loop().time()
start_time = asyncio.get_running_loop().time()
dlq_message = self._kafka_msg_to_message(msg)

await self._record_message_metrics(dlq_message)
Expand Down Expand Up @@ -249,7 +249,7 @@ async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMess
async def _commit_and_record_duration(self, start_time: float) -> None:
"""Commit offset and record processing duration."""
await asyncio.to_thread(self.consumer.commit, asynchronous=False)
duration = asyncio.get_event_loop().time() - start_time
duration = asyncio.get_running_loop().time() - start_time
self.metrics.record_dlq_processing_duration(duration, "process")

async def _process_dlq_message(self, message: DLQMessage) -> None:
Expand Down
2 changes: 1 addition & 1 deletion backend/app/events/admin_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def create_topic(self, topic: str, num_partitions: int = 1, replication_fa
futures = self._admin.create_topics([new_topic], operation_timeout=30.0)

# Wait for result - result() returns None on success, raises exception on failure
await asyncio.get_event_loop().run_in_executor(None, lambda: futures[topic].result(timeout=30.0))
await asyncio.get_running_loop().run_in_executor(None, lambda: futures[topic].result(timeout=30.0))
self.logger.info(f"Topic {topic} created successfully")
return True
except Exception as e:
Expand Down
36 changes: 18 additions & 18 deletions backend/app/events/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def initialize(self) -> None:
self.logger.info("Event store initialized with Beanie")

async def store_event(self, event: BaseEvent) -> bool:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
try:
now = datetime.now(timezone.utc)
data = event.model_dump(exclude={"topic"})
Expand All @@ -71,7 +71,7 @@ async def store_event(self, event: BaseEvent) -> bool:
}
)

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_store_duration(duration, "store_single", "event_store")
self.metrics.record_event_stored(event.event_type, "event_store")
return True
Expand All @@ -84,7 +84,7 @@ async def store_event(self, event: BaseEvent) -> bool:
return False

async def store_batch(self, events: list[BaseEvent]) -> dict[str, int]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
results = {"total": len(events), "stored": 0, "duplicates": 0, "failed": 0}
if not events:
return results
Expand Down Expand Up @@ -112,7 +112,7 @@ async def store_batch(self, events: list[BaseEvent]) -> dict[str, int]:
else:
raise

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_store_duration(duration, "store_batch", "event_store")
add_span_attributes(**{"events.batch.count": len(events)})
if results["stored"] > 0:
Expand All @@ -125,14 +125,14 @@ async def store_batch(self, events: list[BaseEvent]) -> dict[str, int]:
return results

async def get_event(self, event_id: str) -> BaseEvent | None:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
doc = await EventDocument.find_one({"event_id": event_id})
if not doc:
return None

event = self.schema_registry.deserialize_json(_flatten_doc(doc))

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_by_id", "event_store")
return event

Expand All @@ -144,7 +144,7 @@ async def get_events_by_type(
limit: int = 100,
offset: int = 0,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"event_type": event_type}
if tr := self._time_range(start_time, end_time):
query["timestamp"] = tr
Expand All @@ -158,7 +158,7 @@ async def get_events_by_type(
)
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_by_type", "event_store")
return events

Expand All @@ -167,15 +167,15 @@ async def get_execution_events(
execution_id: str,
event_types: list[EventType] | None = None,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"$or": [{"payload.execution_id": execution_id}, {"aggregate_id": execution_id}]}
if event_types:
query["event_type"] = {"$in": event_types}

docs = await EventDocument.find(query).sort([("timestamp", SortDirection.ASCENDING)]).to_list()
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_execution_events", "event_store")
return events

Expand All @@ -187,7 +187,7 @@ async def get_user_events(
end_time: datetime | None = None,
limit: int = 100,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"metadata.user_id": str(user_id)}
if event_types:
query["event_type"] = {"$in": event_types}
Expand All @@ -197,7 +197,7 @@ async def get_user_events(
docs = await EventDocument.find(query).sort([("timestamp", SortDirection.DESCENDING)]).limit(limit).to_list()
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_user_events", "event_store")
return events

Expand All @@ -208,7 +208,7 @@ async def get_security_events(
user_id: str | None = None,
limit: int = 100,
) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
query: dict[str, Any] = {"event_type": {"$in": self._SECURITY_TYPES}}
if user_id:
query["metadata.user_id"] = str(user_id)
Expand All @@ -218,20 +218,20 @@ async def get_security_events(
docs = await EventDocument.find(query).sort([("timestamp", SortDirection.DESCENDING)]).limit(limit).to_list()
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_security_events", "event_store")
return events

async def get_correlation_chain(self, correlation_id: str) -> list[BaseEvent]:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
docs = await (
EventDocument.find({"metadata.correlation_id": str(correlation_id)})
.sort([("timestamp", SortDirection.ASCENDING)])
.to_list()
)
events = [self.schema_registry.deserialize_json(_flatten_doc(doc)) for doc in docs]

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_correlation_chain", "event_store")
return events

Expand All @@ -242,7 +242,7 @@ async def replay_events(
event_types: list[EventType] | None = None,
callback: Callable[[BaseEvent], Awaitable[None]] | None = None,
) -> int:
start = asyncio.get_event_loop().time()
start = asyncio.get_running_loop().time()
count = 0

try:
Expand All @@ -258,7 +258,7 @@ async def replay_events(
await callback(event)
count += 1

duration = asyncio.get_event_loop().time() - start
duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "replay_events", "event_store")
self.logger.info(f"Replayed {count} events from {start_time} to {end_time}")
return count
Expand Down
7 changes: 4 additions & 3 deletions backend/app/events/event_store_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
self.producer = producer # For DLQ handling
self._batch_buffer: list[BaseEvent] = []
self._batch_lock = asyncio.Lock()
self._last_batch_time = asyncio.get_event_loop().time()
self._last_batch_time: float = 0.0
self._batch_task: asyncio.Task[None] | None = None
self._running = False

Expand All @@ -49,6 +49,7 @@ async def start(self) -> None:
if self._running:
return

self._last_batch_time = asyncio.get_running_loop().time()
settings = get_settings()
config = ConsumerConfig(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
Expand Down Expand Up @@ -126,7 +127,7 @@ async def _batch_processor(self) -> None:
await asyncio.sleep(1)

async with self._batch_lock:
time_since_last_batch = asyncio.get_event_loop().time() - self._last_batch_time
time_since_last_batch = asyncio.get_running_loop().time() - self._last_batch_time

if self._batch_buffer and time_since_last_batch >= self.batch_timeout:
await self._flush_batch()
Expand All @@ -140,7 +141,7 @@ async def _flush_batch(self) -> None:

batch = self._batch_buffer.copy()
self._batch_buffer.clear()
self._last_batch_time = asyncio.get_event_loop().time()
self._last_batch_time = asyncio.get_running_loop().time()

self.logger.info(f"Event store flushing batch of {len(batch)} events")
with trace_span(
Expand Down
2 changes: 1 addition & 1 deletion backend/app/services/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def _initialize_kafka(self) -> None:
self.consumer.subscribe([self._topic])

# Store the executor function for sync operations
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
self._executor = loop.run_in_executor

async def stop(self) -> None:
Expand Down
6 changes: 3 additions & 3 deletions backend/app/services/event_replay/replay_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def start_replay(self, session_id: str) -> None:
self.logger.info("Started replay session", extra={"session_id": session_id})

async def _run_replay(self, session: ReplaySessionState) -> None:
start_time = asyncio.get_event_loop().time()
start_time = asyncio.get_running_loop().time()

try:
with trace_span(
Expand Down Expand Up @@ -119,7 +119,7 @@ async def _complete_session(self, session: ReplaySessionState, start_time: float
session.status = ReplayStatus.COMPLETED
session.completed_at = datetime.now(timezone.utc)

duration = asyncio.get_event_loop().time() - start_time
duration = asyncio.get_running_loop().time() - start_time
self._metrics.record_replay_duration(duration, session.config.replay_type)

await self._update_session_in_db(session)
Expand Down Expand Up @@ -281,7 +281,7 @@ async def _write_event_to_file(self, event: BaseEvent, file_path: str) -> None:
self._file_locks[file_path] = asyncio.Lock()

async with self._file_locks[file_path]:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._write_to_file_sync, event, file_path)

def _write_to_file_sync(self, event: BaseEvent, file_path: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ async def _deliver_notification(self, notification: DomainNotification) -> None:
assert subscription is not None

# Send through channel
start_time = asyncio.get_event_loop().time()
start_time = asyncio.get_running_loop().time()
try:
handler = self._channel_handlers.get(notification.channel)
if handler is None:
Expand All @@ -869,7 +869,7 @@ async def _deliver_notification(self, notification: DomainNotification) -> None:

self.logger.debug(f"Using handler {handler.__name__} for channel {notification.channel}")
await handler(notification, subscription)
delivery_time = asyncio.get_event_loop().time() - start_time
delivery_time = asyncio.get_running_loop().time() - start_time

# Mark delivered
await self.repository.update_notification(
Expand Down
10 changes: 5 additions & 5 deletions backend/app/services/result_processor/resource_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def _delete_pod(self, pod_name: str, namespace: str) -> None:
raise InvalidStateError("Kubernetes client not initialized")

try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.v1.read_namespaced_pod, pod_name, namespace)

await loop.run_in_executor(
Expand Down Expand Up @@ -134,7 +134,7 @@ async def _delete_labeled_resources(
) -> None:
"""Generic function to delete labeled resources"""
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
label_selector = f"execution-id={execution_id}"

resources = await loop.run_in_executor(None, partial(list_func, namespace, label_selector=label_selector))
Expand Down Expand Up @@ -179,7 +179,7 @@ async def _cleanup_orphaned_pods(
if not self.v1:
raise InvalidStateError("Kubernetes client not initialized")

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
pods = await loop.run_in_executor(
None, partial(self.v1.list_namespaced_pod, namespace, label_selector="app=integr8s")
)
Expand All @@ -206,7 +206,7 @@ async def _cleanup_orphaned_configmaps(
if not self.v1:
raise InvalidStateError("Kubernetes client not initialized")

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
configmaps = await loop.run_in_executor(
None, partial(self.v1.list_namespaced_config_map, namespace, label_selector="app=integr8s")
)
Expand All @@ -227,7 +227,7 @@ async def get_resource_usage(self, namespace: str = "default") -> CountDict:
"""Get current resource usage counts"""
await self.initialize()

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
label_selector = "app=integr8s"

default_counts = {"pods": 0, "configmaps": 0, "network_policies": 0}
Expand Down
34 changes: 7 additions & 27 deletions backend/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

# IMPORTANT: avoid importing app.main at module import time because it
# constructs the FastAPI app immediately (reading settings from .env).
# We import lazily inside the fixture after test env vars are set.y
# We import lazily inside the fixture after test env vars are set.
# DO NOT import any app.* modules at import time here, as it would
# construct global singletons (logger, settings) before we set test env.

Expand Down Expand Up @@ -66,25 +66,6 @@ def _compute_worker_id() -> str:

@pytest.fixture(scope="session", autouse=True)
def _test_env() -> None:
# Core toggles
os.environ.setdefault("TESTING", "true")
os.environ.setdefault("ENABLE_TRACING", "false")
os.environ.setdefault("OTEL_SDK_DISABLED", "true")
os.environ.setdefault("OTEL_METRICS_EXPORTER", "none")
os.environ.setdefault("OTEL_TRACES_EXPORTER", "none")

# External services - force localhost when running tests on host
os.environ["MONGODB_URL"] = os.environ.get(
"MONGODB_URL",
"mongodb://root:rootpassword@localhost:27017/?authSource=admin",
)
os.environ.setdefault("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
os.environ.setdefault("REDIS_HOST", "localhost")
os.environ.setdefault("REDIS_PORT", "6379")
os.environ.setdefault("SCHEMA_REGISTRY_URL", "http://localhost:8081")
os.environ.setdefault("RATE_LIMIT_ENABLED", "false")
os.environ.setdefault("SECRET_KEY", "test-secret-key-for-testing-only-32chars!!")

# Isolation identifiers
session_id = os.environ.get("PYTEST_SESSION_ID") or uuid.uuid4().hex[:8]
worker_id = _compute_worker_id()
Expand Down Expand Up @@ -210,10 +191,9 @@ async def _http_login(client: httpx.AsyncClient, username: str, password: str) -
return resp.json().get("csrf_token", "")


# Session-scoped shared users for convenience
@pytest.fixture(scope="session")
@pytest.fixture
def test_user_credentials():
uid = os.environ.get("PYTEST_SESSION_ID", uuid.uuid4().hex[:8])
uid = uuid.uuid4().hex[:8]
return {
"username": f"test_user_{uid}",
"email": f"test_user_{uid}@example.com",
Expand All @@ -222,9 +202,9 @@ def test_user_credentials():
}


@pytest.fixture(scope="session")
@pytest.fixture
def test_admin_credentials():
uid = os.environ.get("PYTEST_SESSION_ID", uuid.uuid4().hex[:8])
uid = uuid.uuid4().hex[:8]
return {
"username": f"admin_user_{uid}",
"email": f"admin_user_{uid}@example.com",
Expand All @@ -239,7 +219,7 @@ async def test_user(client: httpx.AsyncClient, test_user_credentials):
creds = test_user_credentials
r = await client.post("/api/v1/auth/register", json=creds)
if r.status_code not in (200, 201, 400):
pytest.skip(f"Cannot create test user (status {r.status_code}).")
pytest.fail(f"Cannot create test user (status {r.status_code}): {r.text}")
csrf = await _http_login(client, creds["username"], creds["password"])
return {**creds, "csrf_token": csrf, "headers": {"X-CSRF-Token": csrf}}

Expand All @@ -250,7 +230,7 @@ async def test_admin(client: httpx.AsyncClient, test_admin_credentials):
creds = test_admin_credentials
r = await client.post("/api/v1/auth/register", json=creds)
if r.status_code not in (200, 201, 400):
pytest.skip(f"Cannot create test admin (status {r.status_code}).")
pytest.fail(f"Cannot create test admin (status {r.status_code}): {r.text}")
csrf = await _http_login(client, creds["username"], creds["password"])
return {**creds, "csrf_token": csrf, "headers": {"X-CSRF-Token": csrf}}

Expand Down
Loading