Skip to content

Commit 88e9a70

Browse files
authored
branch-4.0: [fix](executor) Fix rare self-deadlock that can cause the time-sharing task executor to hang. (#60089)
bp #58273
1 parent 7bd0c5a commit 88e9a70

File tree

4 files changed

+14
-10
lines changed

4 files changed

+14
-10
lines changed

be/src/vec/exec/executor/task_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class TaskExecutor {
3737
virtual Status init() = 0;
3838
virtual Status start() = 0;
3939
virtual void stop() = 0;
40+
virtual void wait() = 0;
4041

4142
virtual Result<std::shared_ptr<TaskHandle>> create_task(
4243
const TaskId& task_id, std::function<double()> utilization_supplier,

be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -562,10 +562,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
562562
std::lock_guard<std::mutex> guard(_mutex);
563563
_running_splits.insert(split);
564564
}
565-
Defer defer {[&]() {
566-
std::lock_guard<std::mutex> guard(_mutex);
567-
_running_splits.erase(split);
568-
}};
569565

570566
Result<SharedListenableFuture<Void>> blocked_future_result = split->process();
571567

@@ -577,10 +573,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
577573
auto blocked_future = blocked_future_result.value();
578574

579575
if (split->is_finished()) {
580-
{
581-
std::ostringstream _oss;
582-
_oss << std::this_thread::get_id();
583-
}
584576
_split_finished(split, split->finished_status());
585577
} else {
586578
if (split->is_auto_reschedule()) {
@@ -625,6 +617,17 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
625617
// In the worst case, the destructor might even try to do something
626618
// with this SplitThreadPool, and produce a deadlock.
627619
// task.runnable.reset();
620+
621+
// IMPORTANT: We must explicitly release 'split' BEFORE acquiring _lock to avoid
622+
// self-deadlock. The destructor chain (PrioritizedSplitRunner -> ScannerSplitRunner
623+
// -> _scan_func lambda -> captured ScannerContext) may call remove_task() which
624+
// tries to acquire _lock. Since _lock is not a recursive mutex, this would deadlock.
625+
{
626+
std::lock_guard<std::mutex> guard(_mutex);
627+
_running_splits.erase(split);
628+
}
629+
split.reset();
630+
628631
l.lock();
629632
thread_pool_task_execution_time_ns_total->increment(
630633
task_execution_time_watch.elapsed_time());

be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ class TimeSharingTaskExecutor : public TaskExecutor {
9797

9898
Status start() override;
9999
void stop() override;
100+
void wait() override;
100101

101102
Result<std::shared_ptr<TaskHandle>> create_task(
102103
const TaskId& task_id, std::function<double()> utilization_supplier,
@@ -245,8 +246,6 @@ class TimeSharingTaskExecutor : public TaskExecutor {
245246
std::unique_lock<std::mutex>& lock);
246247
void _record_leaf_splits_size(std::unique_lock<std::mutex>& lock);
247248
void _split_finished(std::shared_ptr<PrioritizedSplitRunner> split, const Status& status);
248-
// Waits until all the tasks are completed.
249-
void wait();
250249

251250
int64_t _get_running_tasks_for_level(int level) const;
252251

be/src/vec/exec/scan/scanner_scheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler {
268268
void stop() override {
269269
_is_stop.store(true);
270270
_task_executor->stop();
271+
_task_executor->wait();
271272
}
272273

273274
Status start(int max_thread_num, int min_thread_num, int queue_size,

0 commit comments

Comments
 (0)