Skip to content

Commit 7a963bd

Browse files
Merge pull request #20 from OpenDataServices/18-remove-deleted-datasets
Remove datasets that have been deleted from the registry
2 parents 3150655 + 88f471f commit 7a963bd

File tree

6 files changed

+121
-4
lines changed

6 files changed

+121
-4
lines changed

oc4ids_datastore_pipeline/database.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
Engine,
99
String,
1010
create_engine,
11+
delete,
12+
select,
1113
)
1214
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
1315

@@ -45,3 +47,16 @@ def save_dataset(dataset: Dataset) -> None:
4547
with Session(get_engine()) as session:
4648
session.merge(dataset)
4749
session.commit()
50+
51+
52+
def delete_dataset(dataset_id: str) -> None:
53+
with Session(get_engine()) as session:
54+
session.execute(delete(Dataset).where(Dataset.dataset_id == dataset_id))
55+
session.commit()
56+
57+
58+
def get_dataset_ids() -> list[str]:
59+
with Session(get_engine()) as session:
60+
return [
61+
dataset_id for dataset_id in session.scalars(select(Dataset.dataset_id))
62+
]

oc4ids_datastore_pipeline/pipeline.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@
99
import requests
1010
from libcoveoc4ids.api import oc4ids_json_output
1111

12-
from oc4ids_datastore_pipeline.database import Dataset, save_dataset
12+
from oc4ids_datastore_pipeline.database import (
13+
Dataset,
14+
delete_dataset,
15+
get_dataset_ids,
16+
save_dataset,
17+
)
1318
from oc4ids_datastore_pipeline.registry import (
1419
fetch_registered_datasets,
1520
get_license_name_from_url,
@@ -122,11 +127,20 @@ def process_dataset(dataset_name: str, dataset_url: str) -> None:
122127
logger.warning(f"Failed to process dataset {dataset_name} with error {e}")
123128

124129

125-
def process_datasets() -> None:
130+
def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
131+
stored_datasets = get_dataset_ids()
132+
deleted_datasets = stored_datasets - registered_datasets.keys()
133+
for dataset_id in deleted_datasets:
134+
logger.info(f"Dataset {dataset_id} is no longer in the registry, deleting")
135+
delete_dataset(dataset_id)
136+
137+
138+
def process_registry() -> None:
126139
registered_datasets = fetch_registered_datasets()
140+
process_deleted_datasets(registered_datasets)
127141
for name, url in registered_datasets.items():
128142
process_dataset(name, url)
129143

130144

131145
def run() -> None:
132-
process_datasets()
146+
process_registry()

oc4ids_datastore_pipeline/registry.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ def fetch_registered_datasets() -> dict[str, str]:
2222
}
2323
registered_datasets_count = len(registered_datasets)
2424
logger.info(f"Fetched URLs for {registered_datasets_count} datasets")
25-
return registered_datasets
2625
except Exception as e:
2726
raise Exception("Failed to fetch datasets list from registry", e)
27+
if registered_datasets_count < 1:
28+
raise Exception(
29+
"Zero datasets returned from registry, likely an upstream error"
30+
)
31+
return registered_datasets
2832

2933

3034
def fetch_license_mappings() -> dict[str, str]:

tests/test_database.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import datetime
2+
from typing import Any, Generator
3+
4+
import pytest
5+
from pytest_mock import MockerFixture
6+
from sqlalchemy import create_engine
7+
8+
from oc4ids_datastore_pipeline.database import (
9+
Base,
10+
Dataset,
11+
delete_dataset,
12+
get_dataset_ids,
13+
save_dataset,
14+
)
15+
16+
17+
@pytest.fixture(autouse=True)
18+
def before_and_after_each(mocker: MockerFixture) -> Generator[Any, Any, Any]:
19+
engine = create_engine("sqlite:///:memory:")
20+
patch_get_engine = mocker.patch("oc4ids_datastore_pipeline.database.get_engine")
21+
patch_get_engine.return_value = engine
22+
Base.metadata.create_all(engine)
23+
yield
24+
engine.dispose()
25+
26+
27+
def test_save_dataset() -> None:
28+
dataset = Dataset(
29+
dataset_id="test_dataset",
30+
source_url="https://test_dataset.json",
31+
publisher_name="test_publisher",
32+
json_url="data/test_dataset.json",
33+
updated_at=datetime.datetime.now(datetime.UTC),
34+
)
35+
save_dataset(dataset)
36+
37+
assert get_dataset_ids() == ["test_dataset"]
38+
39+
40+
def test_delete_dataset() -> None:
41+
dataset = Dataset(
42+
dataset_id="test_dataset",
43+
source_url="https://test_dataset.json",
44+
publisher_name="test_publisher",
45+
json_url="data/test_dataset.json",
46+
updated_at=datetime.datetime.now(datetime.UTC),
47+
)
48+
save_dataset(dataset)
49+
50+
assert get_dataset_ids() == ["test_dataset"]
51+
52+
delete_dataset("test_dataset")
53+
54+
assert get_dataset_ids() == []

tests/test_pipeline.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from oc4ids_datastore_pipeline.pipeline import (
99
download_json,
1010
process_dataset,
11+
process_deleted_datasets,
1112
transform_to_csv_and_xlsx,
1213
validate_json,
1314
write_json_to_file,
@@ -102,6 +103,21 @@ def test_transform_to_csv_and_xlsx_catches_exception(mocker: MockerFixture) -> N
102103
assert xlsx_path is None
103104

104105

106+
def test_process_deleted_datasets(mocker: MockerFixture) -> None:
107+
patch_get_dataset_ids = mocker.patch(
108+
"oc4ids_datastore_pipeline.pipeline.get_dataset_ids"
109+
)
110+
patch_get_dataset_ids.return_value = ["old_dataset", "test_dataset"]
111+
patch_delete_dataset = mocker.patch(
112+
"oc4ids_datastore_pipeline.pipeline.delete_dataset"
113+
)
114+
115+
registered_datasets = {"test_dataset": "https://test_dataset.json"}
116+
process_deleted_datasets(registered_datasets)
117+
118+
patch_delete_dataset.assert_called_once_with("old_dataset")
119+
120+
105121
def test_process_dataset_catches_exception(mocker: MockerFixture) -> None:
106122
patch_download_json = mocker.patch(
107123
"oc4ids_datastore_pipeline.pipeline.download_json"

tests/test_registry.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,20 @@ def test_fetch_registered_datasets_raises_failure_exception(
3838
assert "Mocked exception" in str(exc_info.value)
3939

4040

41+
def test_fetch_registered_datasets_raises_exception_when_no_datasets(
42+
mocker: MockerFixture,
43+
) -> None:
44+
mock_response = MagicMock()
45+
mock_response.json.return_value = {"records": {}}
46+
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
47+
patch_get.return_value = mock_response
48+
49+
with pytest.raises(Exception) as exc_info:
50+
fetch_registered_datasets()
51+
52+
assert "Zero datasets returned from registry" in str(exc_info.value)
53+
54+
4155
def test_fetch_license_mappings(mocker: MockerFixture) -> None:
4256
mock_response = MagicMock()
4357
mock_response.json.return_value = {

0 commit comments

Comments
 (0)