Skip to content

Commit a4ffe52

Browse files
authored
Merge pull request #57 from zezOtik/Zhukova_lab5
Zhukova lab5
2 parents b8c11c5 + 2145023 commit a4ffe52

File tree

10 files changed

+539
-0
lines changed

10 files changed

+539
-0
lines changed
2.36 MB
Binary file not shown.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AIRFLOW_UID=50000
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM apache/airflow:2.10.5
2+
3+
# Become root to install requirements
4+
USER root
5+
6+
ADD requirements.txt requirements.txt
7+
8+
# Switch back to airflow user
9+
USER airflow
10+
11+
RUN pip install --user --no-cache-dir -r requirements.txt
12+
13+
# Set project name argument
14+
# Example: PROJECT=mymod
15+
#!!!!! Use only lowercase letters
16+
ARG PROJECT=bmstutest
17+
18+
COPY --chown=airflow:root dags /opt/airflow/dags/${PROJECT}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from airflow import DAG
2+
from datetime import datetime
3+
from airflow.operators.python import PythonOperator
4+
from sqlalchemy import create_engine, text
5+
6+
7+
ARGS = {
8+
"owner": "Zhukova_Mariya",
9+
"email": ['test_user_var_2@email.ru','2@email.ru'],
10+
"email_on_failure": True,
11+
"email_on_retry": False,
12+
"start_date": datetime(2025, 3, 20), # дата первой записи в базе
13+
"pool": "default_pool",
14+
"queue": "default"
15+
}
16+
17+
18+
def calculate_and_save_metrics(run_date: str):
19+
"""Рассчитывает метрики за конкретную дату из контекста Airflow"""
20+
print(f"Обработка метрик за дату: {run_date}")
21+
22+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
23+
24+
upsert_data_query = """
25+
INSERT INTO market.daily_sales_metrics(report_date, total_sales_amount)
26+
SELECT :run_date, COALESCE(SUM(amount), 0)
27+
FROM market.sales
28+
WHERE sale_date = :run_date
29+
ON CONFLICT (report_date)
30+
DO UPDATE SET total_sales_amount = EXCLUDED.total_sales_amount
31+
"""
32+
33+
with engine.connect() as conn:
34+
conn.execute(text(upsert_data_query), {"run_date": run_date})
35+
36+
check_amount_query = """
37+
SELECT total_sales_amount FROM market.daily_sales_metrics
38+
WHERE report_date = :run_date
39+
"""
40+
41+
with engine.connect() as conn:
42+
amount_result = conn.execute(text(check_amount_query), {"run_date": run_date}).fetchone()
43+
total_amount = amount_result[0] if amount_result else 0
44+
45+
print(f"Метрики за {run_date} успешно обновлены. Общая сумма продаж: {total_amount}")
46+
47+
48+
with DAG(dag_id='collect_data',
49+
default_args=ARGS,
50+
schedule_interval='@daily',
51+
max_active_runs=1,
52+
start_date=datetime(2025, 3, 20),
53+
catchup=True,
54+
tags=['lab5']) as dag:
55+
56+
t_process_daily = PythonOperator(
57+
task_id='calculate_daily_metrics',
58+
dag=dag,
59+
python_callable=calculate_and_save_metrics,
60+
op_kwargs={"run_date": "{{ ds }}"} # Передаем дату из контекста Airflow
61+
)
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import csv
2+
from airflow import DAG
3+
from datetime import datetime
4+
from airflow.operators.python import PythonOperator
5+
from sqlalchemy import create_engine, text
6+
7+
8+
ARGS = {
9+
"owner": "Zhukova_Mariya",
10+
"email": ['test_user_var_2@email.ru','2@email.ru'],
11+
"email_on_failure": True,
12+
"email_on_retry": False,
13+
"start_date": datetime(2025, 11, 20),
14+
"pool": "default_pool",
15+
"queue": "default"
16+
}
17+
18+
CSV_FILE_PATH = '/opt/airflow/dags/sales_data.csv'
19+
20+
def truncate_table():
21+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
22+
23+
truncate_table_query = "TRUNCATE TABLE market.sales;"
24+
25+
with engine.connect() as conn:
26+
conn.execute(text(truncate_table_query))
27+
28+
print("Таблица 'sales' очищена!")
29+
30+
31+
def init_sales():
32+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
33+
34+
rows_to_insert = []
35+
with open(CSV_FILE_PATH, 'r') as file:
36+
reader = csv.reader(file)
37+
next(reader)
38+
39+
for row in reader:
40+
id = int(row[0])
41+
product_name = row[1]
42+
sale_date = row[2]
43+
amount = float(row[3])
44+
45+
rows_to_insert.append({
46+
'id': id,
47+
'product_name': product_name,
48+
'sale_date': sale_date,
49+
'amount': amount
50+
})
51+
52+
insert_data_query = """
53+
INSERT INTO market.sales (id, product_name, sale_date, amount)
54+
VALUES (:id, :product_name, :sale_date, :amount)
55+
"""
56+
57+
with engine.connect() as conn:
58+
conn.execute(text(insert_data_query), rows_to_insert)
59+
60+
print(f"Таблица 'sales' наполнена {len(rows_to_insert)} записями!")
61+
62+
63+
with DAG(dag_id='init_data',
64+
default_args=ARGS,
65+
schedule_interval='@once',
66+
max_active_runs=1,
67+
start_date=datetime(2025, 3, 20),
68+
catchup=True,
69+
tags=['lab5']) as dag:
70+
71+
t_truncate_table = PythonOperator(
72+
task_id='truncate_table',
73+
dag=dag,
74+
python_callable=truncate_table
75+
)
76+
77+
t_insert_data = PythonOperator(
78+
task_id='insert_data',
79+
dag=dag,
80+
python_callable=init_sales
81+
)
82+
83+
run = t_truncate_table >> t_insert_data
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from airflow import DAG
2+
from datetime import datetime
3+
from airflow.operators.python import PythonOperator
4+
from sqlalchemy import create_engine, text
5+
6+
7+
ARGS = {
8+
"owner": "Zhukova_Mariya",
9+
"email": ['test_user_var_2@email.ru','2@email.ru'],
10+
"email_on_failure": True,
11+
"email_on_retry": False,
12+
"start_date": datetime(2025, 11, 20),
13+
"pool": "default_pool",
14+
"queue": "default"
15+
}
16+
17+
18+
def create_schema():
19+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
20+
21+
create_schema_query = """
22+
CREATE SCHEMA IF NOT EXISTS market;
23+
"""
24+
25+
with engine.connect() as conn:
26+
conn.execute(text(create_schema_query))
27+
28+
print("Схема 'market' успешно создана!")
29+
30+
31+
def create_table():
32+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
33+
34+
create_table_query = """
35+
CREATE TABLE IF NOT EXISTS market.sales (
36+
id BIGINT PRIMARY KEY,
37+
product_name TEXT NOT NULL,
38+
sale_date DATE NOT NULL,
39+
amount DECIMAL(10,2) NOT NULL
40+
);
41+
42+
CREATE TABLE IF NOT EXISTS market.daily_sales_metrics (
43+
report_date DATE PRIMARY KEY,
44+
total_sales_amount DECIMAL(15,2) NOT NULL
45+
);
46+
"""
47+
48+
with engine.connect() as conn:
49+
conn.execute(text(create_table_query))
50+
51+
print("Таблицы 'sales' и 'daily_sales_metrics' успешно созданы!")
52+
53+
54+
with DAG(dag_id='init_schema',
55+
default_args=ARGS,
56+
schedule_interval='@once',
57+
max_active_runs=1,
58+
start_date=datetime(2025, 3, 20),
59+
catchup=True,
60+
tags=['lab5']) as dag:
61+
62+
t_create_schema = PythonOperator(
63+
task_id='create_schema',
64+
dag=dag,
65+
python_callable=create_schema
66+
)
67+
68+
t_create_table = PythonOperator(
69+
task_id='create_table',
70+
dag=dag,
71+
python_callable=create_table
72+
)
73+
74+
run = t_create_schema >> t_create_table
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
id,product_name,sale_date,amount
2+
1,Laptop,2025-03-20,999.99
3+
2,Smartphone,2025-03-20,599.50
4+
3,Tablet,2025-03-20,399.99
5+
4,Headphones,2025-03-20,149.99
6+
5,Monitor,2025-03-20,299.99
7+
6,Keyboard,2025-03-21,79.99
8+
7,Mouse,2025-03-21,29.99
9+
8,Printer,2025-03-21,199.99
10+
9,Camera,2025-03-21,449.50
11+
10,Smartwatch,2025-03-21,199.99
12+
11,Laptop,2025-03-22,1099.99
13+
12,Smartphone,2025-03-22,699.50
14+
13,Tablet,2025-03-22,349.99

0 commit comments

Comments
 (0)