Skip to content

Commit 42db8b2

Browse files
authored
Add MWAA Serverless examples with AWS service integrations (#95)
Co-authored-by: John Jackson <john-jac@users.noreply.github.com>
1 parent ba7f5cc commit 42db8b2

File tree

7 files changed

+1290
-0
lines changed

7 files changed

+1290
-0
lines changed

serverless/examples/README.md

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
# MWAA Serverless Examples
2+
3+
This directory contains example MWAA Serverless DAGs demonstrating integration with various AWS services.
4+
5+
## Overview
6+
7+
These DAGs demonstrate how to use Airflow 3.0.6 with AWS services, including proper IAM permissions, error handling, and resource management. Each DAG is self-contained and includes inline documentation.
8+
9+
## Prerequisites
10+
11+
- Amazon MWAA environment (version 3.0.6 or later)
12+
- AWS CLI configured with appropriate permissions
13+
- IAM roles with required permissions (see [IAM Policy](#iam-policy))
14+
15+
----
16+
*Note:* Throughout this post, we use example values that you'll need to replace with your own:
17+
- Replace `amzn-s3-demo-bucket` with your S3 bucket name
18+
- Replace `111122223333` with your AWS account ID
19+
- Replace `us-east-2` with your AWS Region. MWAA Serverless is available in multiple AWS Regions. Check the [List of AWS Services Available by Region](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/) for current availability.
20+
21+
----
22+
23+
24+
25+
## DAG Examples
26+
27+
### S3 Operations (`s3_dag.yaml`)
28+
Demonstrates S3 bucket and object operations including:
29+
- Creating and deleting S3 buckets
30+
- Creating, listing, and deleting S3 objects
31+
- Using S3KeySensor to wait for objects
32+
33+
### AWS Glue Jobs (`glue_dag.yaml`)
34+
Illustrates Glue ETL job execution:
35+
- Creating and uploading Glue scripts to S3
36+
- Running Glue 5.0 jobs with Python
37+
- Monitoring job completion with sensors
38+
39+
### Amazon Athena (`athena_dag.yaml`)
40+
Demonstrates Athena query execution:
41+
- Running SQL queries against data lakes
42+
- Managing query results and outputs
43+
- Integration with CloudFormation for resource creation
44+
45+
46+
## Setup
47+
48+
1. **Upload YAML files to your S3 bucket:**
49+
```bash
50+
aws s3 cp . s3://amzn-s3-demo-bucket/yaml_dags/ --recursive --exclude "*.md" --exclude "*.json"
51+
```
52+
53+
2. **Create the required IAM execution role:**
54+
```bash
55+
aws iam create-role \
56+
--role-name mwaa-serverless-execution-role \
57+
--assume-role-policy-document file://trust-policy.json
58+
59+
aws iam put-role-policy \
60+
--role-name mwaa-serverless-execution-role \
61+
--policy-name mwaa-airflow3-policy \
62+
--policy-document file://mwaa-comprehensive-policy.json
63+
```
64+
65+
3. **Create your MWAA Serverless** to use the defnitions, i.e..:
66+
```bash
67+
aws mwaa-serverless create-workflow \
68+
--name athena_dag \
69+
--definition-s3-location '{ "Bucket": "amzn-s3-demo-bucket", "ObjectKey": "yaml_dags/athena_dag.yaml" }' \
70+
--role-arn arn:aws:iam::111122223333:role/mwaa-serverless-access-role \
71+
--region us-east-2
72+
```
73+
74+
## IAM Policy
75+
76+
The comprehensive IAM policy required for all DAGs includes permissions for:
77+
78+
- S3 operations (bucket and object management)
79+
- Lambda function lifecycle management
80+
- Glue job creation and execution
81+
- Athena query execution
82+
- Batch job submission and monitoring
83+
- EMR Serverless application management
84+
- SageMaker processing jobs
85+
- CloudFormation stack operations
86+
- CloudWatch logging
87+
88+
```json
89+
90+
{
91+
"Version": "2012-10-17",
92+
"Statement": [
93+
{
94+
"Sid": "S3Permissions",
95+
"Effect": "Allow",
96+
"Action": [
97+
"s3:CreateBucket",
98+
"s3:DeleteBucket",
99+
"s3:ListBucket",
100+
"s3:GetObject",
101+
"s3:PutObject",
102+
"s3:DeleteObject"
103+
],
104+
"Resource": [
105+
"arn:aws:s3:::*",
106+
"arn:aws:s3:::*/*"
107+
]
108+
},
109+
{
110+
"Sid": "GluePermissions",
111+
"Effect": "Allow",
112+
"Action": [
113+
"glue:CreateJob",
114+
"glue:GetJob",
115+
"glue:StartJobRun",
116+
"glue:GetJobRun",
117+
"glue:GetTable",
118+
"glue:CreateTable",
119+
"glue:DeleteTable",
120+
"glue:CreateDatabase",
121+
"glue:DeleteDatabase"
122+
],
123+
"Resource": "*"
124+
},
125+
{
126+
"Sid": "AthenaPermissions",
127+
"Effect": "Allow",
128+
"Action": [
129+
"athena:StartQueryExecution",
130+
"athena:GetQueryExecution",
131+
"athena:GetQueryResults"
132+
],
133+
"Resource": "*"
134+
},
135+
{
136+
"Sid": "CloudwatchPermissions",
137+
"Effect": "Allow",
138+
"Action": [
139+
"logs:CreateLogGroup",
140+
"logs:CreateLogStream",
141+
"logs:PutLogEvents"
142+
],
143+
"Resource": "*"
144+
},
145+
{
146+
"Sid": "CloudFormationPermissions",
147+
"Effect": "Allow",
148+
"Action": [
149+
"cloudformation:CreateStack",
150+
"cloudformation:DeleteStack",
151+
"cloudformation:DescribeStacks"
152+
],
153+
"Resource": "*"
154+
},
155+
{
156+
"Sid": "CloudWatchLogsPermissions",
157+
"Effect": "Allow",
158+
"Action": [
159+
"logs:CreateLogGroup",
160+
"logs:CreateLogStream",
161+
"logs:PutLogEvents"
162+
],
163+
"Resource": "*"
164+
},
165+
{
166+
"Sid": "IAMPassRolePermissions",
167+
"Effect": "Allow",
168+
"Action": [
169+
"iam:PassRole",
170+
"iam:GetRole"
171+
],
172+
"Resource": "*"
173+
}
174+
]
175+
}
176+
```
177+
178+
### Trust Relationships
179+
180+
Your execution role needs trust relationships for:
181+
182+
183+
**Service Roles (for PassRole operations):**
184+
```json
185+
{
186+
"Version": "2012-10-17",
187+
"Statement": [
188+
{
189+
"Effect": "Allow",
190+
"Principal": {
191+
"Service": [
192+
"airflow-serverless.amazonaws.com",
193+
"glue.amazonaws.com"
194+
]
195+
},
196+
"Action": "sts:AssumeRole"
197+
}
198+
]
199+
}
200+
```
201+
202+
## Usage
203+
204+
1. **Enable DAGs** in the Airflow UI
205+
2. **Configure parameters** as needed for your environment
206+
3. **Trigger DAGs** manually or via schedule
207+
4. **Monitor execution** through CloudWatch logs
208+
209+
## Configuration
210+
211+
Most DAGs use parameters that can be customized:
212+
213+
- `role_arn`: IAM role for service operations
214+
- `s3_bucket`: S3 bucket for data and scripts
215+
- `region`: AWS region for resources
216+
217+
Update these parameters in the Airflow UI or via environment variables.
218+
219+
## Troubleshooting
220+
221+
### Common Issues
222+
223+
- **IAM Permission Errors**: Ensure your execution role has all required permissions
224+
- **Resource Not Found**: Verify S3 buckets and objects exist before running DAGs
225+
- **Timeout Issues**: Adjust timeout values for long-running jobs
226+
227+
### Logs
228+
229+
Check CloudWatch logs for detailed error information:
230+
- Airflow task logs: `/aws/amazonmwaa/[environment-name]/task`
231+
- Service-specific logs: `/aws/glue/`, `/aws/athena/`, etc.
232+
233+
## Clean Up
234+
235+
Each DAG includes cleanup tasks where appropriate. For manual cleanup:
236+
237+
```bash
238+
# Remove uploaded files
239+
aws s3 rm s3://amzn-s3-demo-bucket/dags/ --recursive
240+
241+
# Delete IAM role and policies
242+
aws iam delete-role-policy --role-name mwaa-airflow3-execution-role --policy-name mwaa-airflow3-policy
243+
aws iam delete-role --role-name mwaa-airflow3-execution-role
244+
```
245+
246+
## Security
247+
248+
- Always use least-privilege IAM permissions
249+
- Sensitive data should be stored in AWS Secrets Manager
250+
- Network access is controlled through VPC configuration
251+
252+
## Contributing
253+
254+
See [CONTRIBUTING](../../../CONTRIBUTING.md) for guidelines on contributing to this repository.
255+
256+
## License
257+
258+
This library is licensed under the MIT-0 License. See the [LICENSE](../../../LICENSE) file.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Permission is hereby granted, free of charge, to any person obtaining a copy of
4+
# this software and associated documentation files (the "Software"), to deal in
5+
# the Software without restriction, including without limitation the rights to
6+
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
7+
# the Software, and to permit persons to whom the Software is furnished to do so.
8+
#
9+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
10+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
11+
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
12+
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
13+
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
14+
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
15+
16+
athena_dag:
17+
dag_id: athena_dag
18+
params:
19+
output_location: s3://amzn-s3-demo-bucket/athena-results/
20+
schedule: None
21+
tasks:
22+
create_stack:
23+
operator: airflow.providers.amazon.aws.operators.cloud_formation.CloudFormationCreateStackOperator
24+
cloudformation_parameters:
25+
StackName: covid-lake-stack
26+
TemplateURL: https://covid19-lake.s3.us-east-2.amazonaws.com/cfn/CovidLakeStack.template.json
27+
TimeoutInMinutes: 5
28+
OnFailure: DELETE
29+
stack_name: covid-lake-stack
30+
task_id: create_stack
31+
dependencies: []
32+
wait_for_stack_create:
33+
operator: airflow.providers.amazon.aws.sensors.cloud_formation.CloudFormationCreateStackSensor
34+
stack_name: covid-lake-stack
35+
task_id: wait_for_stack_create
36+
dependencies:
37+
- create_stack
38+
query_1:
39+
operator: airflow.providers.amazon.aws.operators.athena.AthenaOperator
40+
database: default
41+
output_location: '{{ params.output_location }}'
42+
query: |
43+
SELECT
44+
cases.fips,
45+
admin2 as county,
46+
province_state,
47+
confirmed,
48+
growth_count,
49+
sum(num_licensed_beds) as num_licensed_beds,
50+
sum(num_staffed_beds) as num_staffed_beds,
51+
sum(num_icu_beds) as num_icu_beds
52+
FROM
53+
"covid-19"."hospital_beds" beds,
54+
( SELECT
55+
fips,
56+
admin2,
57+
province_state,
58+
confirmed,
59+
last_value(confirmed) over (partition by fips order by last_update) - first_value(confirmed) over (partition by fips order by last_update) as growth_count,
60+
first_value(last_update) over (partition by fips order by last_update desc) as most_recent,
61+
last_update
62+
FROM
63+
"covid-19"."enigma_jhu"
64+
WHERE
65+
from_iso8601_timestamp(last_update) > now() - interval '200' day AND country_region = 'US') cases
66+
WHERE
67+
beds.fips = cases.fips AND last_update = most_recent
68+
GROUP BY cases.fips, confirmed, growth_count, admin2, province_state
69+
ORDER BY growth_count desc
70+
task_id: query_1
71+
dependencies:
72+
- wait_for_stack_create
73+
query_2:
74+
operator: airflow.providers.amazon.aws.operators.athena.AthenaOperator
75+
database: default
76+
output_location: '{{ params.output_location }}'
77+
query: |
78+
SELECT * FROM "covid-19"."world_cases_deaths_testing" order by "date" desc limit 10;
79+
task_id: query_2
80+
dependencies:
81+
- wait_for_stack_create
82+
query_3:
83+
operator: airflow.providers.amazon.aws.operators.athena.AthenaOperator
84+
database: default
85+
output_location: '{{ params.output_location }}'
86+
query: |
87+
SELECT
88+
date,
89+
positive,
90+
negative,
91+
pending,
92+
hospitalized,
93+
death,
94+
total,
95+
deathincrease,
96+
hospitalizedincrease,
97+
negativeincrease,
98+
positiveincrease,
99+
sta.state AS state_abbreviation,
100+
abb.state
101+
102+
FROM "covid-19"."covid_testing_states_daily" sta
103+
JOIN "covid-19"."us_state_abbreviations" abb ON sta.state = abb.abbreviation
104+
limit 500;
105+
task_id: query_3
106+
dependencies:
107+
- wait_for_stack_create
108+
delete_stack:
109+
operator: airflow.providers.amazon.aws.operators.cloud_formation.CloudFormationDeleteStackOperator
110+
stack_name: covid-lake-stack
111+
task_id: delete_stack
112+
trigger_rule: all_done
113+
dependencies:
114+
- query_1
115+
- query_3
116+
- query_2
117+
wait_for_stack_delete:
118+
operator: airflow.providers.amazon.aws.sensors.cloud_formation.CloudFormationDeleteStackSensor
119+
stack_name: covid-lake-stack
120+
task_id: wait_for_stack_delete
121+
trigger_rule: all_success
122+
dependencies:
123+
- delete_stack
124+

0 commit comments

Comments
 (0)