diff --git a/migrations/versions/b21b5de6ee2d_add_publisher_country_column.py b/migrations/versions/b21b5de6ee2d_add_publisher_country_column.py new file mode 100644 index 0000000..636614e --- /dev/null +++ b/migrations/versions/b21b5de6ee2d_add_publisher_country_column.py @@ -0,0 +1,31 @@ +"""add publisher_country column + +Revision ID: b21b5de6ee2d +Revises: ebb26242c904 +Create Date: 2025-03-26 10:08:14.401880 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "b21b5de6ee2d" +down_revision: Union[str, None] = "ebb26242c904" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("dataset", sa.Column("publisher_country", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("dataset", "publisher_country") + # ### end Alembic commands ### diff --git a/oc4ids_datastore_pipeline/database.py b/oc4ids_datastore_pipeline/database.py index cbd4180..3a1543f 100644 --- a/oc4ids_datastore_pipeline/database.py +++ b/oc4ids_datastore_pipeline/database.py @@ -28,6 +28,7 @@ class Dataset(Base): dataset_id: Mapped[str] = mapped_column(String, primary_key=True) source_url: Mapped[str] = mapped_column(String) publisher_name: Mapped[str] = mapped_column(String) + publisher_country: Mapped[Optional[str]] = mapped_column(String, nullable=True) license_url: Mapped[Optional[str]] = mapped_column(String, nullable=True) license_title: Mapped[Optional[str]] = mapped_column(String, nullable=True) license_title_short: Mapped[Optional[str]] = mapped_column(String, nullable=True) diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index e77f1b3..abcc6ff 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -106,6 +106,7 @@ def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[s def save_dataset_metadata( dataset_id: str, source_url: str, + publisher_country: str, json_data: dict[str, Any], json_url: Optional[str], csv_url: Optional[str], @@ -122,6 +123,7 @@ def save_dataset_metadata( dataset_id=dataset_id, source_url=source_url, publisher_name=publisher_name, + publisher_country=publisher_country, license_url=license_url, license_title=license_title, license_title_short=license_title_short, @@ -135,9 +137,9 @@ def save_dataset_metadata( raise ProcessDatasetError(f"Failed to update metadata for dataset: {e}") -def process_dataset(dataset_id: str, source_url: str) -> None: +def process_dataset(dataset_id: str, registry_metadata: dict[str, str]) -> None: logger.info(f"Processing dataset {dataset_id}") - json_data = download_json(dataset_id, source_url) + json_data = download_json(dataset_id, registry_metadata["source_url"]) validate_json(dataset_id, json_data) json_path = write_json_to_file( file_name=f"data/{dataset_id}/{dataset_id}.json", @@ -149,7 +151,8 @@ def process_dataset(dataset_id: str, source_url: str) -> None: ) save_dataset_metadata( dataset_id=dataset_id, - source_url=source_url, + source_url=registry_metadata["source_url"], + publisher_country=registry_metadata["country"], json_data=json_data, json_url=json_public_url, csv_url=csv_public_url, @@ -158,7 +161,7 @@ def process_dataset(dataset_id: str, source_url: str) -> None: logger.info(f"Processed dataset {dataset_id}") -def process_deleted_datasets(registered_datasets: dict[str, str]) -> None: +def process_deleted_datasets(registered_datasets: dict[str, dict[str, str]]) -> None: stored_datasets = get_dataset_ids() deleted_datasets = stored_datasets - registered_datasets.keys() for dataset_id in deleted_datasets: @@ -171,13 +174,17 @@ def process_registry() -> None: registered_datasets = fetch_registered_datasets() process_deleted_datasets(registered_datasets) errors: list[dict[str, Any]] = [] - for dataset_id, url in registered_datasets.items(): + for dataset_id, registry_metadata in registered_datasets.items(): try: - process_dataset(dataset_id, url) + process_dataset(dataset_id, registry_metadata) except Exception as e: logger.warning(f"Failed to process dataset {dataset_id} with error {e}") errors.append( - {"dataset_id": dataset_id, "source_url": url, "message": str(e)} + { + "dataset_id": dataset_id, + "source_url": registry_metadata["source_url"], + "message": str(e), + } ) if errors: logger.error( diff --git a/oc4ids_datastore_pipeline/registry.py b/oc4ids_datastore_pipeline/registry.py index 862c612..0890c5d 100644 --- a/oc4ids_datastore_pipeline/registry.py +++ b/oc4ids_datastore_pipeline/registry.py @@ -9,7 +9,7 @@ _license_mappings = None -def fetch_registered_datasets() -> dict[str, str]: +def fetch_registered_datasets() -> dict[str, dict[str, str]]: logger.info("Fetching registered datasets list from registry") try: url = "https://opendataservices.github.io/oc4ids-registry/datatig/type/dataset/records_api.json" # noqa: E501 @@ -17,7 +17,10 @@ def fetch_registered_datasets() -> dict[str, str]: r.raise_for_status() json_data = r.json() registered_datasets = { - key: value["fields"]["url"]["value"] + key: { + "source_url": value["fields"]["url"]["value"], + "country": value["fields"]["country"]["value"], + } for (key, value) in json_data["records"].items() } registered_datasets_count = len(registered_datasets) diff --git a/pyproject.toml b/pyproject.toml index fe5231f..864812e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi" [project] name = "oc4ids-datastore-pipeline" description = "OC4IDS Datastore Pipeline" -version = "0.4.0" +version = "0.5.0" readme = "README.md" dependencies = [ "alembic", @@ -50,3 +50,4 @@ follow_untyped_imports = true [tool.pytest.ini_options] log_cli = true log_cli_level = "INFO" +pythonpath = ["."] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index b78aee4..0096a06 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -132,7 +132,9 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None: "oc4ids_datastore_pipeline.pipeline.delete_files_for_dataset" ) - registered_datasets = {"test_dataset": "https://test_dataset.json"} + registered_datasets = { + "test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"} + } process_deleted_datasets(registered_datasets) patch_delete_dataset.assert_called_once_with("old_dataset") @@ -146,7 +148,9 @@ def test_process_dataset_raises_failure_exception(mocker: MockerFixture) -> None patch_download_json.side_effect = ProcessDatasetError("Download failed: Exception") with pytest.raises(ProcessDatasetError) as exc_info: - process_dataset("test_dataset", "https://test_dataset.json") + process_dataset( + "test_dataset", {"source_url": "https://test_dataset.json", "country": "ab"} + ) assert "Download failed: Exception" in str(exc_info.value) @@ -156,7 +160,7 @@ def test_process_registry_catches_exception(mocker: MockerFixture) -> None: "oc4ids_datastore_pipeline.pipeline.fetch_registered_datasets" ) patch_fetch_registered_datasets.return_value = { - "test_dataset": "https://test_dataset.json" + "test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"} } mocker.patch("oc4ids_datastore_pipeline.pipeline.process_deleted_datasets") patch_process_dataset = mocker.patch( diff --git a/tests/test_registry.py b/tests/test_registry.py index 8e90267..d0079ce 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -14,7 +14,12 @@ def test_fetch_registered_datasets(mocker: MockerFixture) -> None: mock_response = MagicMock() mock_response.json.return_value = { "records": { - "test_dataset": {"fields": {"url": {"value": "https://test_dataset.json"}}} + "test_dataset": { + "fields": { + "url": {"value": "https://test_dataset.json"}, + "country": {"value": "ab"}, + } + } } } patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get") @@ -22,7 +27,9 @@ def test_fetch_registered_datasets(mocker: MockerFixture) -> None: result = fetch_registered_datasets() - assert result == {"test_dataset": "https://test_dataset.json"} + assert result == { + "test_dataset": {"source_url": "https://test_dataset.json", "country": "ab"} + } def test_fetch_registered_datasets_raises_failure_exception(