-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreate_additional_test_data.py
More file actions
334 lines (281 loc) · 13.4 KB
/
create_additional_test_data.py
File metadata and controls
334 lines (281 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
#!/usr/bin/env python
"""
补充生成HR管理系统测试数据
只添加缺失的数据,不重复创建
"""
import os
import sys
import django
from datetime import datetime, date, timedelta
from decimal import Decimal
import random
# 设置Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hr_system.settings')
django.setup()
from django.contrib.auth.models import User
from departments.models import Department
from employees.models import Employee
from recruitment.models import JobPosition, Candidate, JobApplication, Interview, Offer
from salary.models import SalaryStructure, PayrollPeriod, SalaryRecord, SalaryAdjustment
from attendance.models import AttendanceRecord, LeaveType, LeaveRequest, OvertimeRecord
def create_additional_data():
"""创建额外的测试数据"""
print("🚀 开始补充测试数据...")
# 1. 补充薪资数据
create_additional_salary_data()
# 2. 补充考勤数据
create_additional_attendance_data()
# 3. 补充面试数据
create_additional_interview_data()
print("✅ 补充测试数据完成!")
def create_additional_salary_data():
"""补充薪资数据"""
print("💰 补充薪资数据...")
employees = list(Employee.objects.all())
# 创建工资周期(如果不存在)
current_date = date.today()
for i in range(6): # 创建6个月的工资周期
month_date = current_date.replace(day=1) - timedelta(days=i*30)
if not PayrollPeriod.objects.filter(year=month_date.year, month=month_date.month).exists():
start_date = month_date.replace(day=1)
if month_date.month == 12:
end_date = month_date.replace(year=month_date.year+1, month=1, day=1) - timedelta(days=1)
else:
end_date = month_date.replace(month=month_date.month+1, day=1) - timedelta(days=1)
PayrollPeriod.objects.create(
name=f"{month_date.year}年{month_date.month}月工资",
year=month_date.year,
month=month_date.month,
start_date=start_date,
end_date=end_date,
is_closed=i > 2
)
# 为员工创建薪资记录
payroll_periods = list(PayrollPeriod.objects.all())
salary_structures = list(SalaryStructure.objects.all())
created_count = 0
for period in payroll_periods:
for employee in employees:
if not SalaryRecord.objects.filter(employee=employee, payroll_period=period).exists():
base_salary = employee.base_salary
SalaryRecord.objects.create(
employee=employee,
payroll_period=period,
salary_structure=random.choice(salary_structures) if salary_structures else None,
base_salary=base_salary,
performance_bonus=Decimal(random.randint(500, 2000)),
allowances=Decimal(random.randint(200, 800)),
overtime_hours=Decimal(random.randint(0, 20)),
overtime_pay=Decimal(random.randint(0, 1000)),
working_days=Decimal(22),
leave_days=Decimal(random.randint(0, 3)),
late_deduction=Decimal(random.randint(0, 200)),
absent_deduction=Decimal(0),
other_deductions=Decimal(random.randint(0, 100)),
is_calculated=True,
is_paid=period.is_closed,
paid_date=datetime.now() if period.is_closed else None
)
created_count += 1
# 创建薪资调整记录
for i in range(20):
employee = random.choice(employees)
old_salary = employee.base_salary
new_salary = old_salary + Decimal(random.randint(-2000, 5000))
SalaryAdjustment.objects.create(
employee=employee,
adjustment_type=random.choice(["PROMOTION", "PERFORMANCE", "MARKET"]),
old_salary=old_salary,
new_salary=new_salary,
effective_date=date.today() - timedelta(days=random.randint(30, 180)),
reason=f"根据工作表现和市场情况调整薪资",
approved_by=random.choice(employees)
)
print(f" 💵 创建了 {created_count} 条薪资记录")
print(f" 📈 创建了 20 条薪资调整记录")
def create_additional_attendance_data():
"""补充考勤数据"""
print("⏰ 补充考勤数据...")
employees = list(Employee.objects.all())
# 创建请假类型(如果不存在)
leave_types = ["年假", "病假", "事假", "调休", "婚假", "产假"]
for leave_type_name in leave_types:
if not LeaveType.objects.filter(name=leave_type_name).exists():
LeaveType.objects.create(
name=leave_type_name,
days_per_year=random.randint(5, 15),
requires_approval=True,
description=f"{leave_type_name}相关说明"
)
# 补充考勤记录(最近60天)
attendance_count = 0
for employee in employees:
for day_offset in range(60):
record_date = date.today() - timedelta(days=day_offset)
# 跳过周末
if record_date.weekday() >= 5:
continue
# 如果已存在记录则跳过
if AttendanceRecord.objects.filter(employee=employee, date=record_date).exists():
continue
# 随机决定是否有考勤记录(模拟请假、出差等情况)
if random.random() < 0.1:
continue
# 生成签到签退时间
check_in_hour = random.randint(8, 10)
check_in_minute = random.randint(0, 59)
check_in_time = datetime.min.time().replace(
hour=check_in_hour, minute=check_in_minute
)
check_out_hour = random.randint(17, 20)
check_out_minute = random.randint(0, 59)
check_out_time = datetime.min.time().replace(
hour=check_out_hour, minute=check_out_minute
)
# 计算工作时长
check_in_datetime = datetime.combine(record_date, check_in_time)
check_out_datetime = datetime.combine(record_date, check_out_time)
work_duration = check_out_datetime - check_in_datetime
work_hours = work_duration.total_seconds() / 3600 - 1 # 减去1小时午休
# 判断迟到早退
is_late = check_in_time > datetime.strptime("09:00", "%H:%M").time()
is_early_leave = check_out_time < datetime.strptime("18:00", "%H:%M").time()
AttendanceRecord.objects.create(
employee=employee,
date=record_date,
check_in_time=check_in_time,
check_out_time=check_out_time,
work_hours=round(work_hours, 1),
is_late=is_late,
is_early_leave=is_early_leave,
status="PRESENT",
remarks="正常考勤" if not is_late and not is_early_leave else "迟到" if is_late else "早退"
)
attendance_count += 1
# 创建请假申请
leave_types_objs = list(LeaveType.objects.all())
for i in range(30):
employee = random.choice(employees)
leave_type = random.choice(leave_types_objs)
start_date = date.today() + timedelta(days=random.randint(1, 60))
end_date = start_date + timedelta(days=random.randint(1, 5))
LeaveRequest.objects.create(
employee=employee,
leave_type=leave_type,
start_date=start_date,
end_date=end_date,
days=(end_date - start_date).days + 1,
reason=f"因{leave_type.name}需要请假",
status=random.choice(["PENDING", "APPROVED", "REJECTED"]),
approver=random.choice(employees) if random.choice([True, False]) else None,
approval_comment="同意请假" if random.choice([True, False]) else ""
)
# 创建加班记录
for i in range(40):
employee = random.choice(employees)
overtime_date = date.today() - timedelta(days=random.randint(1, 60))
start_time = datetime.strptime("18:00", "%H:%M").time()
end_time = datetime.strptime(f"{random.randint(19, 22)}:{random.choice(['00', '30'])}", "%H:%M").time()
# 计算加班时长
start_datetime = datetime.combine(overtime_date, start_time)
end_datetime = datetime.combine(overtime_date, end_time)
hours = (end_datetime - start_datetime).total_seconds() / 3600
OvertimeRecord.objects.create(
employee=employee,
date=overtime_date,
start_time=start_time,
end_time=end_time,
hours=round(hours, 1),
reason="项目需要加班完成任务",
status=random.choice(["PENDING", "APPROVED"]),
approver=random.choice(employees) if random.choice([True, False]) else None
)
print(f" ✅ 创建了 {attendance_count} 条考勤记录")
print(f" 🏖️ 创建了 30 个请假申请")
print(f" ⏰ 创建了 40 个加班记录")
def create_additional_interview_data():
"""补充面试数据"""
print("👥 补充面试数据...")
# 为现有的申请创建面试
applications = list(JobApplication.objects.filter(
status__in=["INTERVIEW_SCHEDULED", "INTERVIEWED"]
))
employees = list(Employee.objects.all())
interview_count = 0
for app in applications:
# 检查是否已有面试
if not Interview.objects.filter(application=app).exists():
Interview.objects.create(
application=app,
interview_type=random.choice(["PHONE", "VIDEO", "ONSITE"]),
round_number=1,
scheduled_date=date.today() + timedelta(days=random.randint(1, 21)),
scheduled_time=datetime.strptime(f"{random.randint(9, 17)}:00", "%H:%M").time(),
interviewer=random.choice(employees),
location="会议室A" if random.choice([True, False]) else "",
status=random.choice(["SCHEDULED", "COMPLETED"]),
technical_score=random.randint(6, 10) if random.choice([True, False]) else None,
communication_score=random.randint(6, 10) if random.choice([True, False]) else None,
cultural_fit_score=random.randint(6, 10) if random.choice([True, False]) else None,
overall_impression=random.choice(["GOOD", "EXCELLENT"]) if random.choice([True, False]) else "",
recommendation=random.choice(["HIRE", "PENDING"]) if random.choice([True, False]) else ""
)
interview_count += 1
# 为一些已完成面试的申请创建Offer
completed_interviews = Interview.objects.filter(
status="COMPLETED",
recommendation="HIRE"
)
offer_count = 0
for interview in completed_interviews[:5]: # 只为前5个创建Offer
if not Offer.objects.filter(application=interview.application).exists():
Offer.objects.create(
application=interview.application,
offered_salary=Decimal(random.randint(15000, 30000)),
bonus_details="年终奖金根据公司业绩和个人表现确定",
benefits="五险一金、补充医疗保险、年度体检、带薪年假",
start_date=date.today() + timedelta(days=random.randint(7, 30)),
probation_period=3,
expiry_date=date.today() + timedelta(days=7),
status=random.choice(["SENT", "ACCEPTED", "DECLINED"]),
prepared_by=random.choice(employees),
notes="期待您的加入!"
)
offer_count += 1
print(f" 🎯 创建了 {interview_count} 个面试安排")
print(f" 📋 创建了 {offer_count} 个Offer")
if __name__ == "__main__":
create_additional_data()
# 最终统计
print("\n" + "="*50)
print("📊 当前数据统计:")
print("="*50)
from django.db import connection
cursor = connection.cursor()
# 统计各表数据量
tables_info = [
("部门", "departments_department"),
("员工", "employees_employee"),
("职位", "recruitment_jobposition"),
("候选人", "recruitment_candidate"),
("求职申请", "recruitment_jobapplication"),
("面试", "recruitment_interview"),
("Offer", "recruitment_offer"),
("薪资结构", "salary_salarystructure"),
("工资周期", "salary_payrollperiod"),
("薪资记录", "salary_salaryrecord"),
("薪资调整", "salary_salaryadjustment"),
("考勤记录", "attendance_attendancerecord"),
("请假类型", "attendance_leavetype"),
("请假申请", "attendance_leaverequest"),
("加班记录", "attendance_overtimerecord"),
]
total_records = 0
for name, table in tables_info:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
total_records += count
print(f"{name}: {count} 条")
print("-" * 50)
print(f"总记录数: {total_records} 条")
print("✅ 数据库数据充足,可以进行功能测试!")