@@ -380,13 +380,8 @@ Status TimeSharingTaskExecutor::_try_create_thread(int thread_num, std::lock_gua
380380}
381381
382382Status TimeSharingTaskExecutor::_do_submit (std::shared_ptr<PrioritizedSplitRunner> split) {
383- LOG (INFO) << " [TimeSharingTaskExecutor] _do_submit called, split_id: " << split->split_id ()
384- << " , task_id: " << split->task_handle ()->task_id ().to_string ()
385- << " , thread_name: " << _thread_name;
386383 std::unique_lock<std::mutex> l (_lock);
387384 if (!_pool_status.ok ()) [[unlikely]] {
388- LOG (INFO) << " [TimeSharingTaskExecutor] pool_status not ok, split_id: " << split->split_id ()
389- << " , status: " << _pool_status.to_string ();
390385 return _pool_status;
391386 }
392387
@@ -395,17 +390,10 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
395390 _thread_name);
396391 }
397392
398- LOG (INFO) << " submitting split, split_id: " << split->split_id ()
399- << " , task_id: " << split->task_handle ()->task_id ().to_string ();
400-
401393 // Size limit check.
402394 int64_t capacity_remaining = static_cast <int64_t >(_max_threads) - _active_threads +
403395 static_cast <int64_t >(_max_queue_size) - _total_queued_tasks;
404396 if (capacity_remaining < 1 ) {
405- LOG (INFO) << " [TimeSharingTaskExecutor] capacity full, split_id: " << split->split_id ()
406- << " , active_threads: " << _active_threads << " , num_threads: " << _num_threads
407- << " , max_threads: " << _max_threads << " , total_queued: " << _total_queued_tasks
408- << " , max_queue: " << _max_queue_size;
409397 thread_pool_submit_failed->increment (1 );
410398 return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
411399 " Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)" ,
@@ -444,12 +432,6 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
444432 DCHECK (state == SplitThreadPoolToken::State::IDLE ||
445433 state == SplitThreadPoolToken::State::RUNNING);
446434 split->submit_time_watch ().start ();
447- LOG (INFO) << " [TimeSharingTaskExecutor] offering split to queue, split_id: "
448- << split->split_id () << " , task_id: " << split->task_handle ()->task_id ().to_string ()
449- << " , queue_size_before: " << _tokenless->_entries ->size ()
450- << " , total_queued_tasks: " << _total_queued_tasks
451- << " , active_threads: " << _active_threads
452- << " , idle_threads: " << _idle_threads.size ();
453435 _tokenless->_entries ->offer (std::move (split));
454436 if (state == SplitThreadPoolToken::State::IDLE) {
455437 _tokenless->transition (SplitThreadPoolToken::State::RUNNING);
@@ -471,13 +453,8 @@ Status TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
471453 // processed by an active thread (or a thread we're about to create) at some
472454 // point in the future.
473455 if (!_idle_threads.empty ()) {
474- LOG (INFO) << " [TimeSharingTaskExecutor] waking up idle thread, idle_threads_count: "
475- << _idle_threads.size ();
476456 _idle_threads.front ().not_empty .notify_one ();
477457 _idle_threads.pop_front ();
478- } else {
479- LOG (INFO) << " [TimeSharingTaskExecutor] no idle threads, split queued, queue_size: "
480- << _tokenless->_entries ->size ();
481458 }
482459 l.unlock ();
483460
@@ -534,9 +511,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
534511 }
535512
536513 if (_tokenless->_entries ->size () == 0 ) {
537- LOG (INFO) << " [TimeSharingTaskExecutor] no work, going idle, active_threads: "
538- << _active_threads << " , num_threads: " << _num_threads
539- << " , total_queued: " << _total_queued_tasks;
540514 // There's no work to do, let's go idle.
541515 //
542516 // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
@@ -574,9 +548,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
574548 // // Get the next token and task to execute.
575549 DCHECK_EQ (SplitThreadPoolToken::State::RUNNING, _tokenless->state ());
576550 DCHECK (_tokenless->_entries ->size () > 0 );
577- LOG (INFO) << " [TimeSharingTaskExecutor] taking split from queue, queue_size_before: "
578- << _tokenless->_entries ->size () << " , active_threads: " << _active_threads
579- << " , total_queued: " << _total_queued_tasks;
580551 std::shared_ptr<PrioritizedSplitRunner> split = _tokenless->_entries ->take ();
581552 thread_pool_task_wait_worker_time_ns_total->increment (
582553 split->submit_time_watch ().elapsed_time ());
@@ -592,14 +563,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
592563 _running_splits.insert (split);
593564 }
594565
595- LOG (INFO) << " [TimeSharingTaskExecutor] processing split, split_id: " << split->split_id ()
596- << " , task_id: " << split->task_handle ()->task_id ().to_string ()
597- << " , active_threads: " << _active_threads;
598566 Result<SharedListenableFuture<Void>> blocked_future_result = split->process ();
599567
600- LOG (INFO) << " [TimeSharingTaskExecutor] split processed, split_id: " << split->split_id ()
601- << " , task_id: " << split->task_handle ()->task_id ().to_string ()
602- << " , has_error: " << !blocked_future_result.has_value ();
603568 if (!blocked_future_result.has_value ()) {
604569 LOG (WARNING) << " split process failed, split_id: " << split->split_id ()
605570 << " , status: " << blocked_future_result.error ();
@@ -608,68 +573,31 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
608573 auto blocked_future = blocked_future_result.value ();
609574
610575 if (split->is_finished ()) {
611- LOG (INFO) << " [TimeSharingTaskExecutor] split finished, split_id: "
612- << split->split_id ()
613- << " , task_id: " << split->task_handle ()->task_id ().to_string ()
614- << " , status: " << split->finished_status ().to_string ();
615576 _split_finished (split, split->finished_status ());
616577 } else {
617- LOG (INFO) << " [TimeSharingTaskExecutor] split not finished (blocked), split_id: "
618- << split->split_id ()
619- << " , task_id: " << split->task_handle ()->task_id ().to_string ()
620- << " , is_auto_reschedule: " << split->is_auto_reschedule ()
621- << " , future_done: " << blocked_future.is_done ();
622578 if (split->is_auto_reschedule ()) {
623579 std::unique_lock<std::mutex> lock (_mutex);
624580 if (blocked_future.is_done ()) {
625- LOG (INFO) << " [TimeSharingTaskExecutor] blocked future already done, "
626- " re-offering split, split_id: "
627- << split->split_id ();
628581 lock.unlock ();
629582 l.lock ();
630583 if (_tokenless->state () == SplitThreadPoolToken::State::RUNNING) {
631- LOG (INFO) << " [TimeSharingTaskExecutor] re-offering split to queue, "
632- " split_id: "
633- << split->split_id ()
634- << " , queue_size: " << _tokenless->_entries ->size ();
635584 _tokenless->_entries ->offer (split);
636- } else {
637- LOG (INFO) << " [TimeSharingTaskExecutor] token not running, cannot "
638- " re-offer, split_id: "
639- << split->split_id ()
640- << " , token_state: " << _tokenless->state ();
641585 }
642586 l.unlock ();
643587 } else {
644- LOG (INFO) << " [TimeSharingTaskExecutor] registering callback for blocked "
645- " split, split_id: "
646- << split->split_id ()
647- << " , blocked_splits_count: " << _blocked_splits.size ();
648588 _blocked_splits[split] = blocked_future;
649589
650590 _blocked_splits[split].add_callback ([this , split](const Void& value,
651591 const Status& status) {
652592 if (status.ok ()) {
653- LOG (INFO) << " [TimeSharingTaskExecutor] blocked split callback ok, "
654- " split_id: "
655- << split->split_id ();
656593 {
657594 std::unique_lock<std::mutex> lock (_mutex);
658595 _blocked_splits.erase (split);
659596 }
660597 split->reset_level_priority ();
661598 std::unique_lock<std::mutex> l (_lock);
662599 if (_tokenless->state () == SplitThreadPoolToken::State::RUNNING) {
663- LOG (INFO) << " [TimeSharingTaskExecutor] re-offering unblocked "
664- " split, split_id: "
665- << split->split_id ()
666- << " , queue_size: " << _tokenless->_entries ->size ();
667600 _tokenless->_entries ->offer (split);
668- } else {
669- LOG (INFO) << " [TimeSharingTaskExecutor] token not running, "
670- " cannot re-offer unblocked split, split_id: "
671- << split->split_id ()
672- << " , token_state: " << _tokenless->state ();
673601 }
674602 } else {
675603 LOG (WARNING) << " blocked split is failed, split_id: "
@@ -688,7 +616,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
688616 // objects, and we don't want to block submission of the SplitThreadPool.
689617 // In the worst case, the destructor might even try to do something
690618 // with this SplitThreadPool, and produce a deadlock.
691- //
619+ // task.runnable.reset();
620+
692621 // IMPORTANT: We must explicitly release 'split' BEFORE acquiring _lock to avoid
693622 // self-deadlock. The destructor chain (PrioritizedSplitRunner -> ScannerSplitRunner
694623 // -> _scan_func lambda -> captured ScannerContext) may call remove_task() which
@@ -716,22 +645,11 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
716645 // handle shutdown && idle
717646 if (_tokenless->_active_threads == 0 ) {
718647 if (state == SplitThreadPoolToken::State::QUIESCING) {
719- LOG (INFO) << " [TimeSharingTaskExecutor] transitioning to QUIESCED, active_threads: "
720- " 0, queue_size: "
721- << _tokenless->_entries ->size ();
722648 DCHECK (_tokenless->_entries ->size () == 0 );
723649 _tokenless->transition (SplitThreadPoolToken::State::QUIESCED);
724650 } else if (_tokenless->_entries ->size () == 0 ) {
725- LOG (INFO) << " [TimeSharingTaskExecutor] transitioning to IDLE, active_threads: 0, "
726- " queue_size: 0" ;
727651 _tokenless->transition (SplitThreadPoolToken::State::IDLE);
728652 }
729- } else {
730- LOG (INFO)
731- << " [TimeSharingTaskExecutor] token still has active threads, active_threads: "
732- << _tokenless->_active_threads
733- << " , queue_size: " << _tokenless->_entries ->size ()
734- << " , state: " << _tokenless->state ();
735653 }
736654
737655 // We decrease _num_submitted_tasks holding lock, so the following DCHECK works.
@@ -816,8 +734,6 @@ Result<std::shared_ptr<TaskHandle>> TimeSharingTaskExecutor::create_task(
816734 const TaskId& task_id, std::function<double ()> utilization_supplier,
817735 int initial_split_concurrency, std::chrono::nanoseconds split_concurrency_adjust_frequency,
818736 std::optional<int> max_concurrency_per_task) {
819- LOG (INFO) << " [TimeSharingTaskExecutor] create_task called, task_id: " << task_id.to_string ()
820- << " , initial_split_concurrency: " << initial_split_concurrency;
821737 auto task_handle = std::make_shared<TimeSharingTaskHandle>(
822738 task_id, _tokenless->_entries , utilization_supplier, initial_split_concurrency,
823739 split_concurrency_adjust_frequency, max_concurrency_per_task);
@@ -840,8 +756,6 @@ Status TimeSharingTaskExecutor::add_task(const TaskId& task_id,
840756
841757Status TimeSharingTaskExecutor::remove_task (std::shared_ptr<TaskHandle> task_handle) {
842758 auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
843- LOG (INFO) << " [TimeSharingTaskExecutor] remove_task called, task_id: "
844- << handle->task_id ().to_string ();
845759 std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
846760
847761 {
@@ -897,9 +811,6 @@ Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_han
897811Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enqueue_splits (
898812 std::shared_ptr<TaskHandle> task_handle, bool intermediate,
899813 const std::vector<std::shared_ptr<SplitRunner>>& splits) {
900- LOG (INFO) << " [TimeSharingTaskExecutor] enqueue_splits called, task_id: "
901- << task_handle->task_id ().to_string () << " , splits_count: " << splits.size ()
902- << " , intermediate: " << intermediate;
903814 std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
904815 Defer defer {[&]() {
905816 for (auto & split : splits_to_destroy) {
@@ -924,8 +835,6 @@ Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enque
924835 splits_to_destroy.push_back (prioritized_split);
925836 }
926837 } else {
927- LOG (INFO) << " enqueueing split, split_id: " << prioritized_split->split_id ()
928- << " , task_id: " << handle->task_id ().to_string ();
929838 if (handle->enqueue_split (prioritized_split)) {
930839 _schedule_task_if_necessary (handle, lock);
931840 _add_new_entrants (lock);
@@ -946,16 +855,11 @@ Status TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> tas
946855 std::shared_ptr<PrioritizedSplitRunner> prioritized_split =
947856 handle->get_split (split, intermediate);
948857 prioritized_split->reset_level_priority ();
949- LOG (INFO) << " re-enqueueing split, split_id: " << prioritized_split->split_id ()
950- << " , task_id: " << task_handle->task_id ().to_string ();
951858 return _do_submit (prioritized_split);
952859}
953860
954861void TimeSharingTaskExecutor::_split_finished (std::shared_ptr<PrioritizedSplitRunner> split,
955862 const Status& status) {
956- LOG (INFO) << " [TimeSharingTaskExecutor] _split_finished called, split_id: " << split->split_id ()
957- << " , task_id: " << split->task_handle ()->task_id ().to_string ()
958- << " , status: " << status.to_string () << " , level: " << split->priority ().level ();
959863 _completed_splits_per_level[split->priority ().level ()]++;
960864 {
961865 std::unique_lock<std::mutex> lock (_mutex);
0 commit comments