diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index d5a18f19d56ef6..ecc0ca26af0a22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -56,9 +56,6 @@ public class ExportExportingTask extends MasterTask { protected final ExportJob job; - ThreadPoolExecutor exportExecPool = ThreadPoolManager.newDaemonCacheThreadPool( - Config.maximum_parallelism_of_export_job, "exporting-pool-", false); - public ExportExportingTask(ExportJob job) { this.job = job; this.signature = job.getId(); @@ -115,6 +112,11 @@ protected void exec() { List outfileInfoList = Lists.newArrayList(); int parallelNum = selectStmtList.size(); + // Create thread pool with queue size based on actual parallelism + // Queue size = max(parallelNum, maximum_parallelism_of_export_job) to ensure all tasks can be queued + int queueSize = Math.max(parallelNum, Config.maximum_parallelism_of_export_job); + ThreadPoolExecutor exportExecPool = ThreadPoolManager.newDaemonFixedThreadPool( + Config.maximum_parallelism_of_export_job, queueSize, "exporting-pool-", false); CompletionService completionService = new ExecutorCompletionService<>(exportExecPool); // begin exporting @@ -213,8 +215,10 @@ protected void exec() { job.getStmtExecutor(idx).cancel(); } } + exportExecPool.shutdownNow(); + } else { + exportExecPool.shutdown(); } - exportExecPool.shutdownNow(); } if (isFailed) {