Skip to content

Commit 0427205

Browse files
Incorporate new Native memory model into kotlinx-coroutines mainline (#2833)
* Support of new K/N memory model * Dispatchers.Default backed by a pool of workers on Linux and by global_queue on iOS-like * Implementation of Dispatchers.Main that uses the main queue on iOS and default dispatcher on other platforms (#2858) * Introduced newSingleThreadDispatcher and newFixedThreadPoolDispatcher * Use proper reentrant locking and CoW arrays on new memory model, make TestBase _almost_ race-free * More thread-safety in Native counterpart and one more test from native-mt * Source-set sharing for tests shared between JVM and K/N * Wrap Obj-C interop into autorelease pool to avoid memory leaks Fixes #2914 Co-authored-by: dkhalanskyjb <[email protected]>
1 parent 30b057e commit 0427205

File tree

71 files changed

+1299
-616
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+1299
-616
lines changed

buildSrc/src/main/kotlin/SourceSets.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ fun KotlinSourceSet.configureMultiplatform() {
1616
optInAnnotations.forEach { optIn(it) }
1717
progressiveMode = true
1818
}
19-
}
19+
}

kotlinx-coroutines-core/build.gradle

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,50 @@ kotlin {
7474
SourceSetsKt.configureMultiplatform(it)
7575
}
7676

77-
configure(targets) {
78-
// Configure additional binaries and test runs -- one for each OS
79-
if (["macos", "linux", "mingw"].any { name.startsWith(it) }) {
80-
binaries {
81-
// Test for memory leaks using a special entry point that does not exit but returns from main
82-
binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
83-
// Configure a separate test where code runs in background
84-
test("background", [org.jetbrains.kotlin.gradle.plugin.mpp.NativeBuildType.DEBUG]) {
85-
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
86-
}
77+
/*
78+
* Configure four test runs:
79+
* 1) Old memory model, Main thread
80+
* 2) New memory model, Main thread
81+
* 3) Old memory model, BG thread
82+
* 4) New memory model, BG thread (required for Dispatchers.Main tests on Darwin)
83+
*
84+
* All new MM targets are build with optimize = true to have stress tests properly run.
85+
*/
86+
targets.withType(org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTargetWithTests.class).configureEach {
87+
binaries {
88+
// Test for memory leaks using a special entry point that does not exit but returns from main
89+
binaries.getTest("DEBUG").freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
90+
}
91+
92+
binaries.test("newMM", [DEBUG]) {
93+
def thisTest = it
94+
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainNoExit"]
95+
optimized = true
96+
binaryOptions["memoryModel"] = "experimental"
97+
testRuns.create("newMM") {
98+
setExecutionSourceFrom(thisTest)
99+
// A hack to get different suffixes in the aggregated report.
100+
executionTask.configure { targetName = "$targetName new MM" }
101+
}
102+
}
103+
104+
binaries.test("worker", [DEBUG]) {
105+
def thisTest = it
106+
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
107+
testRuns.create("worker") {
108+
setExecutionSourceFrom(thisTest)
109+
executionTask.configure { targetName = "$targetName worker" }
87110
}
88-
testRuns {
89-
background { setExecutionSourceFrom(binaries.backgroundDebugTest) }
111+
}
112+
113+
binaries.test("workerWithNewMM", [DEBUG]) {
114+
def thisTest = it
115+
optimized = true
116+
freeCompilerArgs += ["-e", "kotlinx.coroutines.mainBackground"]
117+
binaryOptions["memoryModel"] = "experimental"
118+
testRuns.create("workerWithNewMM") {
119+
setExecutionSourceFrom(thisTest)
120+
executionTask.configure { targetName = "$targetName worker with new MM" }
90121
}
91122
}
92123
}
@@ -97,6 +128,7 @@ kotlin {
97128
}
98129
}
99130

131+
100132
configurations {
101133
configureKotlinJvmPlatform(kotlinCompilerPluginClasspath)
102134
}

kotlinx-coroutines-core/common/src/EventLoop.common.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
276276
// then process one event from queue
277277
val task = dequeue()
278278
if (task != null) {
279-
task.run()
279+
platformAutoreleasePool { task.run() }
280280
return 0
281281
}
282282
return nextTime
@@ -530,3 +530,14 @@ internal expect fun nanoTime(): Long
530530
internal expect object DefaultExecutor {
531531
public fun enqueue(task: Runnable)
532532
}
533+
534+
/**
535+
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
536+
* non-Darwin native targets.
537+
*
538+
* Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
539+
* the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
540+
* be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
541+
* pool management, it must manage the pool creation and pool drainage manually.
542+
*/
543+
internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)

kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ internal class ArrayBroadcastChannel<E>(
3333
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
3434
}
3535

36+
/**
37+
* NB: prior to changing any logic of ArrayBroadcastChannel internals, please ensure that
38+
* you do not break internal invariants of the SubscriberList implementation on K/N and KJS
39+
*/
40+
3641
/*
3742
* Writes to buffer are guarded by bufferLock, but reads from buffer are concurrent with writes
3843
* - Write element to buffer then write "tail" (volatile)
@@ -60,6 +65,7 @@ internal class ArrayBroadcastChannel<E>(
6065
get() = _size.value
6166
set(value) { _size.value = value }
6267

68+
@Suppress("DEPRECATION")
6369
private val subscribers = subscriberList<Subscriber<E>>()
6470

6571
override val isBufferAlwaysFull: Boolean get() = false

kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ package kotlinx.coroutines.internal
88
* Special kind of list intended to be used as collection of subscribers in `ArrayBroadcastChannel`
99
* On JVM it's CopyOnWriteList and on JS it's MutableList.
1010
*
11-
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
11+
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of the ArrayBroadcastChannel
1212
*/
1313
internal typealias SubscribersList<E> = MutableList<E>
1414

15+
@Deprecated(message = "Implementation of this primitive is tailored to specific ArrayBroadcastChannel usages on K/N " +
16+
"and K/JS platforms and it is unsafe to use it anywhere else")
1517
internal expect fun <E> subscriberList(): SubscribersList<E>
1618

1719
internal expect class ReentrantLock() {

kotlinx-coroutines-core/common/test/EmptyContext.kt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@ package kotlinx.coroutines
77
import kotlinx.coroutines.intrinsics.*
88
import kotlin.coroutines.*
99

10-
suspend fun <T> withEmptyContext(block: suspend () -> T): T {
11-
val baseline = Result.failure<T>(IllegalStateException("Block was suspended"))
12-
var result: Result<T> = baseline
13-
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { result = it })
14-
while (result == baseline) yield()
15-
return result.getOrThrow()
10+
suspend fun <T> withEmptyContext(block: suspend () -> T): T = suspendCoroutine { cont ->
11+
block.startCoroutineUnintercepted(Continuation(EmptyCoroutineContext) { cont.resumeWith(it) })
1612
}

kotlinx-coroutines-core/common/test/TestBase.common.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
package kotlinx.coroutines
88

99
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.internal.*
1011
import kotlin.coroutines.*
1112
import kotlin.test.*
1213

1314
public expect val isStressTest: Boolean
1415
public expect val stressTestMultiplier: Int
16+
public expect val stressTestMultiplierSqrt: Int
1517

1618
/**
1719
* The result of a multiplatform asynchronous test.
@@ -20,6 +22,8 @@ public expect val stressTestMultiplier: Int
2022
@Suppress("NO_ACTUAL_FOR_EXPECT")
2123
public expect class TestResult
2224

25+
public expect val isNative: Boolean
26+
2327
public expect open class TestBase constructor() {
2428
/*
2529
* In common tests we emulate parameterized tests

kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ class FlowInvariantsTest : TestBase() {
235235
}
236236
expectUnreached()
237237
} catch (e: IllegalStateException) {
238-
assertTrue(e.message!!.contains("Flow invariant is violated"))
238+
assertTrue(e.message!!.contains("Flow invariant is violated"), "But had: ${e.message}")
239239
finish(2)
240240
}
241241
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
9+
/**
10+
* Runs a new coroutine and **blocks** the current thread until its completion.
11+
* This function should not be used from a coroutine. It is designed to bridge regular blocking code
12+
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
13+
*/
14+
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

0 commit comments

Comments
 (0)