diff --git a/.env.test b/.env.test index 3b00692..7e9a799 100644 --- a/.env.test +++ b/.env.test @@ -3,3 +3,10 @@ BUCKET_REGION="test-region" BUCKET_NAME="test-bucket" BUCKET_ACCESS_KEY_ID="test-id" BUCKET_ACCESS_KEY_SECRET="test-secret" + +NOTIFICATIONS_ENABLED=1 +NOTIFICATIONS_SMTP_HOST="localhost" +NOTIFICATIONS_SMTP_PORT=8025 +NOTIFICATIONS_SMTP_SSL_ENABLED=0 +NOTIFICATIONS_SENDER_EMAIL="sender@example.com" +NOTIFICATIONS_RECEIVER_EMAIL="receiver@example.com" diff --git a/README.md b/README.md index 6e6c35e..08d4573 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,17 @@ For local development, create a file called `.env.local`, which will be used by You can change which file is loaded setting the environment variable `APP_ENV`. For example the tests set `APP_ENV=test`, which loads variables from `.env.test`. +### Email notification environment variables + +To send failure notifications by email, the following environment variables must be set: + +- `NOTIFICATIONS_ENABLED`: 1 to enable, 0 to disable +- `NOTIFICATIONS_SMTP_HOST` +- `NOTIFICATIONS_SMTP_PORT` +- `NOTIFICATIONS_SMTP_SSL_ENABLED`: 1 to enable, 0 to disable +- `NOTIFICATIONS_SENDER_EMAIL` +- `NOTIFICATIONS_RECEIVER_EMAIL` + ### Run app ``` diff --git a/oc4ids_datastore_pipeline/notifications.py b/oc4ids_datastore_pipeline/notifications.py new file mode 100644 index 0000000..211b8c4 --- /dev/null +++ b/oc4ids_datastore_pipeline/notifications.py @@ -0,0 +1,52 @@ +import datetime +import logging +import os +import smtplib +import ssl +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +logger = logging.getLogger(__name__) + + +def _send_email(errors: list[dict[str, str]]) -> None: + SMTP_HOST = os.environ["NOTIFICATIONS_SMTP_HOST"] + SMTP_PORT = int(os.environ["NOTIFICATIONS_SMTP_PORT"]) + SMTP_SSL_ENABLED = int(os.environ["NOTIFICATIONS_SMTP_SSL_ENABLED"]) + SENDER_EMAIL = os.environ["NOTIFICATIONS_SENDER_EMAIL"] + RECEIVER_EMAIL = os.environ["NOTIFICATIONS_RECEIVER_EMAIL"] + logger.info(f"Sending email to {RECEIVER_EMAIL}") + with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: + if SMTP_SSL_ENABLED: + context = ssl.create_default_context() + server.starttls(context=context) + message = MIMEMultipart() + current_time = datetime.datetime.now(datetime.UTC) + message["Subject"] = f"Errors in OC4IDS Datastore Pipeline run: {current_time}" + message["From"] = SENDER_EMAIL + message["To"] = RECEIVER_EMAIL + + html = f"""\ +
The pipeline completed at {current_time}.
+Please see errors for each dataset below:
+ {"".join([ + f""" +Source URL: {error["source_url"]}
{error["message"]}
+ """
+ for error in errors
+ ])}
+ """
+ message.attach(MIMEText(html, "html"))
+
+ server.sendmail(SENDER_EMAIL, RECEIVER_EMAIL, message.as_string())
+
+
+def send_notification(errors: list[dict[str, str]]) -> None:
+ NOTIFICATIONS_ENABLED = bool(int(os.environ.get("NOTIFICATIONS_ENABLED", "0")))
+ if NOTIFICATIONS_ENABLED:
+ _send_email(errors)
+ else:
+ logger.info("Notifications are disabled, skipping")
diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py
index efbe5af..58e3dea 100644
--- a/oc4ids_datastore_pipeline/pipeline.py
+++ b/oc4ids_datastore_pipeline/pipeline.py
@@ -15,6 +15,7 @@
get_dataset_ids,
save_dataset,
)
+from oc4ids_datastore_pipeline.notifications import send_notification
from oc4ids_datastore_pipeline.registry import (
fetch_registered_datasets,
get_license_name_from_url,
@@ -24,6 +25,17 @@
logger = logging.getLogger(__name__)
+class ProcessDatasetError(Exception):
+ def __init__(self, message: str):
+ super().__init__(message)
+
+
+class ValidationError(ProcessDatasetError):
+ def __init__(self, errors_count: int, errors: list[str]):
+ message = f"Dataset has {errors_count} validation errors: {str(errors)}"
+ super().__init__(message)
+
+
def download_json(url: str) -> Any:
logger.info(f"Downloading json from {url}")
try:
@@ -33,19 +45,23 @@ def download_json(url: str) -> Any:
logger.info(f"Downloaded {url} ({response_size} bytes)")
return r.json()
except Exception as e:
- raise Exception("Download failed", e)
+ raise ProcessDatasetError(f"Download failed: {str(e)}")
-def validate_json(dataset_name: str, json_data: dict[str, Any]) -> None:
- logger.info(f"Validating dataset {dataset_name}")
+def validate_json(dataset_id: str, json_data: dict[str, Any]) -> None:
+ logger.info(f"Validating dataset {dataset_id}")
try:
validation_result = oc4ids_json_output(json_data=json_data)
validation_errors_count = validation_result["validation_errors_count"]
+ validation_errors = validation_result["validation_errors"]
if validation_errors_count > 0:
- raise Exception(f"Dataset has {validation_errors_count} validation errors")
- logger.info(f"Dataset {dataset_name} is valid")
+ raise ValidationError(
+ errors_count=validation_errors_count,
+ errors=validation_errors,
+ )
+ logger.info(f"Dataset {dataset_id} is valid")
except Exception as e:
- raise Exception("Validation failed", e)
+ raise ProcessDatasetError(f"Validation failed: {str(e)}")
def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
@@ -57,7 +73,7 @@ def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
logger.info(f"Finished writing to {file_name}")
return file_name
except Exception as e:
- raise Exception("Error while writing to JSON file", e)
+ raise ProcessDatasetError(f"Error writing dataset to file: {e}")
def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[str]]:
@@ -76,59 +92,60 @@ def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[s
logger.info(f"Transformed to XLSX at {xlsx_path}")
return csv_path, xlsx_path
except Exception as e:
- logger.warning(f"Failed to transform JSON to CSV and XLSX with error {e}")
+ logger.warning(f"Failed to transform JSON to CSV and XLSX: {e}")
return None, None
def save_dataset_metadata(
- dataset_name: str,
+ dataset_id: str,
source_url: str,
json_data: dict[str, Any],
json_url: Optional[str],
csv_url: Optional[str],
xlsx_url: Optional[str],
) -> None:
- logger.info(f"Saving metadata for dataset {dataset_name}")
- publisher_name = json_data.get("publisher", {}).get("name", "")
- license_url = json_data.get("license", None)
- license_name = get_license_name_from_url(license_url) if license_url else None
- dataset = Dataset(
- dataset_id=dataset_name,
- source_url=source_url,
- publisher_name=publisher_name,
- license_url=license_url,
- license_name=license_name,
- json_url=json_url,
- csv_url=csv_url,
- xlsx_url=xlsx_url,
- updated_at=datetime.datetime.now(datetime.UTC),
- )
- save_dataset(dataset)
-
-
-def process_dataset(dataset_name: str, dataset_url: str) -> None:
- logger.info(f"Processing dataset {dataset_name}")
+ logger.info(f"Saving metadata for dataset {dataset_id}")
try:
- json_data = download_json(dataset_url)
- validate_json(dataset_name, json_data)
- json_path = write_json_to_file(
- f"data/{dataset_name}/{dataset_name}.json", json_data
- )
- csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
- json_public_url, csv_public_url, xlsx_public_url = upload_files(
- dataset_name, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
+ publisher_name = json_data.get("publisher", {}).get("name", "")
+ license_url = json_data.get("license", None)
+ license_name = get_license_name_from_url(license_url) if license_url else None
+ dataset = Dataset(
+ dataset_id=dataset_id,
+ source_url=source_url,
+ publisher_name=publisher_name,
+ license_url=license_url,
+ license_name=license_name,
+ json_url=json_url,
+ csv_url=csv_url,
+ xlsx_url=xlsx_url,
+ updated_at=datetime.datetime.now(datetime.UTC),
)
- save_dataset_metadata(
- dataset_name=dataset_name,
- source_url=dataset_url,
- json_data=json_data,
- json_url=json_public_url,
- csv_url=csv_public_url,
- xlsx_url=xlsx_public_url,
- )
- logger.info(f"Processed dataset {dataset_name}")
+ save_dataset(dataset)
except Exception as e:
- logger.warning(f"Failed to process dataset {dataset_name} with error {e}")
+ raise ProcessDatasetError(f"Failed to update metadata for dataset: {e}")
+
+
+def process_dataset(dataset_id: str, source_url: str) -> None:
+ logger.info(f"Processing dataset {dataset_id}")
+ json_data = download_json(source_url)
+ validate_json(dataset_id, json_data)
+ json_path = write_json_to_file(
+ file_name=f"data/{dataset_id}/{dataset_id}.json",
+ json_data=json_data,
+ )
+ csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
+ json_public_url, csv_public_url, xlsx_public_url = upload_files(
+ dataset_id, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
+ )
+ save_dataset_metadata(
+ dataset_id=dataset_id,
+ source_url=source_url,
+ json_data=json_data,
+ json_url=json_public_url,
+ csv_url=csv_public_url,
+ xlsx_url=xlsx_public_url,
+ )
+ logger.info(f"Processed dataset {dataset_id}")
def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
@@ -143,8 +160,20 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
def process_registry() -> None:
registered_datasets = fetch_registered_datasets()
process_deleted_datasets(registered_datasets)
- for name, url in registered_datasets.items():
- process_dataset(name, url)
+ errors: list[dict[str, Any]] = []
+ for dataset_id, url in registered_datasets.items():
+ try:
+ process_dataset(dataset_id, url)
+ except Exception as e:
+ logger.warning(f"Failed to process dataset {dataset_id} with error {e}")
+ errors.append(
+ {"dataset_id": dataset_id, "source_url": url, "message": str(e)}
+ )
+ if errors:
+ logger.error(
+ f"Errors while processing registry: {json.dumps(errors, indent=4)}"
+ )
+ send_notification(errors)
logger.info("Finished processing all datasets")
diff --git a/pyproject.toml b/pyproject.toml
index 1413f3e..0cc3f85 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "oc4ids-datastore-pipeline"
description = "OC4IDS Datastore Pipeline"
-version = "0.1.0"
+version = "0.2.0"
readme = "README.md"
dependencies = [
"alembic",
diff --git a/tests/test_notifications.py b/tests/test_notifications.py
new file mode 100644
index 0000000..aed4631
--- /dev/null
+++ b/tests/test_notifications.py
@@ -0,0 +1,30 @@
+from unittest.mock import MagicMock
+
+from pytest_mock import MockerFixture
+
+from oc4ids_datastore_pipeline.notifications import send_notification
+
+
+def test_send_notification(mocker: MockerFixture) -> None:
+ mock_smtp_server = MagicMock()
+ patch_smtp = mocker.patch("oc4ids_datastore_pipeline.notifications.smtplib.SMTP")
+ patch_smtp.return_value = mock_smtp_server
+
+ errors = [
+ {
+ "dataset_id": "test_dataset",
+ "source_url": "https://test_dataset.json",
+ "message": "Mocked exception",
+ }
+ ]
+ send_notification(errors)
+
+ patch_smtp.assert_called_once_with("localhost", 8025)
+ with mock_smtp_server as server:
+ server.sendmail.assert_called_once()
+ sender, receiver, message = server.sendmail.call_args[0]
+ assert sender == "sender@example.com"
+ assert receiver == "receiver@example.com"
+ assert "test_dataset" in message
+ assert "https://test_dataset.json" in message
+ assert "Mocked exception" in message
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index 8419600..f09425e 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -6,9 +6,11 @@
from pytest_mock import MockerFixture
from oc4ids_datastore_pipeline.pipeline import (
+ ProcessDatasetError,
download_json,
process_dataset,
process_deleted_datasets,
+ process_registry,
transform_to_csv_and_xlsx,
validate_json,
write_json_to_file,
@@ -19,7 +21,7 @@ def test_download_json_raises_failure_exception(mocker: MockerFixture) -> None:
patch_get = mocker.patch("oc4ids_datastore_pipeline.pipeline.requests.get")
patch_get.side_effect = Exception("Mocked exception")
- with pytest.raises(Exception) as exc_info:
+ with pytest.raises(ProcessDatasetError) as exc_info:
download_json(url="https://test_dataset.json")
assert "Download failed" in str(exc_info.value)
@@ -32,8 +34,8 @@ def test_validate_json_raises_failure_exception(mocker: MockerFixture) -> None:
)
patch_oc4ids_json_output.side_effect = Exception("Mocked exception")
- with pytest.raises(Exception) as exc_info:
- validate_json(dataset_name="test_dataset", json_data={})
+ with pytest.raises(ProcessDatasetError) as exc_info:
+ validate_json(dataset_id="test_dataset", json_data={})
assert "Validation failed" in str(exc_info.value)
assert "Mocked exception" in str(exc_info.value)
@@ -45,13 +47,28 @@ def test_validate_json_raises_validation_errors_exception(
patch_oc4ids_json_output = mocker.patch(
"oc4ids_datastore_pipeline.pipeline.oc4ids_json_output"
)
- patch_oc4ids_json_output.return_value = {"validation_errors_count": 2}
-
- with pytest.raises(Exception) as exc_info:
- validate_json(dataset_name="test_dataset", json_data={})
+ patch_oc4ids_json_output.return_value = {
+ "validation_errors_count": 2,
+ "validation_errors": [
+ [
+ '{"message": "Non-unique id values"}',
+ [
+ {
+ "path": "projects/22/parties",
+ "value": "test_value",
+ },
+ {"path": "projects/30/parties", "value": "test_value"},
+ ],
+ ]
+ ],
+ }
+
+ with pytest.raises(ProcessDatasetError) as exc_info:
+ validate_json(dataset_id="test_dataset", json_data={})
assert "Validation failed" in str(exc_info.value)
assert "Dataset has 2 validation errors" in str(exc_info.value)
+ assert "Non-unique id values" in str(exc_info.value)
def test_write_json_to_file_writes_in_correct_format() -> None:
@@ -73,13 +90,13 @@ def test_write_json_to_file_raises_failure_exception(mocker: MockerFixture) -> N
patch_json_dump = mocker.patch("oc4ids_datastore_pipeline.pipeline.json.dump")
patch_json_dump.side_effect = Exception("Mocked exception")
- with pytest.raises(Exception) as exc_info:
+ with pytest.raises(ProcessDatasetError) as exc_info:
with tempfile.TemporaryDirectory() as dir:
file_name = os.path.join(dir, "test_dataset.json")
write_json_to_file(file_name=file_name, json_data={"key": "value"})
- assert "Error while writing to JSON file" in str(exc_info.value)
- assert "Mocked exception" in str(exc_info.value)
+ assert "Error writing dataset to file" in str(exc_info.value)
+ assert "Mocked exception" in str(exc_info.value)
def test_transform_to_csv_and_xlsx_returns_correct_paths(mocker: MockerFixture) -> None:
@@ -122,10 +139,30 @@ def test_process_deleted_datasets(mocker: MockerFixture) -> None:
patch_delete_files_for_dataset.assert_called_once_with("old_dataset")
-def test_process_dataset_catches_exception(mocker: MockerFixture) -> None:
+def test_process_dataset_raises_failure_exception(mocker: MockerFixture) -> None:
patch_download_json = mocker.patch(
"oc4ids_datastore_pipeline.pipeline.download_json"
)
- patch_download_json.side_effect = Exception("Download failed")
+ patch_download_json.side_effect = ProcessDatasetError("Download failed: Exception")
+
+ with pytest.raises(ProcessDatasetError) as exc_info:
+ process_dataset("test_dataset", "https://test_dataset.json")
+
+ assert "Download failed: Exception" in str(exc_info.value)
+
+
+def test_process_registry_catches_exception(mocker: MockerFixture) -> None:
+ patch_fetch_registered_datasets = mocker.patch(
+ "oc4ids_datastore_pipeline.pipeline.fetch_registered_datasets"
+ )
+ patch_fetch_registered_datasets.return_value = {
+ "test_dataset": "https://test_dataset.json"
+ }
+ mocker.patch("oc4ids_datastore_pipeline.pipeline.process_deleted_datasets")
+ patch_process_dataset = mocker.patch(
+ "oc4ids_datastore_pipeline.pipeline.process_dataset"
+ )
+ patch_process_dataset.side_effect = Exception("Mocked exception")
+ mocker.patch("oc4ids_datastore_pipeline.pipeline.send_notification")
- process_dataset("test_dataset", "https://test_dataset.json")
+ process_registry()