Skip to content

Commit d528e3e

Browse files
committed
EventLoop is integrated as runBlocking default and is used for tests, coroutine builders provide CoroutineScope with context
1 parent 54c3175 commit d528e3e

File tree

15 files changed

+243
-120
lines changed

15 files changed

+243
-120
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package kotlinx.coroutines.experimental
22

33
import java.util.concurrent.locks.LockSupport
4-
import kotlin.coroutines.Continuation
5-
import kotlin.coroutines.CoroutineContext
6-
import kotlin.coroutines.startCoroutine
7-
import kotlin.coroutines.suspendCoroutine
4+
import kotlin.coroutines.*
85

96
// --------------- basic coroutine builders ---------------
107

@@ -21,29 +18,30 @@ import kotlin.coroutines.suspendCoroutine
2118
*
2219
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
2320
*/
24-
fun launch(context: CoroutineContext, block: suspend () -> Unit): Job =
25-
StandaloneCoroutine(newCoroutineContext(context)).also { block.startCoroutine(it) }
21+
fun launch(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit): Job =
22+
StandaloneCoroutine(newCoroutineContext(context)).also { block.startCoroutine(it, it) }
2623

2724
/**
2825
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
2926
* the result. It immediately applies dispatcher from the new context, shifting execution of the block into the
3027
* different thread inside the block, and back when it completes.
3128
* The specified [context] is merged onto the current coroutine context.
3229
*/
33-
public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
30+
public suspend fun <T> run(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T =
3431
suspendCoroutine { cont ->
35-
block.startCoroutine(object : Continuation<T> by cont {
36-
override val context: CoroutineContext = cont.context + context
37-
})
32+
// new don't invoke `newCoroutineContext`, but consider this being the same coroutine in the new context
33+
InnerCoroutine(cont.context + context, cont).also { block.startCoroutine(it, it) }
3834
}
3935

4036
/**
4137
* Runs new coroutine and *blocks* current thread *interruptibly* until its completion.
42-
* This function should not be used from coroutine. It is designed to bridge regular code blocking code
43-
* to libraries that are written in suspending style.
44-
* The [context] for the new coroutine must be explicitly specified and must include [CoroutineDispatcher] element.
45-
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
46-
* The specified context is added to the context of the parent running coroutine (if any) inside which this function
38+
* This function should not be used from coroutine. It is designed to bridge regular blocking code
39+
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
40+
*
41+
* The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
42+
* in this blocked thread until the completion of this coroutine.
43+
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
44+
* The specified [context] is added to the context of the parent running coroutine (if any) inside which this function
4745
* is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
4846
*
4947
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
@@ -52,26 +50,45 @@ public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T
5250
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
5351
*/
5452
@Throws(InterruptedException::class)
55-
public fun <T> runBlocking(context: CoroutineContext, block: suspend () -> T): T =
56-
BlockingCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it) }.joinBlocking()
53+
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
54+
val currentThread = Thread.currentThread()
55+
val privateEventLoop = if (context[ContinuationInterceptor] as? CoroutineDispatcher == null)
56+
EventLoopImpl(currentThread) else null
57+
val newContext = newCoroutineContext(context + (privateEventLoop ?: EmptyCoroutineContext))
58+
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop != null)
59+
privateEventLoop?.initParentJob(coroutine)
60+
block.startCoroutine(coroutine, coroutine)
61+
return coroutine.joinBlocking()
62+
}
5763

5864
// --------------- implementation ---------------
5965

6066
private class StandaloneCoroutine(
61-
val parentContext: CoroutineContext
62-
) : AbstractCoroutine<Unit>(parentContext) {
63-
init { initParentJob(parentContext[Job]) }
67+
val newContext: CoroutineContext
68+
) : AbstractCoroutine<Unit>(newContext) {
69+
init { initParentJob(newContext[Job]) }
6470

6571
override fun afterCompletion(state: Any?) {
66-
// note the use of the parent context below!
67-
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelReason)
72+
// note the use of the parent's job context below!
73+
if (state is CompletedExceptionally) handleCoroutineException(newContext, state.cancelReason)
6874
}
6975
}
7076

71-
private class BlockingCoroutine<T>(parentContext: CoroutineContext) : AbstractCoroutine<T>(parentContext) {
72-
val blockedThread: Thread = Thread.currentThread()
77+
private class InnerCoroutine<T>(
78+
override val context: CoroutineContext,
79+
continuation: Continuation<T>
80+
) : Continuation<T> by continuation, CoroutineScope {
81+
override val isActive: Boolean = context[Job]?.isActive ?: true
82+
}
83+
84+
private class BlockingCoroutine<T>(
85+
newContext: CoroutineContext,
86+
val blockedThread: Thread,
87+
val hasPrivateEventLoop: Boolean
88+
) : AbstractCoroutine<T>(newContext) {
89+
val eventLoop: EventLoop? = newContext[ContinuationInterceptor] as? EventLoop
7390

74-
init { initParentJob(parentContext[Job]) }
91+
init { initParentJob(newContext[Job]) }
7592

7693
override fun afterCompletion(state: Any?) {
7794
LockSupport.unpark(blockedThread)
@@ -81,8 +98,14 @@ private class BlockingCoroutine<T>(parentContext: CoroutineContext) : AbstractCo
8198
fun joinBlocking(): T {
8299
while (isActive) {
83100
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
84-
LockSupport.park(this)
101+
if (eventLoop == null || !eventLoop.processNextEvent())
102+
LockSupport.park(this)
103+
}
104+
// process remaining events (that could have been added after last processNextEvent and before cancel
105+
if (hasPrivateEventLoop) {
106+
while (eventLoop!!.processNextEvent()) { /* just spin */ }
85107
}
108+
// now return result
86109
val state = getState()
87110
(state as? CompletedExceptionally)?.let { throw it.exception }
88111
return state as T

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger
1111
* coroutine resumption is dispatched as a separate task even when it already executes inside the pool.
1212
* When available, it wraps [ForkJoinPool.commonPool] and provides a similar shared pool where not.
1313
*/
14-
object CommonPool : CoroutineDispatcher() {
14+
object CommonPool : CoroutineDispatcher(), Yield {
1515
private val pool: Executor = findPool()
1616

1717
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
@@ -37,4 +37,8 @@ object CommonPool : CoroutineDispatcher() {
3737

3838
override fun isDispatchNeeded(): Boolean = true
3939
override fun dispatch(block: Runnable) = pool.execute(block)
40+
41+
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
42+
pool.execute { continuation.resume(Unit) }
43+
}
4044
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt renamed to kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,30 @@ package kotlinx.coroutines.experimental
33
import kotlin.coroutines.Continuation
44
import kotlin.coroutines.CoroutineContext
55

6+
/**
7+
* Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient access
8+
* to its [context] and cancellation status via [isActive].
9+
*/
10+
public interface CoroutineScope {
11+
/**
12+
* Returns `true` when this coroutine is still active (was not cancelled).
13+
*/
14+
public val isActive: Boolean
15+
16+
/**
17+
* Returns the context of this coroutine.
18+
*/
19+
public val context: CoroutineContext
20+
}
21+
622
/**
723
* Abstract class to simplify writing of coroutine completion objects that
824
* implements [Continuation] and [Job] interfaces.
925
* It stores the result of continuation in the state of the job.
1026
*/
1127
@Suppress("LeakingThis")
12-
public abstract class AbstractCoroutine<in T>(parentContext: CoroutineContext) : JobSupport(), Continuation<T> {
13-
override val context: CoroutineContext = parentContext + this // merges this job into this context
28+
public abstract class AbstractCoroutine<in T>(newContext: CoroutineContext) : JobSupport(), Continuation<T>, CoroutineScope {
29+
override val context: CoroutineContext = newContext + this // merges this job into this context
1430

1531
final override fun resume(value: T) {
1632
while (true) { // lock-free loop on state

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kotlinx.coroutines.experimental
33
import java.util.concurrent.atomic.AtomicLong
44
import kotlin.coroutines.AbstractCoroutineContextElement
55
import kotlin.coroutines.ContinuationInterceptor
6-
import kotlin.coroutines.ContinuationInterceptor.Key
76
import kotlin.coroutines.CoroutineContext
87
import kotlin.coroutines.EmptyCoroutineContext
98

@@ -35,7 +34,6 @@ public object Here : CoroutineDispatcher() {
3534
public val currentCoroutineContext: CoroutineContext
3635
get() = CURRENT_CONTEXT.get() ?: throw IllegalStateException("Not inside a coroutine")
3736

38-
3937
/**
4038
* Returns the context of the coroutine that this function is invoked in or a specified [default]
4139
* if not invoked inside a coroutine. A [default] must be a singleton [CoroutineDispatcher] element.

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ public interface Deferred<out T> : Job {
3333
* The specified context is added to the context of the parent running coroutine (if any) inside which this function
3434
* is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
3535
*/
36-
public fun <T> defer(context: CoroutineContext, block: suspend () -> T) : Deferred<T> =
37-
DeferredCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it) }
36+
public fun <T> defer(context: CoroutineContext, block: suspend CoroutineScope.() -> T) : Deferred<T> =
37+
DeferredCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it, it) }
3838

3939
private class DeferredCoroutine<T>(
40-
parentContext: CoroutineContext
41-
) : AbstractCoroutine<T>(parentContext), Deferred<T> {
42-
init { initParentJob(parentContext[Job]) }
40+
newContext: CoroutineContext
41+
) : AbstractCoroutine<T>(newContext), Deferred<T> {
42+
init { initParentJob(newContext[Job]) }
4343

4444
@Suppress("UNCHECKED_CAST")
4545
suspend override fun await(): T {

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import java.util.concurrent.TimeUnit
44
import kotlin.coroutines.ContinuationInterceptor
55

66
/**
7-
* Implemented by [CoroutineDispatcher] implementations that natively support non-blocking [delay] function.
7+
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
8+
* non-blocking [delay] function.
89
*/
910
public interface Delay {
1011
/**
@@ -16,13 +17,13 @@ public interface Delay {
1617
suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
1718
require(time >= 0) { "Delay time $time cannot be negative" }
1819
if (time <= 0) return // don't delay
19-
return suspendCancellableCoroutine { resumeAfterDelay(time, unit, it) }
20+
return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, unit, it) }
2021
}
2122

2223
/**
23-
* Resumes a specified continuation after a specified delay.
24+
* Schedules resume of a specified [continuation] after a specified delay [time].
2425
*/
25-
fun resumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
26+
fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
2627
}
2728

2829
/**
@@ -39,7 +40,7 @@ suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
3940
if (time <= 0) return // don't delay
4041
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
4142
(cont.context[ContinuationInterceptor] as? Delay)?.apply {
42-
resumeAfterDelay(time, unit, cont)
43+
scheduleResumeAfterDelay(time, unit, cont)
4344
return@sc
4445
}
4546
val timeout = scheduledExecutor.schedule({ cont.resume(Unit) }, time, unit)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt

Lines changed: 62 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,65 +4,91 @@ import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
44
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
55
import java.util.concurrent.locks.LockSupport
66
import kotlin.coroutines.Continuation
7-
import kotlin.coroutines.startCoroutine
87

8+
/**
9+
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
10+
* be asked to process next event from their event queue. It is used by [runBlocking] to
11+
* continue processing events when invoked from the event dispatch thread.
12+
*/
913
public interface EventLoop {
10-
public val thisEventLoop: CoroutineDispatcher
11-
public suspend fun yield()
12-
}
14+
/**
15+
* Processes next event in this event loop and returns `true` or returns `false` if there are
16+
* no events to process or when invoked from the wrong thread.
17+
*/
18+
public fun processNextEvent(): Boolean
1319

14-
@Throws(InterruptedException::class)
15-
public fun <T> runEventLoop(block: suspend EventLoop.() -> T): T =
16-
EventLoopImpl<T>().also { block.startCoroutine(it, it.coroutine) }.coroutine.joinBlocking()
20+
public companion object Factory {
21+
/**
22+
* Creates a new event loop that is bound the specified [thread] (current thread by default) and
23+
* stops accepting new events when [parentJob] completes. Every continuation that is scheduled
24+
* onto this event loop unparks the specified thread via [LockSupport.unpark].
25+
*
26+
* The main event-processing loop using the resulting `eventLoop` object should look like this:
27+
* ```
28+
* while (needsToBeRunning) {
29+
* if (Thread.interrupted()) break // or handle somehow
30+
* if (!eventLoop.processNextEvent()) LockSupport.park() // event loop will unpark
31+
* }
32+
* ```
33+
*/
34+
public operator fun invoke(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
35+
EventLoopImpl(thread).apply {
36+
if (parentJob != null) initParentJob(parentJob)
37+
}
38+
}
39+
}
1740

18-
private class EventLoopImpl<T> : CoroutineDispatcher(), EventLoop {
19-
val thread: Thread = Thread.currentThread()
41+
internal class EventLoopImpl(
42+
val thread: Thread
43+
) : CoroutineDispatcher(), EventLoop, Yield {
2044
val queue = LockFreeLinkedListHead()
21-
val coroutine = Coroutine()
22-
23-
public override val thisEventLoop: CoroutineDispatcher = this
45+
var parentJob: Job? = null
2446

25-
public override suspend fun yield(): Unit = suspendCancellableCoroutine { cont ->
26-
val node = Resume(cont)
27-
schedule(node)
28-
cont.removeOnCompletion(node)
47+
fun initParentJob(coroutine: Job) {
48+
require(this.parentJob == null)
49+
this.parentJob = coroutine
2950
}
3051

3152
override fun isDispatchNeeded(): Boolean = Thread.currentThread() != thread
3253

3354
override fun dispatch(block: Runnable) {
3455
schedule(Dispatch(block))
35-
queue.addLast(Dispatch(block))
3656
}
3757

38-
fun schedule(node: LockFreeLinkedListNode) {
39-
check(queue.addLastIf(node) { coroutine.isActive }) {
40-
"EventLoop is already complete... cannot schedule any tasks"
41-
}
42-
LockSupport.unpark(thread)
58+
override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
59+
val node = Resume(continuation)
60+
if (schedule(node))
61+
continuation.removeOnCompletion(node)
4362
}
4463

45-
inner class Coroutine : AbstractCoroutine<T>(this@EventLoopImpl) {
46-
override fun afterCompletion(state: Any?) {
64+
fun schedule(node: Node): Boolean {
65+
val added = if (parentJob == null) {
66+
queue.addLast(node)
67+
true
68+
} else
69+
queue.addLastIf(node) { parentJob!!.isActive }
70+
if (added) {
4771
LockSupport.unpark(thread)
72+
} else {
73+
node.run()
4874
}
75+
return added
76+
}
4977

50-
@Suppress("UNCHECKED_CAST")
51-
fun joinBlocking(): T {
52-
while (isActive) {
53-
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
54-
(queue.removeFirstOrNull() as? Runnable)?.run() ?: LockSupport.park(this)
55-
}
56-
check(queue.isEmpty) { "There are still tasks in event loop queue... Stray coroutines?"}
57-
val state = getState()
58-
(state as? CompletedExceptionally)?.let { throw it.exception }
59-
return state as T
78+
override fun processNextEvent(): Boolean {
79+
if (Thread.currentThread() != thread) return false
80+
(queue.removeFirstOrNull() as? Runnable)?.apply {
81+
run()
82+
return true
6083
}
84+
return false
6185
}
6286

63-
class Dispatch(block: Runnable) : LockFreeLinkedListNode(), Runnable by block
87+
abstract class Node : LockFreeLinkedListNode(), Runnable
88+
89+
class Dispatch(block: Runnable) : Node(), Runnable by block
6490

65-
class Resume(val cont: Continuation<Unit>) : LockFreeLinkedListNode(), Runnable {
91+
class Resume(val cont: Continuation<Unit>) : Node() {
6692
override fun run() = cont.resume(Unit)
6793
}
6894
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import kotlin.coroutines.CoroutineContext
2323
* All functions on this interface are thread-safe.
2424
*/
2525
public interface Job : CoroutineContext.Element {
26+
/**
27+
* Key for [Job] instance in the coroutine context.
28+
*/
2629
public companion object Key : CoroutineContext.Key<Job> {
2730
/**
2831
* Creates new job object. It is optionally a child of a [parent] job.

0 commit comments

Comments
 (0)