Skip to content

Commit 3a453bf

Browse files
Merge pull request #25 from OpenDataServices/9-notifications
Send email failure notifications
2 parents fc1f386 + 0d2e8c3 commit 3a453bf

File tree

7 files changed

+229
-63
lines changed

7 files changed

+229
-63
lines changed

.env.test

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,10 @@ BUCKET_REGION="test-region"
33
BUCKET_NAME="test-bucket"
44
BUCKET_ACCESS_KEY_ID="test-id"
55
BUCKET_ACCESS_KEY_SECRET="test-secret"
6+
7+
NOTIFICATIONS_ENABLED=1
8+
NOTIFICATIONS_SMTP_HOST="localhost"
9+
NOTIFICATIONS_SMTP_PORT=8025
10+
NOTIFICATIONS_SMTP_SSL_ENABLED=0
11+
NOTIFICATIONS_SENDER_EMAIL="[email protected]"
12+
NOTIFICATIONS_RECEIVER_EMAIL="[email protected]"

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,17 @@ For local development, create a file called `.env.local`, which will be used by
4444
You can change which file is loaded setting the environment variable `APP_ENV`.
4545
For example the tests set `APP_ENV=test`, which loads variables from `.env.test`.
4646

47+
### Email notification environment variables
48+
49+
To send failure notifications by email, the following environment variables must be set:
50+
51+
- `NOTIFICATIONS_ENABLED`: 1 to enable, 0 to disable
52+
- `NOTIFICATIONS_SMTP_HOST`
53+
- `NOTIFICATIONS_SMTP_PORT`
54+
- `NOTIFICATIONS_SMTP_SSL_ENABLED`: 1 to enable, 0 to disable
55+
- `NOTIFICATIONS_SENDER_EMAIL`
56+
- `NOTIFICATIONS_RECEIVER_EMAIL`
57+
4758
### Run app
4859

4960
```
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import datetime
2+
import logging
3+
import os
4+
import smtplib
5+
import ssl
6+
from email.mime.multipart import MIMEMultipart
7+
from email.mime.text import MIMEText
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def _send_email(errors: list[dict[str, str]]) -> None:
13+
SMTP_HOST = os.environ["NOTIFICATIONS_SMTP_HOST"]
14+
SMTP_PORT = int(os.environ["NOTIFICATIONS_SMTP_PORT"])
15+
SMTP_SSL_ENABLED = int(os.environ["NOTIFICATIONS_SMTP_SSL_ENABLED"])
16+
SENDER_EMAIL = os.environ["NOTIFICATIONS_SENDER_EMAIL"]
17+
RECEIVER_EMAIL = os.environ["NOTIFICATIONS_RECEIVER_EMAIL"]
18+
logger.info(f"Sending email to {RECEIVER_EMAIL}")
19+
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
20+
if SMTP_SSL_ENABLED:
21+
context = ssl.create_default_context()
22+
server.starttls(context=context)
23+
message = MIMEMultipart()
24+
current_time = datetime.datetime.now(datetime.UTC)
25+
message["Subject"] = f"Errors in OC4IDS Datastore Pipeline run: {current_time}"
26+
message["From"] = SENDER_EMAIL
27+
message["To"] = RECEIVER_EMAIL
28+
29+
html = f"""\
30+
<h1>Errors in OC4IDS Datastore Pipeline run</h1>
31+
<p>The pipeline completed at {current_time}.</p>
32+
<p>Please see errors for each dataset below:</p>
33+
{"".join([
34+
f"""
35+
<h2>{error["dataset_id"]}</h2>
36+
<p>Source URL: <code>{error["source_url"]}</code></p>
37+
<pre><code>{error["message"]}</code></pre>
38+
"""
39+
for error in errors
40+
])}
41+
"""
42+
message.attach(MIMEText(html, "html"))
43+
44+
server.sendmail(SENDER_EMAIL, RECEIVER_EMAIL, message.as_string())
45+
46+
47+
def send_notification(errors: list[dict[str, str]]) -> None:
48+
NOTIFICATIONS_ENABLED = bool(int(os.environ.get("NOTIFICATIONS_ENABLED", "0")))
49+
if NOTIFICATIONS_ENABLED:
50+
_send_email(errors)
51+
else:
52+
logger.info("Notifications are disabled, skipping")

oc4ids_datastore_pipeline/pipeline.py

Lines changed: 78 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
get_dataset_ids,
1616
save_dataset,
1717
)
18+
from oc4ids_datastore_pipeline.notifications import send_notification
1819
from oc4ids_datastore_pipeline.registry import (
1920
fetch_registered_datasets,
2021
get_license_name_from_url,
@@ -24,6 +25,17 @@
2425
logger = logging.getLogger(__name__)
2526

2627

28+
class ProcessDatasetError(Exception):
29+
def __init__(self, message: str):
30+
super().__init__(message)
31+
32+
33+
class ValidationError(ProcessDatasetError):
34+
def __init__(self, errors_count: int, errors: list[str]):
35+
message = f"Dataset has {errors_count} validation errors: {str(errors)}"
36+
super().__init__(message)
37+
38+
2739
def download_json(url: str) -> Any:
2840
logger.info(f"Downloading json from {url}")
2941
try:
@@ -33,19 +45,23 @@ def download_json(url: str) -> Any:
3345
logger.info(f"Downloaded {url} ({response_size} bytes)")
3446
return r.json()
3547
except Exception as e:
36-
raise Exception("Download failed", e)
48+
raise ProcessDatasetError(f"Download failed: {str(e)}")
3749

3850

39-
def validate_json(dataset_name: str, json_data: dict[str, Any]) -> None:
40-
logger.info(f"Validating dataset {dataset_name}")
51+
def validate_json(dataset_id: str, json_data: dict[str, Any]) -> None:
52+
logger.info(f"Validating dataset {dataset_id}")
4153
try:
4254
validation_result = oc4ids_json_output(json_data=json_data)
4355
validation_errors_count = validation_result["validation_errors_count"]
56+
validation_errors = validation_result["validation_errors"]
4457
if validation_errors_count > 0:
45-
raise Exception(f"Dataset has {validation_errors_count} validation errors")
46-
logger.info(f"Dataset {dataset_name} is valid")
58+
raise ValidationError(
59+
errors_count=validation_errors_count,
60+
errors=validation_errors,
61+
)
62+
logger.info(f"Dataset {dataset_id} is valid")
4763
except Exception as e:
48-
raise Exception("Validation failed", e)
64+
raise ProcessDatasetError(f"Validation failed: {str(e)}")
4965

5066

5167
def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
@@ -57,7 +73,7 @@ def write_json_to_file(file_name: str, json_data: dict[str, Any]) -> str:
5773
logger.info(f"Finished writing to {file_name}")
5874
return file_name
5975
except Exception as e:
60-
raise Exception("Error while writing to JSON file", e)
76+
raise ProcessDatasetError(f"Error writing dataset to file: {e}")
6177

6278

6379
def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[str]]:
@@ -76,59 +92,60 @@ def transform_to_csv_and_xlsx(json_path: str) -> tuple[Optional[str], Optional[s
7692
logger.info(f"Transformed to XLSX at {xlsx_path}")
7793
return csv_path, xlsx_path
7894
except Exception as e:
79-
logger.warning(f"Failed to transform JSON to CSV and XLSX with error {e}")
95+
logger.warning(f"Failed to transform JSON to CSV and XLSX: {e}")
8096
return None, None
8197

8298

8399
def save_dataset_metadata(
84-
dataset_name: str,
100+
dataset_id: str,
85101
source_url: str,
86102
json_data: dict[str, Any],
87103
json_url: Optional[str],
88104
csv_url: Optional[str],
89105
xlsx_url: Optional[str],
90106
) -> None:
91-
logger.info(f"Saving metadata for dataset {dataset_name}")
92-
publisher_name = json_data.get("publisher", {}).get("name", "")
93-
license_url = json_data.get("license", None)
94-
license_name = get_license_name_from_url(license_url) if license_url else None
95-
dataset = Dataset(
96-
dataset_id=dataset_name,
97-
source_url=source_url,
98-
publisher_name=publisher_name,
99-
license_url=license_url,
100-
license_name=license_name,
101-
json_url=json_url,
102-
csv_url=csv_url,
103-
xlsx_url=xlsx_url,
104-
updated_at=datetime.datetime.now(datetime.UTC),
105-
)
106-
save_dataset(dataset)
107-
108-
109-
def process_dataset(dataset_name: str, dataset_url: str) -> None:
110-
logger.info(f"Processing dataset {dataset_name}")
107+
logger.info(f"Saving metadata for dataset {dataset_id}")
111108
try:
112-
json_data = download_json(dataset_url)
113-
validate_json(dataset_name, json_data)
114-
json_path = write_json_to_file(
115-
f"data/{dataset_name}/{dataset_name}.json", json_data
116-
)
117-
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
118-
json_public_url, csv_public_url, xlsx_public_url = upload_files(
119-
dataset_name, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
109+
publisher_name = json_data.get("publisher", {}).get("name", "")
110+
license_url = json_data.get("license", None)
111+
license_name = get_license_name_from_url(license_url) if license_url else None
112+
dataset = Dataset(
113+
dataset_id=dataset_id,
114+
source_url=source_url,
115+
publisher_name=publisher_name,
116+
license_url=license_url,
117+
license_name=license_name,
118+
json_url=json_url,
119+
csv_url=csv_url,
120+
xlsx_url=xlsx_url,
121+
updated_at=datetime.datetime.now(datetime.UTC),
120122
)
121-
save_dataset_metadata(
122-
dataset_name=dataset_name,
123-
source_url=dataset_url,
124-
json_data=json_data,
125-
json_url=json_public_url,
126-
csv_url=csv_public_url,
127-
xlsx_url=xlsx_public_url,
128-
)
129-
logger.info(f"Processed dataset {dataset_name}")
123+
save_dataset(dataset)
130124
except Exception as e:
131-
logger.warning(f"Failed to process dataset {dataset_name} with error {e}")
125+
raise ProcessDatasetError(f"Failed to update metadata for dataset: {e}")
126+
127+
128+
def process_dataset(dataset_id: str, source_url: str) -> None:
129+
logger.info(f"Processing dataset {dataset_id}")
130+
json_data = download_json(source_url)
131+
validate_json(dataset_id, json_data)
132+
json_path = write_json_to_file(
133+
file_name=f"data/{dataset_id}/{dataset_id}.json",
134+
json_data=json_data,
135+
)
136+
csv_path, xlsx_path = transform_to_csv_and_xlsx(json_path)
137+
json_public_url, csv_public_url, xlsx_public_url = upload_files(
138+
dataset_id, json_path=json_path, csv_path=csv_path, xlsx_path=xlsx_path
139+
)
140+
save_dataset_metadata(
141+
dataset_id=dataset_id,
142+
source_url=source_url,
143+
json_data=json_data,
144+
json_url=json_public_url,
145+
csv_url=csv_public_url,
146+
xlsx_url=xlsx_public_url,
147+
)
148+
logger.info(f"Processed dataset {dataset_id}")
132149

133150

134151
def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
@@ -143,8 +160,20 @@ def process_deleted_datasets(registered_datasets: dict[str, str]) -> None:
143160
def process_registry() -> None:
144161
registered_datasets = fetch_registered_datasets()
145162
process_deleted_datasets(registered_datasets)
146-
for name, url in registered_datasets.items():
147-
process_dataset(name, url)
163+
errors: list[dict[str, Any]] = []
164+
for dataset_id, url in registered_datasets.items():
165+
try:
166+
process_dataset(dataset_id, url)
167+
except Exception as e:
168+
logger.warning(f"Failed to process dataset {dataset_id} with error {e}")
169+
errors.append(
170+
{"dataset_id": dataset_id, "source_url": url, "message": str(e)}
171+
)
172+
if errors:
173+
logger.error(
174+
f"Errors while processing registry: {json.dumps(errors, indent=4)}"
175+
)
176+
send_notification(errors)
148177
logger.info("Finished processing all datasets")
149178

150179

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi"
55
[project]
66
name = "oc4ids-datastore-pipeline"
77
description = "OC4IDS Datastore Pipeline"
8-
version = "0.1.0"
8+
version = "0.2.0"
99
readme = "README.md"
1010
dependencies = [
1111
"alembic",

tests/test_notifications.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from unittest.mock import MagicMock
2+
3+
from pytest_mock import MockerFixture
4+
5+
from oc4ids_datastore_pipeline.notifications import send_notification
6+
7+
8+
def test_send_notification(mocker: MockerFixture) -> None:
9+
mock_smtp_server = MagicMock()
10+
patch_smtp = mocker.patch("oc4ids_datastore_pipeline.notifications.smtplib.SMTP")
11+
patch_smtp.return_value = mock_smtp_server
12+
13+
errors = [
14+
{
15+
"dataset_id": "test_dataset",
16+
"source_url": "https://test_dataset.json",
17+
"message": "Mocked exception",
18+
}
19+
]
20+
send_notification(errors)
21+
22+
patch_smtp.assert_called_once_with("localhost", 8025)
23+
with mock_smtp_server as server:
24+
server.sendmail.assert_called_once()
25+
sender, receiver, message = server.sendmail.call_args[0]
26+
assert sender == "[email protected]"
27+
assert receiver == "[email protected]"
28+
assert "test_dataset" in message
29+
assert "https://test_dataset.json" in message
30+
assert "Mocked exception" in message

0 commit comments

Comments
 (0)