Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ DEFINE_Int32(task_executor_max_concurrency_per_task, "-1");
DEFINE_Int32(task_executor_initial_max_concurrency_per_task, "-1");

// Enable task executor in internal table scan.
DEFINE_Bool(enable_task_executor_in_internal_table, "false");
DEFINE_Bool(enable_task_executor_in_internal_table, "true");
// Enable task executor in external table scan.
DEFINE_Bool(enable_task_executor_in_external_table, "true");

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/executor/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TaskExecutor {
virtual Status init() = 0;
virtual Status start() = 0;
virtual void stop() = 0;
virtual void wait() = 0;

virtual Result<std::shared_ptr<TaskHandle>> create_task(
const TaskId& task_id, std::function<double()> utilization_supplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
std::lock_guard<std::mutex> guard(_mutex);
_running_splits.insert(split);
}
Defer defer {[&]() {
std::lock_guard<std::mutex> guard(_mutex);
_running_splits.erase(split);
}};

Result<SharedListenableFuture<Void>> blocked_future_result = split->process();

Expand All @@ -577,10 +573,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
auto blocked_future = blocked_future_result.value();

if (split->is_finished()) {
{
std::ostringstream _oss;
_oss << std::this_thread::get_id();
}
_split_finished(split, split->finished_status());
} else {
if (split->is_auto_reschedule()) {
Expand Down Expand Up @@ -625,6 +617,17 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
// In the worst case, the destructor might even try to do something
// with this SplitThreadPool, and produce a deadlock.
// task.runnable.reset();

// IMPORTANT: We must explicitly release 'split' BEFORE acquiring _lock to avoid
// self-deadlock. The destructor chain (PrioritizedSplitRunner -> ScannerSplitRunner
// -> _scan_func lambda -> captured ScannerContext) may call remove_task() which
// tries to acquire _lock. Since _lock is not a recursive mutex, this would deadlock.
{
std::lock_guard<std::mutex> guard(_mutex);
_running_splits.erase(split);
}
split.reset();

l.lock();
thread_pool_task_execution_time_ns_total->increment(
task_execution_time_watch.elapsed_time());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class TimeSharingTaskExecutor : public TaskExecutor {

Status start() override;
void stop() override;
void wait() override;

Result<std::shared_ptr<TaskHandle>> create_task(
const TaskId& task_id, std::function<double()> utilization_supplier,
Expand Down Expand Up @@ -245,8 +246,6 @@ class TimeSharingTaskExecutor : public TaskExecutor {
std::unique_lock<std::mutex>& lock);
void _record_leaf_splits_size(std::unique_lock<std::mutex>& lock);
void _split_finished(std::shared_ptr<PrioritizedSplitRunner> split, const Status& status);
// Waits until all the tasks are completed.
void wait();

int64_t _get_running_tasks_for_level(int level) const;

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler {
void stop() override {
_is_stop.store(true);
_task_executor->stop();
_task_executor->wait();
}

Status start(int max_thread_num, int min_thread_num, int queue_size,
Expand Down
Loading