Skip to content

Commit 967a5cf

Browse files
committed
add support for airflow emails
1 parent b75d53e commit 967a5cf

6 files changed

Lines changed: 18 additions & 6 deletions

File tree

airflow/dags/committees.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from knesset_data_pipelines.committees import background_material_titles
66
from knesset_data_pipelines.committees import parsed_document_committee_sessions
7+
from knesset_data_pipelines.config import AIRFLOW_DEFAULT_EMAILS
78

89

910
dag_kwargs = dict(

airflow/dags/dags_generator_etl.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sqlalchemy.orm import sessionmaker
1515

1616
from knesset_data_pipelines.run_pipeline import list_pipelines, main as run_pipeline
17+
from knesset_data_pipelines.config import AIRFLOW_DEFAULT_EMAILS
1718

1819

1920
dag_kwargs = dict(
@@ -89,14 +90,16 @@ def get_execution_dates(execution_date, external_dag_id, **kwargs):
8990
),
9091
random_name_suffix=True,
9192
task_id=pipeline_dag_id,
92-
dag=dag
93+
dag=dag,
94+
email=AIRFLOW_DEFAULT_EMAILS,
9395
)
9496
else:
9597
main_task = PythonOperator(
9698
python_callable=run_pipeline,
9799
task_id=pipeline_id.replace('/', '.'),
98100
op_kwargs={'pipeline_id': pipeline_id},
99-
dag=dag
101+
dag=dag,
102+
email=AIRFLOW_DEFAULT_EMAILS,
100103
)
101104
for dependency in pipeline_dependencies:
102105
dependency_dag_id = dependency.replace('/', '.')
@@ -105,5 +108,6 @@ def get_execution_dates(execution_date, external_dag_id, **kwargs):
105108
external_dag_id=dependency_dag_id,
106109
execution_date_fn=partial(get_execution_dates, external_dag_id=dependency_dag_id),
107110
mode='reschedule',
108-
dag=dag
111+
dag=dag,
112+
email=AIRFLOW_DEFAULT_EMAILS,
109113
) >> main_task

airflow/dags/google_drive_uploads.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from airflow.operators.python import PythonOperator
44

55
from knesset_data_pipelines.google_drive_upload import committee_meeting_protocols
6+
from knesset_data_pipelines.config import AIRFLOW_DEFAULT_EMAILS
67

78

89
dag_kwargs = dict(
@@ -18,5 +19,6 @@
1819
with DAG('google_drive_uploads.committee_meeting_protocols', **dag_kwargs) as dag:
1920
PythonOperator(
2021
python_callable=committee_meeting_protocols.main,
21-
task_id='committee_meeting_protocols'
22+
task_id='committee_meeting_protocols',
23+
email=AIRFLOW_DEFAULT_EMAILS,
2224
)

airflow/dags/kns_odata.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from airflow.operators.python import PythonOperator
44

55
from knesset_data_pipelines import kns_odata
6+
from knesset_data_pipelines.config import AIRFLOW_DEFAULT_EMAILS
67

78

89
dag_kwargs = dict(
@@ -18,5 +19,6 @@
1819
with DAG('kns_odata', **dag_kwargs) as dag:
1920
PythonOperator(
2021
python_callable=kns_odata.compare_parliamentinfo_tables_pipelines,
21-
task_id='compare_parliamentinfo_tables_pipelines'
22+
task_id='compare_parliamentinfo_tables_pipelines',
23+
email=AIRFLOW_DEFAULT_EMAILS,
2224
)

airflow/dags/members.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from airflow.operators.python import PythonOperator
44

55
from knesset_data_pipelines.members_eng import members_eng
6+
from knesset_data_pipelines.config import AIRFLOW_DEFAULT_EMAILS
67

78

89
dag_kwargs = dict(
@@ -18,5 +19,6 @@
1819
with DAG('members.members_eng', **dag_kwargs) as dag:
1920
PythonOperator(
2021
python_callable=members_eng.main,
21-
task_id='members_eng'
22+
task_id='members_eng',
23+
email=AIRFLOW_DEFAULT_EMAILS,
2224
)

airflow/knesset_data_pipelines/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454

5555
DATASERVICE_HTTP_PROXY = os.environ.get('DATASERVICE_HTTP_PROXY')
5656

57+
AIRFLOW_DEFAULT_EMAILS = [e.strip() for e in (os.environ.get('AIRFLOW_DEFAULT_EMAILS') or '').split(',') if e.strip()]
5758

5859
@contextmanager
5960
def get_google_service_account_json_file_name():

0 commit comments

Comments
 (0)