Skip to content

Commit a158414

Browse files
Introduce WorkStealingDispatcher.
1 parent 7285d9a commit a158414

File tree

9 files changed

+1529
-0
lines changed

9 files changed

+1529
-0
lines changed

build.gradle.kts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import com.squareup.workflow1.buildsrc.shardConnectedCheckTasks
2+
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
23
import org.jetbrains.dokka.gradle.AbstractDokkaLeafTask
34
import java.net.URL
45

@@ -99,6 +100,15 @@ subprojects {
99100
subprojects {
100101
tasks.withType(AbstractPublishToMaven::class.java)
101102
.configureEach { mustRunAfter(tasks.matching { it is Sign }) }
103+
104+
tasks.withType(Test::class.java)
105+
.configureEach {
106+
testLogging {
107+
// This prints exception messages and stack traces to the log when tests fail. Makes it a
108+
// lot easier to see what failed in CI. If this gets too noisy, just remove it.
109+
exceptionFormat = FULL
110+
}
111+
}
102112
}
103113

104114
// This task is invoked by the documentation site generator script in the main workflow project (not

workflow-runtime/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ kotlin {
1717
if (targets == "kmp" || targets == "js") {
1818
js(IR) { browser() }
1919
}
20+
21+
// Needed for expect class Lock, which is not public API, so this doesn't add any binary compat
22+
// risk.
23+
compilerOptions.freeCompilerArgs.add("-Xexpect-actual-classes")
2024
}
2125

2226
dependencies {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.squareup.workflow1.internal
2+
3+
import platform.Foundation.NSLock
4+
5+
internal actual typealias Lock = NSLock
6+
7+
internal actual inline fun <R> Lock.withLock(block: () -> R): R {
8+
lock()
9+
try {
10+
return block()
11+
} finally {
12+
unlock()
13+
}
14+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.squareup.workflow1.internal
2+
3+
internal expect class Lock()
4+
5+
internal expect inline fun <R> Lock.withLock(block: () -> R): R
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
package com.squareup.workflow1.internal
2+
3+
import com.squareup.workflow1.internal.WorkStealingDispatcher.Companion.wrapDispatcherFrom
4+
import kotlinx.coroutines.CoroutineDispatcher
5+
import kotlinx.coroutines.Delay
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.ExperimentalCoroutinesApi
8+
import kotlinx.coroutines.InternalCoroutinesApi
9+
import kotlinx.coroutines.Runnable
10+
import kotlin.concurrent.Volatile
11+
import kotlin.coroutines.Continuation
12+
import kotlin.coroutines.ContinuationInterceptor
13+
import kotlin.coroutines.CoroutineContext
14+
import kotlin.coroutines.resume
15+
16+
/**
17+
* A [CoroutineDispatcher] that delegates to another dispatcher but allows stealing any work
18+
* scheduled on this dispatcher and performing it synchronously by calling [advanceUntilIdle].
19+
*
20+
* The easiest way to create one is by calling [wrapDispatcherFrom].
21+
*
22+
* E.g.
23+
* ```
24+
* val dispatcher = WorkStealingDispatcher.wrapDispatcherFrom(scope.coroutineContext)
25+
* scope.launch(dispatcher) {
26+
* while (true) {
27+
* lots()
28+
* of()
29+
* suspending()
30+
* calls()
31+
* }
32+
* }
33+
* …
34+
* dispatcher.advanceUntilIdle()
35+
* ```
36+
*
37+
* @param delegateInterceptor The [CoroutineDispatcher] or other [ContinuationInterceptor] to
38+
* delegate scheduling behavior to. This can either be a confined or unconfined dispatcher, and its
39+
* behavior will be preserved transparently.
40+
*/
41+
internal open class WorkStealingDispatcher protected constructor(
42+
private val delegateInterceptor: ContinuationInterceptor,
43+
lock: Lock?,
44+
queue: LinkedHashSet<DelegateDispatchedContinuation>?
45+
) : CoroutineDispatcher() {
46+
companion object {
47+
/**
48+
* Creates a [WorkStealingDispatcher] that supports [Delay] if [delegateInterceptor] does.
49+
*/
50+
operator fun invoke(delegateInterceptor: ContinuationInterceptor): WorkStealingDispatcher =
51+
createMatchingDelayability(
52+
delegateInterceptor = delegateInterceptor,
53+
lock = null,
54+
queue = null
55+
)
56+
57+
/**
58+
* Returns a [WorkStealingDispatcher] that delegates to the [CoroutineDispatcher] from
59+
* [context]. If the context does not specify a dispatcher, [Dispatchers.Default] is used.
60+
*/
61+
fun wrapDispatcherFrom(context: CoroutineContext): WorkStealingDispatcher {
62+
// If there's no dispatcher in the context then the coroutines runtime will fall back to
63+
// Dispatchers.Default anyway.
64+
val baseDispatcher = context[ContinuationInterceptor] ?: Dispatchers.Default
65+
return invoke(delegateInterceptor = baseDispatcher)
66+
}
67+
68+
/**
69+
* Returns a [WorkStealingDispatcher] that either does or doesn't implement [Delay] depending
70+
* on whether [delegateInterceptor] implements it, by delegating to its implementation.
71+
*/
72+
@OptIn(InternalCoroutinesApi::class)
73+
private fun createMatchingDelayability(
74+
delegateInterceptor: ContinuationInterceptor,
75+
lock: Lock?,
76+
queue: LinkedHashSet<DelegateDispatchedContinuation>?
77+
): WorkStealingDispatcher {
78+
return if (delegateInterceptor is Delay) {
79+
DelayableWorkStealingDispatcher(
80+
delegate = delegateInterceptor,
81+
delay = delegateInterceptor,
82+
lock = lock,
83+
queue = queue
84+
)
85+
} else {
86+
WorkStealingDispatcher(
87+
delegateInterceptor = delegateInterceptor,
88+
lock = lock,
89+
queue = queue
90+
)
91+
}
92+
}
93+
}
94+
95+
/** Used to synchronize access to the mutable properties of this class. */
96+
private val lock = lock ?: Lock()
97+
98+
// region Access to these properties must always be synchronized with lock.
99+
private val queue = queue ?: LinkedHashSet()
100+
// endregion
101+
102+
/**
103+
* Always returns true since we always need to track what work is waiting so we can advance it.
104+
*/
105+
final override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
106+
107+
final override fun dispatch(
108+
context: CoroutineContext,
109+
block: Runnable
110+
) {
111+
val continuation = DelegateDispatchedContinuation(context, block)
112+
lock.withLock {
113+
queue += continuation
114+
}
115+
116+
// Trampoline the dispatch outside the critical section to avoid deadlocks.
117+
// This will either synchronously run block or dispatch it, depending on what resuming a
118+
// continuation on the delegate dispatcher would do.
119+
continuation.resumeOnDelegateDispatcher()
120+
}
121+
122+
/**
123+
* Calls [limitedParallelism] on [delegateInterceptor] and wraps the returned dispatcher with
124+
* a [WorkStealingDispatcher] that this instance will steal from.
125+
*
126+
* This satisfies the limited parallelism requirements because [advanceUntilIdle] always runs
127+
* tasks with a parallelism of 1 (i.e. serially).
128+
*/
129+
@ExperimentalCoroutinesApi
130+
final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
131+
if (delegateInterceptor !is CoroutineDispatcher) {
132+
throw UnsupportedOperationException(
133+
"limitedParallelism is not supported for WorkStealingDispatcher with " +
134+
"non-dispatcher delegate"
135+
)
136+
}
137+
138+
val limitedDelegate = delegateInterceptor.limitedParallelism(parallelism)
139+
return createMatchingDelayability(
140+
delegateInterceptor = limitedDelegate,
141+
lock = lock,
142+
queue = queue
143+
)
144+
}
145+
146+
/**
147+
* "Steals" work that was scheduled on this dispatcher but hasn't had a chance to run yet and runs
148+
* it, until there is no work left to do. If the work schedules more work, that will also be ran
149+
* before the method returns.
150+
*
151+
* This method is safe to call reentrantly (a continuation resumed by it can call it again).
152+
*
153+
* It is also safe to call from multiple threads, even in parallel, although the behavior is
154+
* undefined. E.g. One thread might return from this method before the other has finished running
155+
* all tasks.
156+
*/
157+
// If we need a strong guarantee for calling from multiple threads we could just run this method
158+
// with a separate lock so all threads would just wait on the first one to finish running, but
159+
// that could deadlock if any of the dispatched coroutines call this method reentrantly.
160+
fun advanceUntilIdle() {
161+
do {
162+
val task = nextTask()
163+
task?.releaseAndRun()
164+
} while (task != null)
165+
}
166+
167+
/**
168+
* Removes and returns the next task to run from the queue.
169+
*/
170+
private fun nextTask(): DelegateDispatchedContinuation? {
171+
lock.withLock {
172+
val iterator = queue.iterator()
173+
if (iterator.hasNext()) {
174+
val task = iterator.next()
175+
iterator.remove()
176+
return task
177+
} else {
178+
return null
179+
}
180+
}
181+
}
182+
183+
protected inner class DelegateDispatchedContinuation(
184+
override val context: CoroutineContext,
185+
private val runnable: Runnable
186+
) : Continuation<Unit> {
187+
188+
/**
189+
* Flag used to avoid checking the queue for the task when this continuation is executed by the
190+
* delegate dispatcher after it's already been ran by advancing. This is best-effort – if
191+
* there's a race, the losing thread will still lock and check the queue before nooping.
192+
*
193+
* Access to this property does not need to be synchronized with [lock] or by any other method,
194+
* since it's just a write-once hint.
195+
*/
196+
@Volatile
197+
private var consumed = false
198+
199+
/**
200+
* Cache for intercepted coroutine so we can release it from [resumeWith].
201+
* [WorkStealingDispatcher] guarantees only one resume call will happen until the continuation
202+
* is done, so we don't need to guard this property with a lock.
203+
*/
204+
private var intercepted: Continuation<Unit>? = null
205+
206+
/**
207+
* Resumes this continuation on [delegateInterceptor] by intercepting it and resuming the
208+
* intercepted continuation.
209+
*
210+
* When a dispatcher returns false from [isDispatchNeeded], then when continuations intercepted
211+
* by it are resumed, they may either be ran in-place or scheduled to the coroutine runtime's
212+
* internal, thread-local event loop (see the kdoc for [Dispatchers.Unconfined] for more
213+
* information on the event loop). The only way to access this internal scheduling behavior is
214+
* to have the dispatcher intercept a continuation and resume the intercepted continuation.
215+
*/
216+
fun resumeOnDelegateDispatcher() {
217+
val intercepted = delegateInterceptor.interceptContinuation(this).also {
218+
this.intercepted = it
219+
}
220+
221+
// If delegate is a CoroutineDispatcher, intercepted will be a special Continuation that will
222+
// check the delegate's isDispatchNeeded to decide whether to call dispatch() or to enqueue it
223+
// to the thread-local unconfined queue.
224+
intercepted.resume(Unit)
225+
}
226+
227+
/**
228+
* DO NOT CALL DIRECTLY! Call [resumeOnDelegateDispatcher] instead.
229+
*/
230+
override fun resumeWith(result: Result<Unit>) {
231+
// Fastest path: If this continuation has already been ran by advancing, don't even bother
232+
// locking and checking the queue. Note that even if consumed is false, the task may have been
233+
// ran already, so we still need to check whether it's in the queue under lock.
234+
if (consumed) return
235+
236+
// Fast path: If we're racing with another thread and consumed hasn't been set yet, then check
237+
// the queue under lock. The queue is the real source of truth.
238+
val unconsumedForSure = lock.withLock {
239+
queue.remove(this)
240+
}
241+
if (unconsumedForSure) {
242+
releaseAndRun()
243+
}
244+
}
245+
246+
/**
247+
* Runs the continuation, notifying the interceptor to release it if necessary.
248+
*
249+
* This method *MUST* only be called if and after the continuation has been successfully removed
250+
* from [queue], otherwise another thread may end up running it as well.
251+
*/
252+
fun releaseAndRun() {
253+
// This flag must be set here, since this is the method that is called by advanceUntilIdle.
254+
consumed = true
255+
256+
intercepted?.let {
257+
if (it !== this) {
258+
delegateInterceptor.releaseInterceptedContinuation(it)
259+
}
260+
intercepted = null
261+
}
262+
runnable.run()
263+
}
264+
}
265+
}
266+
267+
@OptIn(InternalCoroutinesApi::class)
268+
private class DelayableWorkStealingDispatcher(
269+
delegate: ContinuationInterceptor,
270+
delay: Delay,
271+
lock: Lock?,
272+
queue: LinkedHashSet<DelegateDispatchedContinuation>?
273+
) : WorkStealingDispatcher(delegate, lock, queue), Delay by delay

0 commit comments

Comments
 (0)