diff --git a/.env.test b/.env.test new file mode 100644 index 0000000..3b00692 --- /dev/null +++ b/.env.test @@ -0,0 +1,5 @@ +ENABLE_UPLOAD=1 +BUCKET_REGION="test-region" +BUCKET_NAME="test-bucket" +BUCKET_ACCESS_KEY_ID="test-id" +BUCKET_ACCESS_KEY_SECRET="test-secret" diff --git a/.gitignore b/.gitignore index 126d2cd..ea9e685 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ .venv __pycache__ +.env.local + data/ diff --git a/README.md b/README.md index f98812e..25e3ab3 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,21 @@ export DATABASE_URL="postgresql://oc4ids_datastore@localhost/oc4ids_datastore" alembic upgrade head ``` +### S3 environment variables + +To enable files to be uploaded to S3-compatible storage, the following environment variables must be set: + +- `ENABLE_UPLOAD`: 1 to enable, 0 to disable +- `BUCKET_REGION`: +- `BUCKET_NAME` +- `BUCKET_ACCESS_KEY_ID` +- `BUCKET_ACCESS_KEY_SECRET` + +To make this easier, the project uses [`python-dotenv`](https://github.com/theskumar/python-dotenv) to load environment variables from a config file. +For local development, create a file called `.env.local`, which will be used by default. +You can change which file is loaded setting the environment variable `APP_ENV`. +For example the tests set `APP_ENV=test`, which loads variables from `.env.test`. + ### Run app ``` diff --git a/oc4ids_datastore_pipeline/__init__.py b/oc4ids_datastore_pipeline/__init__.py index c3be62a..4082e16 100644 --- a/oc4ids_datastore_pipeline/__init__.py +++ b/oc4ids_datastore_pipeline/__init__.py @@ -1,9 +1,18 @@ import logging +import os import time +from dotenv import find_dotenv, load_dotenv + logging.basicConfig( level=logging.INFO, format="%(asctime)s:%(levelname)s:%(name)s:%(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) logging.Formatter.converter = time.gmtime + +logger = logging.getLogger(__name__) + +APP_ENV = os.environ.get("APP_ENV", "local") +logger.info(f"Loading {APP_ENV} environment variables") +load_dotenv(find_dotenv(f".env.{APP_ENV}", usecwd=True)) diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index aabe607..f21b03f 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -19,6 +19,7 @@ fetch_registered_datasets, get_license_name_from_url, ) +from oc4ids_datastore_pipeline.storage import delete_files_for_dataset, upload_files logger = logging.getLogger(__name__) @@ -83,7 +84,7 @@ def save_dataset_metadata( dataset_name: str, source_url: str, json_data: dict[str, Any], - json_url: str, + json_url: Optional[str], csv_url: Optional[str], xlsx_url: Optional[str], ) -> None: @@ -114,13 +115,16 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None: f"data/{dataset_name}/{dataset_name}.json", json_data ) csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path) + json_public_url, csv_public_url, xlsx_public_url = upload_files( + dataset_name, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path + ) save_dataset_metadata( dataset_name=dataset_name, source_url=dataset_url, json_data=json_data, - json_url=json_path, - csv_url=csv_path, - xlsx_url=xlsx_path, + json_url=json_public_url, + csv_url=csv_public_url, + xlsx_url=xlsx_public_url, ) logger.info(f"Processed dataset {dataset_name}") except Exception as e: @@ -133,6 +137,7 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None: for dataset_id in deleted_datasets: logger.info(f"Dataset {dataset_id} is no longer in the registry, deleting") delete_dataset(dataset_id) + delete_files_for_dataset(dataset_id) def process_registry() -> None: diff --git a/oc4ids_datastore_pipeline/storage.py b/oc4ids_datastore_pipeline/storage.py new file mode 100644 index 0000000..978995a --- /dev/null +++ b/oc4ids_datastore_pipeline/storage.py @@ -0,0 +1,111 @@ +import logging +import os +import zipfile +from pathlib import Path +from typing import Any, Optional + +import boto3 +import botocore + +logger = logging.getLogger(__name__) + + +BUCKET_REGION = os.environ.get("BUCKET_REGION") +BUCKET_NAME = os.environ.get("BUCKET_NAME") +BUCKET_ACCESS_KEY_ID = os.environ.get("BUCKET_ACCESS_KEY_ID") +BUCKET_ACCESS_KEY_SECRET = os.environ.get("BUCKET_ACCESS_KEY_SECRET") + + +def _get_client() -> Any: + session = boto3.session.Session() + return session.client( + "s3", + endpoint_url=f"https://{BUCKET_REGION}.digitaloceanspaces.com/", + config=botocore.config.Config(s3={"addressing_style": "virtual"}), + region_name=BUCKET_REGION, + aws_access_key_id=BUCKET_ACCESS_KEY_ID, + aws_secret_access_key=BUCKET_ACCESS_KEY_SECRET, + ) + + +def _upload_file(local_path: str, bucket_path: str, content_type: str) -> Optional[str]: + try: + logger.info(f"Uploading file {local_path}") + client = _get_client() + client.upload_file( + local_path, + BUCKET_NAME, + bucket_path, + ExtraArgs={"ACL": "public-read", "ContentType": content_type}, + ) + public_url = ( + f"https://{BUCKET_NAME}.{BUCKET_REGION}.digitaloceanspaces.com/" + + bucket_path + ) + logger.info(f"Uploaded to {public_url}") + return public_url + except Exception as e: + logger.warning(f"Failed to upload {local_path} with error {e}") + return None + + +def _upload_json(dataset_id: str, json_path: str) -> Optional[str]: + return _upload_file( + local_path=json_path, + bucket_path=f"{dataset_id}/{dataset_id}.json", + content_type="application/json", + ) + + +def _upload_csv(dataset_id: str, csv_path: str) -> Optional[str]: + try: + directory = Path(csv_path) + zip_file_path = f"{csv_path}_csv.zip" + with zipfile.ZipFile(zip_file_path, mode="w") as archive: + for file_path in directory.rglob("*"): + archive.write(file_path, arcname=file_path.relative_to(directory)) + except Exception as e: + logger.warning(f"Failed to zip {csv_path} with error {e}") + return None + return _upload_file( + local_path=zip_file_path, + bucket_path=f"{dataset_id}/{dataset_id}_csv.zip", + content_type="application/zip", + ) + + +def _upload_xlsx(dataset_id: str, xlsx_path: str) -> Optional[str]: + return _upload_file( + local_path=xlsx_path, + bucket_path=f"{dataset_id}/{dataset_id}.xlsx", + content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 + ) + + +def upload_files( + dataset_id: str, + json_path: Optional[str] = None, + csv_path: Optional[str] = None, + xlsx_path: Optional[str] = None, +) -> tuple[Optional[str], Optional[str], Optional[str]]: + if not bool(int(os.environ.get("ENABLE_UPLOAD", "0"))): + logger.info("Upload is disabled, skipping") + return None, None, None + json_public_url = _upload_json(dataset_id, json_path) if json_path else None + csv_public_url = _upload_csv(dataset_id, csv_path) if csv_path else None + xlsx_public_url = _upload_xlsx(dataset_id, xlsx_path) if xlsx_path else None + return json_public_url, csv_public_url, xlsx_public_url + + +def delete_files_for_dataset(dataset_id: str) -> None: + logger.info(f"Deleting files for dataset {dataset_id}") + try: + client = _get_client() + response = client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=dataset_id) + if "Contents" in response: + objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]] + client.delete_objects( + Bucket=BUCKET_NAME, Delete={"Objects": objects_to_delete} + ) + except Exception as e: + logger.warning(f"Failed to delete files with error {e}") diff --git a/pyproject.toml b/pyproject.toml index c453dfb..d9661cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ version = "0.1.0" readme = "README.md" dependencies = [ "alembic", + "boto3", "flattentool", "libcoveoc4ids", "psycopg2", @@ -25,6 +26,8 @@ dev = [ "mypy", "pytest", "pytest-mock", + "python-dotenv", + "types-boto3", "types-requests", ] diff --git a/requirements_dev.txt b/requirements_dev.txt index 965d8b0..9a073e7 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -16,6 +16,14 @@ backports-datetime-fromisoformat==2.0.3 # via flattentool black==25.1.0 # via oc4ids-datastore-pipeline (pyproject.toml) +boto3==1.36.13 + # via oc4ids-datastore-pipeline (pyproject.toml) +botocore==1.36.13 + # via + # boto3 + # s3transfer +botocore-stubs==1.36.12 + # via types-boto3 btrees==6.1 # via zodb cattrs==24.1.2 @@ -53,6 +61,10 @@ iniconfig==2.0.0 # via pytest isort==6.0.0 # via oc4ids-datastore-pipeline (pyproject.toml) +jmespath==1.0.1 + # via + # boto3 + # botocore json-merge-patch==0.2 # via ocdsextensionregistry jsonref==1.1.0 @@ -125,6 +137,10 @@ pytest==8.3.4 # pytest-mock pytest-mock==3.14.0 # via oc4ids-datastore-pipeline (pyproject.toml) +python-dateutil==2.9.0.post0 + # via botocore +python-dotenv==1.0.1 + # via oc4ids-datastore-pipeline (pyproject.toml) pytz==2025.1 # via flattentool referencing==0.36.2 @@ -150,10 +166,13 @@ rpds-py==0.22.3 # via # jsonschema # referencing +s3transfer==0.11.2 + # via boto3 schema==0.7.7 # via flattentool six==1.17.0 # via + # python-dateutil # rfc3339-validator # url-normalize sqlalchemy==2.0.37 @@ -162,8 +181,14 @@ sqlalchemy==2.0.37 # oc4ids-datastore-pipeline (pyproject.toml) transaction==5.0 # via zodb +types-awscrt==0.23.9 + # via botocore-stubs +types-boto3==1.36.13 + # via oc4ids-datastore-pipeline (pyproject.toml) types-requests==2.32.0.20241016 # via oc4ids-datastore-pipeline (pyproject.toml) +types-s3transfer==0.11.2 + # via types-boto3 typing-extensions==4.12.2 # via # alembic @@ -174,6 +199,7 @@ url-normalize==1.4.3 # via requests-cache urllib3==2.3.0 # via + # botocore # requests # requests-cache # types-requests diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..f555d5c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,3 @@ +import os + +os.environ["APP_ENV"] = "test" diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 4a1718d..8419600 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -111,11 +111,15 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None: patch_delete_dataset = mocker.patch( "oc4ids_datastore_pipeline.pipeline.delete_dataset" ) + patch_delete_files_for_dataset = mocker.patch( + "oc4ids_datastore_pipeline.pipeline.delete_files_for_dataset" + ) registered_datasets = {"test_dataset": "https://test_dataset.json"} process_deleted_datasets(registered_datasets) patch_delete_dataset.assert_called_once_with("old_dataset") + patch_delete_files_for_dataset.assert_called_once_with("old_dataset") def test_process_dataset_catches_exception(mocker: MockerFixture) -> None: diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..2b84a5b --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,216 @@ +import os +import tempfile +from typing import Any +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from oc4ids_datastore_pipeline.storage import delete_files_for_dataset, upload_files + + +@pytest.fixture(autouse=True) +def mock_client(mocker: MockerFixture) -> Any: + os.environ["ENABLE_UPLOAD"] = "1" + mock_boto3_client = MagicMock() + patch_boto3_client = mocker.patch( + "oc4ids_datastore_pipeline.storage.boto3.session.Session.client" + ) + patch_boto3_client.return_value = mock_boto3_client + return mock_boto3_client + + +def test_upload_files_upload_disabled(mock_client: MagicMock) -> None: + os.environ["ENABLE_UPLOAD"] = "0" + + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="dataset.json", + csv_path="dataset_csv.zip", + xlsx_path="dataset.xlsx", + ) + + mock_client.assert_not_called() + assert json_public_url is None + assert csv_public_url is None + assert xlsx_public_url is None + + +def test_upload_files_nothing_to_upload(mock_client: MagicMock) -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files("test_dataset") + + mock_client.assert_not_called() + assert json_public_url is None + assert csv_public_url is None + assert xlsx_public_url is None + + +def test_upload_files_json(mock_client: MagicMock) -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", json_path="data/test_dataset/test_dataset.json" + ) + + mock_client.upload_file.assert_called_once_with( + "data/test_dataset/test_dataset.json", + "test-bucket", + "test_dataset/test_dataset.json", + ExtraArgs={"ACL": "public-read", "ContentType": "application/json"}, + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert csv_public_url is None + assert xlsx_public_url is None + + +def test_upload_files_json_catches_upload_exception(mock_client: MagicMock) -> None: + mock_client.upload_file.side_effect = [Exception("Mock exception"), None, None] + + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path=csv_dir, + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert json_public_url is None + assert ( + csv_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset_csv.zip" # noqa: E501 + ) + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_csv(mock_client: MagicMock) -> None: + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", csv_path=csv_dir + ) + + mock_client.upload_file.assert_called_once_with( + f"{csv_dir}_csv.zip", + "test-bucket", + "test_dataset/test_dataset_csv.zip", + ExtraArgs={"ACL": "public-read", "ContentType": "application/zip"}, + ) + assert json_public_url is None + assert ( + csv_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset_csv.zip" # noqa: E501 + ) + assert xlsx_public_url is None + + +def test_upload_files_csv_catches_zip_exception() -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path="non/existent/directory", + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert csv_public_url is None + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_csv_catches_upload_exception(mock_client: MagicMock) -> None: + mock_client.upload_file.side_effect = [None, Exception("Mock exception"), None] + + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path=csv_dir, + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert csv_public_url is None + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_xlsx(mock_client: MagicMock) -> None: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", xlsx_path="data/test_dataset/test_dataset.xlsx" + ) + + mock_client.upload_file.assert_called_once_with( + "data/test_dataset/test_dataset.xlsx", + "test-bucket", + "test_dataset/test_dataset.xlsx", + ExtraArgs={ + "ACL": "public-read", + "ContentType": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501 + }, + ) + assert json_public_url is None + assert csv_public_url is None + assert ( + xlsx_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.xlsx" # noqa: E501 + ) + + +def test_upload_files_xlsx_catches_upload_exception(mock_client: MagicMock) -> None: + mock_client.upload_file.side_effect = [None, None, Exception("Mock exception")] + + with tempfile.TemporaryDirectory() as csv_dir: + json_public_url, csv_public_url, xlsx_public_url = upload_files( + "test_dataset", + json_path="data/test_dataset/test_dataset.json", + csv_path=csv_dir, + xlsx_path="data/test_dataset/test_dataset.xlsx", + ) + assert ( + json_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset.json" # noqa: E501 + ) + assert ( + csv_public_url + == "https://test-bucket.test-region.digitaloceanspaces.com/test_dataset/test_dataset_csv.zip" # noqa: E501 + ) + assert xlsx_public_url is None + + +def test_delete_files_for_dataset(mock_client: MagicMock) -> None: + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "test_dataset/test_dataset.json"}, + {"Key": "test_dataset/test_dataset_csv.zip"}, + {"Key": "test_dataset/test_dataset.xlsx"}, + ] + } + + delete_files_for_dataset("test_dataset") + + mock_client.delete_objects.assert_called_once_with( + Bucket="test-bucket", + Delete={ + "Objects": [ + {"Key": "test_dataset/test_dataset.json"}, + {"Key": "test_dataset/test_dataset_csv.zip"}, + {"Key": "test_dataset/test_dataset.xlsx"}, + ] + }, + ) + + +def test_delete_files_for_dataset_catches_exception(mock_client: MagicMock) -> None: + mock_client.list_objects_v2.side_effect = Exception("Mock exception") + + delete_files_for_dataset("test_dataset")