Skip to content

Commit b4d5cff

Browse files
Merge pull request #96 from ksharlandjiev/dag-factory-example
MWAA provisioned/serverless Dag factory example
2 parents cc90bb7 + 8cfd301 commit b4d5cff

File tree

11 files changed

+1928
-0
lines changed

11 files changed

+1928
-0
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# MWAA Data Pipeline Workshop
2+
3+
One-command deployment of a provisioned MWAA instance with DAG Factory installed. As part of this stack data pipeline with VPC, Redshift, Glue, and EMR Serverless.
4+
5+
## Quick Start
6+
7+
```bash
8+
# 1. Install prerequisites
9+
pip install aws-sam-cli
10+
11+
# 2. Deploy everything
12+
python3 deploy.py
13+
14+
# 3. Wait ~25 minutes, then access Airflow UI
15+
16+
# 4. Cleanup when done
17+
python3 cleanup.py
18+
```
19+
20+
## What Gets Deployed
21+
22+
### Provisioned MWAA Instance
23+
- **MWAA 3.0.6** environment running Apache Airflow
24+
- **DAG Factory** pre-installed for YAML-based DAG definitions
25+
- Small environment size (suitable for development/testing)
26+
27+
### Data Pipeline Examples
28+
1. **Python DAG** (`dags/data_pipeline.py`) - Traditional Airflow DAG written in Python
29+
2. **YAML DAG** (`yaml/example_data_pipeline.yaml`) - DAG Factory representation of the same pipeline
30+
31+
### Infrastructure Components
32+
- VPC with public/private subnets, NAT gateways
33+
- Redshift Serverless (8 RPU)
34+
- S3 bucket with DAGs, scripts, sample data
35+
- IAM roles for Glue, EMR, Redshift
36+
37+
## Deploying to MWAA Serverless
38+
39+
The YAML-based DAG can be deployed to MWAA Serverless using the following commands:
40+
41+
### 1. Convert Python DAG to YAML
42+
```bash
43+
dag-converter convert data_pipeline.py --output yaml/
44+
```
45+
46+
### 2. Upload YAML to S3
47+
```bash
48+
aws s3 sync yaml/ s3://YOUR-BUCKET-NAME/yaml/
49+
```
50+
51+
### 3. Create Serverless Workflow
52+
```bash
53+
aws mwaa-serverless create-workflow \
54+
--name example_data_pipeline \
55+
--definition-s3-location '{ "Bucket": "YOUR-BUCKET-NAME", "ObjectKey": "yaml/example_data_pipeline.yaml" }' \
56+
--role-arn arn:aws:iam::YOUR-ACCOUNT-ID:role/service-role/YOUR-MWAA-EXECUTION-ROLE \
57+
--region us-east-2
58+
```
59+
60+
### 4. List Serverless Workflows
61+
```bash
62+
aws mwaa-serverless list-workflows --region us-east-2
63+
```
64+
65+
> **Note:** Replace `YOUR-BUCKET-NAME`, `YOUR-ACCOUNT-ID`, and `YOUR-MWAA-EXECUTION-ROLE` with your actual AWS resource identifiers.
66+
67+
## Pipeline Flow
68+
69+
```
70+
S3 Data → Glue Crawler → Glue Transform → EMR Aggregate → Redshift
71+
```
72+
73+
## Files
74+
75+
- `template.yaml` - SAM/CloudFormation template
76+
- `deploy.py` - Deployment script
77+
- `cleanup.py` - Cleanup script
78+
- `dags/` - Airflow DAGs
79+
- `scripts/` - Glue and EMR scripts
80+
- `requirements/` - Python dependencies
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
#!/usr/bin/env python3
2+
"""
3+
MWAA Data Pipeline Workshop - Cleanup Script
4+
Deletes all resources created by the deployment
5+
"""
6+
7+
import subprocess
8+
import sys
9+
import json
10+
11+
STACK_NAME = "mwaa-data-pipeline-workshop"
12+
13+
def run_command(command):
14+
"""Execute CLI command"""
15+
print(f"Running: {command}")
16+
result = subprocess.run(command, shell=True, capture_output=True, text=True)
17+
if result.stdout:
18+
print(result.stdout)
19+
if result.stderr and result.returncode != 0:
20+
print(f"Error: {result.stderr}")
21+
return result.returncode == 0
22+
23+
def get_stack_outputs(region):
24+
"""Get CloudFormation stack outputs"""
25+
result = subprocess.run(
26+
f"aws cloudformation describe-stacks --stack-name {STACK_NAME} --query 'Stacks[0].Outputs' --region {region}",
27+
shell=True,
28+
capture_output=True,
29+
text=True
30+
)
31+
32+
if result.returncode != 0:
33+
return None
34+
35+
try:
36+
outputs = json.loads(result.stdout)
37+
if not outputs:
38+
return None
39+
40+
result_dict = {}
41+
for output in outputs:
42+
result_dict[output['OutputKey']] = output['OutputValue']
43+
44+
return result_dict
45+
except (json.JSONDecodeError, TypeError):
46+
return None
47+
48+
def empty_s3_bucket(bucket_name):
49+
"""Empty S3 bucket before deletion"""
50+
print(f"\nEmptying S3 bucket: {bucket_name}")
51+
52+
# Delete all objects
53+
print(" Deleting objects...")
54+
run_command(f"aws s3 rm s3://{bucket_name} --recursive")
55+
56+
# Delete all versions (if versioning is enabled)
57+
print(" Deleting object versions...")
58+
result = subprocess.run(
59+
f"aws s3api list-object-versions --bucket {bucket_name} --output json",
60+
shell=True,
61+
capture_output=True,
62+
text=True
63+
)
64+
65+
if result.returncode == 0 and result.stdout:
66+
import json
67+
try:
68+
versions = json.loads(result.stdout)
69+
70+
# Delete versions
71+
if 'Versions' in versions:
72+
for version in versions['Versions']:
73+
key = version['Key']
74+
version_id = version['VersionId']
75+
run_command(f"aws s3api delete-object --bucket {bucket_name} --key '{key}' --version-id {version_id}")
76+
77+
# Delete delete markers
78+
if 'DeleteMarkers' in versions:
79+
for marker in versions['DeleteMarkers']:
80+
key = marker['Key']
81+
version_id = marker['VersionId']
82+
run_command(f"aws s3api delete-object --bucket {bucket_name} --key '{key}' --version-id {version_id}")
83+
except json.JSONDecodeError:
84+
pass
85+
86+
print("✓ S3 bucket emptied")
87+
88+
def delete_glue_crawlers(region):
89+
"""Delete Glue crawlers created by the pipeline"""
90+
print("\nDeleting Glue crawlers...")
91+
92+
# List all Glue crawlers
93+
result = subprocess.run(
94+
f"aws glue get-crawlers --region {region} --output json",
95+
shell=True,
96+
capture_output=True,
97+
text=True
98+
)
99+
100+
if result.returncode == 0 and result.stdout:
101+
try:
102+
crawlers_data = json.loads(result.stdout)
103+
crawlers = crawlers_data.get('Crawlers', [])
104+
105+
# Delete crawlers that match our pipeline naming pattern
106+
deleted_count = 0
107+
for crawler in crawlers:
108+
crawler_name = crawler['Name']
109+
if 'airflow-workshop' in crawler_name or 'immersion_day' in crawler_name.lower():
110+
print(f" Deleting Glue crawler: {crawler_name}")
111+
run_command(f"aws glue delete-crawler --name {crawler_name} --region {region}")
112+
deleted_count += 1
113+
114+
if deleted_count > 0:
115+
print(f"✓ Deleted {deleted_count} Glue crawler(s)")
116+
else:
117+
print(" No matching Glue crawlers found")
118+
except json.JSONDecodeError:
119+
print(" Could not parse Glue crawlers list")
120+
else:
121+
print(" Could not list Glue crawlers")
122+
123+
def delete_glue_tables(region, database_name='default'):
124+
"""Delete Glue tables created by the pipeline"""
125+
print(f"\nDeleting Glue tables from database '{database_name}'...")
126+
127+
# List all tables in the database
128+
result = subprocess.run(
129+
f"aws glue get-tables --database-name {database_name} --region {region} --output json",
130+
shell=True,
131+
capture_output=True,
132+
text=True
133+
)
134+
135+
if result.returncode == 0 and result.stdout:
136+
try:
137+
tables_data = json.loads(result.stdout)
138+
tables = tables_data.get('TableList', [])
139+
140+
# Delete tables that match our pipeline naming pattern
141+
deleted_count = 0
142+
for table in tables:
143+
table_name = table['Name']
144+
# Delete tables created by the crawler (green tripdata tables)
145+
if 'green' in table_name.lower() or 'tripdata' in table_name.lower():
146+
print(f" Deleting Glue table: {table_name}")
147+
run_command(f"aws glue delete-table --database-name {database_name} --name {table_name} --region {region}")
148+
deleted_count += 1
149+
150+
if deleted_count > 0:
151+
print(f"✓ Deleted {deleted_count} Glue table(s)")
152+
else:
153+
print(" No matching Glue tables found")
154+
except json.JSONDecodeError:
155+
print(" Could not parse Glue tables list")
156+
else:
157+
print(f" Could not list Glue tables (database '{database_name}' may not exist)")
158+
159+
def delete_glue_jobs(region):
160+
"""Delete Glue jobs created by the pipeline"""
161+
print("\nDeleting Glue jobs...")
162+
163+
# List all Glue jobs
164+
result = subprocess.run(
165+
f"aws glue get-jobs --region {region} --output json",
166+
shell=True,
167+
capture_output=True,
168+
text=True
169+
)
170+
171+
if result.returncode == 0 and result.stdout:
172+
try:
173+
jobs_data = json.loads(result.stdout)
174+
jobs = jobs_data.get('Jobs', [])
175+
176+
# Delete jobs that match our pipeline naming pattern
177+
deleted_count = 0
178+
for job in jobs:
179+
job_name = job['Name']
180+
# Delete jobs created by the pipeline (adjust pattern as needed)
181+
if 'nyc_raw_to_transform' in job_name or 'immersion_day' in job_name.lower():
182+
print(f" Deleting Glue job: {job_name}")
183+
run_command(f"aws glue delete-job --job-name {job_name} --region {region}")
184+
deleted_count += 1
185+
186+
if deleted_count > 0:
187+
print(f"✓ Deleted {deleted_count} Glue job(s)")
188+
else:
189+
print(" No matching Glue jobs found")
190+
except json.JSONDecodeError:
191+
print(" Could not parse Glue jobs list")
192+
else:
193+
print(" Could not list Glue jobs")
194+
195+
def delete_stack(region):
196+
"""Delete CloudFormation stack"""
197+
print(f"\nDeleting CloudFormation stack: {STACK_NAME}")
198+
run_command(f"aws cloudformation delete-stack --stack-name {STACK_NAME} --region {region}")
199+
print("✓ Stack deletion initiated")
200+
print("\nWaiting for stack deletion to complete...")
201+
run_command(f"aws cloudformation wait stack-delete-complete --stack-name {STACK_NAME} --region {region}")
202+
print("✓ Stack deleted successfully")
203+
204+
def get_aws_region():
205+
"""Get AWS region with auto-detection"""
206+
# Try to get region from SAM config first
207+
detected_region = None
208+
try:
209+
with open('samconfig.toml', 'r') as f:
210+
for line in f:
211+
if 'region' in line and '=' in line:
212+
detected_region = line.split('=')[1].strip().strip('"').strip("'")
213+
if detected_region:
214+
break
215+
except FileNotFoundError:
216+
pass
217+
218+
# Try to get region from deployed stack if not found
219+
if not detected_region:
220+
result = subprocess.run(
221+
f"aws cloudformation describe-stacks --stack-name {STACK_NAME} --query 'Stacks[0].StackId' --output text 2>/dev/null",
222+
shell=True,
223+
capture_output=True,
224+
text=True
225+
)
226+
if result.returncode == 0 and result.stdout.strip():
227+
stack_arn = result.stdout.strip()
228+
detected_region = stack_arn.split(':')[3]
229+
230+
# Fall back to AWS CLI config
231+
if not detected_region:
232+
result = subprocess.run(
233+
['aws', 'configure', 'get', 'region'],
234+
capture_output=True,
235+
text=True
236+
)
237+
detected_region = result.stdout.strip() or 'us-east-1'
238+
239+
print(f"\nDetected region: {detected_region}")
240+
user_region = input(f"Press Enter to use {detected_region}, or type a different region: ").strip()
241+
242+
region = user_region if user_region else detected_region
243+
print(f"✓ Using region: {region}")
244+
return region
245+
246+
def main():
247+
print("\n" + "="*60)
248+
print("MWAA Data Pipeline Workshop - Cleanup")
249+
print("="*60 + "\n")
250+
251+
# Get region automatically
252+
region = get_aws_region()
253+
254+
# Confirm deletion
255+
print("\n⚠️ WARNING: This will delete all resources!")
256+
response = input("Are you sure you want to continue? (yes/no): ")
257+
if response.lower() != 'yes':
258+
print("Cleanup cancelled.")
259+
sys.exit(0)
260+
261+
# Get stack outputs
262+
outputs = get_stack_outputs(region)
263+
264+
if not outputs:
265+
print("\n⚠️ Could not find stack outputs.")
266+
print("The stack may not exist or may already be deleted.")
267+
response = input("Do you want to try deleting Glue resources anyway? (yes/no): ")
268+
if response.lower() == 'yes':
269+
delete_glue_crawlers(region)
270+
delete_glue_tables(region, 'default')
271+
delete_glue_jobs(region)
272+
print("\nNothing else to clean up.")
273+
sys.exit(0)
274+
275+
# Delete Glue resources first (before stack deletion)
276+
delete_glue_crawlers(region)
277+
delete_glue_tables(region, 'default')
278+
delete_glue_jobs(region)
279+
280+
# Empty S3 bucket
281+
if 'MWAABucketName' in outputs:
282+
bucket_name = outputs['MWAABucketName']
283+
empty_s3_bucket(bucket_name)
284+
else:
285+
print("⚠️ Could not find bucket name in stack outputs.")
286+
287+
# Delete stack
288+
delete_stack(region)
289+
290+
print("\n" + "="*60)
291+
print("Cleanup Complete!")
292+
print("="*60)
293+
print("\nAll resources have been deleted.")
294+
print("You will no longer incur charges for this workshop.\n")
295+
296+
if __name__ == "__main__":
297+
main()

0 commit comments

Comments
 (0)