1+ from airflow import DAG
2+ from airflow .operators .bash_operator import BashOperator
3+ from airflow .operators .python_operator import PythonOperator , BranchPythonOperator
4+ from datetime import datetime , timedelta
5+ import zipfile
6+ import random
7+ import pandas as pd
8+
9+ default_args = {
10+ 'owner' : 'A3Data' ,
11+ "depends_on_past" : False ,
12+ "start_date" : datetime (2020 , 12 , 30 , 18 , 10 ),
13+ "email" : ["airflow@airflow.com" ],
14+ "email_on_failure" : False ,
15+ "email_on_retry" : False
16+ #"retries": 1,
17+ #"retry_delay": timedelta(minutes=1),
18+ }
19+
20+ dag = DAG (
21+ "conditional-example" ,
22+ description = "A DAG with a condition" ,
23+ default_args = default_args ,
24+ schedule_interval = timedelta (minutes = 2 )
25+ )
26+
27+ get_data = BashOperator (
28+ task_id = "get-data" ,
29+ bash_command = 'curl https://download.inep.gov.br/microdados/Enade_Microdados/microdados_enade_2019.zip -o microdados_enade_2019.zip' ,
30+ trigger_rule = "all_done" ,
31+ dag = dag
32+ )
33+
34+
35+ def unzip_file ():
36+ with zipfile .ZipFile ("microdados_enade_2019.zip" , 'r' ) as zipped :
37+ zipped .extractall ()
38+
39+ unzip_data = PythonOperator (
40+ task_id = 'unzip-data' ,
41+ python_callable = unzip_file ,
42+ dag = dag
43+ )
44+
45+
46+ def select_student ():
47+ df = pd .read_csv ('microdados_enade_2019/2019/3.DADOS/microdados_enade_2019.txt' , sep = ';' , decimal = ',' )
48+ choice = random .randint (0 , df .shape [0 ]- 1 )
49+ student = df .iloc [choice ]
50+ return student .TP_SEXO
51+
52+ pick_student = PythonOperator (
53+ task_id = "pick-student" ,
54+ python_callable = select_student ,
55+ dag = dag
56+ )
57+
58+ def MorF (** context ):
59+ value = context ['task_instance' ].xcom_pull (task_ids = 'pick-student' )
60+ if value == 'M' :
61+ return 'male_branch'
62+ elif value == 'F' :
63+ return 'female_branch'
64+
65+ male_or_female = BranchPythonOperator (
66+ task_id = 'condition-male_or_female' ,
67+ python_callable = MorF ,
68+ provide_context = True ,
69+ dag = dag
70+ )
71+
72+
73+ male_branch = BashOperator (
74+ task_id = "male_branch" ,
75+ bash_command = 'echo "A male student was randomly picked."' ,
76+ dag = dag
77+ )
78+
79+ female_branch = BashOperator (
80+ task_id = "female_branch" ,
81+ bash_command = 'echo "A female student was randomly picked."' ,
82+ dag = dag
83+ )
84+
85+ get_data >> unzip_data >> pick_student >> male_or_female >> [male_branch , female_branch ]
0 commit comments