Skip to content

Commit 9a36c75

Browse files
committed
mds6 rdd dags dz7_v3
1 parent f2c50b9 commit 9a36c75

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed

dags/rdd/dag_lecture7_2.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from airflow import DAG
2+
from datetime import datetime
3+
from airflow.sensors.sql import SqlSensor
4+
from airflow.operators.mysql_operator import MySqlOperator
5+
from airflow.operators.python import PythonOperator
6+
from airflow.utils.trigger_rule import TriggerRule as tr
7+
from airflow.utils.state import State
8+
9+
# Функція для примусового встановлення статусу DAG як успішного
10+
def mark_dag_success(ti, **kwargs):
11+
dag_run = kwargs['dag_run']
12+
dag_run.set_state(State.SUCCESS)
13+
14+
# Аргументи за замовчуванням для DAG
15+
default_args = {
16+
'owner': 'airflow',
17+
'start_date': datetime(2024, 8, 4, 0, 0),
18+
}
19+
20+
# Назва з'єднання з базою даних MySQL
21+
connection_name = "goit_mysql_db"
22+
23+
# Визначення DAG
24+
with DAG(
25+
'working_with_mysql_db_mds6rdd',
26+
default_args=default_args,
27+
schedule_interval=None, # DAG не має запланованого інтервалу виконання
28+
catchup=False, # Вимкнути запуск пропущених задач
29+
tags=["mds6rdd"] # Теги для класифікації DAG
30+
) as dag:
31+
32+
# Завдання для створення схеми бази даних (якщо не існує)
33+
create_schema = MySqlOperator(
34+
task_id='create_schema',
35+
mysql_conn_id=connection_name,
36+
sql="""
37+
CREATE DATABASE IF NOT EXISTS oleksiy;
38+
"""
39+
)
40+
41+
# Завдання для створення таблиці (якщо не існує)
42+
create_table = MySqlOperator(
43+
task_id='create_table',
44+
mysql_conn_id=connection_name,
45+
sql="""
46+
CREATE TABLE IF NOT EXISTS oleksiy.games (
47+
`edition` text,
48+
`edition_id` int DEFAULT NULL,
49+
`edition_url` text,
50+
`year` int DEFAULT NULL,
51+
`city` text,
52+
`country_flag_url` text,
53+
`country_noc` text,
54+
`start_date` text,
55+
`end_date` text,
56+
`competition_date` text,
57+
`isHeld` text
58+
);
59+
"""
60+
)
61+
62+
# Сенсор для порівняння кількості рядків у таблицях `oleksiy.games` і `olympic_dataset.games`
63+
check_for_data = SqlSensor(
64+
task_id='check_if_counts_same',
65+
conn_id=connection_name,
66+
sql="""WITH count_in_copy AS (
67+
select COUNT(*) nrows_copy from oleksiy.games
68+
),
69+
count_in_original AS (
70+
select COUNT(*) nrows_original from olympic_dataset.games
71+
)
72+
SELECT nrows_copy <> nrows_original FROM count_in_copy
73+
CROSS JOIN count_in_original
74+
;""",
75+
mode='poke', # Режим перевірки: періодична перевірка умови
76+
poke_interval=5, # Перевірка кожні 5 секунд
77+
timeout=6, # Тайм-аут після 6 секунд (1 повторна перевірка)
78+
)
79+
80+
# Завдання для оновлення даних у таблиці `oleksiy.games`
81+
refresh_data = MySqlOperator(
82+
task_id='refresh',
83+
mysql_conn_id=connection_name,
84+
sql="""
85+
TRUNCATE oleksiy.games; # Очищення таблиці
86+
INSERT INTO oleksiy.games SELECT * FROM olympic_dataset.games; # Вставка даних з іншої таблиці
87+
""",
88+
)
89+
90+
# Завдання для примусового встановлення статусу DAG як успішного в разі невдачі
91+
mark_success_task = PythonOperator(
92+
task_id='mark_success',
93+
trigger_rule=tr.ONE_FAILED, # Виконати, якщо хоча б одне попереднє завдання завершилося невдачею
94+
python_callable=mark_dag_success,
95+
provide_context=True, # Надати контекст завдання у виклик функції
96+
dag=dag,
97+
)
98+
99+
# Встановлення залежностей між завданнями
100+
create_schema >> create_table >> check_for_data >> refresh_data
101+
check_for_data >> mark_success_task

dags/rdd/dz7.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from airflow import DAG
2+
from datetime import datetime
3+
from airflow.sensors.sql import SqlSensor
4+
from airflow.operators.mysql_operator import MySqlOperator
5+
from airflow.operators.python import PythonOperator
6+
from airflow.utils.trigger_rule import TriggerRule as tr
7+
from airflow.utils.state import State
8+
9+
# Функція для примусового встановлення статусу DAG як успішного
10+
def mark_dag_success(ti, **kwargs):
11+
dag_run = kwargs['dag_run']
12+
dag_run.set_state(State.SUCCESS)
13+
14+
# Назва з'єднання з базою даних MySQL
15+
connection_name = "goit_mysql_db"
16+
17+
# Аргументи за замовчуванням для DAG
18+
default_args = {
19+
'owner': 'airflow',
20+
'start_date': datetime(2024, 8, 4, 0, 0),
21+
}
22+
23+
24+
25+
26+
# Визначення DAG
27+
with DAG(
28+
'mds6rdd_dz7',
29+
default_args=default_args,
30+
schedule_interval=None, # DAG не має запланованого інтервалу виконання
31+
catchup=False, # Вимкнути запуск пропущених задач
32+
tags=["mds6rdd"] # Теги для класифікації DAG
33+
) as dag:
34+
35+
# Завдання для створення схеми бази даних (якщо не існує)
36+
create_schema = MySqlOperator(
37+
task_id='create_schema',
38+
mysql_conn_id=connection_name,
39+
sql="""
40+
CREATE DATABASE IF NOT EXISTS mds6rdd;
41+
"""
42+
)
43+
44+
# Завдання для створення таблиці (якщо не існує)
45+
create_table = MySqlOperator(
46+
task_id='create_table',
47+
mysql_conn_id=connection_name,
48+
sql="""
49+
CREATE TABLE IF NOT EXISTS mds6rdd.games (
50+
`edition` text,
51+
`edition_id` int DEFAULT NULL,
52+
`edition_url` text,
53+
`year` int DEFAULT NULL,
54+
`city` text,
55+
`country_flag_url` text,
56+
`country_noc` text,
57+
`start_date` text,
58+
`end_date` text,
59+
`competition_date` text,
60+
`isHeld` text
61+
);
62+
"""
63+
)
64+
65+
66+
create_schema >> create_table
67+

0 commit comments

Comments
 (0)