11from airflow import DAG
22from airflow .operators .python import PythonOperator
33from airflow .operators .empty import EmptyOperator
4- from airflow .providers .postgres .hooks .postgres import PostgresHook
54from datetime import datetime , timedelta
5+ import psycopg2
66
77default_args = {
88 'owner' : 'MelnikovaAlisa' ,
1515}
1616
1717
18- def init_schema_with_context (** context ):
18+ def init_schema_incremental (** kwargs ):
1919 """
20- Инициализация схемы hospital с использованием даты из контекста DAG run
21-
22- Параметры из контекста:
23- - execution_date: фактическая дата выполнения DAG
24- - logical_date: логическая дата (для расписания)
25- - dag_run.run_id: уникальный ID запуска DAG
26- - ds: дата выполнения в формате YYYY-MM-DD
20+ Инициализация схемы hospital с использованием даты из контекста
2721 """
28-
29- # Получаем параметры из контекста DAG run
30- execution_date = context ['execution_date' ]
31- logical_date = context ['logical_date' ]
32- dag_run_id = context ['dag_run' ].run_id
33- ds = context ['ds' ] # Дата выполнения в формате YYYY-MM-DD
22+ # Получаем дату из контекста
23+ execution_date = kwargs ['logical_date' ].date ()
24+ dag_run_id = kwargs ['dag_run' ].run_id
3425
3526 print ("=" * 60 )
36- print ("ИНИЦИАЛИЗАЦИЯ СХЕМЫ HOSPITAL С КОНТЕКСТОМ " )
27+ print (f "ИНИЦИАЛИЗАЦИЯ СХЕМЫ HOSPITAL" )
3728 print ("=" * 60 )
38-
39- print (f" Параметры из контекста DAG run:" )
40- print (f" • Execution Date: { execution_date } " )
41- print (f" • Logical Date: { logical_date } " )
42- print (f" • DAG Run ID: { dag_run_id } " )
43- print (f" • DS (дата): { ds } " )
44-
45- # Используем дату из контекста для именования
46- date_suffix = ds .replace ('-' , '' ) # 2024-01-15 -> 20240115
47-
48- hook = PostgresHook (postgres_conn_id = 'postgres_default' )
49- conn = hook .get_conn ()
29+ print (f" Дата из контекста DAG run: { execution_date } " )
30+ print (f" DAG Run ID: { dag_run_id } " )
31+
32+ # Подключаемся к PostgreSQL
33+ conn = psycopg2 .connect (
34+ host = 'localhost' ,
35+ port = 5432 ,
36+ dbname = 'airflow' ,
37+ user = 'airflow' ,
38+ password = 'airflow'
39+ )
5040 conn .autocommit = True
5141 cursor = conn .cursor ()
5242
53- # 1. Создаем основную схему hospital
54- print (f"\n 1. Создаем схему 'hospital'..." )
55- cursor .execute ("CREATE SCHEMA IF NOT EXISTS hospital;" )
43+ # Проверяем, существует ли уже схема для этой даты
44+ cursor .execute ("""
45+ SELECT EXISTS(
46+ SELECT 1 FROM information_schema.schemata
47+ WHERE schema_name = 'hospital'
48+ );
49+ """ )
50+ schema_exists = cursor .fetchone ()[0 ]
51+
52+ if not schema_exists :
53+ print ("1. Создаем схему 'hospital'..." )
54+ cursor .execute ("CREATE SCHEMA hospital;" )
55+ else :
56+ print ("1. Схема 'hospital' уже существует" )
5657
57- # 2. Создаем таблицу visits с дополнительными полями для отслеживания DAG run
58- print (f "2. Создаем таблицу 'visits'..." )
58+ # Создаем таблицу visits с датой из контекста
59+ print ("2. Создаем/проверяем таблицу 'visits'..." )
5960 cursor .execute (f"""
6061 CREATE TABLE IF NOT EXISTS hospital.visits (
6162 visit_id BIGINT PRIMARY KEY,
6263 patient_id BIGINT NOT NULL,
6364 doctor_id BIGINT NOT NULL,
6465 visit_date DATE NOT NULL,
6566 diagnosis TEXT,
67+ -- Добавляем поля для отслеживания как у коллеги
68+ dag_run_date DATE DEFAULT '{ execution_date } ',
6669 dag_run_id VARCHAR(100) DEFAULT '{ dag_run_id } ',
67- execution_date TIMESTAMP DEFAULT '{ execution_date } ',
68- logical_date DATE DEFAULT '{ logical_date .date ()} ',
69- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
70+ loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
7071 );
7172 """ )
7273
73- # 3. Создаем таблицу metrics с привязкой к DAG run
74- print (f "3. Создаем таблицу 'metrics'..." )
74+ # Создаем таблицу metrics с датой из контекста
75+ print ("3. Создаем/проверяем таблицу 'metrics'..." )
7576 cursor .execute (f"""
7677 CREATE TABLE IF NOT EXISTS hospital.metrics (
7778 metric_id SERIAL PRIMARY KEY,
7879 metric_name VARCHAR(100) NOT NULL,
7980 metric_value TEXT,
80- calculation_date DATE NOT NULL,
81+ calculation_date DATE NOT NULL DEFAULT '{ execution_date } ',
82+ dag_run_date DATE DEFAULT '{ execution_date } ',
8183 dag_run_id VARCHAR(100) DEFAULT '{ dag_run_id } ',
82- execution_date TIMESTAMP DEFAULT '{ execution_date } ',
8384 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
8485 );
8586 """ )
8687
87- # 4. Создаем таблицу для отслеживания инициализаций по дате
88- print (f"4. Создаем таблицу 'schema_init_log'..." )
89- cursor .execute (f"""
90- CREATE TABLE IF NOT EXISTS hospital.schema_init_log (
91- init_id SERIAL PRIMARY KEY,
92- dag_run_id VARCHAR(100) NOT NULL,
93- execution_date TIMESTAMP NOT NULL,
94- logical_date DATE NOT NULL,
95- schema_version VARCHAR(20) DEFAULT '1.0',
96- init_date DATE DEFAULT '{ logical_date .date ()} ',
97- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
98- CONSTRAINT unique_dag_run UNIQUE(dag_run_id)
99- );
100- """ )
101-
102- # 5. Записываем лог инициализации
103- cursor .execute ("""
104- INSERT INTO hospital.schema_init_log
105- (dag_run_id, execution_date, logical_date)
106- VALUES (%s, %s, %s)
107- ON CONFLICT (dag_run_id)
108- DO UPDATE SET
109- execution_date = EXCLUDED.execution_date,
110- created_at = CURRENT_TIMESTAMP
111- """ , (dag_run_id , execution_date , logical_date .date ()))
112-
113- # 6. Создаем индексы
114- print (f"5. Создаем индексы..." )
88+ # Создаем индексы
89+ print ("4. Создаем индексы..." )
11590 cursor .execute ("""
11691 CREATE INDEX IF NOT EXISTS idx_visits_date ON hospital.visits(visit_date);
117- CREATE INDEX IF NOT EXISTS idx_visits_patient ON hospital.visits(patient_id);
118- CREATE INDEX IF NOT EXISTS idx_visits_doctor ON hospital.visits(doctor_id);
119- CREATE INDEX IF NOT EXISTS idx_metrics_date ON hospital.metrics(calculation_date);
120- CREATE INDEX IF NOT EXISTS idx_visits_dag_run ON hospital.visits(dag_run_id);
121- CREATE INDEX IF NOT EXISTS idx_metrics_dag_run ON hospital.metrics(dag_run_id);
92+ CREATE INDEX IF NOT EXISTS idx_visits_dag_date ON hospital.visits(dag_run_date);
12293 """ )
12394
124- # 7. Проверяем созданные таблицы
95+ # Логируем инициализацию для этой даты
96+ print ("5. Логируем инициализацию..." )
12597 cursor .execute ("""
126- SELECT
127- table_name,
128- table_type
129- FROM information_schema.tables
130- WHERE table_schema = 'hospital'
131- ORDER BY table_name;
98+ CREATE TABLE IF NOT EXISTS hospital.init_log (
99+ log_id SERIAL PRIMARY KEY,
100+ init_date DATE NOT NULL,
101+ dag_run_id VARCHAR(100) NOT NULL,
102+ schema_created BOOLEAN DEFAULT TRUE,
103+ tables_created INTEGER,
104+ init_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
105+ );
132106 """ )
133107
134- tables = cursor .fetchall ()
135-
136- print ( f" \n Созданы таблицы в схеме 'hospital':" )
137- for table_name , table_type in tables :
138- print ( f" • { table_name } ( { table_type } )" )
108+ cursor .execute ( """
109+ INSERT INTO hospital.init_log
110+ (init_date, dag_run_id, schema_created, tables_created )
111+ VALUES (%s, %s, %s, %s)
112+ """ , ( execution_date , dag_run_id , True , 2 ) )
139113
140- # 8. Проверяем запись в логе
114+ # Проверяем результат
141115 cursor .execute ("""
142- SELECT
143- COUNT(*) as table_count,
144- MAX(created_at) as last_init
145- FROM hospital.schema_init_log;
146- """ )
116+ SELECT COUNT(*) FROM hospital.init_log
117+ WHERE init_date = %s
118+ """ , (execution_date ,))
147119
148- log_info = cursor .fetchone ()
120+ log_count = cursor .fetchone ()[ 0 ]
149121
150- print (f"\n Статистика инициализации:" )
151- print (f" Всего инициализаций: { log_info [0 ]} " )
152- print (f" Последняя инициализация: { log_info [1 ]} " )
122+ print (f"\n Инициализация завершена для даты: { execution_date } " )
123+ print (f" Статистика:" )
124+ print (f" • Дата DAG run: { execution_date } " )
125+ print (f" • DAG Run ID: { dag_run_id } " )
126+ print (f" • Записей в логе: { log_count } " )
153127
154128 cursor .close ()
155129 conn .close ()
156130
157- print (f"\n " + "=" * 60 )
158- print (f" ИНИЦИАЛИЗАЦИЯ ЗАВЕРШЕНА С ИСПОЛЬЗОВАНИЕМ КОНТЕКСТА" )
159- print ("=" * 60 )
160- print (f"DAG Run ID: { dag_run_id } " )
161- print (f"Execution Date: { execution_date } " )
162- print (f"Logical Date: { logical_date } " )
163- print (f"Init Date: { ds } " )
164-
165- return {
166- 'status' : 'success' ,
167- 'dag_run_id' : dag_run_id ,
168- 'execution_date' : str (execution_date ),
169- 'logical_date' : str (logical_date ),
170- 'tables_created' : len (tables ),
171- 'init_timestamp' : str (datetime .now ())
172- }
131+ return f"init_completed_for_{ execution_date } "
173132
174133
175134# Создаем DAG
176135dag = DAG (
177- 'init_hospital_schema_context ' ,
136+ 'init_hospital_schema_incremental ' ,
178137 default_args = default_args ,
179- description = 'Инициализация схемы с использованием даты из контекста DAG run' ,
180- schedule = None ,
181- catchup = False ,
182- tags = ['hospital' , 'init' , 'context ' ]
138+ description = 'Инициализация схемы hospital с датой из контекста DAG run' ,
139+ schedule_interval = '0 0 * * *' , # Ежедневно в полночь
140+ catchup = True , # Запускать пропущенные DAG runs
141+ tags = ['hospital' , 'init' , 'incremental ' ]
183142)
184143
185144# Определяем задачи
186145start = EmptyOperator (task_id = 'start' , dag = dag )
187146end = EmptyOperator (task_id = 'end' , dag = dag )
188147
189- # Единственная задача - PythonOperator, который принимает контекст
190148init_schema_task = PythonOperator (
191- task_id = 'init_schema_with_context ' ,
192- python_callable = init_schema_with_context ,
193- provide_context = True , # Ключевой параметр! Передает контекст DAG run
149+ task_id = 'init_schema_incremental ' ,
150+ python_callable = init_schema_incremental ,
151+ provide_context = True ,
194152 dag = dag
195153)
196154
197- # Определение порядка выполнения задач
155+ # Порядок выполнения
198156start >> init_schema_task >> end
0 commit comments