11from airflow import DAG
2- from airflow .providers . common . sql . operators .sql import SQLExecuteQueryOperator
2+ from airflow .operators .python import PythonOperator
33from airflow .operators .empty import EmptyOperator
4+ from airflow .providers .postgres .hooks .postgres import PostgresHook
45from datetime import datetime , timedelta
56
67default_args = {
7- 'owner' : 'Melnikova_Alisa ' ,
8+ 'owner' : 'MelnikovaAlisa ' ,
89 'depends_on_past' : False ,
910 'start_date' : datetime (2024 , 1 , 1 ),
1011 'email_on_failure' : False ,
1314 'retry_delay' : timedelta (minutes = 5 ),
1415}
1516
17+
18+ def init_schema_with_context (** context ):
19+ """
20+ Инициализация схемы hospital с использованием даты из контекста DAG run
21+
22+ Параметры из контекста:
23+ - execution_date: фактическая дата выполнения DAG
24+ - logical_date: логическая дата (для расписания)
25+ - dag_run.run_id: уникальный ID запуска DAG
26+ - ds: дата выполнения в формате YYYY-MM-DD
27+ """
28+
29+ # Получаем параметры из контекста DAG run
30+ execution_date = context ['execution_date' ]
31+ logical_date = context ['logical_date' ]
32+ dag_run_id = context ['dag_run' ].run_id
33+ ds = context ['ds' ] # Дата выполнения в формате YYYY-MM-DD
34+
35+ print ("=" * 60 )
36+ print ("ИНИЦИАЛИЗАЦИЯ СХЕМЫ HOSPITAL С КОНТЕКСТОМ" )
37+ print ("=" * 60 )
38+
39+ print (f" Параметры из контекста DAG run:" )
40+ print (f" • Execution Date: { execution_date } " )
41+ print (f" • Logical Date: { logical_date } " )
42+ print (f" • DAG Run ID: { dag_run_id } " )
43+ print (f" • DS (дата): { ds } " )
44+
45+ # Используем дату из контекста для именования
46+ date_suffix = ds .replace ('-' , '' ) # 2024-01-15 -> 20240115
47+
48+ hook = PostgresHook (postgres_conn_id = 'postgres_default' )
49+ conn = hook .get_conn ()
50+ conn .autocommit = True
51+ cursor = conn .cursor ()
52+
53+ # 1. Создаем основную схему hospital
54+ print (f"\n 1. Создаем схему 'hospital'..." )
55+ cursor .execute ("CREATE SCHEMA IF NOT EXISTS hospital;" )
56+
57+ # 2. Создаем таблицу visits с дополнительными полями для отслеживания DAG run
58+ print (f"2. Создаем таблицу 'visits'..." )
59+ cursor .execute (f"""
60+ CREATE TABLE IF NOT EXISTS hospital.visits (
61+ visit_id BIGINT PRIMARY KEY,
62+ patient_id BIGINT NOT NULL,
63+ doctor_id BIGINT NOT NULL,
64+ visit_date DATE NOT NULL,
65+ diagnosis TEXT,
66+ dag_run_id VARCHAR(100) DEFAULT '{ dag_run_id } ',
67+ execution_date TIMESTAMP DEFAULT '{ execution_date } ',
68+ logical_date DATE DEFAULT '{ logical_date .date ()} ',
69+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
70+ );
71+ """ )
72+
73+ # 3. Создаем таблицу metrics с привязкой к DAG run
74+ print (f"3. Создаем таблицу 'metrics'..." )
75+ cursor .execute (f"""
76+ CREATE TABLE IF NOT EXISTS hospital.metrics (
77+ metric_id SERIAL PRIMARY KEY,
78+ metric_name VARCHAR(100) NOT NULL,
79+ metric_value TEXT,
80+ calculation_date DATE NOT NULL,
81+ dag_run_id VARCHAR(100) DEFAULT '{ dag_run_id } ',
82+ execution_date TIMESTAMP DEFAULT '{ execution_date } ',
83+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
84+ );
85+ """ )
86+
87+ # 4. Создаем таблицу для отслеживания инициализаций по дате
88+ print (f"4. Создаем таблицу 'schema_init_log'..." )
89+ cursor .execute (f"""
90+ CREATE TABLE IF NOT EXISTS hospital.schema_init_log (
91+ init_id SERIAL PRIMARY KEY,
92+ dag_run_id VARCHAR(100) NOT NULL,
93+ execution_date TIMESTAMP NOT NULL,
94+ logical_date DATE NOT NULL,
95+ schema_version VARCHAR(20) DEFAULT '1.0',
96+ init_date DATE DEFAULT '{ logical_date .date ()} ',
97+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
98+ CONSTRAINT unique_dag_run UNIQUE(dag_run_id)
99+ );
100+ """ )
101+
102+ # 5. Записываем лог инициализации
103+ cursor .execute ("""
104+ INSERT INTO hospital.schema_init_log
105+ (dag_run_id, execution_date, logical_date)
106+ VALUES (%s, %s, %s)
107+ ON CONFLICT (dag_run_id)
108+ DO UPDATE SET
109+ execution_date = EXCLUDED.execution_date,
110+ created_at = CURRENT_TIMESTAMP
111+ """ , (dag_run_id , execution_date , logical_date .date ()))
112+
113+ # 6. Создаем индексы
114+ print (f"5. Создаем индексы..." )
115+ cursor .execute ("""
116+ CREATE INDEX IF NOT EXISTS idx_visits_date ON hospital.visits(visit_date);
117+ CREATE INDEX IF NOT EXISTS idx_visits_patient ON hospital.visits(patient_id);
118+ CREATE INDEX IF NOT EXISTS idx_visits_doctor ON hospital.visits(doctor_id);
119+ CREATE INDEX IF NOT EXISTS idx_metrics_date ON hospital.metrics(calculation_date);
120+ CREATE INDEX IF NOT EXISTS idx_visits_dag_run ON hospital.visits(dag_run_id);
121+ CREATE INDEX IF NOT EXISTS idx_metrics_dag_run ON hospital.metrics(dag_run_id);
122+ """ )
123+
124+ # 7. Проверяем созданные таблицы
125+ cursor .execute ("""
126+ SELECT
127+ table_name,
128+ table_type
129+ FROM information_schema.tables
130+ WHERE table_schema = 'hospital'
131+ ORDER BY table_name;
132+ """ )
133+
134+ tables = cursor .fetchall ()
135+
136+ print (f"\n Созданы таблицы в схеме 'hospital':" )
137+ for table_name , table_type in tables :
138+ print (f" • { table_name } ({ table_type } )" )
139+
140+ # 8. Проверяем запись в логе
141+ cursor .execute ("""
142+ SELECT
143+ COUNT(*) as table_count,
144+ MAX(created_at) as last_init
145+ FROM hospital.schema_init_log;
146+ """ )
147+
148+ log_info = cursor .fetchone ()
149+
150+ print (f"\n Статистика инициализации:" )
151+ print (f" Всего инициализаций: { log_info [0 ]} " )
152+ print (f" Последняя инициализация: { log_info [1 ]} " )
153+
154+ cursor .close ()
155+ conn .close ()
156+
157+ print (f"\n " + "=" * 60 )
158+ print (f" ИНИЦИАЛИЗАЦИЯ ЗАВЕРШЕНА С ИСПОЛЬЗОВАНИЕМ КОНТЕКСТА" )
159+ print ("=" * 60 )
160+ print (f"DAG Run ID: { dag_run_id } " )
161+ print (f"Execution Date: { execution_date } " )
162+ print (f"Logical Date: { logical_date } " )
163+ print (f"Init Date: { ds } " )
164+
165+ return {
166+ 'status' : 'success' ,
167+ 'dag_run_id' : dag_run_id ,
168+ 'execution_date' : str (execution_date ),
169+ 'logical_date' : str (logical_date ),
170+ 'tables_created' : len (tables ),
171+ 'init_timestamp' : str (datetime .now ())
172+ }
173+
174+
175+ # Создаем DAG
16176dag = DAG (
17- 'init_hospital_schema ' ,
177+ 'init_hospital_schema_context ' ,
18178 default_args = default_args ,
19- description = 'Инициализация схемы для медицинской клиники ' ,
179+ description = 'Инициализация схемы с использованием даты из контекста DAG run ' ,
20180 schedule = None ,
21181 catchup = False ,
22- tags = ['hospital' , 'init' ]
182+ tags = ['hospital' , 'init' , 'context' ]
23183)
24184
185+ # Определяем задачи
25186start = EmptyOperator (task_id = 'start' , dag = dag )
26187end = EmptyOperator (task_id = 'end' , dag = dag )
27188
28- # Создание схемы hospital
29- create_schema = SQLExecuteQueryOperator (
30- task_id = 'create_hospital_schema' ,
31- conn_id = 'postgres_default' ,
32- sql = """
33- CREATE SCHEMA IF NOT EXISTS hospital;
34- """ ,
35- dag = dag
36- )
37-
38- # Создание таблицы visits
39- create_visits_table = SQLExecuteQueryOperator (
40- task_id = 'create_visits_table' ,
41- conn_id = 'postgres_default' ,
42- sql = """
43- CREATE TABLE IF NOT EXISTS hospital.visits (
44- visit_id BIGINT PRIMARY KEY,
45- patient_id BIGINT NOT NULL,
46- doctor_id BIGINT NOT NULL,
47- visit_date DATE NOT NULL,
48- diagnosis TEXT,
49- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
50- );
51- """ ,
52- dag = dag
53- )
54-
55- # Создание таблицы для метрик
56- create_metrics_table = SQLExecuteQueryOperator (
57- task_id = 'create_metrics_table' ,
58- conn_id = 'postgres_default' ,
59- sql = """
60- CREATE TABLE IF NOT EXISTS hospital.metrics (
61- metric_id SERIAL PRIMARY KEY,
62- metric_name VARCHAR(100) NOT NULL,
63- metric_value JSONB,
64- calculation_date DATE NOT NULL,
65- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
66- );
67- """ ,
68- dag = dag
69- )
70-
71- # Создание индексов для производительности
72- create_indexes = SQLExecuteQueryOperator (
73- task_id = 'create_indexes' ,
74- conn_id = 'postgres_default' ,
75- sql = """
76- CREATE INDEX IF NOT EXISTS idx_visits_date ON hospital.visits(visit_date);
77- CREATE INDEX IF NOT EXISTS idx_visits_patient ON hospital.visits(patient_id);
78- CREATE INDEX IF NOT EXISTS idx_visits_doctor ON hospital.visits(doctor_id);
79- CREATE INDEX IF NOT EXISTS idx_metrics_date ON hospital.metrics(calculation_date);
80- """ ,
189+ # Единственная задача - PythonOperator, который принимает контекст
190+ init_schema_task = PythonOperator (
191+ task_id = 'init_schema_with_context' ,
192+ python_callable = init_schema_with_context ,
193+ provide_context = True , # Ключевой параметр! Передает контекст DAG run
81194 dag = dag
82195)
83196
84197# Определение порядка выполнения задач
85- start >> create_schema >> create_visits_table >> create_metrics_table >> create_indexes >> end
198+ start >> init_schema_task >> end
0 commit comments