|
8 | 8 | from utils.s3 import determine_latest_zarr |
9 | 9 |
|
10 | 10 | default_args = { |
11 | | - 'owner': 'airflow', |
12 | | - 'depends_on_past': False, |
13 | | - 'start_date': datetime.now(tz=timezone.utc) - timedelta(hours=1.5), |
14 | | - 'retries': 1, |
15 | | - 'retry_delay': timedelta(minutes=1), |
16 | | - 'max_active_runs':10, |
17 | | - 'concurrency':10, |
18 | | - 'max_active_tasks':10, |
| 11 | + "owner": "airflow", |
| 12 | + "depends_on_past": False, |
| 13 | + "start_date": datetime.now(tz=timezone.utc) - timedelta(hours=1.5), |
| 14 | + "retries": 1, |
| 15 | + "retry_delay": timedelta(minutes=1), |
| 16 | + "max_active_runs": 10, |
| 17 | + "concurrency": 10, |
| 18 | + "max_active_tasks": 10, |
19 | 19 | } |
20 | 20 |
|
21 | 21 | env = os.getenv("ENVIRONMENT", "development") |
22 | 22 | subnet = os.getenv("ECS_SUBNET") |
23 | 23 | security_group = os.getenv("ECS_SECURITY_GROUP") |
24 | 24 | cluster = f"india-ecs-cluster-{env}" |
25 | 25 |
|
26 | | -region = 'india' |
| 26 | +region = "india" |
27 | 27 |
|
28 | 28 | with DAG( |
29 | | - f'{region}-nwp-consumer', |
| 29 | + f"{region}-nwp-consumer", |
30 | 30 | schedule_interval="0 * * * *", |
31 | 31 | default_args=default_args, |
32 | 32 | concurrency=10, |
|
37 | 37 | latest_only = LatestOnlyOperator(task_id="latest_only") |
38 | 38 |
|
39 | 39 | nwp_consumer_ecmwf = EcsRunTaskOperator( |
40 | | - task_id=f'{region}-nwp-consumer-ecmwf-india', |
41 | | - task_definition='nwp-consumer-ecmwf-india', |
42 | | - cluster=cluster, |
43 | | - overrides={}, |
44 | | - launch_type="FARGATE", |
45 | | - network_configuration={ |
46 | | - "awsvpcConfiguration": { |
47 | | - "subnets": [subnet], |
48 | | - "securityGroups": [security_group], |
49 | | - "assignPublicIp": "ENABLED", |
50 | | - }, |
51 | | - }, |
52 | | - task_concurrency=10, |
53 | | - awslogs_group='/aws/ecs/consumer/nwp-consumer-ecmwf-india', |
54 | | - awslogs_stream_prefix='streaming/nwp-consumer-ecmwf-india-consumer', |
55 | | - awslogs_region='ap-south-1' |
| 40 | + task_id=f"{region}-nwp-consumer-ecmwf-india", |
| 41 | + task_definition="nwp-consumer-ecmwf-india", |
| 42 | + cluster=cluster, |
| 43 | + overrides={}, |
| 44 | + launch_type="FARGATE", |
| 45 | + network_configuration={ |
| 46 | + "awsvpcConfiguration": { |
| 47 | + "subnets": [subnet], |
| 48 | + "securityGroups": [security_group], |
| 49 | + "assignPublicIp": "ENABLED", |
| 50 | + }, |
| 51 | + }, |
| 52 | + task_concurrency=10, |
| 53 | + awslogs_group="/aws/ecs/consumer/nwp-consumer-ecmwf-india", |
| 54 | + awslogs_stream_prefix="streaming/nwp-consumer-ecmwf-india-consumer", |
| 55 | + awslogs_region="ap-south-1", |
56 | 56 | ) |
57 | 57 |
|
58 | 58 | nwp_consumer_gfs = EcsRunTaskOperator( |
59 | | - task_id=f'{region}-nwp-consumer-gfs-india', |
60 | | - task_definition='nwp-consumer-gfs-india', |
61 | | - cluster=cluster, |
62 | | - overrides={}, |
63 | | - launch_type="FARGATE", |
64 | | - network_configuration={ |
65 | | - "awsvpcConfiguration": { |
66 | | - "subnets": [subnet], |
67 | | - "securityGroups": [security_group], |
68 | | - "assignPublicIp": "ENABLED", |
69 | | - }, |
70 | | - }, |
71 | | - task_concurrency=10, |
72 | | - awslogs_group='/aws/ecs/consumer/nwp-consumer-gfs-india', |
73 | | - awslogs_stream_prefix='streaming/nwp-consumer-gfs-india-consumer', |
74 | | - awslogs_region='ap-south-1' |
| 59 | + task_id=f"{region}-nwp-consumer-gfs-india", |
| 60 | + task_definition="nwp-consumer-gfs-india", |
| 61 | + cluster=cluster, |
| 62 | + overrides={}, |
| 63 | + launch_type="FARGATE", |
| 64 | + network_configuration={ |
| 65 | + "awsvpcConfiguration": { |
| 66 | + "subnets": [subnet], |
| 67 | + "securityGroups": [security_group], |
| 68 | + "assignPublicIp": "ENABLED", |
| 69 | + }, |
| 70 | + }, |
| 71 | + task_concurrency=10, |
| 72 | + awslogs_group="/aws/ecs/consumer/nwp-consumer-gfs-india", |
| 73 | + awslogs_stream_prefix="streaming/nwp-consumer-gfs-india-consumer", |
| 74 | + awslogs_region="ap-south-1", |
75 | 75 | ) |
76 | 76 |
|
77 | 77 | nwp_consumer_metoffice = EcsRunTaskOperator( |
78 | | - task_id=f'{region}-nwp-consumer-metoffice-india', |
79 | | - task_definition='nwp-consumer-metoffice-india', |
| 78 | + task_id=f"{region}-nwp-consumer-metoffice-india", |
| 79 | + task_definition="nwp-consumer-metoffice-india", |
80 | 80 | cluster=cluster, |
81 | 81 | overrides={}, |
82 | 82 | launch_type="FARGATE", |
|
88 | 88 | }, |
89 | 89 | }, |
90 | 90 | task_concurrency=10, |
91 | | - awslogs_group='/aws/ecs/consumer/nwp-consumer-metoffice-india', |
92 | | - awslogs_stream_prefix='streaming/nwp-consumer-metoffice-india-consumer', |
93 | | - awslogs_region='ap-south-1' |
| 91 | + awslogs_group="/aws/ecs/consumer/nwp-consumer-metoffice-india", |
| 92 | + awslogs_stream_prefix="streaming/nwp-consumer-metoffice-india-consumer", |
| 93 | + awslogs_region="ap-south-1", |
94 | 94 | ) |
95 | 95 | rename_zarr_metoffice = determine_latest_zarr.override( |
96 | | - task_id="determine_latest_zarr_metoffice", |
| 96 | + task_id="determine_latest_zarr_metoffice", |
97 | 97 | )(bucket=f"india-nwp-{env}", prefix="metoffice/data") |
98 | 98 |
|
99 | 99 | rename_zarr_ecmwf = determine_latest_zarr.override( |
100 | 100 | task_id="determine_latest_zarr_ecmwf", |
101 | | - )(bucket=f'india-nwp-{env}', prefix='ecmwf/data') |
| 101 | + )(bucket=f"india-nwp-{env}", prefix="ecmwf/data") |
102 | 102 |
|
103 | 103 | rename_zarr_gfs = determine_latest_zarr.override( |
104 | 104 | task_id="determine_latest_zarr_gfs", |
105 | | - )(bucket=f'india-nwp-{env}', prefix='gfs/data') |
| 105 | + )(bucket=f"india-nwp-{env}", prefix="gfs/data") |
106 | 106 |
|
107 | 107 | latest_only >> nwp_consumer_ecmwf >> rename_zarr_ecmwf |
108 | 108 | latest_only >> nwp_consumer_gfs >> rename_zarr_gfs |
109 | 109 | latest_only >> nwp_consumer_metoffice >> rename_zarr_metoffice |
110 | | - |
0 commit comments