Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add license columns to dataset table
Revision ID: 85905d23accc
Revises: aaabf849b37f
Create Date: 2025-02-05 09:45:04.056529
"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "85905d23accc"
down_revision: Union[str, None] = "aaabf849b37f"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("dataset", sa.Column("license_url", sa.String(), nullable=True))
op.add_column("dataset", sa.Column("license_name", sa.String(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("dataset", "license_name")
op.drop_column("dataset", "license_url")
# ### end Alembic commands ###
3 changes: 3 additions & 0 deletions oc4ids_datastore_pipeline/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import logging
import os
from typing import Optional

from sqlalchemy import (
DateTime,
Expand All @@ -25,6 +26,8 @@ class Dataset(Base):
dataset_id: Mapped[str] = mapped_column(String, primary_key=True)
source_url: Mapped[str] = mapped_column(String)
publisher_name: Mapped[str] = mapped_column(String)
license_url: Mapped[Optional[str]] = mapped_column(String, nullable=True)
license_name: Mapped[Optional[str]] = mapped_column(String, nullable=True)
json_url: Mapped[str] = mapped_column(String)
updated_at: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True))

Expand Down
44 changes: 17 additions & 27 deletions oc4ids_datastore_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,14 @@
from libcoveoc4ids.api import oc4ids_json_output

from oc4ids_datastore_pipeline.database import Dataset, save_dataset
from oc4ids_datastore_pipeline.registry import (
fetch_registered_datasets,
get_license_name_from_url,
)

logger = logging.getLogger(__name__)


def fetch_registered_datasets() -> dict[str, str]:
logger.info("Fetching registered datasets list from registry")
try:
url = "https://opendataservices.github.io/oc4ids-registry/datatig/type/dataset/records_api.json" # noqa: E501
r = requests.get(url)
r.raise_for_status()
json_data = r.json()
registered_datasets = {
key: value["fields"]["url"]["value"]
for (key, value) in json_data["records"].items()
}
registered_datasets_count = len(registered_datasets)
logger.info(f"Fetched URLs for {registered_datasets_count} datasets")
return registered_datasets
except Exception as e:
raise Exception("Failed to fetch datasets list from registry", e)


def download_json(url: str) -> Any:
logger.info(f"Downloading json from {url}")
try:
Expand All @@ -42,7 +28,7 @@ def download_json(url: str) -> Any:
raise Exception("Download failed", e)


def validate_json(dataset_name: str, json_data: Any) -> None:
def validate_json(dataset_name: str, json_data: dict[str, Any]) -> None:
logger.info(f"Validating dataset {dataset_name}")
try:
validation_result = oc4ids_json_output(json_data=json_data)
Expand All @@ -54,26 +40,32 @@ def validate_json(dataset_name: str, json_data: Any) -> None:
raise Exception("Validation failed", e)


def write_json_to_file(file_name: str, json_data: Any) -> None:
def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
logger.info(f"Writing dataset to file {file_name}")
try:
os.makedirs(os.path.dirname(file_name), exist_ok=True)
with open(file_name, "w") as file:
json.dump(json_data, file, indent=4)
logger.info(f"Finished writing to {file_name}")
return file_name
except Exception as e:
raise Exception("Error while writing to JSON file", e)


def save_dataset_metadata(
dataset_name: str, source_url: str, publisher_name: str, file_name: str
dataset_name: str, source_url: str, json_data: dict[str, Any], json_url: str
) -> None:
logger.info(f"Saving metadata for dataset {dataset_name}")
publisher_name = json_data.get("publisher", {}).get("name", "")
license_url = json_data.get("license", None)
license_name = get_license_name_from_url(license_url) if license_url else None
dataset = Dataset(
dataset_id=dataset_name,
source_url=source_url,
publisher_name=publisher_name,
json_url=file_name,
license_url=license_url,
license_name=license_name,
json_url=json_url,
updated_at=datetime.datetime.now(datetime.UTC),
)
save_dataset(dataset)
Expand All @@ -84,14 +76,12 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None:
try:
json_data = download_json(dataset_url)
validate_json(dataset_name, json_data)
file_name = f"data/{dataset_name}.json"
write_json_to_file(file_name, json_data)
publisher_name = json_data.get("publisher", {}).get("name", "")
json_url = write_json_to_file(f"data/{dataset_name}.json", json_data)
save_dataset_metadata(
dataset_name=dataset_name,
source_url=dataset_url,
publisher_name=publisher_name,
file_name=file_name,
json_data=json_data,
json_url=json_url,
)
logger.info(f"Processed dataset {dataset_name}")
except Exception as e:
Expand Down
55 changes: 55 additions & 0 deletions oc4ids_datastore_pipeline/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging
from typing import Optional

import requests

logger = logging.getLogger(__name__)


_license_mappings = None


def fetch_registered_datasets() -> dict[str, str]:
logger.info("Fetching registered datasets list from registry")
try:
url = "https://opendataservices.github.io/oc4ids-registry/datatig/type/dataset/records_api.json" # noqa: E501
r = requests.get(url)
r.raise_for_status()
json_data = r.json()
registered_datasets = {
key: value["fields"]["url"]["value"]
for (key, value) in json_data["records"].items()
}
registered_datasets_count = len(registered_datasets)
logger.info(f"Fetched URLs for {registered_datasets_count} datasets")
return registered_datasets
except Exception as e:
raise Exception("Failed to fetch datasets list from registry", e)


def fetch_license_mappings() -> dict[str, str]:
logger.info("Fetching license mappings from registry")
try:
url = "https://opendataservices.github.io/oc4ids-registry/datatig/type/license/records_api.json" # noqa: E501
r = requests.get(url)
r.raise_for_status()
json_data = r.json()
return {
urls["fields"]["url"]["value"]: license["fields"]["title"]["value"]
for license in json_data["records"].values()
for urls in license["fields"]["urls"]["values"]
}
except Exception as e:
logger.warning(
"Failed to fetch license mappings from registry, with error: " + str(e),
)
return {}


def get_license_name_from_url(
url: str, force_refresh: Optional[bool] = False
) -> Optional[str]:
global _license_mappings
if force_refresh or (_license_mappings is None):
_license_mappings = fetch_license_mappings()
return _license_mappings.get(url, None)
30 changes: 0 additions & 30 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,18 @@
import os
import tempfile
from textwrap import dedent
from unittest.mock import MagicMock

import pytest
from pytest_mock import MockerFixture

from oc4ids_datastore_pipeline.pipeline import (
download_json,
fetch_registered_datasets,
process_dataset,
validate_json,
write_json_to_file,
)


def test_fetch_registered_datasets(mocker: MockerFixture) -> None:
mock_response = MagicMock()
mock_response.json.return_value = {
"records": {
"test_dataset": {"fields": {"url": {"value": "https://test_dataset.json"}}}
}
}
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.return_value = mock_response

result = fetch_registered_datasets()

assert result == {"test_dataset": "https://test_dataset.json"}


def test_fetch_registered_datasets_raises_failure_exception(
mocker: MockerFixture,
) -> None:
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.side_effect = Exception("Mocked exception")

with pytest.raises(Exception) as exc_info:
fetch_registered_datasets()

assert "Failed to fetch datasets list from registry" in str(exc_info.value)
assert "Mocked exception" in str(exc_info.value)


def test_download_json_raises_failure_exception(mocker: MockerFixture) -> None:
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.side_effect = Exception("Mocked exception")
Expand Down
133 changes: 133 additions & 0 deletions tests/test_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from unittest.mock import MagicMock

import pytest
from pytest_mock import MockerFixture

from oc4ids_datastore_pipeline.registry import (
fetch_license_mappings,
fetch_registered_datasets,
get_license_name_from_url,
)


def test_fetch_registered_datasets(mocker: MockerFixture) -> None:
mock_response = MagicMock()
mock_response.json.return_value = {
"records": {
"test_dataset": {"fields": {"url": {"value": "https://test_dataset.json"}}}
}
}
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.return_value = mock_response

result = fetch_registered_datasets()

assert result == {"test_dataset": "https://test_dataset.json"}


def test_fetch_registered_datasets_raises_failure_exception(
mocker: MockerFixture,
) -> None:
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.side_effect = Exception("Mocked exception")

with pytest.raises(Exception) as exc_info:
fetch_registered_datasets()

assert "Failed to fetch datasets list from registry" in str(exc_info.value)
assert "Mocked exception" in str(exc_info.value)


def test_fetch_license_mappings(mocker: MockerFixture) -> None:
mock_response = MagicMock()
mock_response.json.return_value = {
"records": {
"license_1": {
"fields": {
"title": {"value": "License 1"},
"urls": {
"values": [
{
"fields": {
"url": {"value": "https://license_1.com/license"}
}
},
{
"fields": {
"url": {
"value": "https://license_1.com/different_url"
}
}
},
]
},
}
},
"license_2": {
"fields": {
"title": {"value": "License 2"},
"urls": {
"values": [
{
"fields": {
"url": {"value": "https://license_2.com/license"}
}
},
]
},
}
},
}
}
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.return_value = mock_response

result = fetch_license_mappings()

assert result == {
"https://license_1.com/license": "License 1",
"https://license_1.com/different_url": "License 1",
"https://license_2.com/license": "License 2",
}


def test_fetch_license_mappings_catches_exception(
mocker: MockerFixture,
) -> None:
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.side_effect = Exception("Mocked exception")

result = fetch_license_mappings()

assert result == {}


def test_get_license_name_from_url(mocker: MockerFixture) -> None:
patch_license_mappings = mocker.patch(
"oc4ids_datastore_pipeline.registry.fetch_license_mappings"
)
patch_license_mappings.return_value = {
"https://license_1.com/license": "License 1",
"https://license_2.com/license": "License 2",
}

license_name = get_license_name_from_url(
"https://license_2.com/license", force_refresh=True
)

assert license_name == "License 2"


def test_get_license_name_from_url_not_in_mapping(mocker: MockerFixture) -> None:
patch_license_mappings = mocker.patch(
"oc4ids_datastore_pipeline.registry.fetch_license_mappings"
)
patch_license_mappings.return_value = {
"https://license_1.com/license": "License 1",
}

license_name = get_license_name_from_url(
"https://license_2.com/license", force_refresh=True
)

assert license_name is None