|
| 1 | +from airflow import DAG |
| 2 | +from airflow.providers.mysql.operators.mysql import MySqlOperator |
| 3 | + |
| 4 | +from airflow.operators.python import PythonOperator, BranchPythonOperator |
| 5 | +from airflow.sensors.sql import SqlSensor |
| 6 | +from airflow.utils.trigger_rule import TriggerRule |
| 7 | + |
| 8 | +from datetime import datetime, timedelta |
| 9 | +import random |
| 10 | +import time |
| 11 | + |
| 12 | + |
| 13 | +default_args = { |
| 14 | + 'owner': 'airflow', |
| 15 | + 'retries': 1, |
| 16 | + 'retry_delay': timedelta(seconds=10) |
| 17 | +} |
| 18 | + |
| 19 | +with DAG( |
| 20 | + dag_id='ola_medal_count_pipeline', |
| 21 | + default_args=default_args, |
| 22 | + start_date=datetime(2024, 1, 1), |
| 23 | + schedule_interval=None, |
| 24 | + catchup=False |
| 25 | +) as dag: |
| 26 | + |
| 27 | + # 1. Створення таблиці |
| 28 | + create_table = MySqlOperator( |
| 29 | + task_id='create_table', |
| 30 | + mysql_conn_id='mysql_default', |
| 31 | + sql=""" |
| 32 | + CREATE TABLE IF NOT EXISTS neo_data.ola_medals_results ( |
| 33 | + id INT AUTO_INCREMENT PRIMARY KEY, |
| 34 | + medal_type VARCHAR(10), |
| 35 | + count INT, |
| 36 | + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| 37 | + ); |
| 38 | + """ |
| 39 | + ) |
| 40 | + |
| 41 | + # 2. Випадковий вибір медалі |
| 42 | + def choose_medal(): |
| 43 | + return random.choice(['calc_Bronze', 'calc_Silver', 'calc_Gold']) |
| 44 | + |
| 45 | + pick_medal = PythonOperator( |
| 46 | + task_id='pick_medal', |
| 47 | + python_callable=lambda: print("Picking medal..."), |
| 48 | + ) |
| 49 | + |
| 50 | + pick_medal_task = BranchPythonOperator( |
| 51 | + task_id='pick_medal_task', |
| 52 | + python_callable=choose_medal |
| 53 | + ) |
| 54 | + |
| 55 | + # 3. Три завдання під розгалуження |
| 56 | + calc_Bronze = MySqlOperator( |
| 57 | + task_id='calc_Bronze', |
| 58 | + mysql_conn_id='mysql_default', |
| 59 | + sql=""" |
| 60 | + INSERT INTO neo_data.ola_medals_results (medal_type, count) |
| 61 | + SELECT 'Bronze', COUNT(*) |
| 62 | + FROM olympic_dataset.athlete_event_results |
| 63 | + WHERE medal = 'Bronze'; |
| 64 | + """ |
| 65 | + ) |
| 66 | + |
| 67 | + calc_Silver = MySqlOperator( |
| 68 | + task_id='calc_Silver', |
| 69 | + mysql_conn_id='mysql_default', |
| 70 | + sql=""" |
| 71 | + INSERT INTO neo_data.ola_medals_results (medal_type, count) |
| 72 | + SELECT 'Silver', COUNT(*) |
| 73 | + FROM olympic_dataset.athlete_event_results |
| 74 | + WHERE medal = 'Silver'; |
| 75 | + """ |
| 76 | + ) |
| 77 | + |
| 78 | + calc_Gold = MySqlOperator( |
| 79 | + task_id='calc_Gold', |
| 80 | + mysql_conn_id='mysql_default', |
| 81 | + sql=""" |
| 82 | + INSERT INTO neo_data.ola_medals_results (medal_type, count) |
| 83 | + SELECT 'Gold', COUNT(*) |
| 84 | + FROM olympic_dataset.athlete_event_results |
| 85 | + WHERE medal = 'Gold'; |
| 86 | + """ |
| 87 | + ) |
| 88 | + |
| 89 | + # 5. Створення затримки |
| 90 | + def delay_task(): |
| 91 | + time.sleep(15) |
| 92 | + |
| 93 | + generate_delay = PythonOperator( |
| 94 | + task_id='generate_delay', |
| 95 | + python_callable=delay_task, |
| 96 | + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS |
| 97 | + ) |
| 98 | + |
| 99 | + # 6. SQL Sensor — перевірка свіжості запису |
| 100 | + check_for_correctness = SqlSensor( |
| 101 | + task_id='check_for_correctness', |
| 102 | + conn_id='mysql_default', |
| 103 | + sql=""" |
| 104 | + SELECT COUNT(*) |
| 105 | + FROM neo_data.ola_medals_results |
| 106 | + WHERE created_at >= NOW() - INTERVAL 30 SECOND; |
| 107 | + """, |
| 108 | + timeout=60, |
| 109 | + poke_interval=10, |
| 110 | + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS |
| 111 | + ) |
| 112 | + |
| 113 | + # Зв’язки між задачами |
| 114 | + create_table >> pick_medal >> pick_medal_task |
| 115 | + pick_medal_task >> [calc_Bronze, calc_Silver, calc_Gold] |
| 116 | + [calc_Bronze, calc_Silver, calc_Gold] >> generate_delay >> check_for_correctness |
0 commit comments