File tree Expand file tree Collapse file tree 5 files changed +15
-7
lines changed
Expand file tree Collapse file tree 5 files changed +15
-7
lines changed Original file line number Diff line number Diff line change 1+ AIRFLOW_VERSION: =2.4.3
2+
13.PHONY : all
24all : clean docker-compose.yaml airflow init up
35
46docker-compose.yaml :
5- curl -LfO ' https://airflow.apache.org/docs/apache-airflow/2.2.2 /docker-compose.yaml'
7+ curl -LfO ' https://airflow.apache.org/docs/apache-airflow/${AIRFLOW_VERSION} /docker-compose.yaml'
68
79airflow :
8- curl -Lf ' https://airflow.apache.org/docs/apache-airflow/2.2.2 /airflow.sh' > airflow
10+ curl -Lf ' https://airflow.apache.org/docs/apache-airflow/${AIRFLOW_VERSION} /airflow.sh' > airflow
911 chmod +x airflow
1012
1113.PHONY : init
Original file line number Diff line number Diff line change 44from airflow .decorators import task
55from airflow .models .dag import DAG
66from airflow .providers .amazon .aws .hooks .s3 import S3Hook
7- from airflow .providers .amazon .aws .operators .s3_bucket import S3CreateBucketOperator , S3DeleteBucketOperator
7+ from airflow .providers .amazon .aws .operators .s3 import S3CreateBucketOperator , S3DeleteBucketOperator
88
99# By default, it will use 'aws_default' connection. You can create it here by running `make minio_credentials`
1010# If you want to change it, use a variable and pass it as `aws_conn_id` to all AWS operators.
Original file line number Diff line number Diff line change 44from airflow .decorators import task
55from airflow .models .dag import DAG
66from airflow .models .variable import Variable
7- from airflow .providers .amazon .aws .sensors .s3_key import S3KeySensor , S3KeySizeSensor
7+ from airflow .providers .amazon .aws .sensors .s3 import S3KeySensor
88
99BUCKET_NAME = os .environ .get ('BUCKET_NAME' , 'patatas' )
1010
Original file line number Diff line number Diff line change 1+ # ./airflow variables set gcp_project bigdataupv2022
2+ # ./airflow variables set gcp_region europe-west1
3+ # ./airflow variables set gcp_zone europe-west1-b
4+ # ./airflow variables set gcp_bucket bigdataupv_data
5+
16import datetime
27import os
38
2732 task_id = 'create_dataproc_cluster' ,
2833 cluster_name = 'spark-cluster-{{ ds_nodash }}' ,
2934 num_workers = 2 ,
30- zone = models .Variable .get ('gce_zone' ),
35+ zone = models .Variable .get ('gcp_zone' ),
36+ region = models .Variable .get ('gcp_region' ),
3137 master_machine_type = 'n1-standard-1' ,
3238 worker_machine_type = 'n1-standard-1' )
3339
3440 run_dataproc_pyspark = dataproc_operator .DataProcPySparkOperator (
3541 task_id = 'run_spark' ,
3642 cluster_name = 'spark-cluster-{{ ds_nodash }}' ,
37- region = 'europe-west1' ,
43+ region = models . Variable . get ( 'gcp_region' ) ,
3844 main = 'gs://bigdataupv_code/compras_top_ten_countries.py' ,
3945 files = ['gs://bigdataupv_code/helpers.py' ])
4046
Original file line number Diff line number Diff line change 2626 run_step = dataproc_operator .DataProcPySparkOperator (
2727 task_id = 'run_spark' ,
2828 cluster_name = 'cluster-9c11' ,
29- region = 'europe-west1' ,
29+ region = models . Variable . get ( 'gcp_region' ) ,
3030 main = 'gs://bigdataupv_code/compras_top_ten_countries.py' ,
3131 files = ['gs://bigdataupv_code/helpers.py' ])
3232
You can’t perform that action at this time.
0 commit comments