Skip to content

Commit 163cc4a

Browse files
Decade-qiuAntigravity
andcommitted
♻️ refactor(sandbox): implement true async execution and graceful worker shutdown
- Replace @async with explicit CompletableFuture.supplyAsync - Refactor JudgingWorker to use Spring-managed thread pool - Implement DisposableBean in worker for graceful shutdown - Improve exception handling in SandboxController Co-Authored-By: Antigravity <noreply@google.com>
1 parent eb896d1 commit 163cc4a

File tree

3 files changed

+136
-51
lines changed

3 files changed

+136
-51
lines changed

DOJ-BE/sandbox-service/src/main/java/com/decade/doj/sandbox/controller/SandboxController.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public class SandboxController {
4747
private final StringRedisTemplate redisTemplate;
4848

4949
@PostMapping("/code")
50-
@Operation(summary = "运行代码文件(同步执行)")
51-
public CompletableFuture<R<ExecuteMessage>> runCode(@RequestParam("file") MultipartFile file, @RequestParam("language") @NotBlank String lang) throws IOException {
50+
@Operation(summary = "运行代码文件(异步执行)")
51+
public CompletableFuture<R<ExecuteMessage>> runCode(@RequestParam("file") MultipartFile file, @RequestParam("language") @NotBlank String lang) {
5252
if (file.isEmpty()) {
5353
return CompletableFuture.completedFuture(R.error("上传的文件不能为空!"));
5454
}
@@ -57,16 +57,20 @@ public CompletableFuture<R<ExecuteMessage>> runCode(@RequestParam("file") Multip
5757
return CompletableFuture.completedFuture(R.error("不支持的编程语言: " + lang));
5858
}
5959

60-
String path = saveFile(file, resourceProperties.getCodePath(), null)[0];
61-
62-
return sandboxService
63-
.runCodeInSandbox(path, file.getOriginalFilename(), lang)
64-
.thenApply(R::ok);
60+
try {
61+
String path = saveFile(file, resourceProperties.getCodePath(), null)[0];
62+
return sandboxService
63+
.runCodeInSandbox(path, file.getOriginalFilename(), lang)
64+
.thenApply(R::ok);
65+
} catch (IOException e) {
66+
log.error("保存代码文件失败", e);
67+
return CompletableFuture.completedFuture(R.error("文件保存失败: " + e.getMessage()));
68+
}
6569
}
6670

6771
@PostMapping("/problem")
68-
@Operation(summary = "运行题目代码(同步执行)")
69-
public CompletableFuture<R<ExecuteMessage>> runProblem(@RequestParam("file") MultipartFile file, @RequestParam("input") MultipartFile input, @RequestParam("language") @NotBlank String lang, @RequestParam("pid") Long pid) throws IOException {
72+
@Operation(summary = "运行题目代码(异步执行)")
73+
public CompletableFuture<R<ExecuteMessage>> runProblem(@RequestParam("file") MultipartFile file, @RequestParam("input") MultipartFile input, @RequestParam("language") @NotBlank String lang, @RequestParam("pid") Long pid) {
7074
if (file.isEmpty()) {
7175
return CompletableFuture.completedFuture(R.error("上传的文件不能为空!"));
7276
}
@@ -75,13 +79,18 @@ public CompletableFuture<R<ExecuteMessage>> runProblem(@RequestParam("file") Mul
7579
return CompletableFuture.completedFuture(R.error("不支持的编程语言: " + lang));
7680
}
7781

78-
String[] Paths = saveFile(file, resourceProperties.getCodePath(), null);
79-
String codePath = Paths[0];
80-
saveFile(input, resourceProperties.getCodePath(), Paths[1]);
81-
82-
return sandboxService
83-
.runCodeInSandboxWI(codePath, input.getOriginalFilename(), file.getOriginalFilename(), lang)
84-
.thenApply(R::ok);
82+
try {
83+
String[] Paths = saveFile(file, resourceProperties.getCodePath(), null);
84+
String codePath = Paths[0];
85+
saveFile(input, resourceProperties.getCodePath(), Paths[1]);
86+
87+
return sandboxService
88+
.runCodeInSandboxWI(codePath, input.getOriginalFilename(), file.getOriginalFilename(), lang)
89+
.thenApply(R::ok);
90+
} catch (IOException e) {
91+
log.error("保存代码/输入文件失败", e);
92+
return CompletableFuture.completedFuture(R.error("文件保存失败: " + e.getMessage()));
93+
}
8594
}
8695

8796
@PostMapping("/validate")

DOJ-BE/sandbox-service/src/main/java/com/decade/doj/sandbox/service/impl/SandboxService.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
import com.decade.doj.sandbox.domain.vo.JudgingTask;
1313
import com.decade.doj.sandbox.enums.LanguageEnum;
1414
import com.decade.doj.sandbox.service.ISandboxService;
15-
import lombok.RequiredArgsConstructor;
1615
import lombok.extern.slf4j.Slf4j;
1716
import org.springframework.amqp.rabbit.core.RabbitTemplate;
17+
import org.springframework.beans.factory.annotation.Qualifier;
1818
import org.springframework.beans.factory.annotation.Value;
19-
import org.springframework.scheduling.annotation.Async;
19+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2020
import org.springframework.stereotype.Service;
2121

2222
import java.io.BufferedReader;
@@ -30,10 +30,20 @@
3030
import java.util.regex.Pattern;
3131

3232
@Service
33-
@RequiredArgsConstructor
3433
@Slf4j
3534
public class SandboxService implements ISandboxService {
3635

36+
private final RabbitTemplate rabbitTemplate;
37+
private final ThreadPoolTaskExecutor runCodeExecutor;
38+
39+
public SandboxService(
40+
RabbitTemplate rabbitTemplate,
41+
@Qualifier("RunCodeThreadPool") ThreadPoolTaskExecutor runCodeExecutor
42+
) {
43+
this.rabbitTemplate = rabbitTemplate;
44+
this.runCodeExecutor = runCodeExecutor;
45+
}
46+
3747
/**
3848
* 通过占位符数量构造最终执行命令,例如:
3949
* rawCmd = "python3 %s.py",baseName = "Main" -> "python3 Main.py"
@@ -66,7 +76,6 @@ public static String buildRunCommand(String rawCmd, String baseName) {
6676
// 挂载目录
6777
private static final String MOUNT_PATH = "/app";
6878

69-
private final RabbitTemplate rabbitTemplate;
7079

7180
@Value("${DOJ_CODE_PATH:/app/static/codes/}")
7281
private String containerCodePath;
@@ -93,10 +102,11 @@ private String convertToHostPath(String containerPath) {
93102
}
94103

95104
@Override
96-
@Async("RunCodeThreadPool")
97105
public CompletableFuture<ExecuteMessage> runCodeInSandbox(String filePath, String filename, String lang) {
98-
ExecuteMessage result = _runCodeInSandbox(filePath, filename, lang);
99-
return CompletableFuture.completedFuture(result);
106+
return CompletableFuture.supplyAsync(
107+
() -> _runCodeInSandbox(filePath, filename, lang),
108+
runCodeExecutor
109+
);
100110
}
101111

102112
private ExecuteMessage _runCodeInSandbox(String filePath, String filename, String lang) {
@@ -189,10 +199,11 @@ private ExecuteMessage _runCodeInSandbox(String filePath, String filename, Strin
189199
}
190200

191201
@Override
192-
@Async("RunCodeThreadPool")
193202
public CompletableFuture<ExecuteMessage> runCodeInSandboxWI(String localPath, String inputname, String filename, String lang) {
194-
ExecuteMessage result = _runCodeInSandboxWI(localPath, inputname, filename, lang, null);
195-
return CompletableFuture.completedFuture(result);
203+
return CompletableFuture.supplyAsync(
204+
() -> _runCodeInSandboxWI(localPath, inputname, filename, lang, null),
205+
runCodeExecutor
206+
);
196207
}
197208

198209
@Override

DOJ-BE/sandbox-service/src/main/java/com/decade/doj/sandbox/worker/JudgingWorker.java

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,53 +3,118 @@
33
import com.alibaba.fastjson.JSON;
44
import com.decade.doj.sandbox.domain.vo.JudgingTask;
55
import com.decade.doj.sandbox.service.ISandboxService;
6-
import lombok.RequiredArgsConstructor;
76
import lombok.extern.slf4j.Slf4j;
8-
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.beans.factory.DisposableBean;
8+
import org.springframework.beans.factory.annotation.Qualifier;
99
import org.springframework.boot.ApplicationArguments;
1010
import org.springframework.boot.ApplicationRunner;
1111
import org.springframework.data.redis.core.StringRedisTemplate;
12+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
1213
import org.springframework.stereotype.Component;
1314

14-
import java.util.concurrent.ExecutorService;
15-
import java.util.concurrent.Executors;
1615
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicBoolean;
1717

18+
/**
19+
* 判题任务消费者
20+
* <p>
21+
* 使用单一调度线程从 Redis 队列轮询任务,并将任务分发到 JudgingThreadPool 执行。
22+
* 实现 DisposableBean 接口支持优雅关闭。
23+
*/
1824
@Slf4j
1925
@Component
20-
@RequiredArgsConstructor
21-
public class JudgingWorker implements ApplicationRunner {
26+
public class JudgingWorker implements ApplicationRunner, DisposableBean {
2227

2328
private final StringRedisTemplate redisTemplate;
2429
private final ISandboxService sandboxService;
25-
26-
@Value("${doj.sandbox.worker-threads:4}")
27-
private int workerThreads;
30+
private final ThreadPoolTaskExecutor judgingExecutor;
2831

2932
private static final String JUDGING_QUEUE_KEY = "judging:queue";
33+
private static final int POLL_TIMEOUT_SECONDS = 5;
34+
35+
private final AtomicBoolean running = new AtomicBoolean(true);
36+
private Thread schedulerThread;
37+
38+
public JudgingWorker(
39+
StringRedisTemplate redisTemplate,
40+
ISandboxService sandboxService,
41+
@Qualifier("JudgingThreadPool") ThreadPoolTaskExecutor judgingExecutor
42+
) {
43+
this.redisTemplate = redisTemplate;
44+
this.sandboxService = sandboxService;
45+
this.judgingExecutor = judgingExecutor;
46+
}
3047

3148
@Override
3249
public void run(ApplicationArguments args) {
33-
ExecutorService executor = Executors.newFixedThreadPool(workerThreads);
50+
schedulerThread = new Thread(this::pollAndDispatch, "JudgingScheduler");
51+
schedulerThread.start();
52+
log.info("判题调度器已启动,任务将分发到 JudgingThreadPool 执行");
53+
}
3454

35-
for (int i = 0; i < workerThreads; i++) {
36-
executor.submit(() -> {
37-
log.info("判题 Worker 线程 {} 已启动。", Thread.currentThread().getName());
38-
while (!Thread.currentThread().isInterrupted()) {
55+
/**
56+
* 轮询 Redis 队列并分发任务到线程池
57+
*/
58+
private void pollAndDispatch() {
59+
while (running.get()) {
60+
try {
61+
String taskJson = redisTemplate.opsForList()
62+
.rightPop(JUDGING_QUEUE_KEY, POLL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
63+
64+
if (taskJson != null) {
65+
JudgingTask task = JSON.parseObject(taskJson, JudgingTask.class);
66+
log.info("收到判题任务: submissionId={}, problemId={}",
67+
task.getSubmissionId(), task.getProblemId());
68+
69+
// 分发到线程池异步执行
70+
judgingExecutor.execute(() -> executeTask(task));
71+
}
72+
} catch (Exception e) {
73+
if (running.get()) {
74+
log.error("轮询判题队列出现异常", e);
75+
// 短暂休眠避免错误风暴
3976
try {
40-
String taskJson = redisTemplate.opsForList()
41-
.rightPop(JUDGING_QUEUE_KEY, 30, TimeUnit.SECONDS);
42-
if (taskJson != null) {
43-
JudgingTask task = JSON.parseObject(taskJson, JudgingTask.class);
44-
log.info("Worker 开始处理任务: {}", task);
45-
sandboxService.execute(task);
46-
}
47-
} catch (Exception e) {
48-
log.error("判题 Worker 线程出现异常", e);
77+
Thread.sleep(1000);
78+
} catch (InterruptedException ie) {
79+
Thread.currentThread().interrupt();
80+
break;
4981
}
5082
}
51-
});
83+
}
84+
}
85+
log.info("判题调度器已停止轮询");
86+
}
87+
88+
/**
89+
* 执行单个判题任务
90+
*/
91+
private void executeTask(JudgingTask task) {
92+
try {
93+
log.debug("开始执行判题任务: submissionId={}", task.getSubmissionId());
94+
sandboxService.execute(task);
95+
log.debug("判题任务执行完成: submissionId={}", task.getSubmissionId());
96+
} catch (Exception e) {
97+
log.error("执行判题任务异常: submissionId={}, problemId={}",
98+
task.getSubmissionId(), task.getProblemId(), e);
5299
}
53-
log.info("启动了 {} 个判题 Worker 线程。", workerThreads);
100+
}
101+
102+
@Override
103+
public void destroy() {
104+
log.info("正在关闭判题调度器...");
105+
running.set(false);
106+
107+
if (schedulerThread != null) {
108+
schedulerThread.interrupt();
109+
try {
110+
// 等待调度线程终止
111+
schedulerThread.join(5000);
112+
} catch (InterruptedException e) {
113+
Thread.currentThread().interrupt();
114+
}
115+
}
116+
117+
log.info("判题调度器已关闭,线程池将由 Spring 容器管理关闭");
54118
}
55119
}
120+

0 commit comments

Comments
 (0)