Skip to content

Commit 53a0a40

Browse files
committed
EventLoop work in progress
1 parent d7c8626 commit 53a0a40

File tree

9 files changed

+189
-38
lines changed

9 files changed

+189
-38
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt renamed to kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@ package kotlinx.coroutines.experimental
33
import kotlin.coroutines.Continuation
44
import kotlin.coroutines.CoroutineContext
55

6-
// internal helper class for various primitives that combines Job and Continuation implementations
6+
/**
7+
* Abstract class to simplify writing of coroutine completion objects that
8+
* implements [Continuation] and [Job] interfaces.
9+
* It stores the result of continuation in the state of the job.
10+
*/
711
@Suppress("LeakingThis")
8-
internal open class JobContinuation<in T>(
12+
public abstract class AbstractCoroutine<in T>(
913
parentContext: CoroutineContext
1014
) : JobSupport(parentContext[Job]), Continuation<T> {
1115
override val context: CoroutineContext = parentContext + this // mixes this job into this context
1216

13-
override fun resume(value: T) {
17+
final override fun resume(value: T) {
1418
while (true) { // lock-free loop on state
1519
val state = getState() // atomic read
1620
when (state) {
@@ -21,7 +25,7 @@ internal open class JobContinuation<in T>(
2125
}
2226
}
2327

24-
override fun resumeWithException(exception: Throwable) {
28+
final override fun resumeWithException(exception: Throwable) {
2529
while (true) { // lock-free loop on state
2630
val state = getState() // atomic read
2731
when (state) {
@@ -36,7 +40,7 @@ internal open class JobContinuation<in T>(
3640
}
3741
}
3842

39-
override fun afterCompletion(state: Any?, closeException: Throwable?) {
40-
if (closeException != null) handleCoroutineException(context, closeException)
43+
final override fun handleCompletionException(closeException: Throwable) {
44+
handleCoroutineException(context, closeException)
4145
}
4246
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,17 @@ public fun <T> runBlocking(context: CoroutineContext, block: suspend () -> T): T
5959

6060
private class StandaloneCoroutine(
6161
val parentContext: CoroutineContext
62-
) : JobContinuation<Unit>(parentContext) {
63-
override fun afterCompletion(state: Any?, closeException: Throwable?) {
64-
super.afterCompletion(state, closeException) // handle closeException
62+
) : AbstractCoroutine<Unit>(parentContext) {
63+
override fun afterCompletion(state: Any?) {
6564
// note the use of the parent context below!
6665
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
6766
}
6867
}
6968

70-
private class BlockingCoroutine<T>(parentContext: CoroutineContext) : JobContinuation<T>(parentContext) {
69+
private class BlockingCoroutine<T>(parentContext: CoroutineContext) : AbstractCoroutine<T>(parentContext) {
7170
val blockedThread: Thread = Thread.currentThread()
7271

73-
override fun afterCompletion(state: Any?, closeException: Throwable?) {
74-
super.afterCompletion(state, closeException) // handle closeException
72+
override fun afterCompletion(state: Any?) {
7573
LockSupport.unpark(blockedThread)
7674
}
7775

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (Ca
3131
@PublishedApi
3232
internal class SafeCancellableContinuation<in T>(
3333
private val delegate: Continuation<T>
34-
) : JobContinuation<T>(delegate.context), CancellableContinuation<T> {
34+
) : AbstractCoroutine<T>(delegate.context), CancellableContinuation<T> {
3535
// only updated from the thread that invoked suspendCancellableCoroutine
3636
private var suspendedThread: Thread? = Thread.currentThread()
3737

@@ -46,8 +46,7 @@ internal class SafeCancellableContinuation<in T>(
4646
}
4747

4848
@Suppress("UNCHECKED_CAST")
49-
override fun afterCompletion(state: Any?, closeException: Throwable?) {
50-
super.afterCompletion(state, closeException) // handle closeException
49+
override fun afterCompletion(state: Any?) {
5150
if (suspendedThread === Thread.currentThread()) {
5251
// cancelled during suspendCancellableCoroutine in its thread
5352
suspendedThread = null

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public fun <T> defer(context: CoroutineContext, block: suspend () -> T) : Deferr
3838

3939
private class DeferredCoroutine<T>(
4040
parentContext: CoroutineContext
41-
) : JobContinuation<T>(parentContext), Deferred<T> {
41+
) : AbstractCoroutine<T>(parentContext), Deferred<T> {
4242
@Suppress("UNCHECKED_CAST")
4343
suspend override fun await(): T {
4444
// quick check if already complete (avoid extra object creation)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package kotlinx.coroutines.experimental
2+
3+
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListHead
4+
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
5+
import java.util.concurrent.locks.LockSupport
6+
import kotlin.coroutines.Continuation
7+
import kotlin.coroutines.startCoroutine
8+
9+
public interface EventLoop {
10+
public val thisEventLoop: CoroutineDispatcher
11+
public suspend fun yield()
12+
}
13+
14+
@Throws(InterruptedException::class)
15+
public fun <T> runEventLoop(block: suspend EventLoop.() -> T): T =
16+
EventLoopImpl<T>().also { block.startCoroutine(it, it.coroutine) }.coroutine.joinBlocking()
17+
18+
private class EventLoopImpl<T> : CoroutineDispatcher(), EventLoop {
19+
val thread: Thread = Thread.currentThread()
20+
val queue = LockFreeLinkedListHead()
21+
val coroutine = Coroutine()
22+
23+
public override val thisEventLoop: CoroutineDispatcher = this
24+
25+
public override suspend fun yield(): Unit = suspendCancellableCoroutine { cont ->
26+
val node = Resume(cont)
27+
schedule(node)
28+
cont.removeOnCompletion(node)
29+
}
30+
31+
override fun isDispatchNeeded(): Boolean = Thread.currentThread() != thread
32+
33+
override fun dispatch(block: Runnable) {
34+
schedule(Dispatch(block))
35+
queue.addLast(Dispatch(block))
36+
}
37+
38+
fun schedule(node: LockFreeLinkedListNode) {
39+
check(queue.addLastIf(node) { coroutine.isActive }) {
40+
"EventLoop is already complete... cannot schedule any tasks"
41+
}
42+
LockSupport.unpark(thread)
43+
}
44+
45+
inner class Coroutine : AbstractCoroutine<T>(this@EventLoopImpl) {
46+
override fun afterCompletion(state: Any?) {
47+
LockSupport.unpark(thread)
48+
}
49+
50+
@Suppress("UNCHECKED_CAST")
51+
fun joinBlocking(): T {
52+
while (isActive) {
53+
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
54+
(queue.removeFirstOrNull() as? Runnable)?.run() ?: LockSupport.park(this)
55+
}
56+
check(queue.isEmpty) { "There are still tasks in event loop queue... Stray coroutines?"}
57+
val state = getState()
58+
(state as? CompletedExceptionally)?.let { throw it.exception }
59+
return state as T
60+
}
61+
}
62+
63+
class Dispatch(block: Runnable) : LockFreeLinkedListNode(), Runnable by block
64+
65+
class Resume(val cont: Continuation<Unit>) : LockFreeLinkedListNode(), Runnable {
66+
override fun run() = cont.resume(Unit)
67+
}
68+
}
69+

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Regis
9191
public fun Job.cancelFutureOnCompletion(future: Future<*>): Job.Registration =
9292
onCompletion(CancelFutureOnCompletion(this, future))
9393

94+
internal fun Job.removeOnCompletion(node: LockFreeLinkedListNode): Job.Registration =
95+
onCompletion(RemoveOnCompletion(this, node))
96+
9497
/**
9598
* Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
9699
* when the job is complete for any reason.
@@ -140,21 +143,18 @@ public open class JobSupport(
140143
// #1. Unregister from parent job
141144
registration?.unregister()
142145
// #2 Invoke completion handlers
143-
var closeException: Throwable? = null
144-
val reason = when (update) {
145-
is Cancelled -> update.cancelReason
146-
is CompletedExceptionally -> update.exception
147-
else -> null
148-
}
146+
val reason = (update as? CompletedExceptionally)?.cancelReason
147+
var completionException: Throwable? = null
149148
expect.forEach<JobNode> { node ->
150149
try {
151150
node.invoke(reason)
152151
} catch (ex: Throwable) {
153-
if (closeException == null) closeException = ex else closeException!!.addSuppressed(ex)
152+
completionException?.apply { addSuppressed(ex) } ?: run { completionException = ex }
154153
}
155154
}
156155
// #3 Do other (overridable) processing
157-
afterCompletion(update, closeException)
156+
completionException?.let { handleCompletionException(it) }
157+
afterCompletion(update)
158158
return true
159159
}
160160

@@ -181,10 +181,18 @@ public open class JobSupport(
181181
}
182182
}
183183

184-
protected open fun afterCompletion(state: Any?, closeException: Throwable?) {
185-
if (closeException != null) throw closeException
184+
/**
185+
* Override to process any exceptions that were encountered while invoking [onCompletion] handlers.
186+
*/
187+
protected open fun handleCompletionException(closeException: Throwable) {
188+
throw closeException
186189
}
187190

191+
/**
192+
* Override for post-completion actions that need to do something with the state.
193+
*/
194+
protected open fun afterCompletion(state: Any?) {}
195+
188196
private fun makeNode(handler: CompletionHandler): JobNode =
189197
(handler as? JobNode)?.also { require(it.job === this) }
190198
?: InvokeOnCompletion(this, handler)
@@ -272,3 +280,11 @@ private class CancelFutureOnCompletion(
272280
override fun invoke(reason: Throwable?) { future.cancel(true) }
273281
override fun toString() = "CancelFutureOnCompletion[$future]"
274282
}
283+
284+
private class RemoveOnCompletion(
285+
job: Job,
286+
val node: LockFreeLinkedListNode
287+
) : JobNode(job) {
288+
override fun invoke(reason: Throwable?) { node.remove() }
289+
override fun toString() = "RemoveOnCompletion[$node]"
290+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,21 +139,29 @@ internal open class LockFreeLinkedListNode {
139139
}
140140

141141
/**
142-
* Removes this node from the list.
142+
* Removes this node from the list. Returns `true` when removed successfully.
143143
*/
144-
public open fun remove() {
144+
public open fun remove(): Boolean {
145145
while (true) { // lock-free loop on next
146146
val next = this.next
147-
if (next is Removed) return // was already removed -- don't try to help (original thread will take care)
147+
if (next is Removed) return false // was already removed -- don't try to help (original thread will take care)
148148
if (NEXT.compareAndSet(this, next, Removed(next as Node))) {
149149
// was removed successfully (linearized remove) -- fixup the list
150150
helpDelete()
151151
next.helpInsert(prev.unwrap())
152-
return
152+
return true
153153
}
154154
}
155155
}
156156

157+
internal fun removeFirstOrNull(): Node? {
158+
while (true) { // try to linearize
159+
val first = next()
160+
if (first == this) return null
161+
if (first.remove()) return first
162+
}
163+
}
164+
157165
private fun markPrev(): Node {
158166
while (true) { // lock-free loop on prev
159167
val prev = this.prev
@@ -242,6 +250,8 @@ internal open class LockFreeLinkedListNode {
242250
}
243251

244252
internal open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
253+
public val isEmpty: Boolean get() = next() == this
254+
245255
/**
246256
* Iterates over all elements in this list of a specified type.
247257
*/
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package kotlinx.coroutines.experimental
2+
3+
import org.junit.After
4+
import org.junit.Test
5+
import java.util.concurrent.atomic.AtomicBoolean
6+
import java.util.concurrent.atomic.AtomicInteger
7+
8+
class CoroutinesTest {
9+
var actionIndex = AtomicInteger()
10+
var finished = AtomicBoolean()
11+
12+
fun expect(index: Int) {
13+
val wasIndex = actionIndex.incrementAndGet()
14+
check(index == wasIndex) { "Expecting action index $index but it is actually $wasIndex"}
15+
}
16+
17+
fun finish(index: Int) {
18+
expect(index)
19+
finished.set(true)
20+
}
21+
22+
@After
23+
fun onCompletion() {
24+
check(finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" }
25+
}
26+
27+
@Test
28+
fun testSimple() = runEventLoop {
29+
expect(1)
30+
finish(2)
31+
}
32+
33+
@Test
34+
fun testYield() = runEventLoop {
35+
expect(1)
36+
yield() // effectively does nothing, as we don't have other coroutines
37+
finish(2)
38+
}
39+
40+
@Test
41+
fun testLaunchHereAndYield() = runEventLoop {
42+
expect(1)
43+
val job = launch(Here) {
44+
expect(2)
45+
yield()
46+
expect(4)
47+
}
48+
expect(3)
49+
job.join()
50+
finish(5)
51+
}
52+
}

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListTest.kt

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ class LockFreeLinkedListTest {
1818
assertContents(list, 3, 2, 1)
1919
val n4 = IntNode(4).apply { list.addFirst(this) }
2020
assertContents(list, 4, 3, 2, 1)
21-
n1.remove()
21+
assertTrue(n1.remove())
2222
assertContents(list, 4, 3, 2)
23-
n3.remove()
23+
assertTrue(n3.remove())
24+
assertFalse(n3.remove())
2425
assertContents(list, 4, 2)
25-
n4.remove()
26+
assertTrue(n4.remove())
2627
assertContents(list, 2)
27-
n2.remove()
28+
assertTrue(n2.remove())
2829
assertContents(list)
2930
}
3031

@@ -40,13 +41,14 @@ class LockFreeLinkedListTest {
4041
assertContents(list, 1, 2, 3)
4142
val n4 = IntNode(4).apply { list.addLast(this) }
4243
assertContents(list, 1, 2, 3, 4)
43-
n1.remove()
44+
assertTrue(n1.remove())
4445
assertContents(list, 2, 3, 4)
45-
n3.remove()
46+
assertTrue(n3.remove())
4647
assertContents(list, 2, 4)
47-
n4.remove()
48+
assertTrue(n4.remove())
4849
assertContents(list, 2)
49-
n2.remove()
50+
assertTrue(n2.remove())
51+
assertFalse(n2.remove())
5052
assertContents(list)
5153
}
5254

@@ -72,5 +74,6 @@ class LockFreeLinkedListTest {
7274
list.forEach<IntNode> { actual[index++] = it.i }
7375
assertEquals(n, index)
7476
for (i in 0 until n) assertEquals("item i", expected[i], actual[i])
77+
assertEquals(expected.isEmpty(), list.isEmpty)
7578
}
7679
}

0 commit comments

Comments
 (0)