Skip to content

Commit 04ec3e9

Browse files
author
virusdefender
committed
尝试使用更细粒度的数据库锁
1 parent ca44e8b commit 04ec3e9

File tree

2 files changed

+71
-75
lines changed

2 files changed

+71
-75
lines changed

judge_dispatcher/models.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,6 @@ class JudgeServer(models.Model):
1717
status = models.BooleanField(default=True)
1818
create_time = models.DateTimeField(auto_now_add=True, blank=True, null=True)
1919

20-
def use_judge_instance(self):
21-
# 因为use 和 release 中间是判题时间,可能这个 model 的数据已经被修改了,所以不能直接使用self.xxx,否则取到的是旧数据
22-
server = JudgeServer.objects.select_for_update().get(id=self.id)
23-
server.left_instance_number -= 1
24-
server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100)
25-
server.save()
26-
27-
def release_judge_instance(self):
28-
# 使用原子操作
29-
server = JudgeServer.objects.select_for_update().get(id=self.id)
30-
server.left_instance_number += 1
31-
server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100)
32-
server.save()
33-
3420
class Meta:
3521
db_table = "judge_server"
3622

judge_dispatcher/tasks.py

Lines changed: 71 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -38,25 +38,35 @@ def __init__(self, submission_id, time_limit, memory_limit, test_case_id, spj, s
3838
self.spj_version = spj_version
3939

4040
def choose_judge_server(self):
41-
servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload")
42-
if servers.exists():
43-
return servers.first()
41+
with transaction.atomic():
42+
servers = JudgeServer.objects.select_for_update().filter(workload__lt=100, lock=False, status=True).order_by("-workload")
43+
if servers.exists():
44+
server = servers.first()
45+
server.left_instance_number -= 1
46+
server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100)
47+
server.save()
48+
return server
49+
50+
def release_judge_instance(self, judge_server_id):
51+
with transaction.atomic():
52+
# 使用原子操作, 同时因为use和release中间间隔了判题过程,需要重新查询一下
53+
server = JudgeServer.objects.select_for_update().get(id=judge_server_id)
54+
server.left_instance_number += 1
55+
server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100)
56+
server.save()
4457

4558
def judge(self):
4659
self.submission.judge_start_time = int(time.time() * 1000)
4760

48-
with transaction.atomic():
49-
judge_server = self.choose_judge_server()
50-
51-
# 如果没有合适的判题服务器,就放入等待队列中等待判题
52-
if not judge_server:
53-
JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit,
54-
memory_limit=self.memory_limit, test_case_id=self.test_case_id,
55-
spj=self.spj, spj_language=self.spj_language, spj_code=self.spj_code,
56-
spj_version=self.spj_version)
57-
return
61+
judge_server = self.choose_judge_server()
5862

59-
judge_server.use_judge_instance()
63+
# 如果没有合适的判题服务器,就放入等待队列中等待判题
64+
if not judge_server:
65+
JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit,
66+
memory_limit=self.memory_limit, test_case_id=self.test_case_id,
67+
spj=self.spj, spj_language=self.spj_language, spj_code=self.spj_code,
68+
spj_version=self.spj_version)
69+
return
6070

6171
try:
6272
s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port),
@@ -82,8 +92,7 @@ def judge(self):
8292
self.submission.result = result["system_error"]
8393
self.submission.info = str(e)
8494
finally:
85-
with transaction.atomic():
86-
judge_server.release_judge_instance()
95+
self.release_judge_instance(judge_server.id)
8796

8897
self.submission.judge_end_time = int(time.time() * 1000)
8998
self.submission.save(update_fields=["judge_start_time", "result", "info", "accepted_answer_time", "judge_end_time"])
@@ -111,32 +120,32 @@ def judge(self):
111120
spj_version=waiting_submission.spj_version)
112121

113122
def update_problem_status(self):
114-
problem = Problem.objects.get(id=self.submission.problem_id)
115-
116-
# 更新普通题目的计数器
117-
problem.add_submission_number()
118-
119-
# 更新用户做题状态
120-
user = User.objects.get(id=self.submission.user_id)
121-
122-
problems_status = user.problems_status
123-
if "problems" not in problems_status:
124-
problems_status["problems"] = {}
125-
126-
# 增加用户提交计数器
127-
user.userprofile.add_submission_number()
128-
129-
# 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重
130-
if problems_status["problems"].get(str(problem.id), -1) != 1 and self.submission.result == result["accepted"]:
131-
user.userprofile.add_accepted_problem_number()
132-
133-
if self.submission.result == result["accepted"]:
134-
problem.add_ac_number()
135-
problems_status["problems"][str(problem.id)] = 1
136-
else:
137-
problems_status["problems"][str(problem.id)] = 2
138-
user.problems_status = problems_status
139-
user.save(update_fields=["problem_status"])
123+
with transaction.atomic():
124+
problem = Problem.objects.select_for_update().get(id=self.submission.problem_id)
125+
# 更新普通题目的计数器
126+
problem.add_submission_number()
127+
128+
# 更新用户做题状态
129+
user = User.objects.select_for_update().get(id=self.submission.user_id)
130+
131+
problems_status = user.problems_status
132+
if "problems" not in problems_status:
133+
problems_status["problems"] = {}
134+
135+
# 增加用户提交计数器
136+
user.userprofile.add_submission_number()
137+
138+
# 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重
139+
if problems_status["problems"].get(str(problem.id), -1) != 1 and self.submission.result == result["accepted"]:
140+
user.userprofile.add_accepted_problem_number()
141+
142+
if self.submission.result == result["accepted"]:
143+
problem.add_ac_number()
144+
problems_status["problems"][str(problem.id)] = 1
145+
else:
146+
problems_status["problems"][str(problem.id)] = 2
147+
user.problems_status = problems_status
148+
user.save(update_fields=["problems_status"])
140149
# 普通题目的话,到这里就结束了
141150

142151
def update_contest_problem_status(self):
@@ -146,31 +155,32 @@ def update_contest_problem_status(self):
146155
logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + self.submission.id)
147156
return
148157

149-
contest_problem = ContestProblem.objects.get(contest=contest, id=self.submission.problem_id)
150-
contest_problem.add_submission_number()
158+
with transaction.atomic():
159+
contest_problem = ContestProblem.objects.select_for_update().get(contest=contest, id=self.submission.problem_id)
160+
contest_problem.add_submission_number()
151161

152-
user = User.objects.get(id=self.submission.user_id)
153-
problems_status = user.problems_status
162+
user = User.objects.select_for_update().get(id=self.submission.user_id)
163+
problems_status = user.problems_status
154164

155-
if "contest_problems" not in problems_status:
156-
problems_status["contest_problems"] = {}
165+
if "contest_problems" not in problems_status:
166+
problems_status["contest_problems"] = {}
157167

158-
# 增加用户提交计数器
159-
user.userprofile.add_submission_number()
168+
# 增加用户提交计数器
169+
user.userprofile.add_submission_number()
160170

161-
# 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重
162-
if problems_status["contest_problems"].get(str(contest_problem.id), -1) != 1 and \
163-
self.submission.result == result["accepted"]:
164-
user.userprofile.add_accepted_problem_number()
171+
# 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重
172+
if problems_status["contest_problems"].get(str(contest_problem.id), -1) != 1 and \
173+
self.submission.result == result["accepted"]:
174+
user.userprofile.add_accepted_problem_number()
165175

166-
if self.submission.result == result["accepted"]:
167-
contest_problem.add_ac_number()
168-
problems_status["contest_problems"][str(contest_problem.id)] = 1
169-
else:
170-
problems_status["contest_problems"][str(contest_problem.id)] = 2
176+
if self.submission.result == result["accepted"]:
177+
contest_problem.add_ac_number()
178+
problems_status["contest_problems"][str(contest_problem.id)] = 1
179+
else:
180+
problems_status["contest_problems"][str(contest_problem.id)] = 2
171181

172-
user.problems_status = problems_status
173-
user.save(update_fields=["problem_status"])
182+
user.problems_status = problems_status
183+
user.save(update_fields=["problems_status"])
174184

175185
self.update_contest_rank(contest)
176186

0 commit comments

Comments
 (0)