Skip to content
Merged

lab5 #61

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions students_folder/Nikulina/lab5/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AIRFLOW_UID=50000
6 changes: 6 additions & 0 deletions students_folder/Nikulina/lab5/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
logs/
*.log
pycache/
*.pyc
dag_processor_manager/
scheduler/
18 changes: 18 additions & 0 deletions students_folder/Nikulina/lab5/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM apache/airflow:2.10.5

# Become root to install requirements
USER root

ADD requirements.txt requirements.txt

# Switch back to airflow user
USER airflow

RUN pip install --user --no-cache-dir -r requirements.txt

# Set project name argument
# Example: PROJECT=mymod
#!!!!! Use only lowercase letters
ARG PROJECT=bmstutest

COPY --chown=airflow:root dags /opt/airflow/dags/${PROJECT}
Binary file not shown.
44 changes: 44 additions & 0 deletions students_folder/Nikulina/lab5/dags/calculate_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from sqlalchemy import create_engine, text

ARGS = {
"owner": "bmstu",
"email": ['wzomzot@hop.ru','1@mail.ru'],
"email_on_failure": True,
"email_on_retry": False,
"start_date": datetime(2025, 3, 20),
"pool": "default_pool",
"queue": "default"
}

def calculate_metric(**kwargs):
execution_date = kwargs['logical_date'].date()
print(f"Расчет метрики для даты: {execution_date}")
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
calc_query = """
SELECT ed_tech.get_unique_users_by_date(:input_date) AS unique_users;
"""
with engine.connect() as conn:
result = conn.execute(text(calc_query), {"input_date": execution_date})
unique_users = result.scalar()

print(f"Количество уникальных пользователей за {execution_date}: {unique_users}")
kwargs['ti'].xcom_push(key='metric_result', value=unique_users)

with DAG(
dag_id='calculate_metrics_variant5',
default_args=ARGS,
schedule_interval='0 8 * * *',
max_active_runs=1,
start_date=datetime(2025, 3, 20),
catchup=True,
tags=['lab5', 'variant5']
) as dag:

t_calculate_metric = PythonOperator(
task_id='calculate_metric',
python_callable=calculate_metric,
provide_context=True
)
34 changes: 34 additions & 0 deletions students_folder/Nikulina/lab5/dags/generate_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import random
from datetime import datetime, timedelta
import csv
import os

courses = [
'Python for Beginners',
'Data Science Fundamentals',
'Web Development with Flask',
'Machine Learning Basics',
'SQL Masterclass'
]
start_date = datetime(2025, 4, 1)
end_date = datetime(2025, 6, 30)

data = []
for _ in range(2000):
user_id = random.randint(1, 500)
course_name = random.choice(courses)
days_diff = (end_date - start_date).days
random_days = random.randint(0, days_diff)
completion_date = start_date + timedelta(days=random_days)
data.append((user_id, course_name, completion_date.date()))

output_file = "input_data/input_data.csv"

os.makedirs("input_data", exist_ok=True)

with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['user_id', 'course_name', 'completion_date'])
writer.writerows(data)

print(f"Успешно сгенерировано {len(data)} строк и сохранено в {output_file}")
68 changes: 68 additions & 0 deletions students_folder/Nikulina/lab5/dags/init_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from sqlalchemy import create_engine, text

ARGS = {
"owner": "bmstu",
"email": ['wzomzot@hop.ru','1@mail.ru'],
"email_on_failure": True,
"email_on_retry": False,
"start_date": datetime(2025, 3, 20),
"pool": "default_pool",
"queue": "default"
}

def create_schema():
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
create_schema_query = """
CREATE SCHEMA IF NOT EXISTS ed_tech;
"""
with engine.connect() as conn:
conn.execute(text(create_schema_query))
print("Схема 'ed_tech' успешно создана!")

def create_table():
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
create_table_query = """
CREATE TABLE IF NOT EXISTS ed_tech.course_completions
"""
with engine.connect() as conn:
conn.execute(text(create_table_query))
print("Таблица 'course_completions' успешно создана!")

def create_metric_function():
engine = create_engine("postgresql+psycopg2://airflow:airflow@postgres/airflow")
create_func_query = """
CREATE OR REPLACE FUNCTION ed_tech.get_unique_users_by_date(input_date DATE)
"""
with engine.connect() as conn:
conn.execute(text(create_func_query))
print("Функция 'get_unique_users_by_date' успешно создана!")

with DAG(
dag_id='init_schema_variant5',
default_args=ARGS,
schedule_interval='@once',
max_active_runs=1,
start_date=datetime(2025, 3, 20),
catchup=False,
tags=['lab5', 'variant5']
) as dag:

t_create_schema = PythonOperator(
task_id='create_schema',
python_callable=create_schema
)

t_create_table = PythonOperator(
task_id='create_table',
python_callable=create_table
)

t_create_func = PythonOperator(
task_id='create_metric_function',
python_callable=create_metric_function
)

t_create_schema >> t_create_table >> t_create_func
Loading