Skip to content

Commit 28d2c80

Browse files
authored
feat: migrate handler code from aind-airflow-dags (#20)
* build: require python >=3.9 * feat: add latest alert_handler * test: update unit tests for alert_handler * feat: add latest hpc_handler * ci: add required paramiko version for appache-airflow-providers-ssh * test: update unit tests for hpc_handler * test: update unit tests for log_handler * feat: add latest slurm_v2_handler with small modifications * test: add unit tests for slurm_v2_handler (WIP) * test: update unit tests for slurm_v2_handler methods and SlurmHook * test: update unit tests for slurm_v2_handler.SubmitSlurmJobArray class * fix: return null start/end times if no jobs found * test: add unit tests for slurm_v2_handler.SlurmJobSensor class * feat: remove aind-slurm-rest v1 handler and dependency * feat: add latest task_handler * test: update unit tests for task_handler * refactor: linting * ci: update Dockerfile to use alpine python base image * ci: install optional loki and slurm dependencies in Dockerfile
1 parent df8e7e2 commit 28d2c80

14 files changed

+2740
-1097
lines changed

.github/workflows/test_and_lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010
runs-on: ubuntu-22.04
1111
strategy:
1212
matrix:
13-
python-version: [ '3.8', '3.9', '3.10', '3.11' ]
13+
python-version: [ '3.9', '3.10', '3.11' ]
1414
steps:
1515
- uses: actions/checkout@v4
1616
- name: Set up Python ${{ matrix.python-version }}

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
from python:3.10-slim
1+
FROM python:3.10-alpine
22

33
WORKDIR /app
44
ADD src ./src
55
ADD pyproject.toml .
66
ADD setup.py .
77

8-
RUN pip install . --no-cache-dir
8+
RUN pip install .[loki,slurm] --no-cache-dir

pyproject.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
66
name = "aind-airflow-jobs"
77
description = "Global classes for AIND Airflow service"
88
license = {text = "MIT"}
9-
requires-python = ">=3.8"
9+
requires-python = ">=3.9"
1010
authors = [
1111
{name = "Allen Institute for Neural Dynamics"}
1212
]
@@ -42,15 +42,16 @@ all = [
4242
airflow = [
4343
"google-re2==1.0",
4444
"apache-airflow==2.8.1",
45-
"apache-airflow-providers-ssh>=3.13.1,<=3.14.0"
45+
"apache-airflow-providers-ssh>=3.13.1,<=3.14.0",
46+
"paramiko>=2.9.0,<4.0.0"
4647
]
4748

4849
loki = [
4950
"python-logging-loki==0.3.1"
5051
]
5152

5253
slurm = [
53-
"aind-slurm-rest==1.0.3"
54+
"aind-slurm-rest-v2==0.0.3"
5455
]
5556

5657
[tool.setuptools.packages.find]
@@ -61,7 +62,7 @@ version = {attr = "aind_airflow_jobs.__version__"}
6162

6263
[tool.black]
6364
line-length = 79
64-
target_version = ['py38']
65+
target_version = ['py39']
6566
exclude = '''
6667
6768
(

src/aind_airflow_jobs/alert_handler.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,18 @@ def action(self) -> str:
2828
}[self.value]
2929

3030

31-
def send_job_email(alert_type: AlertType, job: Dict[str, Any]) -> None:
31+
def send_job_email(
32+
alert_type: AlertType,
33+
job: Dict[str, Any],
34+
task_id: Optional[str] = None,
35+
) -> None:
3236
"""
33-
Send an email given the alert type and job.
37+
Send an email given the alert type, job, and optional task_id.
3438
Parameters
3539
----------
3640
alert_type : AlertType
3741
job : Dict[str, Any]
42+
task_id : Optional[str]
3843
3944
Returns
4045
-------
@@ -49,10 +54,25 @@ def send_job_email(alert_type: AlertType, job: Dict[str, Any]) -> None:
4954
or alert_type.ALL.value in job.get("email_notification_types", [])
5055
):
5156
to_email = job.get("user_email")
52-
body = (
53-
f"An airflow pipeline {reason} with the following conf\n\n"
54-
f"Conf: {job}\n\n"
57+
body = ""
58+
if alert_type in [AlertType.FAIL, AlertType.RETRY]:
59+
body += (
60+
"Please check the AIND Data Transfer Service "
61+
'<a href="http://aind-data-transfer-service/jobs">Job '
62+
"Status Page</a> for more details or reach out to a "
63+
"Scientific Computing engineer for assistance.<br/><br/>"
64+
)
65+
66+
body += (
67+
f"An airflow pipeline {reason} with the following "
68+
f"configuration:<br/><br/>"
5569
)
70+
71+
if task_id:
72+
body += f"Task: {task_id}<br/><br/>"
73+
74+
body += f"Configuration:<br/>{job}<br/><br/>"
75+
5676
send_email(to=to_email, subject=subject, html_content=body)
5777

5878

@@ -135,7 +155,13 @@ def on_failure_or_retry_alert(
135155
136156
"""
137157
job = context["params"]
138-
send_job_email(alert_type=alert_type, job=job)
158+
task_id = context.get("task").task_id if context.get("task") else None
159+
160+
send_job_email(
161+
alert_type=alert_type,
162+
job=job,
163+
task_id=task_id,
164+
)
139165

140166

141167
def on_failure_or_retry_log_alert(
@@ -155,8 +181,14 @@ def on_failure_or_retry_log_alert(
155181
"""
156182
job = context["params"]
157183
job_info = get_job_info_from_context(context)
184+
task_id = context.get("task").task_id if context.get("task") else None
185+
158186
send_log_message(job_info, log_level="ERROR")
159-
send_job_email(alert_type=alert_type, job=job)
187+
send_job_email(
188+
alert_type=alert_type,
189+
job=job,
190+
task_id=task_id,
191+
)
160192

161193

162194
def on_begin_or_end_alert(alert_type: AlertType, job: Dict[str, Any]) -> None:

src/aind_airflow_jobs/hpc_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
from airflow.providers.ssh.hooks.ssh import SSHHook
44

55

6-
def get_hpc_hook() -> SSHHook:
6+
def get_hpc_hook(ssh_conn_id: str = "hpc2/uri") -> SSHHook:
77
"""Returns a hook to send ssh commands to the hpc"""
8-
return SSHHook(ssh_conn_id="hpc/uri")
8+
return SSHHook(ssh_conn_id=ssh_conn_id)

0 commit comments

Comments
 (0)