Skip to content
Merged

lab5 #65

Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions students_folder/zemliakov_alexey/lab5/airflow_folder/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AIRFLOW_UID=1000
14 changes: 14 additions & 0 deletions students_folder/zemliakov_alexey/lab5/airflow_folder/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM apache/airflow:2.10.5

# Установка системных зависимостей (только если они необходимы для компиляции)
USER root
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*

# Возврат к пользователю airflow
USER airflow

# Копирование и установка Python-зависимостей
COPY requirements.txt .
RUN pip install --no-cache-dir "apache-airflow==${AIRFLOW_VERSION}" -r requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

with DAG(
'collect_data',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False,
description='Расчет ежемесячной метрики новых сотрудников'
) as dag:

truncate_metrics = PostgresOperator(
task_id='truncate_metrics_table',
postgres_conn_id='airflow_db',
sql='TRUNCATE TABLE hr_monthly_hires;'
)

calculate_metrics = PostgresOperator(
task_id='calculate_monthly_hires',
postgres_conn_id='airflow_db',
sql='''
INSERT INTO hr_monthly_hires (month, count)
SELECT
TO_CHAR(hire_date, 'YYYY-MM') AS month,
COUNT(*) AS count
FROM hr_employees
GROUP BY month
ORDER BY month;
'''
)

truncate_metrics >> calculate_metrics
114 changes: 114 additions & 0 deletions students_folder/zemliakov_alexey/lab5/airflow_folder/dags/init_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from datetime import datetime, timedelta
import random
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook


def generate_random_employees(**context):
num_employees = 10000 # Количество сотрудников для генерации
batch_size = 1000 # Размер пакета для вставки

# Списки для генерации случайных данных
male_first_names = ['Иван', 'Петр', 'Сидор', 'Алексей', 'Дмитрий', 'Сергей',
'Михаил', 'Андрей', 'Николай', 'Василий', 'Александр', 'Максим',
'Евгений', 'Владимир', 'Борис', 'Георгий', 'Станислав', 'Роман',
'Павел', 'Константин', 'Виктор', 'Игорь', 'Артем', 'Даниил']

female_first_names = ['Мария', 'Анна', 'Екатерина', 'Ольга', 'Наталья', 'Светлана',
'Татьяна', 'Елена', 'Юлия', 'Ирина', 'Анастасия', 'Дарья',
'Полина', 'Виктория', 'Ксения', 'Александра', 'Маргарита', 'Валерия',
'Алина', 'Зоя', 'Людмила', 'Вера', 'Раиса', 'Марина']

last_names_male = ['Иванов', 'Петров', 'Сидоров', 'Козлов', 'Смирнов', 'Попов',
'Лебедев', 'Новиков', 'Морозов', 'Волков', 'Соколов', 'Белов',
'Михайлов', 'Федоров', 'Алексеев', 'Дмитриев', 'Сергеев', 'Егоров',
'Степанов', 'Николаев', 'Григорьев', 'Ильин', 'Орлов', 'Титов']

last_names_female = ['Иванова', 'Петрова', 'Сидорова', 'Козлова', 'Смирнова', 'Попова',
'Лебедева', 'Новикова', 'Морозова', 'Волкова', 'Соколова', 'Белова',
'Михайлова', 'Федорова', 'Алексеева', 'Дмитриева', 'Сергеева', 'Егорова',
'Степанова', 'Николаева', 'Григорьева', 'Ильина', 'Орлова', 'Титова']

departments = [
'Engineering', 'Data Science', 'Machine Learning',
'Marketing', 'Digital Marketing', 'Content Marketing',
'Sales', 'Enterprise Sales', 'Inside Sales',
'HR', 'Talent Acquisition', 'Employee Relations',
'Finance', 'Accounting', 'Financial Planning',
'Legal', 'Compliance', 'Intellectual Property',
'Support', 'Customer Success', 'Technical Support',
'Product', 'Product Management', 'Product Design',
'Operations', 'Logistics', 'Quality Assurance'
]

start_date = datetime(2018, 1, 1)
end_date = datetime(2023, 12, 31)

# Подключение к БД
pg_hook = PostgresHook(postgres_conn_id='airflow_db')
conn = pg_hook.get_conn()
cursor = conn.cursor()

# Очистка таблицы перед вставкой
cursor.execute("TRUNCATE TABLE hr_employees RESTART IDENTITY CASCADE;")

total_inserted = 0

# Генерация и вставка данных пакетами
for batch_start in range(1, num_employees + 1, batch_size):
batch_end = min(batch_start + batch_size - 1, num_employees)
batch_data = []

for i in range(batch_start, batch_end + 1):
# Случайный выбор пола для генерации имени
is_male = random.choice([True, False])

if is_male:
first_name = random.choice(male_first_names)
last_name = random.choice(last_names_male)
else:
first_name = random.choice(female_first_names)
last_name = random.choice(last_names_female)

full_name = f"{first_name} {last_name}"
department = random.choice(departments)

# Генерация случайной даты приема на работу
time_between_dates = end_date - start_date
days_between_dates = time_between_dates.days
random_number_of_days = random.randrange(days_between_dates)
hire_date = start_date + timedelta(days=random_number_of_days)

batch_data.append((i, full_name, department, hire_date))

# Вставка батчами
args_str = b','.join(
cursor.mogrify("(%s,%s,%s,%s)", x) for x in batch_data
)
cursor.execute(b"INSERT INTO hr_employees (id, name, department, hire_date) VALUES " + args_str)
conn.commit()

total_inserted += len(batch_data)
print(f"Вставлен пакет записей с {batch_start} по {batch_end} ({len(batch_data)} записей)")

cursor.close()
conn.close()

print(f"Всего вставлено записей: {total_inserted}")
return f"Успешно сгенерировано и вставлено {total_inserted} записей о сотрудниках"


with DAG(
'init_data',
start_date=datetime(2023, 1, 1),
schedule_interval='@once',
catchup=False,
description='Генерация случайных данных о сотрудниках (10 000 записей)',
tags=['data_generation', 'hr']
) as dag:
generate_data = PythonOperator(
task_id='generate_random_employees',
python_callable=generate_random_employees,
provide_context=True
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

with DAG(
'init_schema',
start_date=datetime(2023, 1, 1),
schedule_interval='@once',
catchup=False,
description='Создание схемы и таблиц в метабазе Airflow'
) as dag:

create_employees_table = PostgresOperator(
task_id='create_employees_table',
postgres_conn_id='airflow_db',
sql='''
CREATE TABLE IF NOT EXISTS hr_employees (
id BIGINT PRIMARY KEY,
name TEXT NOT NULL,
department TEXT,
hire_date DATE
);
'''
)

create_metrics_table = PostgresOperator(
task_id='create_metrics_table',
postgres_conn_id='airflow_db',
sql='''
CREATE TABLE IF NOT EXISTS hr_monthly_hires (
month TEXT PRIMARY KEY,
count INTEGER NOT NULL
);
'''
)

create_employees_table >> create_metrics_table
Loading