Skip to content

Commit 22edd1c

Browse files
authored
[fix](export) The export task thread pool uses a blocking queue strategy (#51609)
### What problem does this PR solve? When export parallelism exceeds `maximum_parallelism_of_export_job`, excess tasks are discarded by `LogDiscardPolicy`.
1 parent b6593aa commit 22edd1c

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ public class ExportExportingTask extends MasterTask {
5656

5757
protected final ExportJob job;
5858

59-
ThreadPoolExecutor exportExecPool = ThreadPoolManager.newDaemonCacheThreadPool(
60-
Config.maximum_parallelism_of_export_job, "exporting-pool-", false);
61-
6259
public ExportExportingTask(ExportJob job) {
6360
this.job = job;
6461
this.signature = job.getId();
@@ -115,6 +112,11 @@ protected void exec() {
115112
List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
116113

117114
int parallelNum = selectStmtList.size();
115+
// Create thread pool with queue size based on actual parallelism
116+
// Queue size = max(parallelNum, maximum_parallelism_of_export_job) to ensure all tasks can be queued
117+
int queueSize = Math.max(parallelNum, Config.maximum_parallelism_of_export_job);
118+
ThreadPoolExecutor exportExecPool = ThreadPoolManager.newDaemonFixedThreadPool(
119+
Config.maximum_parallelism_of_export_job, queueSize, "exporting-pool-", false);
118120
CompletionService<ExportResult> completionService = new ExecutorCompletionService<>(exportExecPool);
119121

120122
// begin exporting
@@ -213,8 +215,10 @@ protected void exec() {
213215
job.getStmtExecutor(idx).cancel();
214216
}
215217
}
218+
exportExecPool.shutdownNow();
219+
} else {
220+
exportExecPool.shutdown();
216221
}
217-
exportExecPool.shutdownNow();
218222
}
219223

220224
if (isFailed) {

0 commit comments

Comments
 (0)