Skip to content

Commit 9bacb83

Browse files
committed
introduce runBlockingWithParallelismCompensation and remove parallelism compensation from runBlocking
1 parent 94339ef commit 9bacb83

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

kotlinx-coroutines-core/jvm/src/Builders.kt

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@ import kotlin.coroutines.*
4646
*/
4747
@Throws(InterruptedException::class)
4848
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
49+
contract {
50+
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
51+
}
52+
return runBlocking(context, compensateParallelism = false, block)
53+
}
54+
55+
@Throws(InterruptedException::class)
56+
internal fun <T> runBlockingWithParallelismCompensation(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
57+
contract {
58+
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
59+
}
60+
return runBlocking(context, compensateParallelism = true, block)
61+
}
62+
63+
private fun <T> runBlocking(context: CoroutineContext, compensateParallelism: Boolean, block: suspend CoroutineScope.() -> T): T {
4964
contract {
5065
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
5166
}
@@ -64,15 +79,16 @@ public actual fun <T> runBlocking(context: CoroutineContext, block: suspend Coro
6479
?: ThreadLocalEventLoop.currentOrNull()
6580
newContext = GlobalScope.newCoroutineContext(context)
6681
}
67-
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
82+
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop, compensateParallelism)
6883
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
6984
return coroutine.joinBlocking()
7085
}
7186

7287
private class BlockingCoroutine<T>(
7388
parentContext: CoroutineContext,
7489
private val blockedThread: Thread,
75-
private val eventLoop: EventLoop?
90+
private val eventLoop: EventLoop?,
91+
private val compensateParallelism: Boolean,
7692
) : AbstractCoroutine<T>(parentContext, true, true) {
7793

7894
override val isScopedCoroutine: Boolean get() = true
@@ -96,7 +112,11 @@ private class BlockingCoroutine<T>(
96112
// note: process next even may loose unpark flag, so check if completed before parking
97113
if (isCompleted) break
98114
if (parkNanos > 0) {
99-
withCompensatedParallelism {
115+
if (compensateParallelism) {
116+
withCompensatedParallelism {
117+
parkNanos(this, parkNanos)
118+
}
119+
} else {
100120
parkNanos(this, parkNanos)
101121
}
102122
}

kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineDispatcherTest.kt renamed to kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingWithParallelismCompensationCoroutineDispatcherTest.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import kotlinx.coroutines.*
44
import org.junit.*
55
import java.util.concurrent.atomic.*
66

7-
class RunBlockingCoroutineDispatcherTest : SchedulerTestBase() {
7+
class RunBlockingWithParallelismCompensationCoroutineDispatcherTest : SchedulerTestBase() {
88
@Test
99
fun testRecursiveRunBlockingCanExceedDefaultDispatcherLimit() {
1010
val maxDepth = CORES_COUNT * 3 + 3
1111
fun body(depth: Int) {
1212
if (depth == maxDepth) return
13-
runBlocking(Dispatchers.Default) {
13+
runBlockingWithParallelismCompensation(Dispatchers.Default) {
1414
launch(Dispatchers.Default) {
1515
body(depth + 1)
1616
}
@@ -31,13 +31,13 @@ class RunBlockingCoroutineDispatcherTest : SchedulerTestBase() {
3131
val barrier = CompletableDeferred<Unit>()
3232
val count = AtomicInteger(0)
3333
fun blockingCode() {
34-
runBlocking {
34+
runBlockingWithParallelismCompensation {
3535
count.incrementAndGet()
3636
barrier.await()
3737
count.decrementAndGet()
3838
}
3939
}
40-
runBlocking {
40+
runBlockingWithParallelismCompensation {
4141
repeat(threadsToReach) {
4242
launch(targetDispatcher) {
4343
blockingCode()

kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessStressTest.kt renamed to kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingWithParallelismCompensationCoroutineSchedulerLivenessStressTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.util.*
1010
import java.util.concurrent.*
1111

1212
@RunWith(Parameterized::class)
13-
class RunBlockingCoroutineSchedulerLivenessStressTest(private val yieldMask: Int) : SchedulerTestBase() {
13+
class RunBlockingWithParallelismCompensationCoroutineSchedulerLivenessStressTest(private val yieldMask: Int) : SchedulerTestBase() {
1414
init {
1515
corePoolSize = 1
1616
}
@@ -53,7 +53,7 @@ class RunBlockingCoroutineSchedulerLivenessStressTest(private val yieldMask: Int
5353
val barrier2 = CompletableDeferred<Unit>()
5454
val blocking = launch(targetDispatcher) {
5555
barrier.await()
56-
runBlocking {
56+
runBlockingWithParallelismCompensation {
5757
if ((yieldMask and 1) != 0) yield()
5858
barrier2.await()
5959
if ((yieldMask and 2) != 0) yield()

0 commit comments

Comments
 (0)