Skip to content

Commit ad7bcf6

Browse files
Use WorkStealingDispatcher in runtime, behind a flag.
# Conflicts: # workflow-core/src/commonMain/kotlin/com/squareup/workflow1/RuntimeConfig.kt # workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt # workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt
1 parent 7deeba3 commit ad7bcf6

File tree

4 files changed

+166
-2
lines changed

4 files changed

+166
-2
lines changed

workflow-core/api/workflow-core.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ public final class com/squareup/workflow1/RuntimeConfigOptions : java/lang/Enum
168168
public static final field PARTIAL_TREE_RENDERING Lcom/squareup/workflow1/RuntimeConfigOptions;
169169
public static final field RENDER_ONLY_WHEN_STATE_CHANGES Lcom/squareup/workflow1/RuntimeConfigOptions;
170170
public static final field STABLE_EVENT_HANDLERS Lcom/squareup/workflow1/RuntimeConfigOptions;
171+
public static final field WORK_STEALING_DISPATCHER Lcom/squareup/workflow1/RuntimeConfigOptions;
171172
public static fun getEntries ()Lkotlin/enums/EnumEntries;
172173
public static fun valueOf (Ljava/lang/String;)Lcom/squareup/workflow1/RuntimeConfigOptions;
173174
public static fun values ()[Lcom/squareup/workflow1/RuntimeConfigOptions;
@@ -180,6 +181,7 @@ public final class com/squareup/workflow1/RuntimeConfigOptions$Companion {
180181
}
181182

182183
public final class com/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions : java/lang/Enum {
184+
public static final field ALL Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;
183185
public static final field CONFLATE Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;
184186
public static final field DEA Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;
185187
public static final field DEFAULT Lcom/squareup/workflow1/RuntimeConfigOptions$Companion$RuntimeOptions;

workflow-core/src/commonMain/kotlin/com/squareup/workflow1/RuntimeConfig.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ public enum class RuntimeConfigOptions {
6767
@WorkflowExperimentalRuntime
6868
STABLE_EVENT_HANDLERS,
6969

70+
/**
71+
* Wrap the dispatcher passed to the runtime with a special dispatcher that can be advanced
72+
* explicitly, to allow any tasks scheduled by the workflow runtime to run before certain phases.
73+
*/
74+
@WorkflowExperimentalRuntime
75+
WORK_STEALING_DISPATCHER,
76+
7077
/**
7178
* If we have more actions to process that are queued on nodes not affected by the last
7279
* action application, then we will continue to process those actions before another render
@@ -161,6 +168,13 @@ public enum class RuntimeConfigOptions {
161168
DRAIN_EXCLUSIVE_ACTIONS,
162169
)
163170
),
171+
172+
/**
173+
* Always contains all [RuntimeConfigOptions]. Other values in this enum may happen to contain
174+
* the same set at some point in time, but this one will also always be updated to include new
175+
* ones as they're added.
176+
*/
177+
ALL(RuntimeConfigOptions.entries.toSet())
164178
}
165179
}
166180
}

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import com.squareup.workflow1.RuntimeConfigOptions.CONFLATE_STALE_RENDERINGS
44
import com.squareup.workflow1.RuntimeConfigOptions.DRAIN_EXCLUSIVE_ACTIONS
55
import com.squareup.workflow1.RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES
66
import com.squareup.workflow1.WorkflowInterceptor.RenderPassSkipped
7+
import com.squareup.workflow1.WorkflowInterceptor.RenderPassesComplete
8+
import com.squareup.workflow1.internal.WorkStealingDispatcher
79
import com.squareup.workflow1.WorkflowInterceptor.RenderingConflated
810
import com.squareup.workflow1.WorkflowInterceptor.RenderingProduced
911
import com.squareup.workflow1.internal.WorkflowRunner
@@ -16,6 +18,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
1618
import kotlinx.coroutines.flow.StateFlow
1719
import kotlinx.coroutines.isActive
1820
import kotlinx.coroutines.launch
21+
import kotlinx.coroutines.plus
1922

2023
/**
2124
* Launches the [workflow] in a new coroutine in [scope] and returns a [StateFlow] of its
@@ -142,6 +145,15 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
142145
): StateFlow<RenderingAndSnapshot<RenderingT>> {
143146
val chainedInterceptor = interceptors.chained()
144147

148+
val dispatcher = if (RuntimeConfigOptions.WORK_STEALING_DISPATCHER in runtimeConfig) {
149+
WorkStealingDispatcher.wrapDispatcherFrom(scope.coroutineContext)
150+
} else {
151+
null
152+
}
153+
154+
@Suppress("NAME_SHADOWING")
155+
val scope = dispatcher?.let { scope + dispatcher } ?: scope
156+
145157
val runner = WorkflowRunner(
146158
scope,
147159
workflow,
@@ -249,6 +261,13 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
249261
actionDrainingHasChangedState || actionResult.stateChanged
250262
// We may have more actions we can process, this rendering could be stale.
251263
// This will check for any actions that are immediately available and apply them.
264+
// We advance the dispatcher first to allow any coroutines that were launched by the last
265+
// render pass to start up and potentially enqueue actions.
266+
dispatcher?.let {
267+
workflowTracer.trace("AdvancingWorkflowDispatcher") {
268+
dispatcher.advanceUntilIdle()
269+
}
270+
}
252271
actionResult = runner.applyNextAvailableTreeAction()
253272

254273
// If no actions processed, then no new rendering needed. Pass on to UI.

workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,29 @@ import com.squareup.workflow1.RuntimeConfigOptions.Companion.RuntimeOptions.DEFA
77
import com.squareup.workflow1.RuntimeConfigOptions.DRAIN_EXCLUSIVE_ACTIONS
88
import com.squareup.workflow1.RuntimeConfigOptions.PARTIAL_TREE_RENDERING
99
import com.squareup.workflow1.RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES
10+
import com.squareup.workflow1.RuntimeConfigOptions.WORK_STEALING_DISPATCHER
1011
import com.squareup.workflow1.WorkflowInterceptor.RenderPassSkipped
1112
import com.squareup.workflow1.WorkflowInterceptor.RenderingProduced
1213
import com.squareup.workflow1.WorkflowInterceptor.RuntimeUpdate
1314
import kotlinx.coroutines.CancellationException
1415
import kotlinx.coroutines.CompletableDeferred
1516
import kotlinx.coroutines.CoroutineExceptionHandler
17+
import kotlinx.coroutines.Dispatchers
1618
import kotlinx.coroutines.ExperimentalCoroutinesApi
19+
import kotlinx.coroutines.awaitCancellation
1720
import kotlinx.coroutines.cancel
21+
import kotlinx.coroutines.cancelAndJoin
1822
import kotlinx.coroutines.channels.Channel
1923
import kotlinx.coroutines.flow.MutableSharedFlow
2024
import kotlinx.coroutines.flow.MutableStateFlow
2125
import kotlinx.coroutines.flow.StateFlow
2226
import kotlinx.coroutines.flow.map
27+
import kotlinx.coroutines.flow.produceIn
2328
import kotlinx.coroutines.flow.receiveAsFlow
2429
import kotlinx.coroutines.isActive
30+
import kotlinx.coroutines.job
2531
import kotlinx.coroutines.launch
32+
import kotlinx.coroutines.plus
2633
import kotlinx.coroutines.suspendCancellableCoroutine
2734
import kotlinx.coroutines.sync.Mutex
2835
import kotlinx.coroutines.test.StandardTestDispatcher
@@ -46,7 +53,7 @@ import kotlin.test.assertTrue
4653
@Burst
4754
class RenderWorkflowInTest(
4855
useTracer: Boolean = false,
49-
useUnconfined: Boolean = true,
56+
private val useUnconfined: Boolean = true,
5057
private val runtime: RuntimeOptions = DEFAULT
5158
) {
5259

@@ -1502,7 +1509,9 @@ class RenderWorkflowInTest(
15021509

15031510
@Test
15041511
fun for_conflate_we_do_not_conflate_stacked_actions_into_one_rendering_if_output() {
1505-
if (runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) {
1512+
if (CONFLATE_STALE_RENDERINGS in runtimeConfig &&
1513+
WORK_STEALING_DISPATCHER !in runtimeConfig
1514+
) {
15061515
runTest(dispatcherUsed) {
15071516
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))
15081517

@@ -1746,6 +1755,126 @@ class RenderWorkflowInTest(
17461755
}
17471756
}
17481757

1758+
/**
1759+
* When the [CONFLATE_STALE_RENDERINGS] flag is specified, the runtime will repeatedly run all
1760+
* enqueued WorkflowActions after a render pass, before emitting the rendering to the external
1761+
* flow. When the [WORK_STEALING_DISPATCHER] flag is specified at the same time, any coroutines
1762+
* launched (or even resumed) since the render pass will be allowed to run _before_ checking for
1763+
* actions. This means that any new side effects or workers started by the render pass will be
1764+
* allowed to run to their first suspension point before the rendering is emitted. And if they
1765+
* happen to emit more actions as part of that, then those actions will also be processed, etc.
1766+
* until no more actions are available – only then will the rendering actually be emitted.
1767+
*/
1768+
@Test
1769+
fun new_effect_coroutines_dispatched_before_rendering_emitted_when_work_stealing_dispatcher() {
1770+
// This tests is specifically for standard dispatching behavior. It currently only works when
1771+
// CSR is enabled, although an additional test for DEA should be added.
1772+
if (WORK_STEALING_DISPATCHER !in runtimeConfig ||
1773+
CONFLATE_STALE_RENDERINGS !in runtimeConfig ||
1774+
useUnconfined
1775+
) {
1776+
return
1777+
}
1778+
1779+
runTest(dispatcherUsed) {
1780+
val workflow = Workflow.stateful<Int, Nothing, Unit>(initialState = 0) { effectCount ->
1781+
// Because of the WSD, this effect will be allowed to run after the render pass but before
1782+
// emitting the rendering OR checking for new actions, in the CSR loop. Since it emits an
1783+
// action, that action will be processed and trigger a second render pass.
1784+
runningSideEffect("sender") {
1785+
actionSink.send(
1786+
action("0") {
1787+
expect(2)
1788+
this.state++
1789+
}
1790+
)
1791+
}
1792+
1793+
if (effectCount >= 1) {
1794+
// This effect will be started by the first action and cancelled only when the runtime
1795+
// is cancelled.
1796+
// It will also start in the CSR loop, and trigger a third render pass before emitting the
1797+
// rendering.
1798+
runningSideEffect("0") {
1799+
expect(3)
1800+
actionSink.send(
1801+
action("1") {
1802+
expect(4)
1803+
this.state++
1804+
}
1805+
)
1806+
awaitCancellation {
1807+
expect(9)
1808+
}
1809+
}
1810+
}
1811+
1812+
if (effectCount >= 2) {
1813+
// This effect will be started by the second action, and cancelled by its own action in
1814+
// the same run of the CSR loop again.
1815+
runningSideEffect("1") {
1816+
expect(5)
1817+
actionSink.send(
1818+
action("-1") {
1819+
expect(6)
1820+
this.state--
1821+
}
1822+
)
1823+
awaitCancellation {
1824+
expect(7)
1825+
}
1826+
}
1827+
}
1828+
}
1829+
1830+
// We collect the renderings flow to a channel to drive the runtime loop by receiving from the
1831+
// channel. We can't use testScheduler.advanceUntilIdle() et al because we only want the test
1832+
// scheduler to run tasks until a rendering is available, not indefinitely.
1833+
val renderings = renderWorkflowIn(
1834+
workflow = workflow,
1835+
// Run in this scope so it is advanced by advanceUntilIdle.
1836+
scope = backgroundScope,
1837+
props = MutableStateFlow(Unit),
1838+
runtimeConfig = runtimeConfig,
1839+
workflowTracer = testTracer,
1840+
onOutput = {}
1841+
).produceIn(backgroundScope + Dispatchers.Unconfined)
1842+
1843+
expect(0)
1844+
// Receiving the first rendering allows the runtime coroutine to start. The first rendering
1845+
// is returned synchronously.
1846+
renderings.receive()
1847+
expect(1)
1848+
// Receiving the second rendering will allow the runtime to continue until the rendering is
1849+
// emitted. Since the CSR loop will start all our effects before emitting the next rendering,
1850+
// only one rendering will be emitted for all those render passes.
1851+
renderings.receive()
1852+
expect(8)
1853+
1854+
// No more renderings should be produced.
1855+
testScheduler.advanceUntilIdle()
1856+
assertTrue(renderings.isEmpty)
1857+
1858+
// Cancel the whole workflow runtime, including all effects.
1859+
backgroundScope.coroutineContext.job.cancelAndJoin()
1860+
expect(10)
1861+
}
1862+
}
1863+
1864+
private suspend fun awaitCancellation(onFinally: () -> Unit) {
1865+
try {
1866+
awaitCancellation()
1867+
} finally {
1868+
onFinally()
1869+
}
1870+
}
1871+
1872+
private var expectCounter = 0
1873+
private fun expect(expected: Int) {
1874+
assertEquals(expected, expectCounter)
1875+
expectCounter++
1876+
}
1877+
17491878
@Test
17501879
fun for_drain_exclusive_we_handle_multiple_actions_in_one_render_or_not() = runTest(
17511880
dispatcherUsed

0 commit comments

Comments
 (0)