-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathairtable_issues_email_operator.py
More file actions
116 lines (94 loc) · 3.37 KB
/
airtable_issues_email_operator.py
File metadata and controls
116 lines (94 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from __future__ import annotations
from typing import Any, Sequence
from airflow.models import BaseOperator
from airflow.models.taskinstance import Context
from airflow.utils.email import send_email
class AirtableIssuesEmailOperator(BaseOperator):
template_fields: Sequence[str] = (
"to_emails",
"subject",
"update_result",
)
def __init__(
self,
to_emails: list[str] | str,
update_result: dict[str, Any],
subject: str = "[Airflow] Airtable Issue Management",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.to_emails = to_emails
self.subject = subject
self.update_result = update_result
def build_table_rows(self, email_rows: list[dict[str, Any]]) -> str:
table_rows = ""
for row in email_rows:
table_rows += (
f"<tr><td>{row.get('issue_number', '')}</td>"
f"<td>{row.get('gtfs_dataset_name', '')}</td>"
f"<td>{row.get('status', '')}</td>"
f"<td>{row.get('new_end_date', '')}</td></tr>"
)
return table_rows
def build_failed_html(self, failed_batches: list[dict[str, Any]]) -> str:
if not failed_batches:
return "None"
return "<br>".join(
[
f"Batch {batch['batch_num']}: {batch['error']}"
for batch in failed_batches
]
)
def build_email_body(
self,
email_rows: list[dict[str, Any]],
failed_batches: list[dict[str, Any]],
) -> str:
table_rows = self.build_table_rows(email_rows)
failed_html = self.build_failed_html(failed_batches)
return f"""
<b>✅ Successfully updated {len(email_rows)} Airtable records.</b><br><br>
<b>Closed About to Expire Issues:</b><br>
<table border="1" cellspacing="0" cellpadding="5">
<tr>
<th>Issue Number</th>
<th>GTFS Dataset Name</th>
<th>Status</th>
<th>New End Date</th>
</tr>
{table_rows}
</table><br><br>
<b>❌ Failed batches:</b><br>
{failed_html}
"""
def execute(self, context: Context) -> dict[str, Any]:
del context
update_result = self.update_result
if not update_result:
self.log.info("No update result found. Email not sent.")
return {
"email_sent": False,
"reason": "no_update_result",
}
email_rows = update_result.get("email_rows", [])
failed_batches = update_result.get("failed_batches", [])
updated_record_ids = update_result.get("updated_record_ids", [])
if not email_rows:
self.log.info("No updated rows. Email not sent.")
return {
"email_sent": False,
"reason": "no_updated_rows",
}
body = self.build_email_body(email_rows, failed_batches)
send_email(
to=self.to_emails,
subject=self.subject,
html_content=body,
)
self.log.info("Email sent via Airflow send_email.")
return {
"email_sent": True,
"updated_count": len(email_rows),
"updated_record_ids": updated_record_ids,
"failed_batches": failed_batches,
}