Skip to content

Commit 4fab545

Browse files
committed
[only-for-debug-test] time sharing task executor timeout test.
1 parent ada3074 commit 4fab545

File tree

2 files changed

+98
-5
lines changed

2 files changed

+98
-5
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ DEFINE_Validator(task_executor_initial_max_concurrency_per_task, [](const int co
317317
return true;
318318
});
319319
// Enable task executor in internal table scan.
320-
DEFINE_Bool(enable_task_executor_in_internal_table, "false");
320+
DEFINE_Bool(enable_task_executor_in_internal_table, "true");
321321
// Enable task executor in external table scan.
322322
DEFINE_Bool(enable_task_executor_in_external_table, "true");
323323

be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp

Lines changed: 97 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,13 @@ Status TimeSharingTaskExecutor::_try_create_thread(int thread_num, std::lock_gua
380380
}
381381

382382
Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunner> split) {
383+
LOG(INFO) << "[TimeSharingTaskExecutor] _do_submit called, split_id: " << split->split_id()
384+
<< ", task_id: " << split->task_handle()->task_id().to_string()
385+
<< ", thread_name: " << _thread_name;
383386
std::unique_lock<std::mutex> l(_lock);
384387
if (!_pool_status.ok()) [[unlikely]] {
388+
LOG(INFO) << "[TimeSharingTaskExecutor] pool_status not ok, split_id: " << split->split_id()
389+
<< ", status: " << _pool_status.to_string();
385390
return _pool_status;
386391
}
387392

@@ -390,10 +395,17 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
390395
_thread_name);
391396
}
392397

398+
std::cout << "submitting split, split_id: " << split->split_id()
399+
<< ", task_id: " << split->task_handle()->task_id().to_string() << std::endl;
400+
393401
// Size limit check.
394402
int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads +
395403
static_cast<int64_t>(_max_queue_size) - _total_queued_tasks;
396404
if (capacity_remaining < 1) {
405+
LOG(INFO) << "[TimeSharingTaskExecutor] capacity full, split_id: " << split->split_id()
406+
<< ", active_threads: " << _active_threads << ", num_threads: " << _num_threads
407+
<< ", max_threads: " << _max_threads << ", total_queued: " << _total_queued_tasks
408+
<< ", max_queue: " << _max_queue_size;
397409
thread_pool_submit_failed->increment(1);
398410
return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
399411
"Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)",
@@ -432,6 +444,12 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
432444
DCHECK(state == SplitThreadPoolToken::State::IDLE ||
433445
state == SplitThreadPoolToken::State::RUNNING);
434446
split->submit_time_watch().start();
447+
LOG(INFO) << "[TimeSharingTaskExecutor] offering split to queue, split_id: "
448+
<< split->split_id() << ", task_id: " << split->task_handle()->task_id().to_string()
449+
<< ", queue_size_before: " << _tokenless->_entries->size()
450+
<< ", total_queued_tasks: " << _total_queued_tasks
451+
<< ", active_threads: " << _active_threads
452+
<< ", idle_threads: " << _idle_threads.size();
435453
_tokenless->_entries->offer(std::move(split));
436454
if (state == SplitThreadPoolToken::State::IDLE) {
437455
_tokenless->transition(SplitThreadPoolToken::State::RUNNING);
@@ -453,8 +471,13 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
453471
// processed by an active thread (or a thread we're about to create) at some
454472
// point in the future.
455473
if (!_idle_threads.empty()) {
474+
LOG(INFO) << "[TimeSharingTaskExecutor] waking up idle thread, idle_threads_count: "
475+
<< _idle_threads.size();
456476
_idle_threads.front().not_empty.notify_one();
457477
_idle_threads.pop_front();
478+
} else {
479+
LOG(INFO) << "[TimeSharingTaskExecutor] no idle threads, split queued, queue_size: "
480+
<< _tokenless->_entries->size();
458481
}
459482
l.unlock();
460483

@@ -511,6 +534,9 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
511534
}
512535

513536
if (_tokenless->_entries->size() == 0) {
537+
LOG(INFO) << "[TimeSharingTaskExecutor] no work, going idle, active_threads: "
538+
<< _active_threads << ", num_threads: " << _num_threads
539+
<< ", total_queued: " << _total_queued_tasks;
514540
// There's no work to do, let's go idle.
515541
//
516542
// Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
@@ -548,6 +574,9 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
548574
// // Get the next token and task to execute.
549575
DCHECK_EQ(SplitThreadPoolToken::State::RUNNING, _tokenless->state());
550576
DCHECK(_tokenless->_entries->size() > 0);
577+
LOG(INFO) << "[TimeSharingTaskExecutor] taking split from queue, queue_size_before: "
578+
<< _tokenless->_entries->size() << ", active_threads: " << _active_threads
579+
<< ", total_queued: " << _total_queued_tasks;
551580
std::shared_ptr<PrioritizedSplitRunner> split = _tokenless->_entries->take();
552581
thread_pool_task_wait_worker_time_ns_total->increment(
553582
split->submit_time_watch().elapsed_time());
@@ -567,8 +596,14 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
567596
_running_splits.erase(split);
568597
}};
569598

599+
LOG(INFO) << "[TimeSharingTaskExecutor] processing split, split_id: " << split->split_id()
600+
<< ", task_id: " << split->task_handle()->task_id().to_string()
601+
<< ", active_threads: " << _active_threads;
570602
Result<SharedListenableFuture<Void>> blocked_future_result = split->process();
571603

604+
LOG(INFO) << "[TimeSharingTaskExecutor] split processed, split_id: " << split->split_id()
605+
<< ", task_id: " << split->task_handle()->task_id().to_string()
606+
<< ", has_error: " << !blocked_future_result.has_value();
572607
if (!blocked_future_result.has_value()) {
573608
LOG(WARNING) << "split process failed, split_id: " << split->split_id()
574609
<< ", status: " << blocked_future_result.error();
@@ -577,35 +612,68 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
577612
auto blocked_future = blocked_future_result.value();
578613

579614
if (split->is_finished()) {
580-
{
581-
std::ostringstream _oss;
582-
_oss << std::this_thread::get_id();
583-
}
615+
LOG(INFO) << "[TimeSharingTaskExecutor] split finished, split_id: "
616+
<< split->split_id()
617+
<< ", task_id: " << split->task_handle()->task_id().to_string()
618+
<< ", status: " << split->finished_status().to_string();
584619
_split_finished(split, split->finished_status());
585620
} else {
621+
LOG(INFO) << "[TimeSharingTaskExecutor] split not finished (blocked), split_id: "
622+
<< split->split_id()
623+
<< ", task_id: " << split->task_handle()->task_id().to_string()
624+
<< ", is_auto_reschedule: " << split->is_auto_reschedule()
625+
<< ", future_done: " << blocked_future.is_done();
586626
if (split->is_auto_reschedule()) {
587627
std::unique_lock<std::mutex> lock(_mutex);
588628
if (blocked_future.is_done()) {
629+
LOG(INFO) << "[TimeSharingTaskExecutor] blocked future already done, "
630+
"re-offering split, split_id: "
631+
<< split->split_id();
589632
lock.unlock();
590633
l.lock();
591634
if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING) {
635+
LOG(INFO) << "[TimeSharingTaskExecutor] re-offering split to queue, "
636+
"split_id: "
637+
<< split->split_id()
638+
<< ", queue_size: " << _tokenless->_entries->size();
592639
_tokenless->_entries->offer(split);
640+
} else {
641+
LOG(INFO) << "[TimeSharingTaskExecutor] token not running, cannot "
642+
"re-offer, split_id: "
643+
<< split->split_id()
644+
<< ", token_state: " << _tokenless->state();
593645
}
594646
l.unlock();
595647
} else {
648+
LOG(INFO) << "[TimeSharingTaskExecutor] registering callback for blocked "
649+
"split, split_id: "
650+
<< split->split_id()
651+
<< ", blocked_splits_count: " << _blocked_splits.size();
596652
_blocked_splits[split] = blocked_future;
597653

598654
_blocked_splits[split].add_callback([this, split](const Void& value,
599655
const Status& status) {
600656
if (status.ok()) {
657+
LOG(INFO) << "[TimeSharingTaskExecutor] blocked split callback ok, "
658+
"split_id: "
659+
<< split->split_id();
601660
{
602661
std::unique_lock<std::mutex> lock(_mutex);
603662
_blocked_splits.erase(split);
604663
}
605664
split->reset_level_priority();
606665
std::unique_lock<std::mutex> l(_lock);
607666
if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING) {
667+
LOG(INFO) << "[TimeSharingTaskExecutor] re-offering unblocked "
668+
"split, split_id: "
669+
<< split->split_id()
670+
<< ", queue_size: " << _tokenless->_entries->size();
608671
_tokenless->_entries->offer(split);
672+
} else {
673+
LOG(INFO) << "[TimeSharingTaskExecutor] token not running, "
674+
"cannot re-offer unblocked split, split_id: "
675+
<< split->split_id()
676+
<< ", token_state: " << _tokenless->state();
609677
}
610678
} else {
611679
LOG(WARNING) << "blocked split is failed, split_id: "
@@ -642,11 +710,22 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
642710
// handle shutdown && idle
643711
if (_tokenless->_active_threads == 0) {
644712
if (state == SplitThreadPoolToken::State::QUIESCING) {
713+
LOG(INFO) << "[TimeSharingTaskExecutor] transitioning to QUIESCED, active_threads: "
714+
"0, queue_size: "
715+
<< _tokenless->_entries->size();
645716
DCHECK(_tokenless->_entries->size() == 0);
646717
_tokenless->transition(SplitThreadPoolToken::State::QUIESCED);
647718
} else if (_tokenless->_entries->size() == 0) {
719+
LOG(INFO) << "[TimeSharingTaskExecutor] transitioning to IDLE, active_threads: 0, "
720+
"queue_size: 0";
648721
_tokenless->transition(SplitThreadPoolToken::State::IDLE);
649722
}
723+
} else {
724+
LOG(INFO)
725+
<< "[TimeSharingTaskExecutor] token still has active threads, active_threads: "
726+
<< _tokenless->_active_threads
727+
<< ", queue_size: " << _tokenless->_entries->size()
728+
<< ", state: " << _tokenless->state();
650729
}
651730

652731
// We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
@@ -731,6 +810,8 @@ Result<std::shared_ptr<TaskHandle>> TimeSharingTaskExecutor::create_task(
731810
const TaskId& task_id, std::function<double()> utilization_supplier,
732811
int initial_split_concurrency, std::chrono::nanoseconds split_concurrency_adjust_frequency,
733812
std::optional<int> max_concurrency_per_task) {
813+
LOG(INFO) << "[TimeSharingTaskExecutor] create_task called, task_id: " << task_id.to_string()
814+
<< ", initial_split_concurrency: " << initial_split_concurrency;
734815
auto task_handle = std::make_shared<TimeSharingTaskHandle>(
735816
task_id, _tokenless->_entries, utilization_supplier, initial_split_concurrency,
736817
split_concurrency_adjust_frequency, max_concurrency_per_task);
@@ -753,6 +834,8 @@ Status TimeSharingTaskExecutor::add_task(const TaskId& task_id,
753834

754835
Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_handle) {
755836
auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
837+
LOG(INFO) << "[TimeSharingTaskExecutor] remove_task called, task_id: "
838+
<< handle->task_id().to_string();
756839
std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
757840

758841
{
@@ -808,6 +891,9 @@ Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_han
808891
Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enqueue_splits(
809892
std::shared_ptr<TaskHandle> task_handle, bool intermediate,
810893
const std::vector<std::shared_ptr<SplitRunner>>& splits) {
894+
LOG(INFO) << "[TimeSharingTaskExecutor] enqueue_splits called, task_id: "
895+
<< task_handle->task_id().to_string() << ", splits_count: " << splits.size()
896+
<< ", intermediate: " << intermediate;
811897
std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
812898
Defer defer {[&]() {
813899
for (auto& split : splits_to_destroy) {
@@ -832,6 +918,8 @@ Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enque
832918
splits_to_destroy.push_back(prioritized_split);
833919
}
834920
} else {
921+
std::cout << "enqueueing split, split_id: " << prioritized_split->split_id()
922+
<< ", task_id: " << handle->task_id().to_string() << std::endl;
835923
if (handle->enqueue_split(prioritized_split)) {
836924
_schedule_task_if_necessary(handle, lock);
837925
_add_new_entrants(lock);
@@ -852,11 +940,16 @@ Status TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> tas
852940
std::shared_ptr<PrioritizedSplitRunner> prioritized_split =
853941
handle->get_split(split, intermediate);
854942
prioritized_split->reset_level_priority();
943+
std::cout << "re-enqueueing split, split_id: " << prioritized_split->split_id()
944+
<< ", task_id: " << task_handle->task_id().to_string() << std::endl;
855945
return _do_submit(prioritized_split);
856946
}
857947

858948
void TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner> split,
859949
const Status& status) {
950+
LOG(INFO) << "[TimeSharingTaskExecutor] _split_finished called, split_id: " << split->split_id()
951+
<< ", task_id: " << split->task_handle()->task_id().to_string()
952+
<< ", status: " << status.to_string() << ", level: " << split->priority().level();
860953
_completed_splits_per_level[split->priority().level()]++;
861954
{
862955
std::unique_lock<std::mutex> lock(_mutex);

0 commit comments

Comments
 (0)