Skip to content

Commit bddfb89

Browse files
Introduce internal API to run the current work queue of the system dispatcher #3641)
* Added internal API to run both Dispatchers.Default and Dispatchers.IO * Added internal API to check whether the current thread is IO * Reworked WorkQueue API to have the concept of polling and stealing in "exclusive" modes for the sake of these APIs Fixes #3439 Co-authored-by: Dmitry Khalanskiy <[email protected]>
1 parent 1b414a9 commit bddfb89

File tree

7 files changed

+271
-27
lines changed

7 files changed

+271
-27
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,9 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
311311
}
312312

313313
public final class kotlinx/coroutines/EventLoopKt {
314+
public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z
314315
public static final fun processNextEventInCurrentThread ()J
316+
public static final fun runSingleTaskFromCurrentSystemDispatcher ()J
315317
}
316318

317319
public final class kotlinx/coroutines/ExceptionsKt {

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ import kotlin.jvm.*
1313
* The result of .limitedParallelism(x) call, a dispatcher
1414
* that wraps the given dispatcher, but limits the parallelism level, while
1515
* trying to emulate fairness.
16+
*
17+
* ### Implementation details
18+
*
19+
* By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
20+
* to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
21+
* dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
22+
* in order to avoid starvation of the underlying dispatcher.
23+
*
24+
* Such behavior is crucial to be compatible with any underlying dispatcher implementation without
25+
* direct cooperation.
1626
*/
1727
internal class LimitedDispatcher(
1828
private val dispatcher: CoroutineDispatcher,

kotlinx-coroutines-core/jvm/src/EventLoop.kt

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.Runnable
8+
import kotlinx.coroutines.scheduling.*
9+
import kotlinx.coroutines.scheduling.CoroutineScheduler
10+
711
internal actual abstract class EventLoopImplPlatform: EventLoop() {
812
protected abstract val thread: Thread
913

@@ -45,6 +49,80 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr
4549
*/
4650
@InternalCoroutinesApi
4751
public fun processNextEventInCurrentThread(): Long =
52+
// This API is used in Ktor for serverless integration where a single thread awaits a blocking call
53+
// (and, to avoid actual blocking, does something via this call), see #850
4854
ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
4955

5056
internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
57+
58+
/**
59+
* Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
60+
* Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
61+
* (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
62+
*
63+
* ### Invariants
64+
*
65+
* - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
66+
* [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
67+
* up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
68+
* **are not** executed[1].
69+
* - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
70+
*
71+
* [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
72+
* and we cannot leave this method without leaving thread in its original state.
73+
*
74+
* ### Rationale
75+
*
76+
* This is an internal API that is intended to replace IDEA's core FJP decomposition.
77+
* The following API is provided by IDEA core:
78+
* ```
79+
* runDecomposedTaskAndJoinIt { // <- non-suspending call
80+
* // spawn as many tasks as needed
81+
* // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
82+
* }
83+
* ```
84+
* The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
85+
* thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
86+
* 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks
87+
* until an arbitrary condition is satisfied.
88+
*
89+
* See #3439 for additional details.
90+
*
91+
* ### Limitations and caveats
92+
*
93+
* - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
94+
* - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds
95+
* even when work arrives immediately after returning from this method.
96+
* - When there is no more work, it's up to the caller to decide what to do. It's important to remember that
97+
* work to current dispatcher may arrive **later** from external sources [1]
98+
*
99+
* [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
100+
* a tremendous effort.
101+
*
102+
* @throws IllegalStateException if the current thread is not system dispatcher thread
103+
*/
104+
@InternalCoroutinesApi
105+
@DelicateCoroutinesApi
106+
@PublishedApi
107+
internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
108+
val thread = Thread.currentThread()
109+
if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
110+
return thread.runSingleTask()
111+
}
112+
113+
/**
114+
* Checks whether the given thread belongs to Dispatchers.IO.
115+
* Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
116+
* may change this status when switching between tasks.
117+
*
118+
* This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
119+
* purposes, and is declared as an extension only to avoid top-level scope pollution.
120+
*/
121+
@InternalCoroutinesApi
122+
@DelicateCoroutinesApi
123+
@PublishedApi
124+
internal fun Thread.isIoDispatcherThread(): Boolean {
125+
if (this !is CoroutineScheduler.Worker) return false
126+
return isIo()
127+
}
128+

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,30 @@ internal class CoroutineScheduler(
720720
tryReleaseCpu(WorkerState.TERMINATED)
721721
}
722722

723+
/**
724+
* See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
725+
* This is a fine-tailored method for a specific use-case not expected to be used widely.
726+
*/
727+
fun runSingleTask(): Long {
728+
val stateSnapshot = state
729+
val isCpuThread = state == WorkerState.CPU_ACQUIRED
730+
val task = if (isCpuThread) {
731+
findCpuTask()
732+
} else {
733+
findBlockingTask()
734+
}
735+
if (task == null) {
736+
if (minDelayUntilStealableTaskNs == 0L) return -1L
737+
return minDelayUntilStealableTaskNs
738+
}
739+
runSafely(task)
740+
if (!isCpuThread) decrementBlockingTasks()
741+
assert { state == stateSnapshot}
742+
return 0L
743+
}
744+
745+
fun isIo() = state == WorkerState.BLOCKING
746+
723747
// Counterpart to "tryUnpark"
724748
private fun tryPark() {
725749
if (!inStack()) {
@@ -879,9 +903,21 @@ internal class CoroutineScheduler(
879903
* * Check if our queue has one (maybe mixed in with CPU tasks)
880904
* * Poll global and try steal
881905
*/
906+
return findBlockingTask()
907+
}
908+
909+
// NB: ONLY for runSingleTask method
910+
private fun findBlockingTask(): Task? {
882911
return localQueue.pollBlocking()
883912
?: globalBlockingQueue.removeFirstOrNull()
884-
?: trySteal(blockingOnly = true)
913+
?: trySteal(STEAL_BLOCKING_ONLY)
914+
}
915+
916+
// NB: ONLY for runSingleTask method
917+
private fun findCpuTask(): Task? {
918+
return localQueue.pollCpu()
919+
?: globalBlockingQueue.removeFirstOrNull()
920+
?: trySteal(STEAL_CPU_ONLY)
885921
}
886922

887923
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
@@ -897,7 +933,7 @@ internal class CoroutineScheduler(
897933
} else {
898934
pollGlobalQueues()?.let { return it }
899935
}
900-
return trySteal(blockingOnly = false)
936+
return trySteal(STEAL_ANY)
901937
}
902938

903939
private fun pollGlobalQueues(): Task? {
@@ -910,7 +946,7 @@ internal class CoroutineScheduler(
910946
}
911947
}
912948

913-
private fun trySteal(blockingOnly: Boolean): Task? {
949+
private fun trySteal(stealingMode: StealingMode): Task? {
914950
val created = createdWorkers
915951
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
916952
if (created < 2) {
@@ -924,11 +960,7 @@ internal class CoroutineScheduler(
924960
if (currentIndex > created) currentIndex = 1
925961
val worker = workers[currentIndex]
926962
if (worker !== null && worker !== this) {
927-
val stealResult = if (blockingOnly) {
928-
worker.localQueue.tryStealBlocking(stolenTask)
929-
} else {
930-
worker.localQueue.trySteal(stolenTask)
931-
}
963+
val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
932964
if (stealResult == TASK_STOLEN) {
933965
val result = stolenTask.element
934966
stolenTask.element = null

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
1616
internal const val TASK_STOLEN = -1L
1717
internal const val NOTHING_TO_STEAL = -2L
1818

19+
internal typealias StealingMode = Int
20+
internal const val STEAL_ANY: StealingMode = 3
21+
internal const val STEAL_CPU_ONLY: StealingMode = 2
22+
internal const val STEAL_BLOCKING_ONLY: StealingMode = 1
23+
24+
internal inline val Task.maskForStealingMode: Int
25+
get() = if (isBlocking) STEAL_BLOCKING_ONLY else STEAL_CPU_ONLY
26+
1927
/**
2028
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
2129
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
@@ -107,54 +115,75 @@ internal class WorkQueue {
107115
*
108116
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
109117
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
118+
*
119+
* [StealingMode] controls what tasks to steal:
120+
* * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen
121+
* * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task, which is used by the scheduler when helping in Dispatchers.IO mode
122+
* * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher`
110123
*/
111-
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long {
112-
val task = pollBuffer()
124+
fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
125+
val task = when (stealingMode) {
126+
STEAL_ANY -> pollBuffer()
127+
else -> stealWithExclusiveMode(stealingMode)
128+
}
129+
113130
if (task != null) {
114131
stolenTaskRef.element = task
115132
return TASK_STOLEN
116133
}
117-
return tryStealLastScheduled(stolenTaskRef, blockingOnly = false)
134+
return tryStealLastScheduled(stealingMode, stolenTaskRef)
118135
}
119136

120-
fun tryStealBlocking(stolenTaskRef: ObjectRef<Task?>): Long {
137+
// Steal only tasks of a particular kind, potentially invoking full queue scan
138+
private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? {
121139
var start = consumerIndex.value
122140
val end = producerIndex.value
123-
124-
while (start != end && blockingTasksInBuffer.value > 0) {
125-
stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue
126-
return TASK_STOLEN
141+
val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY
142+
// Bail out if there is no blocking work for us
143+
while (start != end) {
144+
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
145+
return tryExtractFromTheMiddle(start++, onlyBlocking) ?: continue
127146
}
128-
return tryStealLastScheduled(stolenTaskRef, blockingOnly = true)
147+
148+
return null
129149
}
130150

131151
// Polls for blocking task, invoked only by the owner
132-
fun pollBlocking(): Task? {
152+
// NB: ONLY for runSingleTask method
153+
fun pollBlocking(): Task? = pollWithExclusiveMode(onlyBlocking = true /* only blocking */)
154+
155+
// Polls for CPU task, invoked only by the owner
156+
// NB: ONLY for runSingleTask method
157+
fun pollCpu(): Task? = pollWithExclusiveMode(onlyBlocking = false /* only cpu */)
158+
159+
private fun pollWithExclusiveMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
133160
while (true) { // Poll the slot
134161
val lastScheduled = lastScheduledTask.value ?: break
135-
if (!lastScheduled.isBlocking) break
162+
if (lastScheduled.isBlocking != onlyBlocking) break
136163
if (lastScheduledTask.compareAndSet(lastScheduled, null)) {
137164
return lastScheduled
138165
} // Failed -> someone else stole it
139166
}
140167

168+
// Failed to poll the slot, scan the queue
141169
val start = consumerIndex.value
142170
var end = producerIndex.value
143-
144-
while (start != end && blockingTasksInBuffer.value > 0) {
145-
val task = tryExtractBlockingTask(--end)
171+
// Bail out if there is no blocking work for us
172+
while (start != end) {
173+
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
174+
val task = tryExtractFromTheMiddle(--end, onlyBlocking)
146175
if (task != null) {
147176
return task
148177
}
149178
}
150179
return null
151180
}
152181

153-
private fun tryExtractBlockingTask(index: Int): Task? {
182+
private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? {
154183
val arrayIndex = index and MASK
155184
val value = buffer[arrayIndex]
156-
if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
157-
blockingTasksInBuffer.decrementAndGet()
185+
if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
186+
if (onlyBlocking) blockingTasksInBuffer.decrementAndGet()
158187
return value
159188
}
160189
return null
@@ -170,10 +199,12 @@ internal class WorkQueue {
170199
/**
171200
* Contract on return value is the same as for [trySteal]
172201
*/
173-
private fun tryStealLastScheduled(stolenTaskRef: ObjectRef<Task?>, blockingOnly: Boolean): Long {
202+
private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
174203
while (true) {
175204
val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL
176-
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
205+
if ((lastScheduled.maskForStealingMode and stealingMode) == 0) {
206+
return NOTHING_TO_STEAL
207+
}
177208

178209
// TODO time wraparound ?
179210
val time = schedulerTimeSource.nanoTime()

0 commit comments

Comments
 (0)