Skip to content

Commit a6d2794

Browse files
authored
Merge pull request #61 from zezOtik/Nikulina_lab5
lab5
2 parents a4ffe52 + b14805b commit a6d2794

File tree

13 files changed

+2537
-0
lines changed

13 files changed

+2537
-0
lines changed

students_folder/Nikulina/lab5/.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AIRFLOW_UID=50000
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
logs/
2+
*.log
3+
pycache/
4+
*.pyc
5+
dag_processor_manager/
6+
scheduler/
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM apache/airflow:2.10.5
2+
3+
# Become root to install requirements
4+
USER root
5+
6+
ADD requirements.txt requirements.txt
7+
8+
# Switch back to airflow user
9+
USER airflow
10+
11+
RUN pip install --user --no-cache-dir -r requirements.txt
12+
13+
# Set project name argument
14+
# Example: PROJECT=mymod
15+
#!!!!! Use only lowercase letters
16+
ARG PROJECT=bmstutest
17+
18+
COPY --chown=airflow:root dags /opt/airflow/dags/${PROJECT}
1.5 MB
Binary file not shown.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from airflow import DAG
2+
from datetime import datetime
3+
from airflow.operators.python import PythonOperator
4+
from sqlalchemy import create_engine, text
5+
6+
ARGS = {
7+
"owner": "bmstu",
8+
"email": ['wzomzot@hop.ru','1@mail.ru'],
9+
"email_on_failure": True,
10+
"email_on_retry": False,
11+
"start_date": datetime(2025, 3, 20),
12+
"pool": "default_pool",
13+
"queue": "default"
14+
}
15+
16+
def calculate_metric(**kwargs):
17+
execution_date = kwargs['logical_date'].date()
18+
print(f"Расчет метрики для даты: {execution_date}")
19+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
20+
calc_query = """
21+
SELECT ed_tech.get_unique_users_by_date(:input_date) AS unique_users;
22+
"""
23+
with engine.connect() as conn:
24+
result = conn.execute(text(calc_query), {"input_date": execution_date})
25+
unique_users = result.scalar()
26+
27+
print(f"Количество уникальных пользователей за {execution_date}: {unique_users}")
28+
kwargs['ti'].xcom_push(key='metric_result', value=unique_users)
29+
30+
with DAG(
31+
dag_id='calculate_metrics_variant5',
32+
default_args=ARGS,
33+
schedule_interval='0 8 * * *',
34+
max_active_runs=1,
35+
start_date=datetime(2025, 3, 20),
36+
catchup=True,
37+
tags=['lab5', 'variant5']
38+
) as dag:
39+
40+
t_calculate_metric = PythonOperator(
41+
task_id='calculate_metric',
42+
python_callable=calculate_metric,
43+
provide_context=True
44+
)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import random
2+
from datetime import datetime, timedelta
3+
import csv
4+
import os
5+
6+
courses = [
7+
'Python for Beginners',
8+
'Data Science Fundamentals',
9+
'Web Development with Flask',
10+
'Machine Learning Basics',
11+
'SQL Masterclass'
12+
]
13+
start_date = datetime(2025, 4, 1)
14+
end_date = datetime(2025, 6, 30)
15+
16+
data = []
17+
for _ in range(2000):
18+
user_id = random.randint(1, 500)
19+
course_name = random.choice(courses)
20+
days_diff = (end_date - start_date).days
21+
random_days = random.randint(0, days_diff)
22+
completion_date = start_date + timedelta(days=random_days)
23+
data.append((user_id, course_name, completion_date.date()))
24+
25+
output_file = "input_data/input_data.csv"
26+
27+
os.makedirs("input_data", exist_ok=True)
28+
29+
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
30+
writer = csv.writer(csvfile)
31+
writer.writerow(['user_id', 'course_name', 'completion_date'])
32+
writer.writerows(data)
33+
34+
print(f"Успешно сгенерировано {len(data)} строк и сохранено в {output_file}")
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from airflow import DAG
2+
from datetime import datetime
3+
from airflow.operators.python import PythonOperator
4+
from sqlalchemy import create_engine, text
5+
6+
ARGS = {
7+
"owner": "bmstu",
8+
"email": ['wzomzot@hop.ru','1@mail.ru'],
9+
"email_on_failure": True,
10+
"email_on_retry": False,
11+
"start_date": datetime(2025, 3, 20),
12+
"pool": "default_pool",
13+
"queue": "default"
14+
}
15+
16+
def create_schema():
17+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
18+
create_schema_query = """
19+
CREATE SCHEMA IF NOT EXISTS ed_tech;
20+
"""
21+
with engine.connect() as conn:
22+
conn.execute(text(create_schema_query))
23+
print("Схема 'ed_tech' успешно создана!")
24+
25+
def create_table():
26+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
27+
create_table_query = """
28+
CREATE TABLE IF NOT EXISTS ed_tech.course_completions
29+
"""
30+
with engine.connect() as conn:
31+
conn.execute(text(create_table_query))
32+
print("Таблица 'course_completions' успешно создана!")
33+
34+
def create_metric_function():
35+
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
36+
create_func_query = """
37+
CREATE OR REPLACE FUNCTION ed_tech.get_unique_users_by_date(input_date DATE)
38+
"""
39+
with engine.connect() as conn:
40+
conn.execute(text(create_func_query))
41+
print("Функция 'get_unique_users_by_date' успешно создана!")
42+
43+
with DAG(
44+
dag_id='init_schema_variant5',
45+
default_args=ARGS,
46+
schedule_interval='@once',
47+
max_active_runs=1,
48+
start_date=datetime(2025, 3, 20),
49+
catchup=False,
50+
tags=['lab5', 'variant5']
51+
) as dag:
52+
53+
t_create_schema = PythonOperator(
54+
task_id='create_schema',
55+
python_callable=create_schema
56+
)
57+
58+
t_create_table = PythonOperator(
59+
task_id='create_table',
60+
python_callable=create_table
61+
)
62+
63+
t_create_func = PythonOperator(
64+
task_id='create_metric_function',
65+
python_callable=create_metric_function
66+
)
67+
68+
t_create_schema >> t_create_table >> t_create_func

0 commit comments

Comments
 (0)