diff --git a/migrations/versions/084c39bf418e_add_csv_and_xlsx_columns_to_dataset_.py b/migrations/versions/084c39bf418e_add_csv_and_xlsx_columns_to_dataset_.py new file mode 100644 index 0000000..d4ef005 --- /dev/null +++ b/migrations/versions/084c39bf418e_add_csv_and_xlsx_columns_to_dataset_.py @@ -0,0 +1,33 @@ +"""add csv and xlsx columns to dataset table + +Revision ID: 084c39bf418e +Revises: 85905d23accc +Create Date: 2025-02-05 11:10:03.114086 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "084c39bf418e" +down_revision: Union[str, None] = "85905d23accc" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("dataset", sa.Column("csv_url", sa.String(), nullable=True)) + op.add_column("dataset", sa.Column("xlsx_url", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("dataset", "xlsx_url") + op.drop_column("dataset", "csv_url") + # ### end Alembic commands ### diff --git a/oc4ids_datastore_pipeline/database.py b/oc4ids_datastore_pipeline/database.py index 476bd00..f160c1b 100644 --- a/oc4ids_datastore_pipeline/database.py +++ b/oc4ids_datastore_pipeline/database.py @@ -29,6 +29,8 @@ class Dataset(Base): license_url: Mapped[Optional[str]] = mapped_column(String, nullable=True) license_name: Mapped[Optional[str]] = mapped_column(String, nullable=True) json_url: Mapped[str] = mapped_column(String) + csv_url: Mapped[Optional[str]] = mapped_column(String, nullable=True) + xlsx_url: Mapped[Optional[str]] = mapped_column(String, nullable=True) updated_at: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True)) diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index d58c420..cca5fbd 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -2,8 +2,10 @@ import json import logging import os -from typing import Any +from pathlib import Path +from typing import Any, Optional +import flattentool import requests from libcoveoc4ids.api import oc4ids_json_output @@ -52,8 +54,33 @@ def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str: raise Exception("Error while writing to JSON file", e) +def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[str]]: + logger.info(f"Transforming {json_path}") + try: + path = Path(json_path) + flattentool.flatten( + json_path, + output_name=str(path.parent / path.stem), + root_list_path="projects", + main_sheet_name="projects", + ) # type: ignore[no-untyped-call] + csv_path = str(path.parent / path.stem) + xlsx_path = f"{path.parent / path.stem}.xlsx" + logger.info(f"Transformed to CSV at {csv_path}") + logger.info(f"Transformed to XLSX at {xlsx_path}") + return csv_path, xlsx_path + except Exception as e: + logger.warning(f"Failed to transform JSON to CSV and XLSX with error {e}") + return None, None + + def save_dataset_metadata( - dataset_name: str, source_url: str, json_data: dict[str, Any], json_url: str + dataset_name: str, + source_url: str, + json_data: dict[str, Any], + json_url: str, + csv_url: Optional[str], + xlsx_url: Optional[str], ) -> None: logger.info(f"Saving metadata for dataset {dataset_name}") publisher_name = json_data.get("publisher", {}).get("name", "") @@ -66,6 +93,8 @@ def save_dataset_metadata( license_url=license_url, license_name=license_name, json_url=json_url, + csv_url=csv_url, + xlsx_url=xlsx_url, updated_at=datetime.datetime.now(datetime.UTC), ) save_dataset(dataset) @@ -76,12 +105,17 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None: try: json_data = download_json(dataset_url) validate_json(dataset_name, json_data) - json_url = write_json_to_file(f"data/{dataset_name}.json", json_data) + json_path = write_json_to_file( + f"data/{dataset_name}/{dataset_name}.json", json_data + ) + csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path) save_dataset_metadata( dataset_name=dataset_name, source_url=dataset_url, json_data=json_data, - json_url=json_url, + json_url=json_path, + csv_url=csv_path, + xlsx_url=xlsx_path, ) logger.info(f"Processed dataset {dataset_name}") except Exception as e: diff --git a/pyproject.toml b/pyproject.toml index b1e19c3..c453dfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ version = "0.1.0" readme = "README.md" dependencies = [ "alembic", + "flattentool", "libcoveoc4ids", "psycopg2", "requests", @@ -40,7 +41,7 @@ max-line-length = 88 strict = true [[tool.mypy.overrides]] -module = ["libcoveoc4ids.*"] +module = ["libcoveoc4ids.*", "flattentool.*"] follow_untyped_imports = true [tool.pytest.ini_options] diff --git a/requirements_dev.txt b/requirements_dev.txt index 7904b9d..965d8b0 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -42,7 +42,9 @@ flake8==7.1.1 flake8-pyproject==1.2.3 # via oc4ids-datastore-pipeline (pyproject.toml) flattentool==0.27.0 - # via libcove + # via + # libcove + # oc4ids-datastore-pipeline (pyproject.toml) idna==3.10 # via requests ijson==3.3.0 diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 281c5f4..f914fe6 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -8,6 +8,7 @@ from oc4ids_datastore_pipeline.pipeline import ( download_json, process_dataset, + transform_to_csv_and_xlsx, validate_json, write_json_to_file, ) @@ -80,6 +81,27 @@ def test_write_json_to_file_raises_failure_exception(mocker: MockerFixture) -> N assert "Mocked exception" in str(exc_info.value) +def test_transform_to_csv_and_xlsx_returns_correct_paths(mocker: MockerFixture) -> None: + mocker.patch("oc4ids_datastore_pipeline.pipeline.flattentool.flatten") + + csv_path, xlsx_path = transform_to_csv_and_xlsx("dir/dataset/dataset.json") + + assert csv_path == "dir/dataset/dataset" + assert xlsx_path == "dir/dataset/dataset.xlsx" + + +def test_transform_to_csv_and_xlsx_catches_exception(mocker: MockerFixture) -> None: + patch_flatten = mocker.patch( + "oc4ids_datastore_pipeline.pipeline.flattentool.flatten" + ) + patch_flatten.side_effect = Exception("Mocked exception") + + csv_path, xlsx_path = transform_to_csv_and_xlsx("dir/dataset/dataset.json") + + assert csv_path is None + assert xlsx_path is None + + def test_process_dataset_catches_exception(mocker: MockerFixture) -> None: patch_download_json = mocker.patch( "oc4ids_datastore_pipeline.pipeline.download_json"