Skip to content

Commit 45d88fc

Browse files
committed
mds6_rdd_dags7 lecture
1 parent 6c57eb5 commit 45d88fc

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed

dags/rdd/dag7.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from airflow import DAG
2+
from airflow.operators.python import PythonOperator, BranchPythonOperator
3+
from airflow.utils.trigger_rule import TriggerRule as tr
4+
from datetime import datetime
5+
import random
6+
7+
# Функція для генерації випадкового числа
8+
def generate_number(ti):
9+
number = random.randint(1, 100)
10+
print(f"Generated number: {number}")
11+
12+
return number
13+
14+
# Функція для перевірки парності числа
15+
def check_even_odd(ti):
16+
number = ti.xcom_pull(task_ids='generate_number')
17+
18+
if number % 2 == 0:
19+
return 'square_task'
20+
else:
21+
return 'cube_task'
22+
23+
# Функція для піднесення числа до квадрата
24+
def square_number(ti):
25+
number = ti.xcom_pull(task_ids='generate_number')
26+
result = number ** 2
27+
28+
ti.xcom_push(key='math_result', value=result)
29+
print(f"{number} squared is {result}")
30+
31+
# Функція для піднесення числа до куба
32+
def cube_number(ti):
33+
number = ti.xcom_pull(task_ids='generate_number')
34+
result = number ** 3
35+
36+
ti.xcom_push(key='math_result', value=result)
37+
print(f"{number} cubed is {result}")
38+
39+
# Функція для витягування даних з xcom та prints
40+
def final_function(ti):
41+
original_number = ti.xcom_pull(task_ids='generate_number')
42+
math_result = ti.xcom_pull(key='math_result')
43+
44+
print(f"Original value {original_number}, math_result {math_result}")
45+
46+
# Визначення DAG
47+
default_args = {
48+
'owner': 'airflow',
49+
'start_date': datetime(2024, 8, 4, 0, 0),
50+
}
51+
52+
with DAG(
53+
'even_or_odd_square_or_cube_rdd',
54+
default_args=default_args,
55+
schedule_interval='*/10 * * * *',
56+
catchup=False,
57+
tags=["rdd_mds6"]
58+
) as dag:
59+
generate_number_task = PythonOperator(
60+
task_id='generate_number',
61+
python_callable=generate_number,
62+
)
63+
64+
check_even_odd_task = BranchPythonOperator(
65+
task_id='check_even_odd',
66+
python_callable=check_even_odd,
67+
)
68+
69+
square_task = PythonOperator(
70+
task_id='square_task',
71+
python_callable=square_number,
72+
)
73+
74+
cube_task = PythonOperator(
75+
task_id='cube_task',
76+
python_callable=cube_number,
77+
)
78+
79+
end_task = PythonOperator(
80+
task_id='end_task',
81+
python_callable=final_function,
82+
trigger_rule=tr.ONE_SUCCESS
83+
)
84+
85+
# Встановлення залежностей
86+
generate_number_task >> check_even_odd_task
87+
check_even_odd_task >> [square_task, cube_task]
88+
square_task >> end_task
89+
cube_task >> end_task
90+

0 commit comments

Comments
 (0)