Skip to content

Commit f39de10

Browse files
committed
runtime: minor fixes and improvements for scheduler
1 parent 8380fdd commit f39de10

File tree

1 file changed

+164
-60
lines changed

1 file changed

+164
-60
lines changed

std/runtime/proc.jule

Lines changed: 164 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,22 @@ fn globrunqget(mut pp: &p, max: i32, mut &cp: *c): bool {
340340
ret true
341341
}
342342

343+
// runqempty reports whether pp has no Cs on its local run queue.
344+
// It never returns true spuriously.
345+
fn runqempty(pp: &p): bool {
346+
// We must guard against races between concurrent runqput/runqget
347+
// operations. Observing head == tail is only valid if tail is stable.
348+
for {
349+
head := atomic::Load(&pp.runqhead, atomic::Acquire)
350+
tail := atomic::Load(&pp.runqtail, atomic::Acquire)
351+
352+
// Re-read tail to ensure it has not changed while reading head.
353+
if tail == atomic::Load(&pp.runqtail, atomic::Acquire) {
354+
ret head == tail
355+
}
356+
}
357+
}
358+
343359
// Tries to put cp on the local runnable queue.
344360
// If next is false, it adds cp to the tail of the runnable queue if runnext is used.
345361
// If next is true, runqput puts cp in the pp.runnext slot.
@@ -599,6 +615,7 @@ fn runqsteal(mut pp: &p, mut pp2: &p, mut &cp: *c): bool {
599615
// Tries to steal a batch of coroutines or a timer for pp.
600616
// Writes one of the stolen coroutines to cp.
601617
// Reports false if failed, otherwise true.
618+
#disable nilptr boundary
602619
fn stealWork(mut pp: &p, mut &cp: *c): (stealC: bool, stealTimer: bool) {
603620
const StealTries = 4
604621
mut n := 0
@@ -629,6 +646,30 @@ fn stealWork(mut pp: &p, mut &cp: *c): (stealC: bool, stealTimer: bool) {
629646
ret false, false
630647
}
631648

649+
// Check all Ps for a runnable C to steal.
650+
//
651+
// On entry we have no P. If a C is available to steal and a P is available,
652+
// the P is returned which the caller should acquire and attempt to steal the
653+
// work to.
654+
#disable nilptr
655+
fn checkRunqsNoP(): &p {
656+
for _, pp2 in sched.allp {
657+
if !runqempty(pp2) {
658+
sched.mu.lock()
659+
mut pp := pidlegetSpinning()
660+
if pp == nil {
661+
// Can't get a P, don't bother checking remaining Ps.
662+
sched.mu.unlock()
663+
ret nil
664+
}
665+
sched.mu.unlock()
666+
ret pp
667+
}
668+
}
669+
// No work available.
670+
ret nil
671+
}
672+
632673
// Selects a runnable coroutine for execution.
633674
//
634675
// This function is one of the core components of the coroutine scheduler.
@@ -645,19 +686,25 @@ fn stealWork(mut pp: &p, mut &cp: *c): (stealC: bool, stealTimer: bool) {
645686
// requiring sched.lock and increasing contention under high connection
646687
// counts (e.g. many concurrent network wakeups).
647688
//
648-
// runqputbatch places the goroutines directly onto the local run queue of
689+
// runqputbatch places the coroutines directly onto the local run queue of
649690
// the current P, preserving cache locality and avoiding global scheduler
650691
// contention. This improves throughput for event-driven workloads without
651692
// violating scheduler invariants.
652693
//
653694
// Sysmon and other scheduler systems intentionally will use the
654695
// `injectclist`, as they operate without a P and must ensure global progress.
696+
//
697+
// It never returns without a runnable coroutine.
655698
#disable nilptr boundary
656-
fn findRunnable(): (coro: c, ok: bool) {
699+
fn findRunnable(): (coro: c) {
700+
let mut ok: bool
657701
mut m := gett()
658-
mut pp := m.pp
659702

703+
// Place the "top" label here because P may change if M is detached from its current P.
704+
// To ensure we observe the most recent P paired with M, we must reload it.
660705
top:
706+
mut pp := m.pp
707+
661708
// Check the global runnable queue once in a while to ensure fairness.
662709
// Otherwise two coroutines can completely occupy the local runqueue
663710
// by constantly respawning each other.
@@ -666,14 +713,14 @@ top:
666713
ok = globrunqget(pp, 1, &coro)
667714
sched.mu.unlock()
668715
if ok {
669-
ret coro, true
716+
ret
670717
}
671718
}
672719

673720
// Local runnable queue.
674721
ok = runqget(pp, &coro)
675722
if ok {
676-
ret coro, true
723+
ret
677724
}
678725

679726
// Local queue empty: attempt to fetch a batch from the global run queue.
@@ -685,7 +732,7 @@ top:
685732
ok = globrunqget(pp, 0, &coro)
686733
sched.mu.unlock()
687734
if ok {
688-
ret coro, true
735+
ret
689736
}
690737
}
691738

@@ -703,7 +750,7 @@ top:
703750
coro = toRun[0]
704751
runqputbatch(pp, &toRun, 1, bn)
705752
eventpollAdjustWaiters(-i32(bn))
706-
ret coro, true
753+
ret
707754
}
708755
}
709756

@@ -717,7 +764,7 @@ top:
717764
}
718765
stealC, stealTimer := stealWork(pp, &coro)
719766
if stealC {
720-
ret coro, true
767+
ret
721768
}
722769
if stealTimer {
723770
mut timer := pp.timers.cache
@@ -730,18 +777,13 @@ top:
730777
| fireCTimer:
731778
// The timer associated with a coroutine,
732779
// let the scheduler handle this.
733-
ret coro, true
780+
ret
734781
|:
735782
panic("unreachable")
736783
}
737784
}
738785
}
739786

740-
if !m.spinning && atomic::Load(&sched.needspinning, atomic::Acquire) == 1 {
741-
becomeSpinning(m)
742-
goto top
743-
}
744-
745787
// No runnable coroutine found yet.
746788
// Determine how long we can block based on timers and deadlines.
747789
// If pollUntil is -1, eventpoll will block indefinitely.
@@ -755,17 +797,95 @@ top:
755797
| fireCTimer:
756798
// The timer associated with a coroutine,
757799
// let the scheduler handle this.
758-
ret coro, true
800+
ret
759801
|:
760802
panic("unreachable")
761803
}
762804
}
763805

806+
// Return P.
807+
sched.mu.lock()
808+
// Check the global runq last time.
809+
if sched.runq.len > 0 {
810+
ok = globrunqget(pp, 0, &coro)
811+
if ok {
812+
sched.mu.unlock()
813+
ret
814+
}
815+
}
816+
if !m.spinning && atomic::Load(&sched.needspinning, atomic::Acquire) == 1 {
817+
// See "Delicate dance" comment below.
818+
becomeSpinning(m)
819+
sched.mu.unlock()
820+
goto top
821+
}
822+
pidleput(m.pp)
823+
m.pp = nil
824+
sched.mu.unlock()
825+
826+
// Delicate dance: thread transitions from spinning to non-spinning
827+
// state, potentially concurrently with submission of new work. We must
828+
// drop nmspinning first and then check all sources again.
829+
// If we do it the other way around, another thread can submit work
830+
// after we've checked all sources but before we drop nmspinning;
831+
// as a result nobody will unpark a thread to run the work.
832+
//
833+
// If we discover new work below, we need to restore m.spinning as a
834+
// signal for resetspinning to unpark a new worker thread (because
835+
// there can be more than one starving coroutine).
836+
//
837+
// However, if after discovering new work we also observe no idle Ps
838+
// (either here or in resetspinning), we have a problem. We may be
839+
// racing with a non-spinning M in the block above, having found no
840+
// work and preparing to release its P and park. Allowing that P to go
841+
// idle will result in loss of work conservation (idle P while there is
842+
// runnable work). This could result in complete deadlock in the
843+
// unlikely event that we discover new work (from netpoll) right as we
844+
// are racing with _all_ other Ps going idle.
845+
//
846+
// We use sched.needspinning to synchronize with non-spinning Ms going
847+
// idle. If needspinning is set when they are about to drop their P,
848+
// they abort the drop and instead become a new spinning M on our
849+
// behalf. If we are not racing and the system is truly fully loaded
850+
// then no spinning threads are required, and the next thread to
851+
// naturally become spinning will clear the flag.
852+
wasSpinning := m.spinning
764853
if m.spinning {
765854
m.spinning = false
766855
if atomic::Add(&sched.nmspinning, -1, atomic::Relaxed) < 0 {
767856
panic("findrunnable: negative nmspinning")
768857
}
858+
859+
// Note the for correctness, only the last M transitioning from
860+
// spinning to non-spinning must perform these rechecks to
861+
// ensure no missed work. However, the runtime has some cases
862+
// of transient increments of nmspinning that are decremented
863+
// without going through this path, so we must be conservative
864+
// and perform the check on all spinning Ms.
865+
//
866+
// See Go's issue: https://go.dev/issue/43997.
867+
868+
// Check global and P runqueues again.
869+
870+
sched.mu.lock()
871+
if sched.runq.len > 0 {
872+
mut pp2 := pidlegetSpinning()
873+
if pp2 != nil {
874+
globrunqget(pp2, 0, &coro)
875+
sched.mu.unlock()
876+
m.pp = pp2
877+
becomeSpinning(m)
878+
ret
879+
}
880+
}
881+
sched.mu.unlock()
882+
883+
mut pp2 := checkRunqsNoP()
884+
if pp2 != nil {
885+
m.pp = pp2
886+
becomeSpinning(m)
887+
goto top
888+
}
769889
}
770890

771891
// Final step: eventpoll.
@@ -775,55 +895,43 @@ top:
775895
bn := eventpoll(pollUntil, &toRun)
776896
now := nanotime()
777897
atomic::Store(&sched.lastpoll, now, atomic::Release)
778-
if bn > 0 {
779-
coro = toRun[0]
780-
runqputbatch(pp, &toRun, 1, bn)
781-
eventpollAdjustWaiters(-i32(bn))
782-
ret coro, true
898+
sched.mu.lock()
899+
mut pp2 := pidleget()
900+
sched.mu.unlock()
901+
m.pp = pp2
902+
if m.pp == nil {
903+
if bn > 0 {
904+
injectclist(&toRun, 0, bn)
905+
eventpollAdjustWaiters(-i32(bn))
906+
}
907+
} else {
908+
if bn > 0 {
909+
coro = toRun[0]
910+
runqputbatch(pp, &toRun, 1, bn)
911+
eventpollAdjustWaiters(-i32(bn))
912+
ret
913+
}
914+
if wasSpinning {
915+
becomeSpinning(m)
916+
}
917+
goto top
783918
}
784919
}
785920

786-
ret coro, false
921+
stopm(m)
922+
goto top
787923
}
788924

789925
// A coroutine scheduler routine.
790926
// Works like and event-loop, looks for a coroutine and executes it.
791927
#disable nilptr
792928
fn schedule() {
793929
mut m := gett()
794-
Sched:
795-
m.pp.schedtick = 0
796930
for {
797-
// Poll a coroutine to run.
798-
mut c, ok := findRunnable()
799-
// findRunnable failed to find a runnable coroutine.
800-
if !ok {
801-
// There are coroutines polling for I/O and
802-
// there is no M looking for eventpoll.
803-
if sched.ncpolling() > 0 &&
804-
atomic::Load(&sched.lastpoll, atomic::Acquire) != pollCheckIn {
805-
continue
806-
}
807-
// There are timers in this worker, clean them.
808-
// Normally, eventpoll returns a timer if exist.
809-
// Somehow, a spurious wakeup occurred.
810-
m.pp.timers.mu.lock()
811-
ntimers := m.pp.timers.len()
812-
m.pp.timers.mu.unlock()
813-
if ntimers > 0 {
814-
continue
815-
}
816-
// We tried enough.
817-
// Nothing to do, break the loop.
818-
// Park the M.
819-
if m.spinning {
820-
m.spinning = false
821-
if atomic::Add(&sched.nmspinning, -1, atomic::Relaxed) < 0 {
822-
panic("findrunnable: negative nmspinning")
823-
}
824-
}
825-
break
826-
}
931+
// Poll a C to run and assign it to M.
932+
m.c = findRunnable()
933+
m.c.state |= coroRunning
934+
827935
// This thread is going to run a coroutine and is not spinning anymore,
828936
// so if it was marked as spinning we need to reset it now and potentially
829937
// start a new spinning M.
@@ -837,10 +945,6 @@ Sched:
837945
// Refresh the budget for the new C.
838946
m.pp.budget = pbudget
839947

840-
// Assign C to M.
841-
m.c = c
842-
m.c.state |= coroRunning
843-
844948
// Run C.
845949
sched.enterrun()
846950
resume(&m.c)
@@ -857,8 +961,6 @@ Sched:
857961
close(&m.c)
858962
}
859963
}
860-
stopm(m)
861-
goto Sched
862964
}
863965

864966
// Start point of a M.
@@ -1139,7 +1241,9 @@ fn stopm(mut m: &thread) {
11391241
sched.mu.lock()
11401242
sched.nm--
11411243
checkdead()
1142-
pidleput(m.pp)
1244+
if m.pp != nil {
1245+
panic("runtime: stopm with a P")
1246+
}
11431247
mput(m)
11441248
sched.mu.unlock()
11451249
m.parker.park()

0 commit comments

Comments
 (0)