Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.python import PythonOperator
from utils.elastic_beanstalk import scale_elastic_beanstalk_instance
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback_no_action_required

default_args = {
"owner": "airflow",
Expand Down Expand Up @@ -45,15 +45,15 @@
python_callable=scale_elastic_beanstalk_instance,
op_kwargs={"name": name, "number_of_instances": 2, "sleep_seconds": 60 * 5},
task_concurrency=2,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback_no_action_required,
)

elb_1 = PythonOperator(
task_id=f"scale_elb_1_{name}",
python_callable=scale_elastic_beanstalk_instance,
op_kwargs={"name": name, "number_of_instances": 1},
task_concurrency=2,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback_no_action_required,
)

latest_only >> elb_2 >> elb_1
17 changes: 14 additions & 3 deletions terraform/modules/services/airflow/dags/india/forecast-site-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback

from airflow.operators.latest_only import LatestOnlyOperator

Expand All @@ -24,6 +24,17 @@

region = "india"

forecast_ruvnl_error_message = (
"❌ The task {{ ti.task_id }} failed. "
"This would ideally be fixed before for DA actions at 09.00 IST"
"Please see run book for appropriate actions."
)

forecast_ad_error_message = (
"❌ The task {{ ti.task_id }} failed. "
"Please see run book for appropriate actions. "
)

# hour the forecast can run, not include 7,8,19,20
hours = "0,1,2,3,4,5,6,9,10,11,12,13,14,15,16,17,18,21,22,23"

Expand Down Expand Up @@ -51,7 +62,7 @@
"assignPublicIp": "ENABLED",
},
},
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(forecast_ruvnl_error_message),
task_concurrency=10,
awslogs_group="/aws/ecs/forecast/forecast",
awslogs_stream_prefix="streaming/forecast-forecast",
Expand Down Expand Up @@ -84,7 +95,7 @@
"assignPublicIp": "ENABLED",
},
},
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(forecast_ad_error_message),
task_concurrency=10,
awslogs_group="/aws/ecs/forecast/forecast-ad",
awslogs_stream_prefix="streaming/forecast-ad-forecast",
Expand Down
27 changes: 26 additions & 1 deletion terraform/modules/services/airflow/dags/india/nwp-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

from airflow.operators.latest_only import LatestOnlyOperator
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback
from utils.s3 import determine_latest_zarr

default_args = {
Expand All @@ -25,6 +25,28 @@

region = "india"

nwp_metoffice_error_message = (
"❌ The task {{ ti.task_id }} failed."
"The forecast will continue running until it runs out of data."
"Metoffice status link is <https://datahub.metoffice.gov.uk/support/service-status|here> "
"Please see run book for appropriate actions. "
)

nwp_ecmwf_error_message = (
"❌ The task {{ ti.task_id }} failed."
"The forecast will continue running until it runs out of data. "
"ECMWF status link is <https://www.nco.ncep.noaa.gov/pmb/nwprod/prodstat/|here> "
"Please see run book for appropriate actions. "
)

nwp_gfs_error_message = (
"❌ The task {{ ti.task_id }} failed."
"The forecast will continue running until it runs out of data. "
"ECMWF status link is <https://status.ecmwf.int/|here> "
"Please see run book for appropriate actions. "
)


with DAG(
f"{region}-nwp-consumer",
schedule_interval="0 * * * *",
Expand All @@ -50,6 +72,7 @@
},
},
task_concurrency=10,
on_failure_callback=slack_message_callback(nwp_ecmwf_error_message),
awslogs_group="/aws/ecs/consumer/nwp-consumer-ecmwf-india",
awslogs_stream_prefix="streaming/nwp-consumer-ecmwf-india-consumer",
awslogs_region="ap-south-1",
Expand All @@ -69,6 +92,7 @@
},
},
task_concurrency=10,
on_failure_callback=slack_message_callback(nwp_gfs_error_message),
awslogs_group="/aws/ecs/consumer/nwp-consumer-gfs-india",
awslogs_stream_prefix="streaming/nwp-consumer-gfs-india-consumer",
awslogs_region="ap-south-1",
Expand All @@ -88,6 +112,7 @@
},
},
task_concurrency=10,
on_failure_callback=slack_message_callback(nwp_metoffice_error_message),
awslogs_group="/aws/ecs/consumer/nwp-consumer-metoffice-india",
awslogs_stream_prefix="streaming/nwp-consumer-metoffice-india-consumer",
awslogs_region="ap-south-1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback_no_action_required

from airflow.operators.latest_only import LatestOnlyOperator

Expand Down Expand Up @@ -48,7 +48,7 @@
"assignPublicIp": "ENABLED",
},
},
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback_no_action_required,
task_concurrency=10,
awslogs_group="/aws/ecs/consumer/runvl-consumer",
awslogs_stream_prefix="streaming/runvl-consumer-consumer",
Expand Down
11 changes: 9 additions & 2 deletions terraform/modules/services/airflow/dags/india/satellite-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

from airflow.operators.latest_only import LatestOnlyOperator
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback

default_args = {
"owner": "airflow",
Expand All @@ -24,6 +24,13 @@

# Tasks can still be defined in terraform, or defined here

satellite_error_message = (
"❌ The task {{ ti.task_id }} failed."
"EUMETSAT status links are <https://uns.eumetsat.int/uns/|here> "
"and <https://masif.eumetsat.int/ossi/webpages/level2.html?ossi_level2_filename=seviri_iodc.html|here>. "
"Please see run book for appropriate actions. "
)

region = "india"

with DAG(
Expand Down Expand Up @@ -52,7 +59,7 @@
},
},
task_concurrency=10,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(satellite_error_message),
awslogs_group="/aws/ecs/consumer/sat-consumer",
awslogs_stream_prefix="streaming/sat-consumer-consumer",
awslogs_region="ap-south-1",
Expand Down
10 changes: 8 additions & 2 deletions terraform/modules/services/airflow/dags/uk/cloudcasting-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator

from airflow.operators.latest_only import LatestOnlyOperator
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback

default_args = {
"owner": "airflow",
Expand All @@ -23,6 +23,12 @@
security_group = os.getenv("ECS_SECURITY_GROUP")
cluster = f"Nowcasting-{env}"

cloudcasting_error_message = (
"⚠️ The task {{ ti.task_id }} failed,"
" but its ok. The cloudcasting is currently no critical. "
"No out of hours support is required."
)

# Tasks can still be defined in terraform, or defined here

region = "uk"
Expand Down Expand Up @@ -52,7 +58,7 @@
},
},
task_concurrency=10,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(cloudcasting_error_message),
awslogs_group="/aws/ecs/forecast/cloudcasting",
awslogs_stream_prefix="streaming/cloudcasting-forecast",
awslogs_region="eu-west-1",
Expand Down
14 changes: 10 additions & 4 deletions terraform/modules/services/airflow/dags/uk/dayafter-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
import os
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback

from airflow.operators.latest_only import LatestOnlyOperator

Expand All @@ -25,6 +25,12 @@

# Tasks can still be defined in terraform, or defined here

day_after_error_message = (
"⚠️ The task {{ ti.task_id }} failed,"
" but its ok. This task is not critical for live services. "
"No out of hours support is required."
)

region = "uk"

with DAG(
Expand All @@ -49,7 +55,7 @@
"assignPublicIp": "ENABLED",
},
},
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(day_after_error_message),
task_concurrency=10,
awslogs_group="/aws/ecs/consumer/pvlive-national-day-after",
awslogs_stream_prefix="streaming/pvlive-national-day-after-consumer",
Expand Down Expand Up @@ -79,7 +85,7 @@
"assignPublicIp": "ENABLED",
},
},
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(day_after_error_message),
task_concurrency=10,
awslogs_group="/aws/ecs/consumer/pvlive-gsp-day-after",
awslogs_stream_prefix="streaming/pvlive-gsp-day-after-consumer",
Expand Down Expand Up @@ -110,7 +116,7 @@
"assignPublicIp": "ENABLED",
},
},
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(day_after_error_message),
task_concurrency=10,
awslogs_group="/aws/ecs/analysis/metrics",
awslogs_stream_prefix="streaming/metrics-analysis",
Expand Down
12 changes: 9 additions & 3 deletions terraform/modules/services/airflow/dags/uk/elasticbeanstalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.python import PythonOperator
from utils.elastic_beanstalk import scale_elastic_beanstalk_instance
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback

default_args = {
"owner": "airflow",
Expand All @@ -19,6 +19,12 @@
"max_active_tasks": 10,
}

elb_error_message = (
"⚠️ The task {{ ti.task_id }} failed,"
" but its ok. This task tried to reset the Elastic Beanstalk instances. "
"No out of hours support is required."
)

region = "uk"
env = os.getenv("ENVIRONMENT", "development")
names = [
Expand Down Expand Up @@ -46,15 +52,15 @@
python_callable=scale_elastic_beanstalk_instance,
op_kwargs={"name": name, "number_of_instances": 2, "sleep_seconds": 60 * 5},
task_concurrency=2,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(elb_error_message),
)

elb_1 = PythonOperator(
task_id=f"scale_elb_1_{name}",
python_callable=scale_elastic_beanstalk_instance,
op_kwargs={"name": name, "number_of_instances": 1},
task_concurrency=2,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(elb_error_message),
)

latest_only >> elb_2 >> elb_1
43 changes: 30 additions & 13 deletions terraform/modules/services/airflow/dags/uk/forecast-gsp-dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
from utils.slack import on_failure_callback
from utils.slack import slack_message_callback

from airflow.operators.latest_only import LatestOnlyOperator

Expand All @@ -26,6 +25,30 @@

# Tasks can still be defined in terraform, or defined here

forecast_pvnet_error_message = (
"⚠️ The task {{ ti.task_id }} failed,"
" but its ok. PVNET-ECMWF only will run next. "
"No out of hours support is required."
)

forecast_pvnet_da_error_message = (
"❌ The task {{ ti.task_id }} failed. "
"This would ideally be fixed before for DA actions at 09.00"
"Please see run book for appropriate actions."
)

forecast_ecmwf_error_message = (
"❌ The task {{ ti.task_id }} failed. This is only run after the main PVnet has failed. "
"We have about 6 hours before the blend services need this. "
"Please see run book for appropriate actions. "
)

forecast_blend_error_message = (
"❌ The task {{ ti.task_id }} failed."
"The blending of forecast has failed. "
"Please see run book for appropriate actions. "
)

region = "uk"

with DAG(
Expand Down Expand Up @@ -53,13 +76,7 @@
},
},
task_concurrency=10,
on_failure_callback=[send_slack_notification(
text="⚠️ The task {{ ti.task_id }} failed,"
" but its ok. PVNET-ECMWF only will run next. "
"No out of hours support is required. ⚠️",
channel=f"tech-ops-airflow-{env}",
username="Airflow",
)],
on_failure_callback=slack_message_callback(forecast_pvnet_error_message),
awslogs_group="/aws/ecs/forecast/forecast_pvnet",
awslogs_stream_prefix="streaming/forecast_pvnet-forecast",
awslogs_region="eu-west-1",
Expand All @@ -79,7 +96,7 @@
},
},
task_concurrency=10,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(forecast_ecmwf_error_message),
trigger_rule="all_failed",
awslogs_group="/aws/ecs/forecast/forecast_pvnet_ecmwf",
awslogs_stream_prefix="streaming/forecast_pvnet_ecmwf-forecast",
Expand All @@ -100,7 +117,7 @@
},
},
task_concurrency=10,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(forecast_blend_error_message),
trigger_rule="one_success",
awslogs_group="/aws/ecs/blend/forecast_blend",
awslogs_stream_prefix="streaming/forecast_blend-blend",
Expand Down Expand Up @@ -136,7 +153,7 @@
},
},
task_concurrency=10,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(forecast_pvnet_da_error_message),
awslogs_group="/aws/ecs/forecast/forecast_pvnet_day_ahead",
awslogs_stream_prefix="streaming/forecast_pvnet_day_ahead-forecast",
awslogs_region="eu-west-1",
Expand All @@ -156,7 +173,7 @@
},
},
task_concurrency=10,
on_failure_callback=on_failure_callback,
on_failure_callback=slack_message_callback(forecast_blend_error_message),
awslogs_group="/aws/ecs/blend/forecast_blend",
awslogs_stream_prefix="streaming/forecast_blend-blend",
awslogs_region="eu-west-1",
Expand Down
Loading