Skip to content

Commit 0eab4f5

Browse files
rmacnak-googleCommit Queue
authored andcommitted
[vm] Let active mutator stealing mark thread pool workers as blocked instead of setting the thread pool to unlimited workers.
TEST=ci (flaky resource exhaustion) Bug: #54687 Change-Id: I4ba7b6ae4d5ceb460c7db883c0733434ae76de19 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/410641 Reviewed-by: Martin Kustermann <[email protected]> Commit-Queue: Ryan Macnak <[email protected]> Reviewed-by: Alexander Aprelev <[email protected]>
1 parent d309ba4 commit 0eab4f5

File tree

7 files changed

+48
-57
lines changed

7 files changed

+48
-57
lines changed

runtime/vm/isolate.cc

Lines changed: 31 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ DEFINE_FLAG_HANDLER(DeterministicModeHandler,
9494

9595
DEFINE_FLAG(bool,
9696
disable_thread_pool_limit,
97-
true,
97+
false,
9898
"Disables the limit of the thread pool (simulates custom embedder "
9999
"with custom message handler on unlimited number of threads).");
100100

@@ -380,10 +380,17 @@ IsolateGroup::IsolateGroup(std::shared_ptr<IsolateGroupSource> source,
380380
{
381381
FlagsCopyFrom(api_flags);
382382
if (!is_vm_isolate) {
383-
thread_pool_.reset(
384-
new MutatorThreadPool(this, FLAG_disable_thread_pool_limit
385-
? 0
386-
: Scavenger::MaxMutatorThreadCount()));
383+
intptr_t max_worker_threads;
384+
if (FLAG_disable_thread_pool_limit) {
385+
max_worker_threads = 0;
386+
} else {
387+
// There needs to be at least one more thread than active mutators slots
388+
// so that there is a thread waiting in IncreaseMutatorCount (instead of
389+
// unscheduled task sitting in the thread pool's queue) to eventually
390+
// timeout and trigger StealActiveMutators.
391+
max_worker_threads = Scavenger::MaxMutatorThreadCount() + 2;
392+
}
393+
thread_pool_.reset(new MutatorThreadPool(this, max_worker_threads));
387394
}
388395
{
389396
WriteRwLocker wl(ThreadState::Current(), isolate_groups_rwlock_);
@@ -587,14 +594,12 @@ void IsolateGroup::set_saved_unlinked_calls(const Array& saved_unlinked_calls) {
587594

588595
static constexpr intptr_t kActiveMutatorPreemptionTimeout = 120;
589596

590-
void IsolateGroup::IncreaseMutatorCount(Isolate* mutator,
591-
bool is_nested_reenter) {
592-
ASSERT(mutator->group() == this);
593-
597+
void IsolateGroup::IncreaseMutatorCount(Thread* thread,
598+
bool is_nested_reenter,
599+
bool was_stolen) {
594600
// If the mutator was temporarily blocked on a worker thread, we have to
595601
// unblock the worker thread again.
596-
if (is_nested_reenter) {
597-
ASSERT(mutator->mutator_thread() != nullptr);
602+
if (is_nested_reenter || was_stolen) {
598603
thread_pool()->MarkCurrentWorkerAsUnBlocked();
599604
}
600605

@@ -610,17 +615,28 @@ void IsolateGroup::IncreaseMutatorCount(Isolate* mutator,
610615
waiting_mutators_++;
611616
bool timed_out = false;
612617
if (has_timeout_waiter_) {
613-
ml.Wait();
618+
if (was_stolen) {
619+
ml.WaitWithSafepointCheck(thread);
620+
} else {
621+
ml.Wait();
622+
}
614623
} else {
615624
has_timeout_waiter_ = true;
616-
timed_out =
617-
ml.Wait(kActiveMutatorPreemptionTimeout) == Monitor::kTimedOut;
625+
if (was_stolen) {
626+
timed_out = ml.WaitWithSafepointCheck(
627+
thread, kActiveMutatorPreemptionTimeout) ==
628+
Monitor::kTimedOut;
629+
} else {
630+
timed_out =
631+
ml.Wait(kActiveMutatorPreemptionTimeout) == Monitor::kTimedOut;
632+
}
618633
has_timeout_waiter_ = false;
619634
}
620635
waiting_mutators_--;
621636

622637
if (timed_out) {
623-
active_mutators_ -= thread_registry()->StealActiveMutators();
638+
active_mutators_ -=
639+
thread_registry()->StealActiveMutators(thread_pool());
624640
ASSERT(active_mutators_ >= 0);
625641
}
626642
}
@@ -636,39 +652,6 @@ void IsolateGroup::IncreaseMutatorCount(Isolate* mutator,
636652
}
637653
}
638654

639-
void IsolateGroup::ReincreaseMutatorCount(Thread* thread) {
640-
MonitorLocker ml(active_mutators_monitor_.get());
641-
ASSERT(active_mutators_ <= max_active_mutators_);
642-
while (active_mutators_ == max_active_mutators_) {
643-
waiting_mutators_++;
644-
bool timed_out = false;
645-
if (has_timeout_waiter_) {
646-
ml.WaitWithSafepointCheck(thread);
647-
} else {
648-
has_timeout_waiter_ = true;
649-
timed_out =
650-
ml.WaitWithSafepointCheck(thread, kActiveMutatorPreemptionTimeout) ==
651-
Monitor::kTimedOut;
652-
has_timeout_waiter_ = false;
653-
}
654-
waiting_mutators_--;
655-
656-
if (timed_out) {
657-
active_mutators_ -= thread_registry()->StealActiveMutators();
658-
ASSERT(active_mutators_ >= 0);
659-
}
660-
}
661-
active_mutators_++;
662-
663-
// StealActiveMutators may cause multiple slots to become available, but
664-
// does not do a NotifyAll to prevent the case of thousands of threads
665-
// waking up to claim a ~dozen slots, so we keep notifying while there are
666-
// both available slots and waiters.
667-
if ((active_mutators_ != max_active_mutators_) && (waiting_mutators_ > 0)) {
668-
ml.Notify();
669-
}
670-
}
671-
672655
void IsolateGroup::DecreaseMutatorCount(Isolate* mutator, bool is_nested_exit) {
673656
ASSERT(mutator->group() == this);
674657

runtime/vm/isolate.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,9 +540,10 @@ class IsolateGroup : public IntrusiveDListEntry<IsolateGroup> {
540540
return thread == nullptr ? nullptr : thread->isolate_group();
541541
}
542542

543-
void IncreaseMutatorCount(Isolate* mutator, bool is_nested_reenter);
543+
void IncreaseMutatorCount(Thread* thread,
544+
bool is_nested_reenter,
545+
bool was_stolen);
544546
void DecreaseMutatorCount(Isolate* mutator, bool is_nested_exit);
545-
void ReincreaseMutatorCount(Thread* thread);
546547
NO_SANITIZE_THREAD
547548
intptr_t MutatorCount() const { return active_mutators_; }
548549

runtime/vm/thread.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ void Thread::EnterIsolate(Isolate* isolate) {
369369

370370
auto group = isolate->group();
371371
if (!(is_nested_reenter && isolate->mutator_thread()->OwnsSafepoint())) {
372-
group->IncreaseMutatorCount(isolate, is_nested_reenter);
372+
group->IncreaseMutatorCount(nullptr, is_nested_reenter, false);
373373
}
374374

375375
// Two threads cannot enter isolate at same time.
@@ -1366,7 +1366,8 @@ void Thread::UnwindScopes(uword stack_marker) {
13661366
}
13671367

13681368
void Thread::HandleStolen() {
1369-
isolate_group()->ReincreaseMutatorCount(this);
1369+
isolate_group()->IncreaseMutatorCount(this, /*is_nested_reenter=*/false,
1370+
/*was_stolen=*/true);
13701371
}
13711372

13721373
void Thread::EnterSafepointUsingLock() {

runtime/vm/thread_pool.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,11 @@ bool ThreadPool::CurrentThreadIsWorker() {
139139
}
140140

141141
void ThreadPool::MarkCurrentWorkerAsBlocked() {
142-
auto worker =
143-
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
142+
MarkWorkerAsBlocked(OSThread::Current());
143+
}
144+
145+
void ThreadPool::MarkWorkerAsBlocked(OSThread* thread) {
146+
auto worker = static_cast<Worker*>(thread->owning_thread_pool_worker_);
144147
Worker* new_worker = nullptr;
145148
if (worker != nullptr) {
146149
MutexLocker ml(&pool_mutex_);
@@ -152,7 +155,7 @@ void ThreadPool::MarkCurrentWorkerAsBlocked() {
152155
// If we have pending tasks and there are no idle workers, we will spawn a
153156
// new thread (temporarily allow exceeding the maximum pool size) to
154157
// handle the pending tasks.
155-
if (idle_workers_.IsEmpty() && pending_tasks_ > 0) {
158+
if (pending_tasks_ > count_idle_) {
156159
new_worker = new Worker(this);
157160
idle_workers_.Append(new_worker);
158161
count_idle_++;
@@ -169,6 +172,7 @@ void ThreadPool::MarkCurrentWorkerAsUnBlocked() {
169172
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
170173
if (worker != nullptr) {
171174
MutexLocker ml(&pool_mutex_);
175+
ASSERT(worker->is_blocked_);
172176
if (worker->is_blocked_) {
173177
worker->is_blocked_ = false;
174178
if (max_pool_size_ > 0) {

runtime/vm/thread_pool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class ThreadPool {
5454
// Mark the current thread as being blocked (e.g. in native code). This might
5555
// temporarily increase the max thread pool size.
5656
void MarkCurrentWorkerAsBlocked();
57+
void MarkWorkerAsBlocked(OSThread* thread);
5758

5859
// Mark the current thread as being unblocked. Must be called iff
5960
// [MarkCurrentWorkerAsBlocked] was called before and the thread is now ready

runtime/vm/thread_registry.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,14 @@ void ThreadRegistry::FlushMarkingStacks() {
115115
}
116116
}
117117

118-
intptr_t ThreadRegistry::StealActiveMutators() {
118+
intptr_t ThreadRegistry::StealActiveMutators(ThreadPool* pool) {
119119
MonitorLocker ml(threads_lock());
120120
intptr_t count = 0;
121121
Thread* thread = active_list_;
122122
while (thread != nullptr) {
123123
if (thread->TryStealActiveMutator()) {
124124
ASSERT(thread->IsDartMutatorThread());
125+
pool->MarkWorkerAsBlocked(thread->os_thread());
125126
count++;
126127
}
127128
thread = thread->next_;

runtime/vm/thread_registry.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class ThreadRegistry {
3838
void AcquireMarkingStacks();
3939
void ReleaseMarkingStacks();
4040
void FlushMarkingStacks();
41-
intptr_t StealActiveMutators();
41+
intptr_t StealActiveMutators(ThreadPool* pool);
4242

4343
// Concurrent-approximate number of active isolates in the active_list
4444
intptr_t active_isolates_count() { return active_isolates_count_.load(); }

0 commit comments

Comments
 (0)