Skip to content

Commit edf9f57

Browse files
Add broken stress test for multithreaded runtime behavior.
1 parent 96997a2 commit edf9f57

File tree

3 files changed

+122
-18
lines changed

3 files changed

+122
-18
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.squareup.workflow1
2+
3+
import java.util.concurrent.CountDownLatch
4+
5+
/**
6+
* Returns the maximum number of threads that can be ran in parallel on the host system, rounded
7+
* down to the nearest even number, and at least 2.
8+
*/
9+
internal fun calculateSaturatingTestThreadCount(minThreads: Int) =
10+
Runtime.getRuntime().availableProcessors().let {
11+
if (it.mod(2) != 0) it - 1 else it
12+
}.coerceAtLeast(minThreads)
13+
14+
/**
15+
* Calls [CountDownLatch.await] in a loop until count is zero, even if the thread gets
16+
* interrupted.
17+
*/
18+
@Suppress("CheckResult")
19+
internal fun CountDownLatch.awaitUntilDone() {
20+
while (count > 0) {
21+
try {
22+
await()
23+
} catch (e: InterruptedException) {
24+
// Continue
25+
}
26+
}
27+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.squareup.workflow1
2+
3+
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
4+
import kotlinx.coroutines.DelicateCoroutinesApi
5+
import kotlinx.coroutines.Job
6+
import kotlinx.coroutines.flow.MutableStateFlow
7+
import kotlinx.coroutines.flow.first
8+
import kotlinx.coroutines.launch
9+
import kotlinx.coroutines.newFixedThreadPoolContext
10+
import kotlinx.coroutines.plus
11+
import kotlinx.coroutines.test.runTest
12+
import kotlinx.coroutines.yield
13+
import java.util.concurrent.CountDownLatch
14+
import kotlin.test.Test
15+
16+
class WorkflowRuntimeMultithreadingStressTest {
17+
18+
@OptIn(DelicateCoroutinesApi::class)
19+
@Test
20+
fun actionContention() = runTest {
21+
// At least 2 threads so that workflow runtime can always run in parallel with at least one
22+
// emitter.
23+
val testThreadCount = calculateSaturatingTestThreadCount(minThreads = 5)
24+
25+
// Determines how many separate channels are in the system.
26+
val childCount = (testThreadCount / 4).coerceAtLeast(2)
27+
// Determines how many channel sends can be queued up simultaneously.
28+
val emittersPerChild = (testThreadCount / 4).coerceAtLeast(2)
29+
// Determines how many times each emitter will loop sending actions.
30+
val emissionsPerEmitter = (testThreadCount * 10).coerceAtLeast(10)
31+
val totalEmissions = childCount * emittersPerChild * emissionsPerEmitter
32+
33+
val emittersReadyLatch = CountDownLatch(childCount)
34+
val startEmittingLatch = Job()
35+
36+
// Child launches a bunch of coroutines that loop sending outputs to the parent. We use multiple
37+
// emitters for each child to create contention on each channel, and loop within each coroutine
38+
// to prolong that contention over time as the runtime grinds through all the actions.
39+
// The parent renders a bunch of these children and increments a counter every time any of them
40+
// emit an output. We use multiple children to create contention on the select with multiple
41+
// channels.
42+
val child = Workflow.stateless { childIndex: Int ->
43+
runningSideEffect("emitter") {
44+
repeat(emittersPerChild) { emitterIndex ->
45+
launch(start = UNDISPATCHED) {
46+
val action = action<Int, Nothing, Unit>("emit-$emitterIndex") { setOutput(Unit) }
47+
startEmittingLatch.join()
48+
repeat(emissionsPerEmitter) { emissionIndex ->
49+
actionSink.send(action)
50+
yield()
51+
}
52+
}
53+
}
54+
emittersReadyLatch.countDown()
55+
}
56+
}
57+
val root = Workflow.stateful(
58+
initialState = { _, _ -> 0 },
59+
snapshot = { null },
60+
render = { _, count ->
61+
val action = action<Unit, Int, Nothing>("countChild") { this.state++ }
62+
repeat(childCount) { childIndex ->
63+
renderChild(child, props = childIndex, key = "child-$childIndex", handler = { action })
64+
}
65+
return@stateful count
66+
})
67+
68+
val testDispatcher = newFixedThreadPoolContext(nThreads = testThreadCount, name = "test")
69+
testDispatcher.use {
70+
val renderings = renderWorkflowIn(
71+
workflow = root,
72+
scope = backgroundScope + testDispatcher,
73+
props = MutableStateFlow(Unit),
74+
onOutput = {}
75+
)
76+
77+
// Wait for all workers to spin up.
78+
emittersReadyLatch.awaitUntilDone()
79+
println("Thread count: $testThreadCount")
80+
println("Child count: $childCount")
81+
println("Emitters per child: $emittersPerChild")
82+
println("Emissions per emitter: $emissionsPerEmitter")
83+
println("Waiting for $totalEmissions emissions…")
84+
85+
// Trigger an avalanche of emissions.
86+
startEmittingLatch.complete()
87+
88+
// Wait for all workers to finish.
89+
renderings.first { it.rendering == totalEmissions }
90+
}
91+
}
92+
}

workflow-runtime/src/jvmTest/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcherStressTest.kt

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.squareup.workflow1.internal
22

3+
import com.squareup.workflow1.awaitUntilDone
4+
import com.squareup.workflow1.calculateSaturatingTestThreadCount
35
import kotlinx.coroutines.CoroutineScope
46
import kotlinx.coroutines.Dispatchers
57
import kotlinx.coroutines.test.StandardTestDispatcher
@@ -14,9 +16,7 @@ import kotlin.test.assertTrue
1416
* Returns the maximum number of threads that can be ran in parallel on the host system, rounded
1517
* down to the nearest even number, and at least 2.
1618
*/
17-
private val saturatingTestThreadCount = Runtime.getRuntime().availableProcessors().let {
18-
if (it.mod(2) != 0) it - 1 else it
19-
}.coerceAtLeast(2)
19+
private val saturatingTestThreadCount = calculateSaturatingTestThreadCount(minThreads = 2)
2020

2121
/**
2222
* Tests that use multiple threads to hammer on [WorkStealingDispatcher] and verify its thread
@@ -253,19 +253,4 @@ class WorkStealingDispatcherStressTest {
253253
// Ensure that all tasks were ran exactly once.
254254
assertTrue(statuses.all { it.get() == 1 })
255255
}
256-
257-
/**
258-
* Calls [CountDownLatch.await] in a loop until count is zero, even if the thread gets
259-
* interrupted.
260-
*/
261-
@Suppress("CheckResult")
262-
private fun CountDownLatch.awaitUntilDone() {
263-
while (count > 0) {
264-
try {
265-
await()
266-
} catch (e: InterruptedException) {
267-
// Continue
268-
}
269-
}
270-
}
271256
}

0 commit comments

Comments
 (0)