Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@ BUCKET_REGION="test-region"
BUCKET_NAME="test-bucket"
BUCKET_ACCESS_KEY_ID="test-id"
BUCKET_ACCESS_KEY_SECRET="test-secret"

NOTIFICATIONS_ENABLED=1
NOTIFICATIONS_SMTP_HOST="localhost"
NOTIFICATIONS_SMTP_PORT=8025
NOTIFICATIONS_SMTP_SSL_ENABLED=0
NOTIFICATIONS_SENDER_EMAIL="[email protected]"
NOTIFICATIONS_RECEIVER_EMAIL="[email protected]"
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ For local development, create a file called `.env.local`, which will be used by
You can change which file is loaded setting the environment variable `APP_ENV`.
For example the tests set `APP_ENV=test`, which loads variables from `.env.test`.

### Email notification environment variables

To send failure notifications by email, the following environment variables must be set:

- `NOTIFICATIONS_ENABLED`: 1 to enable, 0 to disable
- `NOTIFICATIONS_SMTP_HOST`
- `NOTIFICATIONS_SMTP_PORT`
- `NOTIFICATIONS_SMTP_SSL_ENABLED`: 1 to enable, 0 to disable
- `NOTIFICATIONS_SENDER_EMAIL`
- `NOTIFICATIONS_RECEIVER_EMAIL`

### Run app

```
Expand Down
52 changes: 52 additions & 0 deletions oc4ids_datastore_pipeline/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime
import logging
import os
import smtplib
import ssl
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

logger = logging.getLogger(__name__)


def _send_email(errors: list[dict[str, str]]) -> None:
SMTP_HOST = os.environ["NOTIFICATIONS_SMTP_HOST"]
SMTP_PORT = int(os.environ["NOTIFICATIONS_SMTP_PORT"])
SMTP_SSL_ENABLED = int(os.environ["NOTIFICATIONS_SMTP_SSL_ENABLED"])
SENDER_EMAIL = os.environ["NOTIFICATIONS_SENDER_EMAIL"]
RECEIVER_EMAIL = os.environ["NOTIFICATIONS_RECEIVER_EMAIL"]
logger.info(f"Sending email to {RECEIVER_EMAIL}")
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
if SMTP_SSL_ENABLED:
context = ssl.create_default_context()
server.starttls(context=context)
message = MIMEMultipart()
current_time = datetime.datetime.now(datetime.UTC)
message["Subject"] = f"Errors in OC4IDS Datastore Pipeline run: {current_time}"
message["From"] = SENDER_EMAIL
message["To"] = RECEIVER_EMAIL

html = f"""\
<h1>Errors in OC4IDS Datastore Pipeline run</h1>
<p>The pipeline completed at {current_time}.</p>
<p>Please see errors for each dataset below:</p>
{"".join([
f"""
<h2>{error["dataset_id"]}</h2>
<p>Source URL: <code>{error["source_url"]}</code></p>
<pre><code>{error["message"]}</code></pre>
"""
for error in errors
])}
"""
message.attach(MIMEText(html, "html"))

server.sendmail(SENDER_EMAIL, RECEIVER_EMAIL, message.as_string())


def send_notification(errors: list[dict[str, str]]) -> None:
NOTIFICATIONS_ENABLED = bool(int(os.environ.get("NOTIFICATIONS_ENABLED", "0")))
if NOTIFICATIONS_ENABLED:
_send_email(errors)
else:
logger.info("Notifications are disabled, skipping")
127 changes: 78 additions & 49 deletions oc4ids_datastore_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
get_dataset_ids,
save_dataset,
)
from oc4ids_datastore_pipeline.notifications import send_notification
from oc4ids_datastore_pipeline.registry import (
fetch_registered_datasets,
get_license_name_from_url,
Expand All @@ -24,6 +25,17 @@
logger = logging.getLogger(__name__)


class ProcessDatasetError(Exception):
def __init__(self, message: str):
super().__init__(message)


class ValidationError(ProcessDatasetError):
def __init__(self, errors_count: int, errors: list[str]):
message = f"Dataset has {errors_count} validation errors: {str(errors)}"
super().__init__(message)


def download_json(url: str) -> Any:
logger.info(f"Downloading json from {url}")
try:
Expand All @@ -33,19 +45,23 @@ def download_json(url: str) -> Any:
logger.info(f"Downloaded {url} ({response_size} bytes)")
return r.json()
except Exception as e:
raise Exception("Download failed", e)
raise ProcessDatasetError(f"Download failed: {str(e)}")


def validate_json(dataset_name: str, json_data: dict[str, Any]) -> None:
logger.info(f"Validating dataset {dataset_name}")
def validate_json(dataset_id: str, json_data: dict[str, Any]) -> None:
logger.info(f"Validating dataset {dataset_id}")
try:
validation_result = oc4ids_json_output(json_data=json_data)
validation_errors_count = validation_result["validation_errors_count"]
validation_errors = validation_result["validation_errors"]
if validation_errors_count > 0:
raise Exception(f"Dataset has {validation_errors_count} validation errors")
logger.info(f"Dataset {dataset_name} is valid")
raise ValidationError(
errors_count=validation_errors_count,
errors=validation_errors,
)
logger.info(f"Dataset {dataset_id} is valid")
except Exception as e:
raise Exception("Validation failed", e)
raise ProcessDatasetError(f"Validation failed: {str(e)}")


def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
Expand All @@ -57,7 +73,7 @@ def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
logger.info(f"Finished writing to {file_name}")
return file_name
except Exception as e:
raise Exception("Error while writing to JSON file", e)
raise ProcessDatasetError(f"Error writing dataset to file: {e}")


def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[str]]:
Expand All @@ -76,59 +92,60 @@ def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[s
logger.info(f"Transformed to XLSX at {xlsx_path}")
return csv_path, xlsx_path
except Exception as e:
logger.warning(f"Failed to transform JSON to CSV and XLSX with error {e}")
logger.warning(f"Failed to transform JSON to CSV and XLSX: {e}")
return None, None


def save_dataset_metadata(
dataset_name: str,
dataset_id: str,
source_url: str,
json_data: dict[str, Any],
json_url: Optional[str],
csv_url: Optional[str],
xlsx_url: Optional[str],
) -> None:
logger.info(f"Saving metadata for dataset {dataset_name}")
publisher_name = json_data.get("publisher", {}).get("name", "")
license_url = json_data.get("license", None)
license_name = get_license_name_from_url(license_url) if license_url else None
dataset = Dataset(
dataset_id=dataset_name,
source_url=source_url,
publisher_name=publisher_name,
license_url=license_url,
license_name=license_name,
json_url=json_url,
csv_url=csv_url,
xlsx_url=xlsx_url,
updated_at=datetime.datetime.now(datetime.UTC),
)
save_dataset(dataset)


def process_dataset(dataset_name: str, dataset_url: str) -> None:
logger.info(f"Processing dataset {dataset_name}")
logger.info(f"Saving metadata for dataset {dataset_id}")
try:
json_data = download_json(dataset_url)
validate_json(dataset_name, json_data)
json_path = write_json_to_file(
f"data/{dataset_name}/{dataset_name}.json", json_data
)
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
json_public_url, csv_public_url, xlsx_public_url = upload_files(
dataset_name, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
publisher_name = json_data.get("publisher", {}).get("name", "")
license_url = json_data.get("license", None)
license_name = get_license_name_from_url(license_url) if license_url else None
dataset = Dataset(
dataset_id=dataset_id,
source_url=source_url,
publisher_name=publisher_name,
license_url=license_url,
license_name=license_name,
json_url=json_url,
csv_url=csv_url,
xlsx_url=xlsx_url,
updated_at=datetime.datetime.now(datetime.UTC),
)
save_dataset_metadata(
dataset_name=dataset_name,
source_url=dataset_url,
json_data=json_data,
json_url=json_public_url,
csv_url=csv_public_url,
xlsx_url=xlsx_public_url,
)
logger.info(f"Processed dataset {dataset_name}")
save_dataset(dataset)
except Exception as e:
logger.warning(f"Failed to process dataset {dataset_name} with error {e}")
raise ProcessDatasetError(f"Failed to update metadata for dataset: {e}")


def process_dataset(dataset_id: str, source_url: str) -> None:
logger.info(f"Processing dataset {dataset_id}")
json_data = download_json(source_url)
validate_json(dataset_id, json_data)
json_path = write_json_to_file(
file_name=f"data/{dataset_id}/{dataset_id}.json",
json_data=json_data,
)
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
json_public_url, csv_public_url, xlsx_public_url = upload_files(
dataset_id, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
)
save_dataset_metadata(
dataset_id=dataset_id,
source_url=source_url,
json_data=json_data,
json_url=json_public_url,
csv_url=csv_public_url,
xlsx_url=xlsx_public_url,
)
logger.info(f"Processed dataset {dataset_id}")


def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
Expand All @@ -143,8 +160,20 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
def process_registry() -> None:
registered_datasets = fetch_registered_datasets()
process_deleted_datasets(registered_datasets)
for name, url in registered_datasets.items():
process_dataset(name, url)
errors: list[dict[str, Any]] = []
for dataset_id, url in registered_datasets.items():
try:
process_dataset(dataset_id, url)
except Exception as e:
logger.warning(f"Failed to process dataset {dataset_id} with error {e}")
errors.append(
{"dataset_id": dataset_id, "source_url": url, "message": str(e)}
)
if errors:
logger.error(
f"Errors while processing registry: {json.dumps(errors, indent=4)}"
)
send_notification(errors)
logger.info("Finished processing all datasets")


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "oc4ids-datastore-pipeline"
description = "OC4IDS Datastore Pipeline"
version = "0.1.0"
version = "0.2.0"
readme = "README.md"
dependencies = [
"alembic",
Expand Down
30 changes: 30 additions & 0 deletions tests/test_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from unittest.mock import MagicMock

from pytest_mock import MockerFixture

from oc4ids_datastore_pipeline.notifications import send_notification


def test_send_notification(mocker: MockerFixture) -> None:
mock_smtp_server = MagicMock()
patch_smtp = mocker.patch("oc4ids_datastore_pipeline.notifications.smtplib.SMTP")
patch_smtp.return_value = mock_smtp_server

errors = [
{
"dataset_id": "test_dataset",
"source_url": "https://test_dataset.json",
"message": "Mocked exception",
}
]
send_notification(errors)

patch_smtp.assert_called_once_with("localhost", 8025)
with mock_smtp_server as server:
server.sendmail.assert_called_once()
sender, receiver, message = server.sendmail.call_args[0]
assert sender == "[email protected]"
assert receiver == "[email protected]"
assert "test_dataset" in message
assert "https://test_dataset.json" in message
assert "Mocked exception" in message
Loading