1616
1717
1818def calculate_and_save_metrics (run_date : str ):
19+ """Рассчитывает метрики за конкретную дату из контекста Airflow"""
1920 print (f"Обработка метрик за дату: { run_date } " )
2021
2122 engine = create_engine ("postgresql+psycopg2://airflow:airflow@postgres/airflow" )
@@ -44,35 +45,17 @@ def calculate_and_save_metrics(run_date: str):
4445 print (f"Метрики за { run_date } успешно обновлены. Общая сумма продаж: { total_amount } " )
4546
4647
47- def collect_sales_per_day ():
48- """Обработка всех дат из таблицы sales"""
49- engine = create_engine ("postgresql+psycopg2://airflow:airflow@postgres/airflow" )
50-
51- # Получаем все уникальные даты из sales
52- with engine .connect () as conn :
53- result = conn .execute (text ("""
54- SELECT DISTINCT sale_date
55- FROM market.sales
56- ORDER BY sale_date
57- """ ))
58- dates = [row [0 ] for row in result ]
59-
60- print (f"Найдены даты для обработки: { dates } " )
61-
62- # Обрабатываем каждую дату
63- for date in dates :
64- calculate_and_save_metrics (str (date ))
65-
6648with DAG (dag_id = 'collect_data' ,
6749 default_args = ARGS ,
68- schedule_interval = '@once' , # или daily - если ежедневно
50+ schedule_interval = '@daily' ,
6951 max_active_runs = 1 ,
7052 start_date = datetime (2025 , 3 , 20 ),
7153 catchup = True ,
7254 tags = ['lab5' ]) as dag :
7355
74- t_process_all = PythonOperator (
75- task_id = 'collect_sales_per_day ' ,
56+ t_process_daily = PythonOperator (
57+ task_id = 'calculate_daily_metrics ' ,
7658 dag = dag ,
77- python_callable = collect_sales_per_day
59+ python_callable = calculate_and_save_metrics ,
60+ op_kwargs = {"run_date" : "{{ ds }}" } # Передаем дату из контекста Airflow
7861 )
0 commit comments