diff --git a/.github/workflows/tests/conftest.py b/.github/workflows/tests/conftest.py index 2f384328..e5cedd45 100644 --- a/.github/workflows/tests/conftest.py +++ b/.github/workflows/tests/conftest.py @@ -1,44 +1,47 @@ -import pytest import os +from typing import Any, Generator + import psycopg2 import psycopg2.extensions +import pytest -@pytest.fixture(scope='session') -def raster_endpoint(): - return os.getenv('RASTER_ENDPOINT', "http://127.0.0.1/raster") +@pytest.fixture(scope="session") +def raster_endpoint() -> str: + return os.getenv("RASTER_ENDPOINT", "http://127.0.0.1/raster") -@pytest.fixture(scope='session') -def vector_endpoint(): - return os.getenv('VECTOR_ENDPOINT', "http://127.0.0.1/vector") +@pytest.fixture(scope="session") +def vector_endpoint() -> str: + return os.getenv("VECTOR_ENDPOINT", "http://127.0.0.1/vector") -@pytest.fixture(scope='session') -def stac_endpoint(): - return os.getenv('STAC_ENDPOINT', "http://127.0.0.1/stac") +@pytest.fixture(scope="session") +def stac_endpoint() -> str: + return os.getenv("STAC_ENDPOINT", "http://127.0.0.1/stac") -@pytest.fixture(scope='session') -def db_connection(): +@pytest.fixture(scope="session") +def db_connection() -> Generator[Any, None, None]: """Create database connection for testing.""" # Require all database connection parameters to be explicitly set - required_vars = ['PGHOST', 'PGPORT', 'PGDATABASE', 'PGUSER', 'PGPASSWORD'] + required_vars = ["PGHOST", "PGPORT", "PGDATABASE", "PGUSER", "PGPASSWORD"] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: - pytest.fail(f"Required environment variables not set: {', '.join(missing_vars)}") - - connection_params = { - 'host': os.getenv('PGHOST'), - 'port': int(os.getenv('PGPORT')), - 'database': os.getenv('PGDATABASE'), - 'user': os.getenv('PGUSER'), - 'password': os.getenv('PGPASSWORD') - } + pytest.fail( + f"Required environment variables not set: {', '.join(missing_vars)}" + ) + # All required vars are guaranteed to exist due to check above try: - conn = psycopg2.connect(**connection_params) + conn = psycopg2.connect( + host=os.environ["PGHOST"], + port=int(os.environ["PGPORT"]), + database=os.environ["PGDATABASE"], + user=os.environ["PGUSER"], + password=os.environ["PGPASSWORD"], + ) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) yield conn conn.close() diff --git a/.github/workflows/tests/test_autoscaling.py b/.github/workflows/tests/test_autoscaling.py index 463bc14c..83cdedf2 100644 --- a/.github/workflows/tests/test_autoscaling.py +++ b/.github/workflows/tests/test_autoscaling.py @@ -140,7 +140,7 @@ def make_request(url: str, timeout: int = 10) -> bool: """Make a single HTTP request and return success status.""" try: response = requests.get(url, timeout=timeout) - return response.status_code == 200 + return bool(response.status_code == 200) except requests.RequestException: return False diff --git a/.github/workflows/tests/test_notifications.py b/.github/workflows/tests/test_notifications.py index 4c44a427..9223d26c 100644 --- a/.github/workflows/tests/test_notifications.py +++ b/.github/workflows/tests/test_notifications.py @@ -3,11 +3,12 @@ import json import subprocess import time +from typing import Any import pytest -def test_eoapi_notifier_deployment(): +def test_eoapi_notifier_deployment() -> None: """Test that eoapi-notifier deployment is running.""" # Check if eoapi-notifier deployment exists and is ready result = subprocess.run( @@ -38,7 +39,7 @@ def test_eoapi_notifier_deployment(): ) -def test_cloudevents_sink_exists(): +def test_cloudevents_sink_exists() -> None: """Test that Knative CloudEvents sink service exists and is accessible.""" # Check if Knative service exists result = subprocess.run( @@ -64,7 +65,7 @@ def test_cloudevents_sink_exists(): ) -def test_notification_configuration(): +def test_notification_configuration() -> None: """Test that eoapi-notifier is configured correctly.""" # Get the configmap for eoapi-notifier result = subprocess.run( @@ -94,7 +95,7 @@ def test_notification_configuration(): ) -def test_cloudevents_sink_logs_show_startup(): +def test_cloudevents_sink_logs_show_startup() -> None: """Test that Knative CloudEvents sink started successfully.""" # Get Knative CloudEvents sink pod logs result = subprocess.run( @@ -123,7 +124,7 @@ def test_cloudevents_sink_logs_show_startup(): ) -def test_eoapi_notifier_logs_show_connection(): +def test_eoapi_notifier_logs_show_connection() -> None: """Test that eoapi-notifier connects to database successfully.""" # Give some time for the notifier to start time.sleep(5) @@ -150,7 +151,7 @@ def test_eoapi_notifier_logs_show_connection(): assert "Authentication failed" not in logs, "Should not have auth errors" -def test_database_notification_triggers_exist(db_connection): +def test_database_notification_triggers_exist(db_connection: Any) -> None: """Test that pgstac notification triggers are installed.""" with db_connection.cursor() as cur: # Check if the notification function exists @@ -180,7 +181,7 @@ def test_database_notification_triggers_exist(db_connection): ) -def test_end_to_end_notification_flow(db_connection): +def test_end_to_end_notification_flow(db_connection: Any) -> None: """Test complete flow: database → eoapi-notifier → Knative CloudEvents sink.""" # Skip if notifications not enabled @@ -269,7 +270,7 @@ def test_end_to_end_notification_flow(db_connection): cursor.execute("SELECT pgstac.delete_item(%s);", (test_item_id,)) -def test_k_sink_injection(): +def test_k_sink_injection() -> None: """Test that SinkBinding injects K_SINK into eoapi-notifier deployment.""" # Check if eoapi-notifier deployment exists result = subprocess.run( diff --git a/.github/workflows/tests/test_pgstac_notifications.py b/.github/workflows/tests/test_pgstac_notifications.py index 2a1ab5f2..5d0b650c 100644 --- a/.github/workflows/tests/test_pgstac_notifications.py +++ b/.github/workflows/tests/test_pgstac_notifications.py @@ -1,41 +1,53 @@ """Test pgstac notification triggers.""" + import json import os -import psycopg2 -import psycopg2.extensions -import pytest -import time import subprocess -from datetime import datetime, timezone - +import time +from typing import Any, Generator +import pytest -@pytest.fixture(scope='session') -def notifications_enabled(): +@pytest.fixture(scope="session") +def notifications_enabled() -> bool: """Check if notifications are enabled in the deployment config by checking Helm values.""" try: # Get release name from environment or default - release_name = os.getenv('RELEASE_NAME', 'eoapi') - namespace = os.getenv('NAMESPACE', 'eoapi') + release_name = os.getenv("RELEASE_NAME", "eoapi") + namespace = os.getenv("NAMESPACE", "eoapi") # Check if notifications are enabled in Helm values - result = subprocess.run([ - 'helm', 'get', 'values', release_name, - '-n', namespace, - '-o', 'json' - ], capture_output=True, text=True, check=True) + result = subprocess.run( + [ + "helm", + "get", + "values", + release_name, + "-n", + namespace, + "-o", + "json", + ], + capture_output=True, + text=True, + check=True, + ) # Parse JSON and check notifications.sources.pgstac value values = json.loads(result.stdout) - return values.get('notifications', {}).get('sources', {}).get('pgstac', False) + return bool( + values.get("notifications", {}) + .get("sources", {}) + .get("pgstac", False) + ) except (subprocess.CalledProcessError, json.JSONDecodeError, Exception): # If we can't check the Helm values, assume notifications are disabled return False @pytest.fixture -def notification_listener(db_connection): +def notification_listener(db_connection: Any) -> Generator[Any, None, None]: """Set up notification listener for pgstac_items_change.""" cursor = db_connection.cursor() cursor.execute("LISTEN pgstac_items_change;") @@ -44,10 +56,14 @@ def notification_listener(db_connection): cursor.close() -def test_notification_triggers_exist(db_connection, notifications_enabled): +def test_notification_triggers_exist( + db_connection: Any, notifications_enabled: bool +) -> None: """Test that notification triggers and function are properly installed.""" if not notifications_enabled: - pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + pytest.skip( + "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" + ) cursor = db_connection.cursor() @@ -58,32 +74,43 @@ def test_notification_triggers_exist(db_connection, notifications_enabled): WHERE proname = 'notify_items_change_func' ); """) - assert cursor.fetchone()[0], "notify_items_change_func function should exist" + assert cursor.fetchone()[0], ( + "notify_items_change_func function should exist" + ) # Check that all three triggers exist trigger_names = [ - 'notify_items_change_insert', - 'notify_items_change_update', - 'notify_items_change_delete' + "notify_items_change_insert", + "notify_items_change_update", + "notify_items_change_delete", ] for trigger_name in trigger_names: - cursor.execute(""" + cursor.execute( + """ SELECT EXISTS( SELECT 1 FROM pg_trigger WHERE tgname = %s AND tgrelid = 'pgstac.items'::regclass ); - """, (trigger_name,)) - assert cursor.fetchone()[0], f"Trigger {trigger_name} should exist on pgstac.items" + """, + (trigger_name,), + ) + assert cursor.fetchone()[0], ( + f"Trigger {trigger_name} should exist on pgstac.items" + ) cursor.close() -def test_insert_notification(db_connection, notification_listener, notifications_enabled): +def test_insert_notification( + db_connection: Any, notification_listener: Any, notifications_enabled: bool +) -> None: """Test that INSERT operations trigger notifications.""" if not notifications_enabled: - pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + pytest.skip( + "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" + ) cursor = db_connection.cursor() @@ -97,16 +124,18 @@ def test_insert_notification(db_connection, notification_listener, notifications # Insert a test item using pgstac.create_item test_item_id = f"test-item-{int(time.time())}" - item_data = json.dumps({ - "id": test_item_id, - "type": "Feature", - "stac_version": "1.0.0", - "collection": test_collection_id, - "geometry": {"type": "Point", "coordinates": [0, 0]}, - "bbox": [0, 0, 0, 0], - "properties": {"datetime": "2020-01-01T00:00:00Z"}, - "assets": {} - }) + item_data = json.dumps( + { + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {}, + } + ) cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) @@ -140,10 +169,14 @@ def test_insert_notification(db_connection, notification_listener, notifications cursor.close() -def test_update_notification(db_connection, notification_listener, notifications_enabled): +def test_update_notification( + db_connection: Any, notification_listener: Any, notifications_enabled: bool +) -> None: """Test that UPDATE operations trigger notifications.""" if not notifications_enabled: - pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + pytest.skip( + "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" + ) cursor = db_connection.cursor() @@ -156,16 +189,18 @@ def test_update_notification(db_connection, notification_listener, notifications # Insert a test item first using pgstac.create_item test_item_id = f"test-item-update-{int(time.time())}" - item_data = json.dumps({ - "id": test_item_id, - "type": "Feature", - "stac_version": "1.0.0", - "collection": test_collection_id, - "geometry": {"type": "Point", "coordinates": [0, 0]}, - "bbox": [0, 0, 0, 0], - "properties": {"datetime": "2020-01-01T00:00:00Z"}, - "assets": {} - }) + item_data = json.dumps( + { + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {}, + } + ) cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) @@ -175,16 +210,18 @@ def test_update_notification(db_connection, notification_listener, notifications db_connection.notifies.pop(0) # Update the item using pgstac.update_item - updated_item_data = json.dumps({ - "id": test_item_id, - "type": "Feature", - "stac_version": "1.0.0", - "collection": test_collection_id, - "geometry": {"type": "Point", "coordinates": [0, 0]}, - "bbox": [0, 0, 0, 0], - "properties": {"datetime": "2020-01-01T00:00:00Z", "updated": True}, - "assets": {} - }) + updated_item_data = json.dumps( + { + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z", "updated": True}, + "assets": {}, + } + ) cursor.execute("SELECT pgstac.update_item(%s);", (updated_item_data,)) @@ -201,7 +238,13 @@ def test_update_notification(db_connection, notification_listener, notifications # Parse the notification payload - PgSTAC update uses DELETE+INSERT, so accept both payload = json.loads(notify.payload) - assert payload["operation"] in ["DELETE", "INSERT", "UPDATE"], f"Operation should be DELETE, INSERT, or UPDATE, got {payload['operation']}" + assert payload["operation"] in [ + "DELETE", + "INSERT", + "UPDATE", + ], ( + f"Operation should be DELETE, INSERT, or UPDATE, got {payload['operation']}" + ) assert "items" in payload assert len(payload["items"]) == 1 assert payload["items"][0]["id"] == test_item_id @@ -218,10 +261,14 @@ def test_update_notification(db_connection, notification_listener, notifications cursor.close() -def test_delete_notification(db_connection, notification_listener, notifications_enabled): +def test_delete_notification( + db_connection: Any, notification_listener: Any, notifications_enabled: bool +) -> None: """Test that DELETE operations trigger notifications.""" if not notifications_enabled: - pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + pytest.skip( + "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" + ) cursor = db_connection.cursor() @@ -234,16 +281,18 @@ def test_delete_notification(db_connection, notification_listener, notifications # Insert a test item first using pgstac.create_item test_item_id = f"test-item-delete-{int(time.time())}" - item_data = json.dumps({ - "id": test_item_id, - "type": "Feature", - "stac_version": "1.0.0", - "collection": test_collection_id, - "geometry": {"type": "Point", "coordinates": [0, 0]}, - "bbox": [0, 0, 0, 0], - "properties": {"datetime": "2020-01-01T00:00:00Z"}, - "assets": {} - }) + item_data = json.dumps( + { + "id": test_item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {}, + } + ) cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) @@ -282,10 +331,14 @@ def test_delete_notification(db_connection, notification_listener, notifications cursor.close() -def test_bulk_operations_notification(db_connection, notification_listener, notifications_enabled): +def test_bulk_operations_notification( + db_connection: Any, notification_listener: Any, notifications_enabled: bool +) -> None: """Test that bulk operations send notifications with multiple items.""" if not notifications_enabled: - pytest.skip("PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test") + pytest.skip( + "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" + ) cursor = db_connection.cursor() @@ -300,16 +353,18 @@ def test_bulk_operations_notification(db_connection, notification_listener, noti test_items = [f"bulk-item-{i}-{int(time.time())}" for i in range(3)] for item_id in test_items: - item_data = json.dumps({ - "id": item_id, - "type": "Feature", - "stac_version": "1.0.0", - "collection": test_collection_id, - "geometry": {"type": "Point", "coordinates": [0, 0]}, - "bbox": [0, 0, 0, 0], - "properties": {"datetime": "2020-01-01T00:00:00Z"}, - "assets": {} - }) + item_data = json.dumps( + { + "id": item_id, + "type": "Feature", + "stac_version": "1.0.0", + "collection": test_collection_id, + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "bbox": [0, 0, 0, 0], + "properties": {"datetime": "2020-01-01T00:00:00Z"}, + "assets": {}, + } + ) cursor.execute("SELECT pgstac.create_item(%s);", (item_data,)) @@ -318,7 +373,9 @@ def test_bulk_operations_notification(db_connection, notification_listener, noti start_time = time.time() notifications_received = 0 - while time.time() - start_time < timeout and notifications_received < len(test_items): + while time.time() - start_time < timeout and notifications_received < len( + test_items + ): db_connection.poll() while db_connection.notifies: notify = db_connection.notifies.pop(0) @@ -329,7 +386,9 @@ def test_bulk_operations_notification(db_connection, notification_listener, noti assert "items" in payload notifications_received += len(payload["items"]) - assert notifications_received >= len(test_items), f"Should have received notifications for all {len(test_items)} items" + assert notifications_received >= len(test_items), ( + f"Should have received notifications for all {len(test_items)} items" + ) # Cleanup for item_id in test_items: diff --git a/.github/workflows/tests/test_raster.py b/.github/workflows/tests/test_raster.py index 8daae4f5..b5308cb5 100644 --- a/.github/workflows/tests/test_raster.py +++ b/.github/workflows/tests/test_raster.py @@ -1,7 +1,9 @@ """test EOapi.""" -import httpx + import os +import httpx + # better timeouts timeout = httpx.Timeout(15.0, connect=60.0) if bool(os.getenv("IGNORE_SSL_VERIFICATION", False)): @@ -10,7 +12,7 @@ client = httpx.Client(timeout=timeout) -def test_raster_api(raster_endpoint): +def test_raster_api(raster_endpoint: str) -> None: """test api.""" resp = client.get( f"{raster_endpoint}/healthz", headers={"Accept-Encoding": "br, gzip"} @@ -19,9 +21,12 @@ def test_raster_api(raster_endpoint): assert resp.headers["content-type"] == "application/json" -def test_mosaic_api(raster_endpoint): +def test_mosaic_api(raster_endpoint: str) -> None: """test mosaic.""" - query = {"collections": ["noaa-emergency-response"], "filter-lang": "cql-json"} + query = { + "collections": ["noaa-emergency-response"], + "filter-lang": "cql-json", + } resp = client.post(f"{raster_endpoint}/searches/register", json=query) assert resp.headers["content-type"] == "application/json" assert resp.status_code == 200 @@ -30,7 +35,9 @@ def test_mosaic_api(raster_endpoint): searchid = resp.json()["id"] - resp = client.get(f"{raster_endpoint}/searches/{searchid}/point/-85.6358,36.1624/assets") + resp = client.get( + f"{raster_endpoint}/searches/{searchid}/point/-85.6358,36.1624/assets" + ) assert resp.status_code == 200 assert len(resp.json()) == 1 assert list(resp.json()[0]) == ["id", "bbox", "assets", "collection"] @@ -56,7 +63,7 @@ def test_mosaic_api(raster_endpoint): assert "content-encoding" not in resp.headers -def test_mosaic_collection_api(raster_endpoint): +def test_mosaic_collection_api(raster_endpoint: str) -> None: """test mosaic collection.""" resp = client.get( f"{raster_endpoint}/collections/noaa-emergency-response/point/-85.6358,36.1624/assets" @@ -86,56 +93,92 @@ def test_mosaic_collection_api(raster_endpoint): assert "content-encoding" not in resp.headers -def test_mosaic_search(raster_endpoint): +def test_mosaic_search(raster_endpoint: str) -> None: """test mosaic.""" # register some fake mosaic searches = [ { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection1"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection1"], + }, "metadata": {"owner": "vincent"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection2"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection2"], + }, "metadata": {"owner": "vincent"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection3"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection3"], + }, "metadata": {"owner": "vincent"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection4"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection4"], + }, "metadata": {"owner": "vincent"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection5"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection5"], + }, "metadata": {"owner": "vincent"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection6"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection6"], + }, "metadata": {"owner": "vincent"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection7"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection7"], + }, "metadata": {"owner": "vincent"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection8"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection8"], + }, "metadata": {"owner": "sean"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection9"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection9"], + }, "metadata": {"owner": "sean"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection10"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection10"], + }, "metadata": {"owner": "drew"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection11"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection11"], + }, "metadata": {"owner": "drew"}, }, { - "filter": {"op": "=", "args": [{"property": "collection"}, "collection12"]}, + "filter": { + "op": "=", + "args": [{"property": "collection"}, "collection12"], + }, "metadata": {"owner": "drew"}, }, ] @@ -160,7 +203,10 @@ def test_mosaic_search(raster_endpoint): assert len(links) == 2 assert links[0]["rel"] == "self" assert links[1]["rel"] == "next" - assert links[1]["href"] == f"{raster_endpoint}/searches/list?limit=10&offset=10" + assert ( + links[1]["href"] + == f"{raster_endpoint}/searches/list?limit=10&offset=10" + ) resp = client.get( f"{raster_endpoint}/searches/list", params={"limit": 1, "offset": 1} @@ -173,38 +219,54 @@ def test_mosaic_search(raster_endpoint): links = resp.json()["links"] assert len(links) == 3 assert links[0]["rel"] == "self" - assert links[0]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=1" + assert ( + links[0]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=1" + ) assert links[1]["rel"] == "next" - assert links[1]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=2" + assert ( + links[1]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=2" + ) assert links[2]["rel"] == "prev" - assert links[2]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=0" + assert ( + links[2]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=0" + ) # Filter on mosaic metadata - resp = client.get(f"{raster_endpoint}/searches/list", params={"owner": "vincent"}) + resp = client.get( + f"{raster_endpoint}/searches/list", params={"owner": "vincent"} + ) assert resp.status_code == 200 assert resp.json()["context"]["matched"] == 7 assert resp.json()["context"]["limit"] == 10 assert resp.json()["context"]["returned"] == 7 # sortBy - resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "lastused"}) + resp = client.get( + f"{raster_endpoint}/searches/list", params={"sortby": "lastused"} + ) assert resp.status_code == 200 - resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "usecount"}) + resp = client.get( + f"{raster_endpoint}/searches/list", params={"sortby": "usecount"} + ) assert resp.status_code == 200 - resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "-owner"}) + resp = client.get( + f"{raster_endpoint}/searches/list", params={"sortby": "-owner"} + ) assert resp.status_code == 200 assert ( "owner" not in resp.json()["searches"][0]["search"]["metadata"] ) # some mosaic don't have owners - resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "owner"}) + resp = client.get( + f"{raster_endpoint}/searches/list", params={"sortby": "owner"} + ) assert resp.status_code == 200 assert "owner" in resp.json()["searches"][0]["search"]["metadata"] -def test_item(raster_endpoint): +def test_item(raster_endpoint: str) -> None: """test stac endpoints.""" resp = client.get( f"{raster_endpoint}/collections/noaa-emergency-response/items/20200307aC0853300w361200/assets", diff --git a/.github/workflows/tests/test_stac.py b/.github/workflows/tests/test_stac.py index fc96768b..ea67c9e2 100644 --- a/.github/workflows/tests/test_stac.py +++ b/.github/workflows/tests/test_stac.py @@ -1,7 +1,9 @@ """test EOapi.""" -import httpx + import os +import httpx + timeout = httpx.Timeout(15.0, connect=60.0) if bool(os.getenv("IGNORE_SSL_VERIFICATION", False)): client = httpx.Client(timeout=timeout, verify=False) @@ -9,7 +11,7 @@ client = httpx.Client(timeout=timeout) -def test_stac_api(stac_endpoint): +def test_stac_api(stac_endpoint: str) -> None: """test stac.""" # Ping assert client.get(f"{stac_endpoint}/_mgmt/ping").status_code == 200 @@ -24,7 +26,7 @@ def test_stac_api(stac_endpoint): assert link["href"].startswith(stac_endpoint.split("://")[1]) # viewer - #assert client.get(f"{stac_endpoint}/index.html").status_code == 200 + # assert client.get(f"{stac_endpoint}/index.html").status_code == 200 assert client.get(f"{stac_endpoint}/index.html").status_code == 404 # Collections @@ -42,7 +44,9 @@ def test_stac_api(stac_endpoint): assert link["href"].startswith(stac_endpoint.split("://")[1]) # items - resp = client.get(f"{stac_endpoint}/collections/noaa-emergency-response/items") + resp = client.get( + f"{stac_endpoint}/collections/noaa-emergency-response/items" + ) assert resp.status_code == 200 items = resp.json() # Verify item links have correct base path @@ -60,17 +64,18 @@ def test_stac_api(stac_endpoint): assert item["id"] == "20200307aC0853300w361200" -def test_stac_to_raster(stac_endpoint): +def test_stac_to_raster(stac_endpoint: str) -> None: """test link to raster api.""" # tilejson resp = client.get( f"{stac_endpoint}/collections/noaa-emergency-response/items/20200307aC0853300w361200/tilejson.json", params={"assets": "cog"}, ) - #assert resp.status_code == 307 + # assert resp.status_code == 307 assert resp.status_code == 404 -def test_stac_custom_path(stac_endpoint): + +def test_stac_custom_path(stac_endpoint: str) -> None: """test stac with custom ingress path.""" # If we're using a custom path (e.g., /api instead of /stac) base_path = stac_endpoint.split("://")[1] @@ -83,8 +88,9 @@ def test_stac_custom_path(stac_endpoint): # All links should use the custom path for link in landing["links"]: if link["href"].startswith("/"): - assert link["href"].startswith(base_path), \ + assert link["href"].startswith(base_path), ( f"Link {link['href']} doesn't start with {base_path}" + ) # Collections should also use the custom path resp = client.get(f"{stac_endpoint}/collections") @@ -94,11 +100,14 @@ def test_stac_custom_path(stac_endpoint): for collection in collections: for link in collection["links"]: if link["href"].startswith("/"): - assert link["href"].startswith(base_path), \ + assert link["href"].startswith(base_path), ( f"Collection link {link['href']} doesn't start with {base_path}" + ) # Test a specific item - resp = client.get(f"{stac_endpoint}/collections/noaa-emergency-response/items") + resp = client.get( + f"{stac_endpoint}/collections/noaa-emergency-response/items" + ) assert resp.status_code == 200 items = resp.json() @@ -106,13 +115,14 @@ def test_stac_custom_path(stac_endpoint): for feature in items["features"]: for link in feature["links"]: if link["href"].startswith("/"): - assert link["href"].startswith(base_path), \ + assert link["href"].startswith(base_path), ( f"Item link {link['href']} doesn't start with {base_path}" + ) # viewer resp = client.get( f"{stac_endpoint}/collections/noaa-emergency-response/items/20200307aC0853300w361200/viewer", params={"assets": "cog"}, ) - #assert resp.status_code == 307 + # assert resp.status_code == 307 assert resp.status_code == 404 diff --git a/.github/workflows/tests/test_vector.py b/.github/workflows/tests/test_vector.py index 0216db34..7fe25eac 100644 --- a/.github/workflows/tests/test_vector.py +++ b/.github/workflows/tests/test_vector.py @@ -1,7 +1,8 @@ -import httpx import os import time +import httpx + timeout = httpx.Timeout(15.0, connect=60.0) if bool(os.getenv("IGNORE_SSL_VERIFICATION", False)): client = httpx.Client(timeout=timeout, verify=False) @@ -9,7 +10,7 @@ client = httpx.Client(timeout=timeout) -def test_vector_api(vector_endpoint): +def test_vector_api(vector_endpoint: str) -> None: """test vector.""" # landing resp = client.get(f"{vector_endpoint}/") @@ -51,11 +52,18 @@ def test_vector_api(vector_endpoint): elapsed_time = time.time() - start_time if elapsed_time > total_timeout: - print(f"Timeout exceeded after {elapsed_time:.1f}s. Expected 7 collections, got {current_count}") + print( + f"Timeout exceeded after {elapsed_time:.1f}s. Expected 7 collections, got {current_count}" + ) if "collections" in collections_data: - available_collections = [c.get("id", "unknown") for c in collections_data["collections"]] + available_collections = [ + c.get("id", "unknown") + for c in collections_data["collections"] + ] print(f"Available collections: {available_collections}") - assert False, f"Expected 7 collections but found {current_count} after {elapsed_time:.1f}s timeout" + assert False, ( + f"Expected 7 collections but found {current_count} after {elapsed_time:.1f}s timeout" + ) time.sleep(10) resp = client.get(f"{vector_endpoint}/collections") @@ -64,8 +72,13 @@ def test_vector_api(vector_endpoint): matched_count = collections_data.get("numberMatched", 0) returned_count = collections_data.get("numberReturned", 0) - assert matched_count == 7, f"Expected 7 matched collections, got {matched_count}. Available: {[c.get('id', 'unknown') for c in collections_data.get('collections', [])]}" - assert returned_count == 7, f"Expected 7 returned collections, got {returned_count}" + assert matched_count == 7, ( + f"Expected 7 matched collections, got {matched_count}. " + f"Available: {[c.get('id', 'unknown') for c in collections_data.get('collections', [])]}" + ) + assert returned_count == 7, ( + f"Expected 7 returned collections, got {returned_count}" + ) collections = resp.json()["collections"] ids = [c["id"] for c in collections] @@ -84,9 +97,7 @@ def test_vector_api(vector_endpoint): assert resp.json()["itemType"] == "feature" # items - resp = client.get( - f"{vector_endpoint}/collections/public.my_data/items" - ) + resp = client.get(f"{vector_endpoint}/collections/public.my_data/items") assert resp.status_code == 200 assert resp.headers["content-type"] == "application/geo+json" items = resp.json()["features"] @@ -111,15 +122,15 @@ def test_vector_api(vector_endpoint): assert len(items) == 6 # item - resp = client.get( - f"{vector_endpoint}/collections/public.my_data/items/1" - ) + resp = client.get(f"{vector_endpoint}/collections/public.my_data/items/1") assert resp.status_code == 200 item = resp.json() assert item["id"] == 1 # OGC Tiles - resp = client.get(f"{vector_endpoint}/collections/public.my_data/tiles/WebMercatorQuad/0/0/0") + resp = client.get( + f"{vector_endpoint}/collections/public.my_data/tiles/WebMercatorQuad/0/0/0" + ) assert resp.status_code == 200 resp = client.get( diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5d213e4b..84d5ffe7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -34,6 +34,16 @@ repos: hooks: - id: actionlint + # Python type checking for test files + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.11.2 + hooks: + - id: mypy + name: mypy strict mode for tests + args: ['--strict', '--ignore-missing-imports'] + files: ^\.github/workflows/tests/.*\.py$ + additional_dependencies: ['types-psycopg2', 'httpx', 'pytest', 'types-requests'] + # Fast Helm syntax check only - repo: local hooks: diff --git a/CHANGELOG.md b/CHANGELOG.md index a47c8a24..929f2868 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Make integration tests fail properly - Temporarily skip VRT driver in GDALg to avoid https://github.com/OSGeo/gdal/issues/12645 - Consistent naming of behavior field +- Made all python tests comply with mypy strict validation ## [0.7.13] - 2025-11-04