Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ENABLE_UPLOAD=1
BUCKET_REGION="test-region"
BUCKET_NAME="test-bucket"
BUCKET_ACCESS_KEY_ID="test-id"
BUCKET_ACCESS_KEY_SECRET="test-secret"
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.venv
__pycache__

.env.local

data/
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ export DATABASE_URL="postgresql://oc4ids_datastore@localhost/oc4ids_datastore"
alembic upgrade head
```

### S3 environment variables

To enable files to be uploaded to S3-compatible storage, the following environment variables must be set:

- `ENABLE_UPLOAD`: 1 to enable, 0 to disable
- `BUCKET_REGION`:
- `BUCKET_NAME`
- `BUCKET_ACCESS_KEY_ID`
- `BUCKET_ACCESS_KEY_SECRET`

To make this easier, the project uses [`python-dotenv`](https://github.com/theskumar/python-dotenv) to load environment variables from a config file.
For local development, create a file called `.env.local`, which will be used by default.
You can change which file is loaded setting the environment variable `APP_ENV`.
For example the tests set `APP_ENV=test`, which loads variables from `.env.test`.

### Run app

```
Expand Down
9 changes: 9 additions & 0 deletions oc4ids_datastore_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import logging
import os
import time

from dotenv import find_dotenv, load_dotenv

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
logging.Formatter.converter = time.gmtime

logger = logging.getLogger(__name__)

APP_ENV = os.environ.get("APP_ENV", "local")
logger.info(f"Loading {APP_ENV} environment variables")
load_dotenv(find_dotenv(f".env.{APP_ENV}", usecwd=True))
13 changes: 9 additions & 4 deletions oc4ids_datastore_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
fetch_registered_datasets,
get_license_name_from_url,
)
from oc4ids_datastore_pipeline.storage import delete_files_for_dataset, upload_files

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -83,7 +84,7 @@ def save_dataset_metadata(
dataset_name: str,
source_url: str,
json_data: dict[str, Any],
json_url: str,
json_url: Optional[str],
csv_url: Optional[str],
xlsx_url: Optional[str],
) -> None:
Expand Down Expand Up @@ -114,13 +115,16 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None:
f"data/{dataset_name}/{dataset_name}.json", json_data
)
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
json_public_url, csv_public_url, xlsx_public_url = upload_files(
dataset_name, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
)
save_dataset_metadata(
dataset_name=dataset_name,
source_url=dataset_url,
json_data=json_data,
json_url=json_path,
csv_url=csv_path,
xlsx_url=xlsx_path,
json_url=json_public_url,
csv_url=csv_public_url,
xlsx_url=xlsx_public_url,
)
logger.info(f"Processed dataset {dataset_name}")
except Exception as e:
Expand All @@ -133,6 +137,7 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
for dataset_id in deleted_datasets:
logger.info(f"Dataset {dataset_id} is no longer in the registry, deleting")
delete_dataset(dataset_id)
delete_files_for_dataset(dataset_id)


def process_registry() -> None:
Expand Down
111 changes: 111 additions & 0 deletions oc4ids_datastore_pipeline/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
import os
import zipfile
from pathlib import Path
from typing import Any, Optional

import boto3
import botocore

logger = logging.getLogger(__name__)


BUCKET_REGION = os.environ.get("BUCKET_REGION")
BUCKET_NAME = os.environ.get("BUCKET_NAME")
BUCKET_ACCESS_KEY_ID = os.environ.get("BUCKET_ACCESS_KEY_ID")
BUCKET_ACCESS_KEY_SECRET = os.environ.get("BUCKET_ACCESS_KEY_SECRET")


def _get_client() -> Any:
session = boto3.session.Session()
return session.client(
"s3",
endpoint_url=f"https://{BUCKET_REGION}.digitaloceanspaces.com/",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole Endpoint URL could be a env var? Allows changing provider easily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, but then maybe the addressing_style config below should be a config? Maybe then it gets confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also ran into an issue later down the line where the boto3 client doesn't return the public URL of the uploaded file, so we have to construct that ourselves, which can be different depending on the provider, and it's easier to have all the parts, rather than a single URL. This ended up being the nicest way I could find

config=botocore.config.Config(s3={"addressing_style": "virtual"}),
region_name=BUCKET_REGION,
aws_access_key_id=BUCKET_ACCESS_KEY_ID,
aws_secret_access_key=BUCKET_ACCESS_KEY_SECRET,
)


def _upload_file(local_path: str, bucket_path: str, content_type: str) -> Optional[str]:
try:
logger.info(f"Uploading file {local_path}")
client = _get_client()
client.upload_file(
local_path,
BUCKET_NAME,
bucket_path,
ExtraArgs={"ACL": "public-read", "ContentType": content_type},
)
public_url = (
f"https://{BUCKET_NAME}.{BUCKET_REGION}.digitaloceanspaces.com/"
+ bucket_path
)
logger.info(f"Uploaded to {public_url}")
return public_url
except Exception as e:
logger.warning(f"Failed to upload {local_path} with error {e}")
return None


def _upload_json(dataset_id: str, json_path: str) -> Optional[str]:
return _upload_file(
local_path=json_path,
bucket_path=f"{dataset_id}/{dataset_id}.json",
content_type="application/json",
)


def _upload_csv(dataset_id: str, csv_path: str) -> Optional[str]:
try:
directory = Path(csv_path)
zip_file_path = f"{csv_path}_csv.zip"
with zipfile.ZipFile(zip_file_path, mode="w") as archive:
for file_path in directory.rglob("*"):
archive.write(file_path, arcname=file_path.relative_to(directory))
except Exception as e:
logger.warning(f"Failed to zip {csv_path} with error {e}")
return None
return _upload_file(
local_path=zip_file_path,
bucket_path=f"{dataset_id}/{dataset_id}_csv.zip",
content_type="application/zip",
)


def _upload_xlsx(dataset_id: str, xlsx_path: str) -> Optional[str]:
return _upload_file(
local_path=xlsx_path,
bucket_path=f"{dataset_id}/{dataset_id}.xlsx",
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # noqa: E501
)


def upload_files(
dataset_id: str,
json_path: Optional[str] = None,
csv_path: Optional[str] = None,
xlsx_path: Optional[str] = None,
) -> tuple[Optional[str], Optional[str], Optional[str]]:
if not bool(int(os.environ.get("ENABLE_UPLOAD", "0"))):
logger.info("Upload is disabled, skipping")
return None, None, None
json_public_url = _upload_json(dataset_id, json_path) if json_path else None
csv_public_url = _upload_csv(dataset_id, csv_path) if csv_path else None
xlsx_public_url = _upload_xlsx(dataset_id, xlsx_path) if xlsx_path else None
return json_public_url, csv_public_url, xlsx_public_url


def delete_files_for_dataset(dataset_id: str) -> None:
logger.info(f"Deleting files for dataset {dataset_id}")
try:
client = _get_client()
response = client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=dataset_id)
if "Contents" in response:
objects_to_delete = [{"Key": obj["Key"]} for obj in response["Contents"]]
client.delete_objects(
Bucket=BUCKET_NAME, Delete={"Objects": objects_to_delete}
)
except Exception as e:
logger.warning(f"Failed to delete files with error {e}")
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ version = "0.1.0"
readme = "README.md"
dependencies = [
"alembic",
"boto3",
"flattentool",
"libcoveoc4ids",
"psycopg2",
Expand All @@ -25,6 +26,8 @@ dev = [
"mypy",
"pytest",
"pytest-mock",
"python-dotenv",
"types-boto3",
"types-requests",
]

Expand Down
26 changes: 26 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ backports-datetime-fromisoformat==2.0.3
# via flattentool
black==25.1.0
# via oc4ids-datastore-pipeline (pyproject.toml)
boto3==1.36.13
# via oc4ids-datastore-pipeline (pyproject.toml)
botocore==1.36.13
# via
# boto3
# s3transfer
botocore-stubs==1.36.12
# via types-boto3
btrees==6.1
# via zodb
cattrs==24.1.2
Expand Down Expand Up @@ -53,6 +61,10 @@ iniconfig==2.0.0
# via pytest
isort==6.0.0
# via oc4ids-datastore-pipeline (pyproject.toml)
jmespath==1.0.1
# via
# boto3
# botocore
json-merge-patch==0.2
# via ocdsextensionregistry
jsonref==1.1.0
Expand Down Expand Up @@ -125,6 +137,10 @@ pytest==8.3.4
# pytest-mock
pytest-mock==3.14.0
# via oc4ids-datastore-pipeline (pyproject.toml)
python-dateutil==2.9.0.post0
# via botocore
python-dotenv==1.0.1
# via oc4ids-datastore-pipeline (pyproject.toml)
pytz==2025.1
# via flattentool
referencing==0.36.2
Expand All @@ -150,10 +166,13 @@ rpds-py==0.22.3
# via
# jsonschema
# referencing
s3transfer==0.11.2
# via boto3
schema==0.7.7
# via flattentool
six==1.17.0
# via
# python-dateutil
# rfc3339-validator
# url-normalize
sqlalchemy==2.0.37
Expand All @@ -162,8 +181,14 @@ sqlalchemy==2.0.37
# oc4ids-datastore-pipeline (pyproject.toml)
transaction==5.0
# via zodb
types-awscrt==0.23.9
# via botocore-stubs
types-boto3==1.36.13
# via oc4ids-datastore-pipeline (pyproject.toml)
types-requests==2.32.0.20241016
# via oc4ids-datastore-pipeline (pyproject.toml)
types-s3transfer==0.11.2
# via types-boto3
typing-extensions==4.12.2
# via
# alembic
Expand All @@ -174,6 +199,7 @@ url-normalize==1.4.3
# via requests-cache
urllib3==2.3.0
# via
# botocore
# requests
# requests-cache
# types-requests
Expand Down
3 changes: 3 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import os

os.environ["APP_ENV"] = "test"
4 changes: 4 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,15 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None:
patch_delete_dataset = mocker.patch(
"oc4ids_datastore_pipeline.pipeline.delete_dataset"
)
patch_delete_files_for_dataset = mocker.patch(
"oc4ids_datastore_pipeline.pipeline.delete_files_for_dataset"
)

registered_datasets = {"test_dataset": "https://test_dataset.json"}
process_deleted_datasets(registered_datasets)

patch_delete_dataset.assert_called_once_with("old_dataset")
patch_delete_files_for_dataset.assert_called_once_with("old_dataset")


def test_process_dataset_catches_exception(mocker: MockerFixture) -> None:
Expand Down
Loading