Skip to content

Commit 7045011

Browse files
add aws cloudwatch logs to airflow (#757)
* add aws cloudwatch logs to airflow * PR commnet * lint
1 parent e9d44f5 commit 7045011

File tree

13 files changed

+352
-226
lines changed

13 files changed

+352
-226
lines changed

terraform/modules/services/airflow/dags/india/forecast-site-dag.py

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,43 @@
77
from airflow.operators.latest_only import LatestOnlyOperator
88

99
default_args = {
10-
'owner': 'airflow',
11-
'depends_on_past': False,
12-
'start_date': datetime.now(tz=timezone.utc) - timedelta(hours=3),
13-
'retries': 1,
14-
'retry_delay': timedelta(minutes=1),
15-
'max_active_runs':10,
16-
'concurrency':10,
17-
'max_active_tasks':10,
10+
"owner": "airflow",
11+
"depends_on_past": False,
12+
"start_date": datetime.now(tz=timezone.utc) - timedelta(hours=3),
13+
"retries": 1,
14+
"retry_delay": timedelta(minutes=1),
15+
"max_active_runs": 10,
16+
"concurrency": 10,
17+
"max_active_tasks": 10,
1818
}
1919

2020
env = os.getenv("ENVIRONMENT", "development")
2121
subnet = os.getenv("ECS_SUBNET")
2222
security_group = os.getenv("ECS_SECURITY_GROUP")
2323
cluster = f"india-ecs-cluster-{env}"
2424

25-
region = 'india'
25+
region = "india"
2626

2727
# hour the forecast can run, not include 7,8,19,20
28-
hours = '0,1,2,3,4,5,6,9,10,11,12,13,14,15,16,17,18,21,22,23'
28+
hours = "0,1,2,3,4,5,6,9,10,11,12,13,14,15,16,17,18,21,22,23"
2929

30-
with DAG(f'{region}-runvl-forecast', schedule_interval=f"0 {hours} * * *", default_args=default_args, concurrency=10, max_active_tasks=10) as dag:
30+
with DAG(
31+
f"{region}-runvl-forecast",
32+
schedule_interval=f"0 {hours} * * *",
33+
default_args=default_args,
34+
concurrency=10,
35+
max_active_tasks=10,
36+
) as dag:
3137
dag.doc_md = "Run the forecast"
3238

3339
latest_only = LatestOnlyOperator(task_id="latest_only")
3440

3541
forecast = EcsRunTaskOperator(
36-
task_id=f'{region}-forecast-ruvnl',
37-
task_definition='forecast',
42+
task_id=f"{region}-forecast-ruvnl",
43+
task_definition="forecast",
3844
cluster=cluster,
3945
overrides={},
40-
launch_type = "FARGATE",
46+
launch_type="FARGATE",
4147
network_configuration={
4248
"awsvpcConfiguration": {
4349
"subnets": [subnet],
@@ -46,22 +52,31 @@
4652
},
4753
},
4854
on_failure_callback=on_failure_callback,
49-
task_concurrency = 10,
55+
task_concurrency=10,
56+
awslogs_group="/aws/ecs/forecast/forecast",
57+
awslogs_stream_prefix="streaming/forecast-forecast",
58+
awslogs_region="ap-south-1",
5059
)
5160

5261
latest_only >> [forecast]
5362

54-
with DAG(f'{region}-ad-forecast', schedule_interval=f"0,15,30,45 * * * *", default_args=default_args, concurrency=10, max_active_tasks=10) as dag:
63+
with DAG(
64+
f"{region}-ad-forecast",
65+
schedule_interval=f"0,15,30,45 * * * *",
66+
default_args=default_args,
67+
concurrency=10,
68+
max_active_tasks=10,
69+
) as dag:
5570
dag.doc_md = "Run the forecast for client AD"
5671

5772
latest_only = LatestOnlyOperator(task_id="latest_only")
5873

5974
forecast = EcsRunTaskOperator(
60-
task_id=f'{region}-forecast-ad',
61-
task_definition='forecast-ad',
75+
task_id=f"{region}-forecast-ad",
76+
task_definition="forecast-ad",
6277
cluster=cluster,
6378
overrides={},
64-
launch_type = "FARGATE",
79+
launch_type="FARGATE",
6580
network_configuration={
6681
"awsvpcConfiguration": {
6782
"subnets": [subnet],
@@ -70,7 +85,10 @@
7085
},
7186
},
7287
on_failure_callback=on_failure_callback,
73-
task_concurrency = 10,
88+
task_concurrency=10,
89+
awslogs_group="/aws/ecs/forecast/forecast-ad",
90+
awslogs_stream_prefix="streaming/forecast-ad-forecast",
91+
awslogs_region="ap-south-1",
7492
)
7593

7694
latest_only >> [forecast]

terraform/modules/services/airflow/dags/india/nwp-dag.py

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,25 @@
88
from utils.s3 import determine_latest_zarr
99

1010
default_args = {
11-
'owner': 'airflow',
12-
'depends_on_past': False,
13-
'start_date': datetime.now(tz=timezone.utc) - timedelta(hours=1.5),
14-
'retries': 1,
15-
'retry_delay': timedelta(minutes=1),
16-
'max_active_runs':10,
17-
'concurrency':10,
18-
'max_active_tasks':10,
11+
"owner": "airflow",
12+
"depends_on_past": False,
13+
"start_date": datetime.now(tz=timezone.utc) - timedelta(hours=1.5),
14+
"retries": 1,
15+
"retry_delay": timedelta(minutes=1),
16+
"max_active_runs": 10,
17+
"concurrency": 10,
18+
"max_active_tasks": 10,
1919
}
2020

2121
env = os.getenv("ENVIRONMENT", "development")
2222
subnet = os.getenv("ECS_SUBNET")
2323
security_group = os.getenv("ECS_SECURITY_GROUP")
2424
cluster = f"india-ecs-cluster-{env}"
2525

26-
region = 'india'
26+
region = "india"
2727

2828
with DAG(
29-
f'{region}-nwp-consumer',
29+
f"{region}-nwp-consumer",
3030
schedule_interval="0 * * * *",
3131
default_args=default_args,
3232
concurrency=10,
@@ -37,40 +37,46 @@
3737
latest_only = LatestOnlyOperator(task_id="latest_only")
3838

3939
nwp_consumer_ecmwf = EcsRunTaskOperator(
40-
task_id=f'{region}-nwp-consumer-ecmwf-india',
41-
task_definition='nwp-consumer-ecmwf-india',
42-
cluster=cluster,
43-
overrides={},
44-
launch_type="FARGATE",
45-
network_configuration={
46-
"awsvpcConfiguration": {
47-
"subnets": [subnet],
48-
"securityGroups": [security_group],
49-
"assignPublicIp": "ENABLED",
50-
},
51-
},
52-
task_concurrency=10,
40+
task_id=f"{region}-nwp-consumer-ecmwf-india",
41+
task_definition="nwp-consumer-ecmwf-india",
42+
cluster=cluster,
43+
overrides={},
44+
launch_type="FARGATE",
45+
network_configuration={
46+
"awsvpcConfiguration": {
47+
"subnets": [subnet],
48+
"securityGroups": [security_group],
49+
"assignPublicIp": "ENABLED",
50+
},
51+
},
52+
task_concurrency=10,
53+
awslogs_group="/aws/ecs/consumer/nwp-consumer-ecmwf-india",
54+
awslogs_stream_prefix="streaming/nwp-consumer-ecmwf-india-consumer",
55+
awslogs_region="ap-south-1",
5356
)
5457

5558
nwp_consumer_gfs = EcsRunTaskOperator(
56-
task_id=f'{region}-nwp-consumer-gfs-india',
57-
task_definition='nwp-consumer-gfs-india',
58-
cluster=cluster,
59-
overrides={},
60-
launch_type="FARGATE",
61-
network_configuration={
62-
"awsvpcConfiguration": {
63-
"subnets": [subnet],
64-
"securityGroups": [security_group],
65-
"assignPublicIp": "ENABLED",
66-
},
67-
},
68-
task_concurrency=10,
59+
task_id=f"{region}-nwp-consumer-gfs-india",
60+
task_definition="nwp-consumer-gfs-india",
61+
cluster=cluster,
62+
overrides={},
63+
launch_type="FARGATE",
64+
network_configuration={
65+
"awsvpcConfiguration": {
66+
"subnets": [subnet],
67+
"securityGroups": [security_group],
68+
"assignPublicIp": "ENABLED",
69+
},
70+
},
71+
task_concurrency=10,
72+
awslogs_group="/aws/ecs/consumer/nwp-consumer-gfs-india",
73+
awslogs_stream_prefix="streaming/nwp-consumer-gfs-india-consumer",
74+
awslogs_region="ap-south-1",
6975
)
7076

7177
nwp_consumer_metoffice = EcsRunTaskOperator(
72-
task_id=f'{region}-nwp-consumer-metoffice-india',
73-
task_definition='nwp-consumer-metoffice-india',
78+
task_id=f"{region}-nwp-consumer-metoffice-india",
79+
task_definition="nwp-consumer-metoffice-india",
7480
cluster=cluster,
7581
overrides={},
7682
launch_type="FARGATE",
@@ -82,20 +88,22 @@
8288
},
8389
},
8490
task_concurrency=10,
91+
awslogs_group="/aws/ecs/consumer/nwp-consumer-metoffice-india",
92+
awslogs_stream_prefix="streaming/nwp-consumer-metoffice-india-consumer",
93+
awslogs_region="ap-south-1",
8594
)
8695
rename_zarr_metoffice = determine_latest_zarr.override(
87-
task_id="determine_latest_zarr_metoffice",
96+
task_id="determine_latest_zarr_metoffice",
8897
)(bucket=f"india-nwp-{env}", prefix="metoffice/data")
8998

9099
rename_zarr_ecmwf = determine_latest_zarr.override(
91100
task_id="determine_latest_zarr_ecmwf",
92-
)(bucket=f'india-nwp-{env}', prefix='ecmwf/data')
101+
)(bucket=f"india-nwp-{env}", prefix="ecmwf/data")
93102

94103
rename_zarr_gfs = determine_latest_zarr.override(
95104
task_id="determine_latest_zarr_gfs",
96-
)(bucket=f'india-nwp-{env}', prefix='gfs/data')
105+
)(bucket=f"india-nwp-{env}", prefix="gfs/data")
97106

98107
latest_only >> nwp_consumer_ecmwf >> rename_zarr_ecmwf
99108
latest_only >> nwp_consumer_gfs >> rename_zarr_gfs
100109
latest_only >> nwp_consumer_metoffice >> rename_zarr_metoffice
101-

terraform/modules/services/airflow/dags/india/runvnl-data-dag.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
security_group = os.getenv("ECS_SECURITY_GROUP")
2323
cluster = f"india-ecs-cluster-{env}"
2424

25-
region = 'india'
25+
region = "india"
2626

2727
with DAG(
28-
f'{region}-runvl-data-consumer',
28+
f"{region}-runvl-data-consumer",
2929
schedule_interval="*/3 * * * *",
3030
default_args=default_args,
3131
concurrency=10,
@@ -36,8 +36,8 @@
3636
latest_only = LatestOnlyOperator(task_id="latest_only")
3737

3838
runvl_data = EcsRunTaskOperator(
39-
task_id=f'{region}-runvl-consumer',
40-
task_definition='runvl-consumer',
39+
task_id=f"{region}-runvl-consumer",
40+
task_definition="runvl-consumer",
4141
cluster=cluster,
4242
overrides={},
4343
launch_type="FARGATE",
@@ -50,8 +50,9 @@
5050
},
5151
on_failure_callback=on_failure_callback,
5252
task_concurrency=10,
53+
awslogs_group="/aws/ecs/consumer/runvl-consumer",
54+
awslogs_stream_prefix="streaming/runvl-consumer-consumer",
55+
awslogs_region="ap-south-1",
5356
)
5457

55-
5658
latest_only >> [runvl_data]
57-

terraform/modules/services/airflow/dags/india/satellite-dag.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"max_active_runs": 10,
1515
"concurrency": 10,
1616
"max_active_tasks": 10,
17-
"execution_timeout":timedelta(minutes=30),
17+
"execution_timeout": timedelta(minutes=30),
1818
}
1919

2020
env = os.getenv("ENVIRONMENT", "development")
@@ -24,10 +24,10 @@
2424

2525
# Tasks can still be defined in terraform, or defined here
2626

27-
region = 'india'
27+
region = "india"
2828

2929
with DAG(
30-
f'{region}-satellite-consumer',
30+
f"{region}-satellite-consumer",
3131
schedule_interval="*/5 * * * *",
3232
default_args=default_args,
3333
concurrency=10,
@@ -39,8 +39,8 @@
3939
latest_only = LatestOnlyOperator(task_id="latest_only")
4040

4141
sat_consumer = EcsRunTaskOperator(
42-
task_id=f'{region}-satellite-consumer',
43-
task_definition='sat-consumer',
42+
task_id=f"{region}-satellite-consumer",
43+
task_definition="sat-consumer",
4444
cluster=cluster,
4545
overrides={},
4646
launch_type="FARGATE",
@@ -52,7 +52,10 @@
5252
},
5353
},
5454
task_concurrency=10,
55-
on_failure_callback=on_failure_callback
55+
on_failure_callback=on_failure_callback,
56+
awslogs_group="/aws/ecs/consumer/sat-consumer",
57+
awslogs_stream_prefix="streaming/sat-consumer-consumer",
58+
awslogs_region="ap-south-1",
5659
)
5760

5861
latest_only >> sat_consumer

0 commit comments

Comments
 (0)