-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_test_data.py
More file actions
454 lines (384 loc) · 19.7 KB
/
generate_test_data.py
File metadata and controls
454 lines (384 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
#!/usr/bin/env python
"""
生成HR管理系统测试数据
包含部门、员工、招聘、薪资、考勤等各模块的测试数据
"""
import os
import sys
import django
from datetime import datetime, date, timedelta
from decimal import Decimal
import random
# 设置Django环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hr_system.settings')
django.setup()
from django.contrib.auth.models import User
from departments.models import Department
from employees.models import Employee
from recruitment.models import JobPosition, Candidate, JobApplication, Interview, Offer
from salary.models import SalaryStructure, PayrollPeriod, SalaryRecord, SalaryAdjustment
from attendance.models import AttendanceRecord, LeaveType, LeaveRequest, OvertimeRecord
def create_test_data():
"""创建测试数据"""
print("🚀 开始生成测试数据...")
# 1. 创建更多部门和员工
create_departments_and_employees()
# 2. 创建招聘相关数据
create_recruitment_data()
# 3. 创建薪资相关数据
create_salary_data()
# 4. 创建考勤相关数据
create_attendance_data()
print("✅ 测试数据生成完成!")
def create_departments_and_employees():
"""创建部门和员工数据"""
print("📝 创建部门和员工数据...")
# 创建更多部门
dept_data = [
{"name": "市场营销部", "code": "MKT", "description": "负责公司产品推广和市场开拓"},
{"name": "销售部", "code": "SAL", "description": "负责产品销售和客户维护"},
{"name": "财务部", "code": "FIN", "description": "负责公司财务管理和会计核算"},
{"name": "法务部", "code": "LEG", "description": "负责法律事务和合规管理"},
{"name": "客服部", "code": "CS", "description": "负责客户服务和支持"},
]
for dept_info in dept_data:
if not Department.objects.filter(name=dept_info["name"]).exists():
Department.objects.create(**dept_info, is_active=True)
# 获取所有部门
departments = list(Department.objects.all())
# 创建更多员工
employee_names = [
"张伟", "王芳", "李强", "赵敏", "刘洋", "陈静", "杨磊", "黄丽",
"周杰", "吴娜", "徐斌", "朱红", "胡军", "高峰", "林雪", "何平",
"郭涛", "宋佳", "邓超", "冯娟", "蒋辉", "韩梅", "曹操", "孙权",
"刘备", "关羽", "张飞", "赵云", "马超", "黄忠", "诸葛亮", "司马懿"
]
# 为每个部门创建3-5个员工
for dept in departments:
employee_count = random.randint(3, 5)
for i in range(employee_count):
if employee_names:
name = employee_names.pop(0)
username = f"user_{name}_{random.randint(1000, 9999)}"
# 创建用户
user = User.objects.create_user(
username=username,
first_name=name,
email=f"{username}@company.com",
password="password123"
)
# 创建员工
Employee.objects.create(
user=user,
employee_id=f"EMP{random.randint(10000, 99999)}",
department=dept,
position=random.choice(["经理", "主管", "专员", "助理", "实习生"]),
gender=random.choice(["M", "F"]),
id_card=f"{random.randint(100000000000000000, 999999999999999999)}",
hire_date=date.today() - timedelta(days=random.randint(30, 365)),
base_salary=Decimal(random.randint(5000, 20000)),
phone=f"1{random.randint(30, 99)}{random.randint(10000000, 99999999)}",
address=f"北京市{random.choice(['朝阳区', '海淀区', '西城区', '东城区'])}",
is_active=True
)
print(f" 📋 创建了 {Employee.objects.count()} 个员工")
def create_recruitment_data():
"""创建招聘相关数据"""
print("👥 创建招聘数据...")
departments = list(Department.objects.all())
employees = list(Employee.objects.all())
# 创建职位
job_titles = [
"软件工程师", "产品经理", "UI设计师", "数据分析师", "运营专员",
"市场专员", "销售代表", "财务分析师", "人事专员", "客服代表",
"测试工程师", "架构师", "项目经理", "商务专员", "法务专员"
]
for i, title in enumerate(job_titles):
code = f"JOB{i+1:03d}"
if not JobPosition.objects.filter(code=code).exists():
JobPosition.objects.create(
title=title,
code=code,
department=random.choice(departments),
hiring_manager=random.choice(employees),
description=f"{title}职位描述,负责相关工作内容的执行和管理。",
requirements=f"1. 本科及以上学历\n2. 3年以上相关工作经验\n3. 良好的沟通能力",
responsibilities=f"1. 负责{title}相关工作\n2. 协助团队完成目标\n3. 参与项目管理",
benefits="五险一金、带薪年假、节日福利、培训机会",
experience_required=random.choice(["1-3", "3-5", "5-10"]),
education_required=random.choice(["BACHELOR", "MASTER"]),
salary_min=Decimal(random.randint(8000, 15000)),
salary_max=Decimal(random.randint(15000, 30000)),
headcount=random.randint(1, 3),
status=random.choice(["PUBLISHED", "DRAFT"]),
location="北京",
job_type="全职",
priority=random.choice(["NORMAL", "HIGH"]),
deadline=date.today() + timedelta(days=random.randint(30, 90))
)
# 创建候选人
candidate_names = [
"候选人A", "候选人B", "候选人C", "候选人D", "候选人E",
"候选人F", "候选人G", "候选人H", "候选人I", "候选人J",
"候选人K", "候选人L", "候选人M", "候选人N", "候选人O"
]
for i, name in enumerate(candidate_names):
email = f"candidate{i+1}@example.com"
if not Candidate.objects.filter(email=email).exists():
Candidate.objects.create(
name=name,
email=email,
phone=f"1{random.randint(30, 99)}{random.randint(10000000, 99999999)}",
gender=random.choice(["M", "F"]),
birth_date=date(random.randint(1985, 2000), random.randint(1, 12), random.randint(1, 28)),
education=random.choice(["本科", "硕士", "博士"]),
school=random.choice(["北京大学", "清华大学", "复旦大学", "上海交大"]),
major=random.choice(["计算机科学", "软件工程", "电子信息", "工商管理"]),
experience_years=random.randint(1, 10),
current_company=f"公司{random.randint(1, 20)}",
current_position=random.choice(job_titles),
expected_salary=Decimal(random.randint(10000, 25000)),
self_introduction="我是一个积极向上的人,有丰富的工作经验...",
skills="Python, Django, JavaScript, React, MySQL等",
address=f"北京市{random.choice(['朝阳区', '海淀区', '西城区', '东城区'])}"
)
# 创建求职申请
positions = list(JobPosition.objects.filter(status="PUBLISHED"))
candidates = list(Candidate.objects.all())
created_applications = set() # 记录已创建的申请组合
for i in range(25): # 创建25个申请
# 随机选择职位和候选人的组合,确保不重复
max_attempts = 50
attempt = 0
while attempt < max_attempts:
position = random.choice(positions)
candidate = random.choice(candidates)
combination = (position.id, candidate.id)
if combination not in created_applications:
created_applications.add(combination)
JobApplication.objects.create(
position=position,
candidate=candidate,
status=random.choice(["SUBMITTED", "SCREENING", "INTERVIEW_SCHEDULED", "INTERVIEWED"]),
applied_date=datetime.now() - timedelta(days=random.randint(1, 30)),
resume_score=random.randint(6, 10),
notes="申请备注信息"
)
break
attempt += 1
# 创建面试安排
applications = list(JobApplication.objects.filter(status__in=["INTERVIEW_SCHEDULED", "INTERVIEWED"]))
for app in applications[:10]: # 为前10个申请安排面试
Interview.objects.create(
application=app,
interview_type=random.choice(["PHONE", "VIDEO", "ONSITE"]),
round_number=1,
scheduled_date=date.today() + timedelta(days=random.randint(1, 14)),
scheduled_time=datetime.strptime(f"{random.randint(9, 17)}:00", "%H:%M").time(),
interviewer=random.choice(employees),
location="会议室A" if random.choice([True, False]) else "",
status=random.choice(["SCHEDULED", "COMPLETED"]),
technical_score=random.randint(6, 10) if random.choice([True, False]) else None,
communication_score=random.randint(6, 10) if random.choice([True, False]) else None,
cultural_fit_score=random.randint(6, 10) if random.choice([True, False]) else None,
overall_impression=random.choice(["GOOD", "EXCELLENT"]) if random.choice([True, False]) else "",
recommendation=random.choice(["HIRE", "PENDING"]) if random.choice([True, False]) else ""
)
print(f" 💼 创建了 {JobPosition.objects.count()} 个职位")
print(f" 👤 创建了 {Candidate.objects.count()} 个候选人")
print(f" 📄 创建了 {JobApplication.objects.count()} 个求职申请")
print(f" 🎯 创建了 {Interview.objects.count()} 个面试安排")
def create_salary_data():
"""创建薪资相关数据"""
print("💰 创建薪资数据...")
employees = list(Employee.objects.all())
# 创建工资周期
current_date = date.today()
for i in range(6): # 创建6个月的工资周期
month_date = current_date.replace(day=1) - timedelta(days=i*30)
if not PayrollPeriod.objects.filter(year=month_date.year, month=month_date.month).exists():
start_date = month_date.replace(day=1)
if month_date.month == 12:
end_date = month_date.replace(year=month_date.year+1, month=1, day=1) - timedelta(days=1)
else:
end_date = month_date.replace(month=month_date.month+1, day=1) - timedelta(days=1)
PayrollPeriod.objects.create(
name=f"{month_date.year}年{month_date.month}月工资",
year=month_date.year,
month=month_date.month,
start_date=start_date,
end_date=end_date,
is_closed=i > 2 # 前3个月的工资周期设为已关闭
)
# 为员工创建薪资记录
payroll_periods = list(PayrollPeriod.objects.all())
salary_structures = list(SalaryStructure.objects.all())
for period in payroll_periods:
for employee in employees[:15]: # 为前15个员工创建薪资记录
if not SalaryRecord.objects.filter(employee=employee, payroll_period=period).exists():
base_salary = employee.base_salary
SalaryRecord.objects.create(
employee=employee,
payroll_period=period,
salary_structure=random.choice(salary_structures) if salary_structures else None,
base_salary=base_salary,
performance_bonus=Decimal(random.randint(500, 2000)),
allowances=Decimal(random.randint(200, 800)),
overtime_hours=Decimal(random.randint(0, 20)),
overtime_pay=Decimal(random.randint(0, 1000)),
working_days=Decimal(22),
leave_days=Decimal(random.randint(0, 3)),
late_deduction=Decimal(random.randint(0, 200)),
absent_deduction=Decimal(0),
other_deductions=Decimal(random.randint(0, 100)),
is_calculated=True,
is_paid=period.is_closed,
calculated_at=datetime.now() if period.is_closed else None,
paid_at=datetime.now() if period.is_paid else None
)
# 创建薪资调整记录
for i in range(10):
employee = random.choice(employees)
old_salary = employee.base_salary
new_salary = old_salary + Decimal(random.randint(-2000, 5000))
SalaryAdjustment.objects.create(
employee=employee,
adjustment_type=random.choice(["PROMOTION", "PERFORMANCE", "MARKET"]),
old_salary=old_salary,
new_salary=new_salary,
effective_date=date.today() - timedelta(days=random.randint(30, 180)),
reason=f"根据工作表现和市场情况调整薪资",
approved_by=random.choice(employees)
)
print(f" 📅 创建了 {PayrollPeriod.objects.count()} 个工资周期")
print(f" 💵 创建了 {SalaryRecord.objects.count()} 条薪资记录")
print(f" 📈 创建了 {SalaryAdjustment.objects.count()} 条薪资调整记录")
def create_attendance_data():
"""创建考勤相关数据"""
print("⏰ 创建考勤数据...")
employees = list(Employee.objects.all())
# 创建请假类型
leave_types = ["年假", "病假", "事假", "调休", "婚假", "产假"]
for leave_type_name in leave_types:
if not LeaveType.objects.filter(name=leave_type_name).exists():
LeaveType.objects.create(
name=leave_type_name,
days_per_year=random.randint(5, 15),
requires_approval=True,
description=f"{leave_type_name}相关说明"
)
# 创建考勤记录(最近30天)
for employee in employees:
for day_offset in range(30):
record_date = date.today() - timedelta(days=day_offset)
# 跳过周末
if record_date.weekday() >= 5:
continue
# 随机决定是否有考勤记录(模拟请假、出差等情况)
if random.random() < 0.1: # 10%概率没有考勤记录
continue
# 生成签到签退时间
check_in_hour = random.randint(8, 10)
check_in_minute = random.randint(0, 59)
check_in_time = datetime.combine(record_date, datetime.min.time().replace(
hour=check_in_hour, minute=check_in_minute
))
check_out_hour = random.randint(17, 20)
check_out_minute = random.randint(0, 59)
check_out_time = datetime.combine(record_date, datetime.min.time().replace(
hour=check_out_hour, minute=check_out_minute
))
# 计算工作时长
work_duration = check_out_time - check_in_time
work_hours = work_duration.total_seconds() / 3600 - 1 # 减去1小时午休
# 判断迟到早退
is_late = check_in_time.time() > datetime.strptime("09:00", "%H:%M").time()
is_early_leave = check_out_time.time() < datetime.strptime("18:00", "%H:%M").time()
AttendanceRecord.objects.create(
employee=employee,
date=record_date,
check_in_time=check_in_time,
check_out_time=check_out_time,
work_hours=round(work_hours, 1),
is_late=is_late,
is_early_leave=is_early_leave,
status="PRESENT",
remarks="正常考勤" if not is_late and not is_early_leave else "迟到" if is_late else "早退"
)
# 创建请假申请
leave_types_objs = list(LeaveType.objects.all())
for i in range(20): # 创建20个请假申请
employee = random.choice(employees)
leave_type = random.choice(leave_types_objs)
start_date = date.today() + timedelta(days=random.randint(1, 30))
end_date = start_date + timedelta(days=random.randint(1, 5))
LeaveRequest.objects.create(
employee=employee,
leave_type=leave_type,
start_date=start_date,
end_date=end_date,
days=(end_date - start_date).days + 1,
reason=f"因{leave_type.name}需要请假",
status=random.choice(["PENDING", "APPROVED", "REJECTED"]),
approver=random.choice(employees) if random.choice([True, False]) else None,
approval_comment="同意请假" if random.choice([True, False]) else ""
)
# 创建加班记录
for i in range(15): # 创建15个加班记录
employee = random.choice(employees)
overtime_date = date.today() - timedelta(days=random.randint(1, 30))
start_time = datetime.strptime("18:00", "%H:%M").time()
end_time = datetime.strptime(f"{random.randint(19, 22)}:{random.choice(['00', '30'])}", "%H:%M").time()
# 计算加班时长
start_datetime = datetime.combine(overtime_date, start_time)
end_datetime = datetime.combine(overtime_date, end_time)
hours = (end_datetime - start_datetime).total_seconds() / 3600
OvertimeRecord.objects.create(
employee=employee,
date=overtime_date,
start_time=start_time,
end_time=end_time,
hours=round(hours, 1),
reason="项目需要加班完成任务",
status=random.choice(["PENDING", "APPROVED"]),
approver=random.choice(employees) if random.choice([True, False]) else None
)
print(f" 📋 创建了 {LeaveType.objects.count()} 种请假类型")
print(f" ✅ 创建了 {AttendanceRecord.objects.count()} 条考勤记录")
print(f" 🏖️ 创建了 {LeaveRequest.objects.count()} 个请假申请")
print(f" ⏰ 创建了 {OvertimeRecord.objects.count()} 个加班记录")
if __name__ == "__main__":
create_test_data()
# 最终统计
print("\n" + "="*50)
print("📊 最终数据统计:")
print("="*50)
from django.db import connection
cursor = connection.cursor()
# 统计各表数据量
tables_info = [
("部门", "departments_department"),
("员工", "employees_employee"),
("职位", "recruitment_jobposition"),
("候选人", "recruitment_candidate"),
("求职申请", "recruitment_jobapplication"),
("面试", "recruitment_interview"),
("Offer", "recruitment_offer"),
("薪资结构", "salary_salarystructure"),
("工资周期", "salary_payrollperiod"),
("薪资记录", "salary_salaryrecord"),
("薪资调整", "salary_salaryadjustment"),
("考勤记录", "attendance_attendancerecord"),
("请假类型", "attendance_leavetype"),
("请假申请", "attendance_leaverequest"),
("加班记录", "attendance_overtimerecord"),
]
total_records = 0
for name, table in tables_info:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
total_records += count
print(f"{name}: {count} 条")
print("-" * 50)
print(f"总记录数: {total_records} 条")
print("✅ 测试数据生成完成!")