Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 26 additions & 23 deletions .github/workflows/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,47 @@
import pytest
import os
from typing import Any, Generator

import psycopg2
import psycopg2.extensions
import pytest


@pytest.fixture(scope='session')
def raster_endpoint():
return os.getenv('RASTER_ENDPOINT', "http://127.0.0.1/raster")
@pytest.fixture(scope="session")
def raster_endpoint() -> str:
return os.getenv("RASTER_ENDPOINT", "http://127.0.0.1/raster")


@pytest.fixture(scope='session')
def vector_endpoint():
return os.getenv('VECTOR_ENDPOINT', "http://127.0.0.1/vector")
@pytest.fixture(scope="session")
def vector_endpoint() -> str:
return os.getenv("VECTOR_ENDPOINT", "http://127.0.0.1/vector")


@pytest.fixture(scope='session')
def stac_endpoint():
return os.getenv('STAC_ENDPOINT', "http://127.0.0.1/stac")
@pytest.fixture(scope="session")
def stac_endpoint() -> str:
return os.getenv("STAC_ENDPOINT", "http://127.0.0.1/stac")


@pytest.fixture(scope='session')
def db_connection():
@pytest.fixture(scope="session")
def db_connection() -> Generator[Any, None, None]:
"""Create database connection for testing."""
# Require all database connection parameters to be explicitly set
required_vars = ['PGHOST', 'PGPORT', 'PGDATABASE', 'PGUSER', 'PGPASSWORD']
required_vars = ["PGHOST", "PGPORT", "PGDATABASE", "PGUSER", "PGPASSWORD"]
missing_vars = [var for var in required_vars if not os.getenv(var)]

if missing_vars:
pytest.fail(f"Required environment variables not set: {', '.join(missing_vars)}")

connection_params = {
'host': os.getenv('PGHOST'),
'port': int(os.getenv('PGPORT')),
'database': os.getenv('PGDATABASE'),
'user': os.getenv('PGUSER'),
'password': os.getenv('PGPASSWORD')
}
pytest.fail(
f"Required environment variables not set: {', '.join(missing_vars)}"
)

# All required vars are guaranteed to exist due to check above
try:
conn = psycopg2.connect(**connection_params)
conn = psycopg2.connect(
host=os.environ["PGHOST"],
port=int(os.environ["PGPORT"]),
database=os.environ["PGDATABASE"],
user=os.environ["PGUSER"],
password=os.environ["PGPASSWORD"],
)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
yield conn
conn.close()
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests/test_autoscaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def make_request(url: str, timeout: int = 10) -> bool:
"""Make a single HTTP request and return success status."""
try:
response = requests.get(url, timeout=timeout)
return response.status_code == 200
return bool(response.status_code == 200)
except requests.RequestException:
return False

Expand Down
17 changes: 9 additions & 8 deletions .github/workflows/tests/test_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import json
import subprocess
import time
from typing import Any

import pytest


def test_eoapi_notifier_deployment():
def test_eoapi_notifier_deployment() -> None:
"""Test that eoapi-notifier deployment is running."""
# Check if eoapi-notifier deployment exists and is ready
result = subprocess.run(
Expand Down Expand Up @@ -38,7 +39,7 @@ def test_eoapi_notifier_deployment():
)


def test_cloudevents_sink_exists():
def test_cloudevents_sink_exists() -> None:
"""Test that Knative CloudEvents sink service exists and is accessible."""
# Check if Knative service exists
result = subprocess.run(
Expand All @@ -64,7 +65,7 @@ def test_cloudevents_sink_exists():
)


def test_notification_configuration():
def test_notification_configuration() -> None:
"""Test that eoapi-notifier is configured correctly."""
# Get the configmap for eoapi-notifier
result = subprocess.run(
Expand Down Expand Up @@ -94,7 +95,7 @@ def test_notification_configuration():
)


def test_cloudevents_sink_logs_show_startup():
def test_cloudevents_sink_logs_show_startup() -> None:
"""Test that Knative CloudEvents sink started successfully."""
# Get Knative CloudEvents sink pod logs
result = subprocess.run(
Expand Down Expand Up @@ -123,7 +124,7 @@ def test_cloudevents_sink_logs_show_startup():
)


def test_eoapi_notifier_logs_show_connection():
def test_eoapi_notifier_logs_show_connection() -> None:
"""Test that eoapi-notifier connects to database successfully."""
# Give some time for the notifier to start
time.sleep(5)
Expand All @@ -150,7 +151,7 @@ def test_eoapi_notifier_logs_show_connection():
assert "Authentication failed" not in logs, "Should not have auth errors"


def test_database_notification_triggers_exist(db_connection):
def test_database_notification_triggers_exist(db_connection: Any) -> None:
"""Test that pgstac notification triggers are installed."""
with db_connection.cursor() as cur:
# Check if the notification function exists
Expand Down Expand Up @@ -180,7 +181,7 @@ def test_database_notification_triggers_exist(db_connection):
)


def test_end_to_end_notification_flow(db_connection):
def test_end_to_end_notification_flow(db_connection: Any) -> None:
"""Test complete flow: database → eoapi-notifier → Knative CloudEvents sink."""

# Skip if notifications not enabled
Expand Down Expand Up @@ -269,7 +270,7 @@ def test_end_to_end_notification_flow(db_connection):
cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,))


def test_k_sink_injection():
def test_k_sink_injection() -> None:
"""Test that SinkBinding injects K_SINK into eoapi-notifier deployment."""
# Check if eoapi-notifier deployment exists
result = subprocess.run(
Expand Down
Loading