From 3130898160f4f6813499cba89da956a0243b7b76 Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:55:14 +0200 Subject: [PATCH 1/7] feat: upload files to s3-compatible storage bucket --- .gitignore | 2 + README.md | 4 ++ oc4ids_datastore_pipeline/__init__.py | 4 ++ oc4ids_datastore_pipeline/pipeline.py | 13 ++-- oc4ids_datastore_pipeline/storage.py | 88 +++++++++++++++++++++++++++ pyproject.toml | 3 + requirements_dev.txt | 26 ++++++++ tests/test_storage.py | 1 + 8 files changed, 137 insertions(+), 4 deletions(-) create mode 100644 oc4ids_datastore_pipeline/storage.py create mode 100644 tests/test_storage.py diff --git a/.gitignore b/.gitignore index 126d2cd..84ffe10 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ .venv __pycache__ +.env + data/ diff --git a/README.md b/README.md index f98812e..f1a1f9d 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,10 @@ export DATABASE_URL="postgresql://oc4ids_datastore@localhost/oc4ids_datastore" alembic upgrade head ``` +### S3 environment variables + + + ### Run app ``` diff --git a/oc4ids_datastore_pipeline/__init__.py b/oc4ids_datastore_pipeline/__init__.py index c3be62a..5f8d98b 100644 --- a/oc4ids_datastore_pipeline/__init__.py +++ b/oc4ids_datastore_pipeline/__init__.py @@ -1,9 +1,13 @@ import logging import time +from dotenv import load_dotenv + logging.basicConfig( level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) logging.Formatter.converter = time.gmtime + +load_dotenv() diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index aabe607..e8a50f8 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -19,6 +19,7 @@ fetch_registered_datasets, get_license_name_from_url, ) +from oc4ids_datastore_pipeline.storage import upload_files logger = logging.getLogger(__name__) @@ -83,7 +84,7 @@ def save_dataset_metadata( dataset_name: str, source_url: str, json_data: dict[str, Any], - json_url: str, + json_url: Optional[str], csv_url: Optional[str], xlsx_url: Optional[str], ) -> None: @@ -114,13 +115,16 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None: f"data/{dataset_name}/{dataset_name}.json", json_data ) csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path) + json_public_url, csv_public_url, xlsx_public_url = upload_files( + json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path + ) save_dataset_metadata( dataset_name=dataset_name, source_url=dataset_url, json_data=json_data, - json_url=json_path, - csv_url=csv_path, - xlsx_url=xlsx_path, + json_url=json_public_url, + csv_url=csv_public_url, + xlsx_url=xlsx_public_url, ) logger.info(f"Processed dataset {dataset_name}") except Exception as e: @@ -133,6 +137,7 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None: for dataset_id in deleted_datasets: logger.info(f"Dataset {dataset_id} is no longer in the registry, deleting") delete_dataset(dataset_id) + # TODO: Delete stored files def process_registry() -> None: diff --git a/oc4ids_datastore_pipeline/storage.py b/oc4ids_datastore_pipeline/storage.py new file mode 100644 index 0000000..e79d7ad --- /dev/null +++ b/oc4ids_datastore_pipeline/storage.py @@ -0,0 +1,88 @@ +import logging +import os +import zipfile +from pathlib import Path +from typing import Any, Optional + +import boto3 +import botocore + +logger = logging.getLogger(__name__) + + +# TODO: +ENABLE_UPLOAD = os.environ.get("ENABLE_UPLOAD", "0") +BUCKET_REGION = os.environ.get("BUCKET_REGION") +BUCKET_NAME = os.environ.get("BUCKET_NAME") +BUCKET_ACCESS_KEY_ID = os.environ.get("BUCKET_ACCESS_KEY_ID") +BUCKET_ACCESS_KEY_SECRET = os.environ.get("BUCKET_ACCESS_KEY_SECRET") + + +def _get_client() -> Any: + session = boto3.session.Session() + return session.client( + "s3", + endpoint_url=f"https://{BUCKET_REGION}.digitaloceanspaces.com/", + config=botocore.config.Config(s3={"addressing_style": "virtual"}), + region_name=BUCKET_REGION, + aws_access_key_id=BUCKET_ACCESS_KEY_ID, + aws_secret_access_key=BUCKET_ACCESS_KEY_SECRET, + ) + + +def upload_file(local_path: str, content_type: str) -> str: + bucket_path = os.path.relpath(local_path, "data") + logger.info(f"Uploading file {local_path}") + client = _get_client() + client.upload_file( + local_path, + BUCKET_NAME, + bucket_path, + ExtraArgs={"ACL": "public-read", "ContentType": content_type}, + ) + return ( + f"https://{BUCKET_NAME}.{BUCKET_REGION}.digitaloceanspaces.com/" + bucket_path + ) + + +def upload_json(json_path: str) -> str: + return upload_file(json_path, content_type="application/json") + + +def upload_csv(csv_path: str) -> str: + directory = Path(csv_path) + zip_file_path = f"{csv_path}_csv.zip" + with zipfile.ZipFile(zip_file_path, mode="w") as archive: + for file_path in directory.rglob("*"): + archive.write(file_path, arcname=file_path.relative_to(directory)) + return upload_file(zip_file_path, content_type="application/zip") + + +def upload_xlsx(xlsx_path: str) -> str: + return upload_file( + xlsx_path, + content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 + ) + + +def upload_files( + json_path: Optional[str] = None, + csv_path: Optional[str] = None, + xlsx_path: Optional[str] = None, +) -> tuple[Optional[str], Optional[str], Optional[str]]: + # TODO: Option to delete local files once uploaded? + # TODO: Exception handling + if not bool(int(ENABLE_UPLOAD)): + logger.info("Upload is disabled, skipping") + return None, None, None + logger.info("Uploading files") + if json_path: + json_public_url = upload_json(json_path) + logger.info(f"Uploaded JSON file to {json_public_url}") + if csv_path: + csv_public_url = upload_csv(csv_path) + logger.info(f"Uploaded CSV zip file to {csv_public_url}") + if xlsx_path: + xlsx_public_url = upload_xlsx(xlsx_path) + logger.info(f"Uploaded XLSX file to {xlsx_public_url}") + return json_public_url, csv_public_url, xlsx_public_url diff --git a/pyproject.toml b/pyproject.toml index c453dfb..d9661cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ version = "0.1.0" readme = "README.md" dependencies = [ "alembic", + "boto3", "flattentool", "libcoveoc4ids", "psycopg2", @@ -25,6 +26,8 @@ dev = [ "mypy", "pytest", "pytest-mock", + "python-dotenv", + "types-boto3", "types-requests", ] diff --git a/requirements_dev.txt b/requirements_dev.txt index 965d8b0..9a073e7 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -16,6 +16,14 @@ backports-datetime-fromisoformat==2.0.3 # via flattentool black==25.1.0 # via oc4ids-datastore-pipeline (pyproject.toml) +boto3==1.36.13 + # via oc4ids-datastore-pipeline (pyproject.toml) +botocore==1.36.13 + # via + # boto3 + # s3transfer +botocore-stubs==1.36.12 + # via types-boto3 btrees==6.1 # via zodb cattrs==24.1.2 @@ -53,6 +61,10 @@ iniconfig==2.0.0 # via pytest isort==6.0.0 # via oc4ids-datastore-pipeline (pyproject.toml) +jmespath==1.0.1 + # via + # boto3 + # botocore json-merge-patch==0.2 # via ocdsextensionregistry jsonref==1.1.0 @@ -125,6 +137,10 @@ pytest==8.3.4 # pytest-mock pytest-mock==3.14.0 # via oc4ids-datastore-pipeline (pyproject.toml) +python-dateutil==2.9.0.post0 + # via botocore +python-dotenv==1.0.1 + # via oc4ids-datastore-pipeline (pyproject.toml) pytz==2025.1 # via flattentool referencing==0.36.2 @@ -150,10 +166,13 @@ rpds-py==0.22.3 # via # jsonschema # referencing +s3transfer==0.11.2 + # via boto3 schema==0.7.7 # via flattentool six==1.17.0 # via + # python-dateutil # rfc3339-validator # url-normalize sqlalchemy==2.0.37 @@ -162,8 +181,14 @@ sqlalchemy==2.0.37 # oc4ids-datastore-pipeline (pyproject.toml) transaction==5.0 # via zodb +types-awscrt==0.23.9 + # via botocore-stubs +types-boto3==1.36.13 + # via oc4ids-datastore-pipeline (pyproject.toml) types-requests==2.32.0.20241016 # via oc4ids-datastore-pipeline (pyproject.toml) +types-s3transfer==0.11.2 + # via types-boto3 typing-extensions==4.12.2 # via # alembic @@ -174,6 +199,7 @@ url-normalize==1.4.3 # via requests-cache urllib3==2.3.0 # via + # botocore # requests # requests-cache # types-requests diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..553be87 --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1 @@ +# TODO: Implement tests From 210e9e983ef96ea696a36b636abb385676a3a1d1 Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Thu, 6 Feb 2025 11:39:56 +0200 Subject: [PATCH 2/7] feat: exception handling for file uploads --- .env.test | 5 + oc4ids_datastore_pipeline/__init__.py | 4 - oc4ids_datastore_pipeline/pipeline.py | 4 +- oc4ids_datastore_pipeline/storage.py | 87 +++++++------ tests/conftest.py | 3 + tests/test_storage.py | 170 +++++++++++++++++++++++++- 6 files changed, 230 insertions(+), 43 deletions(-) create mode 100644 .env.test create mode 100644 tests/conftest.py diff --git a/.env.test b/.env.test new file mode 100644 index 0000000..3b00692 --- /dev/null +++ b/.env.test @@ -0,0 +1,5 @@ +ENABLE_UPLOAD=1 +BUCKET_REGION="test-region" +BUCKET_NAME="test-bucket" +BUCKET_ACCESS_KEY_ID="test-id" +BUCKET_ACCESS_KEY_SECRET="test-secret" diff --git a/oc4ids_datastore_pipeline/__init__.py b/oc4ids_datastore_pipeline/__init__.py index 5f8d98b..c3be62a 100644 --- a/oc4ids_datastore_pipeline/__init__.py +++ b/oc4ids_datastore_pipeline/__init__.py @@ -1,13 +1,9 @@ import logging import time -from dotenv import load_dotenv - logging.basicConfig( level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) logging.Formatter.converter = time.gmtime - -load_dotenv() diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index e8a50f8..05988e9 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -7,6 +7,7 @@ import flattentool import requests +from dotenv import load_dotenv from libcoveoc4ids.api import oc4ids_json_output from oc4ids_datastore_pipeline.database import ( @@ -116,7 +117,7 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None: ) csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path) json_public_url, csv_public_url, xlsx_public_url = upload_files( - json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path + dataset_name, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path ) save_dataset_metadata( dataset_name=dataset_name, @@ -148,4 +149,5 @@ def process_registry() -> None: def run() -> None: + load_dotenv() process_registry() diff --git a/oc4ids_datastore_pipeline/storage.py b/oc4ids_datastore_pipeline/storage.py index e79d7ad..f26d22c 100644 --- a/oc4ids_datastore_pipeline/storage.py +++ b/oc4ids_datastore_pipeline/storage.py @@ -10,8 +10,6 @@ logger = logging.getLogger(__name__) -# TODO: -ENABLE_UPLOAD = os.environ.get("ENABLE_UPLOAD", "0") BUCKET_REGION = os.environ.get("BUCKET_REGION") BUCKET_NAME = os.environ.get("BUCKET_NAME") BUCKET_ACCESS_KEY_ID = os.environ.get("BUCKET_ACCESS_KEY_ID") @@ -30,8 +28,7 @@ def _get_client() -> Any: ) -def upload_file(local_path: str, content_type: str) -> str: - bucket_path = os.path.relpath(local_path, "data") +def _upload_file(local_path: str, bucket_path: str, content_type: str) -> str: logger.info(f"Uploading file {local_path}") client = _get_client() client.upload_file( @@ -40,49 +37,65 @@ def upload_file(local_path: str, content_type: str) -> str: bucket_path, ExtraArgs={"ACL": "public-read", "ContentType": content_type}, ) - return ( + public_url = ( f"https://{BUCKET_NAME}.{BUCKET_REGION}.digitaloceanspaces.com/" + bucket_path ) - - -def upload_json(json_path: str) -> str: - return upload_file(json_path, content_type="application/json") - - -def upload_csv(csv_path: str) -> str: - directory = Path(csv_path) - zip_file_path = f"{csv_path}_csv.zip" - with zipfile.ZipFile(zip_file_path, mode="w") as archive: - for file_path in directory.rglob("*"): - archive.write(file_path, arcname=file_path.relative_to(directory)) - return upload_file(zip_file_path, content_type="application/zip") - - -def upload_xlsx(xlsx_path: str) -> str: - return upload_file( - xlsx_path, - content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 - ) + logger.info(f"Uploaded to {public_url}") + return public_url + + +def _upload_json(dataset_id: str, json_path: str) -> Optional[str]: + try: + return _upload_file( + local_path=json_path, + bucket_path=f"{dataset_id}/{dataset_id}.json", + content_type="application/json", + ) + except Exception as e: + logger.warning(f"Failed to upload {json_path} with error {e}") + return None + + +def _upload_csv(dataset_id: str, csv_path: str) -> Optional[str]: + try: + directory = Path(csv_path) + zip_file_path = f"{csv_path}_csv.zip" + with zipfile.ZipFile(zip_file_path, mode="w") as archive: + for file_path in directory.rglob("*"): + archive.write(file_path, arcname=file_path.relative_to(directory)) + return _upload_file( + local_path=zip_file_path, + bucket_path=f"{dataset_id}/{dataset_id}_csv.zip", + content_type="application/zip", + ) + except Exception as e: + logger.warning(f"Failed to upload {csv_path} with error {e}") + return None + + +def _upload_xlsx(dataset_id: str, xlsx_path: str) -> Optional[str]: + try: + return _upload_file( + local_path=xlsx_path, + bucket_path=f"{dataset_id}/{dataset_id}.xlsx", + content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 + ) + except Exception as e: + logger.warning(f"Failed to upload {xlsx_path} with error {e}") + return None def upload_files( + dataset_id: str, json_path: Optional[str] = None, csv_path: Optional[str] = None, xlsx_path: Optional[str] = None, ) -> tuple[Optional[str], Optional[str], Optional[str]]: # TODO: Option to delete local files once uploaded? - # TODO: Exception handling - if not bool(int(ENABLE_UPLOAD)): + if not bool(int(os.environ.get("ENABLE_UPLOAD", "0"))): logger.info("Upload is disabled, skipping") return None, None, None - logger.info("Uploading files") - if json_path: - json_public_url = upload_json(json_path) - logger.info(f"Uploaded JSON file to {json_public_url}") - if csv_path: - csv_public_url = upload_csv(csv_path) - logger.info(f"Uploaded CSV zip file to {csv_public_url}") - if xlsx_path: - xlsx_public_url = upload_xlsx(xlsx_path) - logger.info(f"Uploaded XLSX file to {xlsx_public_url}") + json_public_url = _upload_json(dataset_id, json_path) if json_path else None + csv_public_url = _upload_csv(dataset_id, csv_path) if csv_path else None + xlsx_public_url = _upload_xlsx(dataset_id, xlsx_path) if xlsx_path else None return json_public_url, csv_public_url, xlsx_public_url diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c2d98f5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,3 @@ +from dotenv import find_dotenv, load_dotenv + +load_dotenv(find_dotenv(".env.test")) diff --git a/tests/test_storage.py b/tests/test_storage.py index 553be87..9c3c526 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -1 +1,169 @@ -# TODO: Implement tests +import os +import tempfile +from typing import Any +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from oc4ids_datastore_pipeline.storage import upload_files + + +@pytest.fixture(autouse=True) +def mock_client(mocker: MockerFixture) -> Any: + os.environ["ENABLE_UPLOAD"] = "1" + mock_boto3_client = MagicMock() + patch_boto3_client = mocker.patch( + "oc4ids_datastore_pipeline.storage.boto3.session.Session.client" + ) + patch_boto3_client.return_value = mock_boto3_client + return mock_boto3_client + + +def test_upload_files_upload_disabled(mock_client: MagicMock) -> None: + os.environ["ENABLE_UPLOAD"] = "0" + + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="dataset.json", + csv_path="dataset_csv.zip", + xlsx_path="dataset.xlsx", + ) + + mock_client.assert_not_called() + assert json_public_url is None + assert csv_public_url is None + assert xlsx_public_url is None + + +def test_upload_files_nothing_to_upload(mock_client: MagicMock) -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files("test_dataset") + + mock_client.assert_not_called() + assert json_public_url is None + assert csv_public_url is None + assert xlsx_public_url is None + + +def test_upload_files_json(mock_client: MagicMock) -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", json_path="data/test_dataset/test_dataset.json" + ) + + mock_client.upload_file.assert_called_once_with( + "data/test_dataset/test_dataset.json", + "test-bucket", + "test_dataset/test_dataset.json", + ExtraArgs={"ACL": "public-read", "ContentType": "application/json"}, + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert csv_public_url is None + assert xlsx_public_url is None + + +def test_upload_files_json_catches_exception(mock_client: MagicMock) -> None: + mock_client.upload_file.side_effect = [Exception("Mock exception"), None, None] + + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path=csv_dir, + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert json_public_url is None + assert ( + csv_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset_csv.zip" # noqa: E501 + ) + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_csv(mock_client: MagicMock) -> None: + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", csv_path=csv_dir + ) + + mock_client.upload_file.assert_called_once_with( + f"{csv_dir}_csv.zip", + "test-bucket", + "test_dataset/test_dataset_csv.zip", + ExtraArgs={"ACL": "public-read", "ContentType": "application/zip"}, + ) + assert json_public_url is None + assert ( + csv_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset_csv.zip" # noqa: E501 + ) + assert xlsx_public_url is None + + +def test_upload_files_csv_catches_exception(mock_client: MagicMock) -> None: + mock_client.upload_file.side_effect = [None, Exception("Mock exception"), None] + + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path=csv_dir, + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert csv_public_url is None + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_xlsx(mock_client: MagicMock) -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", xlsx_path="data/test_dataset/test_dataset.xlsx" + ) + + mock_client.upload_file.assert_called_once_with( + "data/test_dataset/test_dataset.xlsx", + "test-bucket", + "test_dataset/test_dataset.xlsx", + ExtraArgs={ + "ACL": "public-read", + "ContentType": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 + }, + ) + assert json_public_url is None + assert csv_public_url is None + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_xlsx_catches_exception(mock_client: MagicMock) -> None: + mock_client.upload_file.side_effect = [None, None, Exception("Mock exception")] + + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path=csv_dir, + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert ( + csv_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset_csv.zip" # noqa: E501 + ) + assert xlsx_public_url is None From 3ff74f7a1d54b19618f0da9503cf23856593a3d1 Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Thu, 6 Feb 2025 11:46:26 +0200 Subject: [PATCH 3/7] feat: refactor exception handling --- oc4ids_datastore_pipeline/storage.py | 71 +++++++++++++--------------- tests/test_storage.py | 24 ++++++++-- 2 files changed, 55 insertions(+), 40 deletions(-) diff --git a/oc4ids_datastore_pipeline/storage.py b/oc4ids_datastore_pipeline/storage.py index f26d22c..ca4f6e6 100644 --- a/oc4ids_datastore_pipeline/storage.py +++ b/oc4ids_datastore_pipeline/storage.py @@ -28,34 +28,35 @@ def _get_client() -> Any: ) -def _upload_file(local_path: str, bucket_path: str, content_type: str) -> str: - logger.info(f"Uploading file {local_path}") - client = _get_client() - client.upload_file( - local_path, - BUCKET_NAME, - bucket_path, - ExtraArgs={"ACL": "public-read", "ContentType": content_type}, - ) - public_url = ( - f"https://{BUCKET_NAME}.{BUCKET_REGION}.digitaloceanspaces.com/" + bucket_path - ) - logger.info(f"Uploaded to {public_url}") - return public_url - - -def _upload_json(dataset_id: str, json_path: str) -> Optional[str]: +def _upload_file(local_path: str, bucket_path: str, content_type: str) -> Optional[str]: try: - return _upload_file( - local_path=json_path, - bucket_path=f"{dataset_id}/{dataset_id}.json", - content_type="application/json", + logger.info(f"Uploading file {local_path}") + client = _get_client() + client.upload_file( + local_path, + BUCKET_NAME, + bucket_path, + ExtraArgs={"ACL": "public-read", "ContentType": content_type}, ) + public_url = ( + f"https://{BUCKET_NAME}.{BUCKET_REGION}.digitaloceanspaces.com/" + + bucket_path + ) + logger.info(f"Uploaded to {public_url}") + return public_url except Exception as e: - logger.warning(f"Failed to upload {json_path} with error {e}") + logger.warning(f"Failed to upload {local_path} with error {e}") return None +def _upload_json(dataset_id: str, json_path: str) -> Optional[str]: + return _upload_file( + local_path=json_path, + bucket_path=f"{dataset_id}/{dataset_id}.json", + content_type="application/json", + ) + + def _upload_csv(dataset_id: str, csv_path: str) -> Optional[str]: try: directory = Path(csv_path) @@ -63,26 +64,22 @@ def _upload_csv(dataset_id: str, csv_path: str) -> Optional[str]: with zipfile.ZipFile(zip_file_path, mode="w") as archive: for file_path in directory.rglob("*"): archive.write(file_path, arcname=file_path.relative_to(directory)) - return _upload_file( - local_path=zip_file_path, - bucket_path=f"{dataset_id}/{dataset_id}_csv.zip", - content_type="application/zip", - ) except Exception as e: - logger.warning(f"Failed to upload {csv_path} with error {e}") + logger.warning(f"Failed to zip {csv_path} with error {e}") return None + return _upload_file( + local_path=zip_file_path, + bucket_path=f"{dataset_id}/{dataset_id}_csv.zip", + content_type="application/zip", + ) def _upload_xlsx(dataset_id: str, xlsx_path: str) -> Optional[str]: - try: - return _upload_file( - local_path=xlsx_path, - bucket_path=f"{dataset_id}/{dataset_id}.xlsx", - content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 - ) - except Exception as e: - logger.warning(f"Failed to upload {xlsx_path} with error {e}") - return None + return _upload_file( + local_path=xlsx_path, + bucket_path=f"{dataset_id}/{dataset_id}.xlsx", + content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 + ) def upload_files( diff --git a/tests/test_storage.py b/tests/test_storage.py index 9c3c526..3122478 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -64,7 +64,7 @@ def test_upload_files_json(mock_client: MagicMock) -> None: assert xlsx_public_url is None -def test_upload_files_json_catches_exception(mock_client: MagicMock) -> None: +def test_upload_files_json_catches_upload_exception(mock_client: MagicMock) -> None: mock_client.upload_file.side_effect = [Exception("Mock exception"), None, None] with tempfile.TemporaryDirectory() as csv_dir: @@ -105,7 +105,25 @@ def test_upload_files_csv(mock_client: MagicMock) -> None: assert xlsx_public_url is None -def test_upload_files_csv_catches_exception(mock_client: MagicMock) -> None: +def test_upload_files_csv_catches_zip_exception() -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path="non/existent/directory", + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert csv_public_url is None + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_csv_catches_upload_exception(mock_client: MagicMock) -> None: mock_client.upload_file.side_effect = [None, Exception("Mock exception"), None] with tempfile.TemporaryDirectory() as csv_dir: @@ -148,7 +166,7 @@ def test_upload_files_xlsx(mock_client: MagicMock) -> None: ) -def test_upload_files_xlsx_catches_exception(mock_client: MagicMock) -> None: +def test_upload_files_xlsx_catches_upload_exception(mock_client: MagicMock) -> None: mock_client.upload_file.side_effect = [None, None, Exception("Mock exception")] with tempfile.TemporaryDirectory() as csv_dir: From 394052f413f2ff4dbf97687ce83d8536792ea10c Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Thu, 6 Feb 2025 12:07:57 +0200 Subject: [PATCH 4/7] fix: refactor environment loading --- .gitignore | 2 +- oc4ids_datastore_pipeline/__init__.py | 9 +++++++++ oc4ids_datastore_pipeline/pipeline.py | 2 -- tests/conftest.py | 4 ++-- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 84ffe10..ea9e685 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .venv __pycache__ -.env +.env.local data/ diff --git a/oc4ids_datastore_pipeline/__init__.py b/oc4ids_datastore_pipeline/__init__.py index c3be62a..a6d83c4 100644 --- a/oc4ids_datastore_pipeline/__init__.py +++ b/oc4ids_datastore_pipeline/__init__.py @@ -1,9 +1,18 @@ import logging +import os import time +from dotenv import find_dotenv, load_dotenv + logging.basicConfig( level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) logging.Formatter.converter = time.gmtime + +logger = logging.getLogger(__name__) + +APP_ENV = os.environ.get("APP_ENV", "local") +logger.info(f"Loading {APP_ENV} environment variables") +load_dotenv(find_dotenv(f".env.{APP_ENV}")) diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index 05988e9..8e509c6 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -7,7 +7,6 @@ import flattentool import requests -from dotenv import load_dotenv from libcoveoc4ids.api import oc4ids_json_output from oc4ids_datastore_pipeline.database import ( @@ -149,5 +148,4 @@ def process_registry() -> None: def run() -> None: - load_dotenv() process_registry() diff --git a/tests/conftest.py b/tests/conftest.py index c2d98f5..f555d5c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,3 @@ -from dotenv import find_dotenv, load_dotenv +import os -load_dotenv(find_dotenv(".env.test")) +os.environ["APP_ENV"] = "test" From 03a6d4db0939fabfbeb86b5ba90ee81c7f00b14e Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Thu, 6 Feb 2025 12:22:37 +0200 Subject: [PATCH 5/7] feat: delete files for datasets removed from the registry --- oc4ids_datastore_pipeline/pipeline.py | 4 ++-- oc4ids_datastore_pipeline/storage.py | 15 ++++++++++++- tests/test_pipeline.py | 4 ++++ tests/test_storage.py | 31 ++++++++++++++++++++++++++- 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index 8e509c6..f21b03f 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -19,7 +19,7 @@ fetch_registered_datasets, get_license_name_from_url, ) -from oc4ids_datastore_pipeline.storage import upload_files +from oc4ids_datastore_pipeline.storage import delete_files_for_dataset, upload_files logger = logging.getLogger(__name__) @@ -137,7 +137,7 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None: for dataset_id in deleted_datasets: logger.info(f"Dataset {dataset_id} is no longer in the registry, deleting") delete_dataset(dataset_id) - # TODO: Delete stored files + delete_files_for_dataset(dataset_id) def process_registry() -> None: diff --git a/oc4ids_datastore_pipeline/storage.py b/oc4ids_datastore_pipeline/storage.py index ca4f6e6..978995a 100644 --- a/oc4ids_datastore_pipeline/storage.py +++ b/oc4ids_datastore_pipeline/storage.py @@ -88,7 +88,6 @@ def upload_files( csv_path: Optional[str] = None, xlsx_path: Optional[str] = None, ) -> tuple[Optional[str], Optional[str], Optional[str]]: - # TODO: Option to delete local files once uploaded? if not bool(int(os.environ.get("ENABLE_UPLOAD", "0"))): logger.info("Upload is disabled, skipping") return None, None, None @@ -96,3 +95,17 @@ def upload_files( csv_public_url = _upload_csv(dataset_id, csv_path) if csv_path else None xlsx_public_url = _upload_xlsx(dataset_id, xlsx_path) if xlsx_path else None return json_public_url, csv_public_url, xlsx_public_url + + +def delete_files_for_dataset(dataset_id: str) -> None: + logger.info(f"Deleting files for dataset {dataset_id}") + try: + client = _get_client() + response = client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=dataset_id) + if "Contents" in response: + objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]] + client.delete_objects( + Bucket=BUCKET_NAME, Delete={"Objects": objects_to_delete} + ) + except Exception as e: + logger.warning(f"Failed to delete files with error {e}") diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 4a1718d..8419600 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -111,11 +111,15 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None: patch_delete_dataset = mocker.patch( "oc4ids_datastore_pipeline.pipeline.delete_dataset" ) + patch_delete_files_for_dataset = mocker.patch( + "oc4ids_datastore_pipeline.pipeline.delete_files_for_dataset" + ) registered_datasets = {"test_dataset": "https://test_dataset.json"} process_deleted_datasets(registered_datasets) patch_delete_dataset.assert_called_once_with("old_dataset") + patch_delete_files_for_dataset.assert_called_once_with("old_dataset") def test_process_dataset_catches_exception(mocker: MockerFixture) -> None: diff --git a/tests/test_storage.py b/tests/test_storage.py index 3122478..2b84a5b 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -6,7 +6,7 @@ import pytest from pytest_mock import MockerFixture -from oc4ids_datastore_pipeline.storage import upload_files +from oc4ids_datastore_pipeline.storage import delete_files_for_dataset, upload_files @pytest.fixture(autouse=True) @@ -185,3 +185,32 @@ def test_upload_files_xlsx_catches_upload_exception(mock_client: MagicMock) -> N == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset_csv.zip" # noqa: E501 ) assert xlsx_public_url is None + + +def test_delete_files_for_dataset(mock_client: MagicMock) -> None: + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "test_dataset/test_dataset.json"}, + {"Key": "test_dataset/test_dataset_csv.zip"}, + {"Key": "test_dataset/test_dataset.xlsx"}, + ] + } + + delete_files_for_dataset("test_dataset") + + mock_client.delete_objects.assert_called_once_with( + Bucket="test-bucket", + Delete={ + "Objects": [ + {"Key": "test_dataset/test_dataset.json"}, + {"Key": "test_dataset/test_dataset_csv.zip"}, + {"Key": "test_dataset/test_dataset.xlsx"}, + ] + }, + ) + + +def test_delete_files_for_dataset_catches_exception(mock_client: MagicMock) -> None: + mock_client.list_objects_v2.side_effect = Exception("Mock exception") + + delete_files_for_dataset("test_dataset") From bec0de6ef5febde484416e985c9f9861016399f1 Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Thu, 6 Feb 2025 12:29:53 +0200 Subject: [PATCH 6/7] docs: document environment variables for uploading files --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f1a1f9d..25e3ab3 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,18 @@ alembic upgrade head ### S3 environment variables - +To enable files to be uploaded to S3-compatible storage, the following environment variables must be set: + +- `ENABLE_UPLOAD`: 1 to enable, 0 to disable +- `BUCKET_REGION`: +- `BUCKET_NAME` +- `BUCKET_ACCESS_KEY_ID` +- `BUCKET_ACCESS_KEY_SECRET` + +To make this easier, the project uses [`python-dotenv`](https://github.com/theskumar/python-dotenv) to load environment variables from a config file. +For local development, create a file called `.env.local`, which will be used by default. +You can change which file is loaded setting the environment variable `APP_ENV`. +For example the tests set `APP_ENV=test`, which loads variables from `.env.test`. ### Run app From 59b8da62dcd01965e08b0293b3f1581692e38f39 Mon Sep 17 00:00:00 2001 From: Tilly Woodfield <22456167+tillywoodfield@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:24:27 +0200 Subject: [PATCH 7/7] test: fix env loading in CI runner --- oc4ids_datastore_pipeline/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oc4ids_datastore_pipeline/__init__.py b/oc4ids_datastore_pipeline/__init__.py index a6d83c4..4082e16 100644 --- a/oc4ids_datastore_pipeline/__init__.py +++ b/oc4ids_datastore_pipeline/__init__.py @@ -15,4 +15,4 @@ APP_ENV = os.environ.get("APP_ENV", "local") logger.info(f"Loading {APP_ENV} environment variables") -load_dotenv(find_dotenv(f".env.{APP_ENV}")) +load_dotenv(find_dotenv(f".env.{APP_ENV}", usecwd=True))