Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add csv and xlsx columns to dataset table

Revision ID: 084c39bf418e
Revises: 85905d23accc
Create Date: 2025-02-05 11:10:03.114086

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "084c39bf418e"
down_revision: Union[str, None] = "85905d23accc"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("dataset", sa.Column("csv_url", sa.String(), nullable=True))
op.add_column("dataset", sa.Column("xlsx_url", sa.String(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("dataset", "xlsx_url")
op.drop_column("dataset", "csv_url")
# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions oc4ids_datastore_pipeline/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class Dataset(Base):
license_url: Mapped[Optional[str]] = mapped_column(String, nullable=True)
license_name: Mapped[Optional[str]] = mapped_column(String, nullable=True)
json_url: Mapped[str] = mapped_column(String)
csv_url: Mapped[Optional[str]] = mapped_column(String, nullable=True)
xlsx_url: Mapped[Optional[str]] = mapped_column(String, nullable=True)
updated_at: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True))


Expand Down
42 changes: 38 additions & 4 deletions oc4ids_datastore_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import json
import logging
import os
from typing import Any
from pathlib import Path
from typing import Any, Optional

import flattentool
import requests
from libcoveoc4ids.api import oc4ids_json_output

Expand Down Expand Up @@ -52,8 +54,33 @@ def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
raise Exception("Error while writing to JSON file", e)


def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[str]]:
logger.info(f"Transforming {json_path}")
try:
path = Path(json_path)
flattentool.flatten(
json_path,
output_name=str(path.parent / path.stem),
root_list_path="projects",
main_sheet_name="projects",
) # type: ignore[no-untyped-call]
csv_path = str(path.parent / path.stem)
xlsx_path = f"{path.parent / path.stem}.xlsx"
logger.info(f"Transformed to CSV at {csv_path}")
logger.info(f"Transformed to XLSX at {xlsx_path}")
return csv_path, xlsx_path
except Exception as e:
logger.warning(f"Failed to transform JSON to CSV and XLSX with error {e}")
return None, None


def save_dataset_metadata(
dataset_name: str, source_url: str, json_data: dict[str, Any], json_url: str
dataset_name: str,
source_url: str,
json_data: dict[str, Any],
json_url: str,
csv_url: Optional[str],
xlsx_url: Optional[str],
) -> None:
logger.info(f"Saving metadata for dataset {dataset_name}")
publisher_name = json_data.get("publisher", {}).get("name", "")
Expand All @@ -66,6 +93,8 @@ def save_dataset_metadata(
license_url=license_url,
license_name=license_name,
json_url=json_url,
csv_url=csv_url,
xlsx_url=xlsx_url,
updated_at=datetime.datetime.now(datetime.UTC),
)
save_dataset(dataset)
Expand All @@ -76,12 +105,17 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None:
try:
json_data = download_json(dataset_url)
validate_json(dataset_name, json_data)
json_url = write_json_to_file(f"data/{dataset_name}.json", json_data)
json_path = write_json_to_file(
f"data/{dataset_name}/{dataset_name}.json", json_data
)
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
save_dataset_metadata(
dataset_name=dataset_name,
source_url=dataset_url,
json_data=json_data,
json_url=json_url,
json_url=json_path,
csv_url=csv_path,
xlsx_url=xlsx_path,
)
logger.info(f"Processed dataset {dataset_name}")
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ version = "0.1.0"
readme = "README.md"
dependencies = [
"alembic",
"flattentool",
"libcoveoc4ids",
"psycopg2",
"requests",
Expand Down Expand Up @@ -40,7 +41,7 @@ max-line-length = 88
strict = true

[[tool.mypy.overrides]]
module = ["libcoveoc4ids.*"]
module = ["libcoveoc4ids.*", "flattentool.*"]
follow_untyped_imports = true

[tool.pytest.ini_options]
Expand Down
4 changes: 3 additions & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ flake8==7.1.1
flake8-pyproject==1.2.3
# via oc4ids-datastore-pipeline (pyproject.toml)
flattentool==0.27.0
# via libcove
# via
# libcove
# oc4ids-datastore-pipeline (pyproject.toml)
idna==3.10
# via requests
ijson==3.3.0
Expand Down
22 changes: 22 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from oc4ids_datastore_pipeline.pipeline import (
download_json,
process_dataset,
transform_to_csv_and_xlsx,
validate_json,
write_json_to_file,
)
Expand Down Expand Up @@ -80,6 +81,27 @@ def test_write_json_to_file_raises_failure_exception(mocker: MockerFixture) -> N
assert "Mocked exception" in str(exc_info.value)


def test_transform_to_csv_and_xlsx_returns_correct_paths(mocker: MockerFixture) -> None:
mocker.patch("oc4ids_datastore_pipeline.pipeline.flattentool.flatten")

csv_path, xlsx_path = transform_to_csv_and_xlsx("dir/dataset/dataset.json")

assert csv_path == "dir/dataset/dataset"
assert xlsx_path == "dir/dataset/dataset.xlsx"


def test_transform_to_csv_and_xlsx_catches_exception(mocker: MockerFixture) -> None:
patch_flatten = mocker.patch(
"oc4ids_datastore_pipeline.pipeline.flattentool.flatten"
)
patch_flatten.side_effect = Exception("Mocked exception")

csv_path, xlsx_path = transform_to_csv_and_xlsx("dir/dataset/dataset.json")

assert csv_path is None
assert xlsx_path is None


def test_process_dataset_catches_exception(mocker: MockerFixture) -> None:
patch_download_json = mocker.patch(
"oc4ids_datastore_pipeline.pipeline.download_json"
Expand Down