Skip to content

Commit a12ee15

Browse files
committed
MPP: Delay, EventLoop & runBlocking moved to common code
1 parent 1f0df4b commit a12ee15

File tree

15 files changed

+535
-9
lines changed

15 files changed

+535
-9
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonBuilders.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kotlinx.coroutines.experimental
22

33
import kotlin.coroutines.experimental.CoroutineContext
4+
import kotlin.coroutines.experimental.EmptyCoroutineContext
45

56
@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")
67
public expect fun launch(
@@ -9,3 +10,8 @@ public expect fun launch(
910
parent: Job? = null,
1011
block: suspend CoroutineScope.() -> Unit
1112
): Job
13+
14+
@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")
15+
public expect fun <T> runBlocking(
16+
context: CoroutineContext = EmptyCoroutineContext,
17+
block: suspend CoroutineScope.() -> T): T
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package kotlinx.coroutines.experimental
2+
3+
public expect interface Delay
4+
5+
public expect suspend fun delay(time: Long)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package kotlinx.coroutines.experimental
2+
3+
public expect interface EventLoop

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/CommonJob.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public expect class JobCancellationException(
4343

4444
public expect fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle
4545
public expect suspend fun Job.cancelAndJoin()
46-
@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER")
46+
@Suppress("EXPECTED_DECLARATION_WITH_DEFAULT_PARAMETER", "EXTENSION_SHADOWED_BY_MEMBER") // See KT-21598
4747
public expect fun Job.cancelChildren(cause: Throwable? = null)
4848
public expect suspend fun Job.joinChildren()
4949

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T
164164
* @param block the coroutine code.
165165
*/
166166
@Throws(InterruptedException::class)
167-
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
167+
public actual fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
168168
val currentThread = Thread.currentThread()
169169
val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop(currentThread) else null
170170
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import kotlin.coroutines.experimental.CoroutineContext
3030
* Implementation of this interface affects operation of
3131
* [delay][kotlinx.coroutines.experimental.delay] and [withTimeout] functions.
3232
*/
33-
public interface Delay {
33+
public actual interface Delay {
3434
/**
3535
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
3636
* This suspending function is cancellable.
@@ -81,8 +81,32 @@ public interface Delay {
8181
*
8282
* This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
8383
* implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
84+
*
85+
* @param time time in milliseconds.
86+
*/
87+
public actual suspend fun delay(time: Long) {
88+
kotlin.require(time >= 0) { "Delay time $time cannot be negative" }
89+
if (time <= 0) return // don't delay
90+
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
91+
cont.context.delay.scheduleResumeAfterDelay(time, TimeUnit.MILLISECONDS, cont)
92+
}
93+
}
94+
95+
/**
96+
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
97+
* This suspending function is cancellable.
98+
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
99+
* immediately resumes with [CancellationException].
100+
*
101+
* Note, that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
102+
*
103+
* This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
104+
* implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
105+
*
106+
* @param time time in the specified [unit].
107+
* @param unit time unit.
84108
*/
85-
suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
109+
public suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
86110
require(time >= 0) { "Delay time $time cannot be negative" }
87111
if (time <= 0) return // don't delay
88112
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import kotlin.coroutines.experimental.CoroutineContext
3131
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
3232
* continue processing events when invoked from the event dispatch thread.
3333
*/
34-
public interface EventLoop {
34+
public actual interface EventLoop {
3535
/**
3636
* Processes next event in this event loop.
3737
*

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,9 @@ public actual interface Job : CoroutineContext.Element {
267267
@Deprecated(message = "For binary compatibility", level = DeprecationLevel.HIDDEN)
268268
public fun invokeOnCompletion(handler: CompletionHandler, onCancelling: Boolean): DisposableHandle
269269

270+
/**
271+
* @suppress **Deprecated**: Use with named `onCancelling` and `handler` parameters.
272+
*/
270273
@Deprecated(message = "Use with named `onCancelling` and `handler` parameters", level = DeprecationLevel.WARNING,
271274
replaceWith = ReplaceWith("this.invokeOnCompletion(onCancelling = onCancelling_, handler = handler)"))
272275
public fun invokeOnCompletion(onCancelling_: Boolean = false, handler: CompletionHandler): DisposableHandle

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public actual object NonCancellable : AbstractCoroutineContextElement(Job), Job
6767

6868
/** Always returns [NonDisposableHandle]. */
6969
@Suppress("OverridingDeprecatedMember")
70-
override fun invokeOnCompletion(onCancelling: Boolean, handler: CompletionHandler): DisposableHandle =
70+
override fun invokeOnCompletion(onCancelling_: Boolean, handler: CompletionHandler): DisposableHandle =
7171
NonDisposableHandle
7272

7373
/** Always returns [NonDisposableHandle]. */

js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package kotlinx.coroutines.experimental
22

33
import kotlin.coroutines.experimental.ContinuationInterceptor
44
import kotlin.coroutines.experimental.CoroutineContext
5+
import kotlin.coroutines.experimental.EmptyCoroutineContext
6+
import kotlin.coroutines.experimental.startCoroutine
57

68
/**
79
* Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
@@ -47,6 +49,27 @@ public actual fun launch(
4749
return coroutine
4850
}
4951

52+
/**
53+
* Runs new coroutine with the private event loop until its completion.
54+
* This function should not be used from coroutine. It is designed to bridge regular code
55+
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
56+
*
57+
* The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
58+
* in this blocked thread until the completion of this coroutine.
59+
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
60+
*
61+
* @param context context of the coroutine. The default value is an implementation of [EventLoop].
62+
* @param block the coroutine code.
63+
*/
64+
public actual fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
65+
val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop() else null
66+
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
67+
val coroutine = BlockingCoroutine<T>(newContext, privateEventLoop = eventLoop != null)
68+
coroutine.initParentJob(newContext[Job])
69+
block.startCoroutine(coroutine, coroutine)
70+
return coroutine.joinBlocking()
71+
}
72+
5073
// --------------- implementation ---------------
5174

5275
private open class StandaloneCoroutine(
@@ -68,3 +91,36 @@ private class LazyStandaloneCoroutine(
6891
}
6992
}
7093

94+
private class BlockingCoroutine<T>(
95+
parentContext: CoroutineContext,
96+
private val privateEventLoop: Boolean
97+
) : AbstractCoroutine<T>(parentContext, true) {
98+
private val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
99+
100+
init {
101+
if (privateEventLoop) require(eventLoop is BlockingEventLoop)
102+
}
103+
104+
fun joinBlocking(): T {
105+
while (true) {
106+
val delay = eventLoop?.processNextEvent() ?: Double.MAX_VALUE
107+
if (isCompleted) break
108+
if (delay > 0) throw IllegalStateException("JS thread cannot be blocked, " +
109+
"runBlocking { ... } cannot be waiting for its completion with timeout")
110+
}
111+
// process queued events (that could have been added after last processNextEvent and before cancel
112+
if (privateEventLoop) (eventLoop as BlockingEventLoop).apply {
113+
// We exit the "while" loop above when this coroutine's state "isCompleted",
114+
// Here we should signal that BlockingEventLoop should not accept any more tasks
115+
isCompleted = true
116+
shutdown()
117+
}
118+
// now return result
119+
val state = this.state
120+
(state as? CompletedExceptionally)?.let { throw it.exception }
121+
@Suppress("UNCHECKED_CAST")
122+
return state as T
123+
}
124+
}
125+
126+

0 commit comments

Comments
 (0)