Skip to content

Commit 41c5c8b

Browse files
committed
lazyDefer introduced
1 parent 44ba4b1 commit 41c5c8b

File tree

9 files changed

+423
-106
lines changed

9 files changed

+423
-106
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ import kotlin.coroutines.*
2121
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
2222
*/
2323
fun launch(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit): Job =
24-
StandaloneCoroutine(newCoroutineContext(context)).also { block.startCoroutine(it, it) }
24+
StandaloneCoroutine(newCoroutineContext(context)).apply {
25+
initParentJob(context[Job])
26+
block.startCoroutine(this, this)
27+
}
2528

2629
/**
2730
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
@@ -54,10 +57,10 @@ public suspend fun <T> run(context: CoroutineContext, block: suspend CoroutineSc
5457
@Throws(InterruptedException::class)
5558
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
5659
val currentThread = Thread.currentThread()
57-
val privateEventLoop = if (context[ContinuationInterceptor] == null)
58-
EventLoopImpl(currentThread) else null
60+
val privateEventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
5961
val newContext = newCoroutineContext(context + (privateEventLoop ?: EmptyCoroutineContext))
6062
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop != null)
63+
coroutine.initParentJob(context[Job])
6164
privateEventLoop?.initParentJob(coroutine)
6265
block.startCoroutine(coroutine, coroutine)
6366
return coroutine.joinBlocking()
@@ -66,13 +69,11 @@ public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl
6669
// --------------- implementation ---------------
6770

6871
private class StandaloneCoroutine(
69-
val newContext: CoroutineContext
70-
) : AbstractCoroutine<Unit>(newContext) {
71-
init { initParentJob(newContext[Job]) }
72-
72+
val parentContext: CoroutineContext
73+
) : AbstractCoroutine<Unit>(parentContext) {
7374
override fun afterCompletion(state: Any?) {
7475
// note the use of the parent's job context below!
75-
if (state is CompletedExceptionally) handleCoroutineException(newContext, state.cancelReason)
76+
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelReason)
7677
}
7778
}
7879

@@ -84,13 +85,11 @@ private class InnerCoroutine<T>(
8485
}
8586

8687
private class BlockingCoroutine<T>(
87-
newContext: CoroutineContext,
88+
context: CoroutineContext,
8889
val blockedThread: Thread,
8990
val hasPrivateEventLoop: Boolean
90-
) : AbstractCoroutine<T>(newContext) {
91-
val eventLoop: EventLoop? = newContext[ContinuationInterceptor] as? EventLoop
92-
93-
init { initParentJob(newContext[Job]) }
91+
) : AbstractCoroutine<T>(context) {
92+
val eventLoop: EventLoop? = context[ContinuationInterceptor] as? EventLoop
9493

9594
override fun afterCompletion(state: Any?) {
9695
if (Thread.currentThread() != blockedThread)

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ public interface CoroutineScope {
2525
* It stores the result of continuation in the state of the job.
2626
*/
2727
@Suppress("LeakingThis")
28-
public abstract class AbstractCoroutine<in T>(newContext: CoroutineContext) : JobSupport(), Continuation<T>, CoroutineScope {
29-
override val context: CoroutineContext = newContext + this // merges this job into this context
28+
public abstract class AbstractCoroutine<in T>(context: CoroutineContext) : JobSupport(), Continuation<T>, CoroutineScope {
29+
override val context: CoroutineContext = context + this // merges this job into this context
3030

3131
final override fun resume(value: T) {
3232
while (true) { // lock-free loop on state

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import kotlin.coroutines.startCoroutine
66
/**
77
* Deferred value is conceptually a non-blocking cancellable future.
88
* It is created with [defer] coroutine builder.
9+
* It is in [active][isActive] state while the value is being computed.
910
*/
1011
public interface Deferred<out T> : Job {
1112
/**
@@ -35,20 +36,33 @@ public interface Deferred<out T> : Job {
3536
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
3637
*/
3738
public fun <T> defer(context: CoroutineContext, block: suspend CoroutineScope.() -> T) : Deferred<T> =
38-
DeferredCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it, it) }
39+
DeferredCoroutine<T>(newCoroutineContext(context)).apply {
40+
initParentJob(context[Job])
41+
block.startCoroutine(this, this)
42+
}
3943

40-
private class DeferredCoroutine<T>(
41-
newContext: CoroutineContext
42-
) : AbstractCoroutine<T>(newContext), Deferred<T> {
43-
init { initParentJob(newContext[Job]) }
44+
internal open class DeferredCoroutine<T>(
45+
context: CoroutineContext
46+
) : AbstractCoroutine<T>(context), Deferred<T> {
47+
protected open fun start(): Boolean = false // LazyDeferredCoroutine overrides
4448

4549
@Suppress("UNCHECKED_CAST")
4650
suspend override fun await(): T {
4751
// quick check if already complete (avoid extra object creation)
48-
val state = getState()
49-
if (state !is Active) {
50-
if (state is CompletedExceptionally) throw state.exception
51-
return state as T
52+
getState().let { state ->
53+
if (state !is Active) {
54+
if (state is CompletedExceptionally) throw state.exception
55+
return state as T
56+
}
57+
}
58+
if (start()) { // LazyDeferredCoroutine overrides
59+
// recheck state (may have started & already completed
60+
getState().let { state ->
61+
if (state !is Active) {
62+
if (state is CompletedExceptionally) throw state.exception
63+
return state as T
64+
}
65+
}
5266
}
5367
// Note: await is cancellable itself!
5468
return awaitGetValue()

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
139139
}
140140

141141
// invoke at most once after construction after all other initialization
142-
protected fun initParentJob(parent: Job?) {
142+
public fun initParentJob(parent: Job?) {
143143
if (parent == null) return
144144
check(registration == null)
145145
// directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
@@ -155,9 +155,11 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
155155
expect as ActiveList // assert type
156156
require(update !is Active) // only active -> inactive transition is allowed
157157
if (!STATE.compareAndSet(this, expect, update)) return false
158-
// #1. Unregister from parent job
158+
// #1. Update linked state before invoking completion handlers
159+
onStateUpdate(update)
160+
// #2. Unregister from parent job
159161
registration?.unregister() // volatile read registration _after_ state was updated
160-
// #2 Invoke completion handlers
162+
// #3. Invoke completion handlers
161163
val reason = (update as? CompletedExceptionally)?.cancelReason
162164
var completionException: Throwable? = null
163165
expect.forEach<JobNode> { node ->
@@ -167,7 +169,7 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
167169
completionException?.apply { addSuppressed(ex) } ?: run { completionException = ex }
168170
}
169171
}
170-
// #3 Do other (overridable) processing
172+
// #4. Do other (overridable) processing after completion handlers
171173
completionException?.let { handleCompletionException(it) }
172174
afterCompletion(update)
173175
return true
@@ -196,6 +198,11 @@ public open class JobSupport : AbstractCoroutineContextElement(Job), Job {
196198
}
197199
}
198200

201+
/**
202+
* Override to make linked state changes before completion handlers are invoked.
203+
*/
204+
protected open fun onStateUpdate(update: Any?) {}
205+
199206
/**
200207
* Override to process any exceptions that were encountered while invoking [onCompletion] handlers.
201208
*/
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package kotlinx.coroutines.experimental
2+
3+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
4+
import kotlin.coroutines.CoroutineContext
5+
import kotlin.coroutines.startCoroutine
6+
7+
/**
8+
* Lazy deferred value is conceptually a non-blocking cancellable future that is started on
9+
* the first [await] or [start] invocation.
10+
* It is created with [lazyDefer] coroutine builder.
11+
*
12+
* Unlike a simple [Deferred] value, a lazy deferred value has three states:
13+
* * _Pending_ -- before the starts of the coroutine ([isActive] is `true`, but [isComputing] is `false`).
14+
* * _Computing_ -- while computing the value ([isActive] is `true` and [isComputing] is `true`).
15+
* * _Complete_ -- when done computing the value ([isActive] is `false` and [isComputing] is `false`).
16+
*
17+
* If this lazy deferred value is [cancelled][cancel], then it becomes immediately complete and
18+
* cancels ongoing computation coroutine if it was started.
19+
*/
20+
public interface LazyDeferred<T> : Deferred<T> {
21+
/**
22+
* Returns `true` if the coroutine is computing its value.
23+
*/
24+
public val isComputing: Boolean
25+
26+
/**
27+
* Starts coroutine to compute this lazily deferred value. The result `true` if this invocation actually
28+
* started coroutine or `false` if it was already started or cancelled.
29+
*/
30+
public fun start(): Boolean
31+
}
32+
33+
/**
34+
* Lazily starts new coroutine on the first [await][Deferred.await] or [start][LazyDeferred.start] invocation
35+
* on the resulting [LazyDeferred].
36+
* The running coroutine is cancelled when the resulting value is [cancelled][Job.cancel].
37+
*
38+
* The [context] for the new coroutine must be explicitly specified.
39+
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
40+
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
41+
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
42+
*/
43+
public fun <T> lazyDefer(context: CoroutineContext, block: suspend CoroutineScope.() -> T) : LazyDeferred<T> =
44+
LazyDeferredCoroutine<T>(newCoroutineContext(context), block).apply {
45+
initParentJob(context[Job])
46+
}
47+
48+
private class LazyDeferredCoroutine<T>(
49+
context: CoroutineContext,
50+
val block: suspend CoroutineScope.() -> T
51+
) : DeferredCoroutine<T>(context), LazyDeferred<T> {
52+
53+
@Volatile
54+
var lazyState: Int = STATE_PENDING
55+
56+
companion object {
57+
private val STATE_PENDING = 0
58+
private val STATE_COMPUTING = 1
59+
private val STATE_COMPLETE = 2
60+
61+
private val LAZY_STATE: AtomicIntegerFieldUpdater<LazyDeferredCoroutine<*>> =
62+
AtomicIntegerFieldUpdater.newUpdater(LazyDeferredCoroutine::class.java, "lazyState")
63+
}
64+
65+
/*
66+
=== State linking & linearization of the overall state ===
67+
68+
There are two state variables in this object and they have to update atomically from the standpoint of
69+
external observer:
70+
1. Job.state that is used by isActive function.
71+
2. lazyState that is used to make sure coroutine starts at most once.
72+
External observer must see only three states, not four, i.e. it should not be able
73+
to see `isActive == false`, but `isComputing == true`.
74+
75+
On completion/cancellation state variables are updated in this order:
76+
a) state <- complete (isComplete starts returning true)
77+
b) lazyState <- STATE_COMPLETE (see onStateUpdate)
78+
This is why, `isComputing` checks state variables in reverse order:
79+
a) lazyState is checked _first_
80+
b) isActive is checked after it
81+
This way cancellation/completion is atomic w.r.t to all state functions.
82+
83+
`start` function also has to check lazyState _before_ isActive.
84+
*/
85+
86+
override val isComputing: Boolean get() = lazyState == STATE_COMPUTING && isActive
87+
88+
override fun start(): Boolean {
89+
while (true) { // lock-free loop on lazyState
90+
when (lazyState) { // volatile read
91+
STATE_PENDING -> {
92+
if (isActive) { // then volatile read Job.state (inside isActive)
93+
// can try to start
94+
if (LAZY_STATE.compareAndSet(this, STATE_PENDING, STATE_COMPUTING)) {
95+
block.startCoroutine(this, this)
96+
return true
97+
}
98+
} else {
99+
// cannot start -- already complete -- help update lazyState
100+
lazyState = STATE_COMPLETE
101+
return false
102+
}
103+
}
104+
else -> return false
105+
}
106+
}
107+
}
108+
109+
override fun onStateUpdate(update: Any?) {
110+
lazyState = STATE_COMPLETE
111+
}
112+
}

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt

Lines changed: 1 addition & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,9 @@
11
package kotlinx.coroutines.experimental
22

3-
import org.junit.After
43
import org.junit.Test
54
import java.io.IOException
6-
import java.util.concurrent.atomic.AtomicBoolean
7-
import java.util.concurrent.atomic.AtomicInteger
8-
import java.util.concurrent.atomic.AtomicReference
9-
10-
class CoroutinesTest {
11-
var actionIndex = AtomicInteger()
12-
var finished = AtomicBoolean()
13-
var error = AtomicReference<Throwable>()
14-
15-
fun expect(index: Int) {
16-
val wasIndex = actionIndex.incrementAndGet()
17-
check(index == wasIndex) { "Expecting action index $index but it is actually $wasIndex" }
18-
}
19-
20-
fun expectUnreached() {
21-
throw IllegalStateException("Should not be reached").also { error.compareAndSet(null, it) }
22-
}
23-
24-
fun finish(index: Int) {
25-
expect(index)
26-
finished.set(true)
27-
}
28-
29-
@After
30-
fun onCompletion() {
31-
error.get()?.let { throw it }
32-
check(finished.get()) { "Expecting that 'finish(...)' was invoked, but it was not" }
33-
}
345

6+
class CoroutinesTest : TestBase() {
357
@Test
368
fun testSimple() = runBlocking {
379
expect(1)
@@ -187,53 +159,4 @@ class CoroutinesTest {
187159
}
188160
finish(5)
189161
}
190-
191-
@Test
192-
fun testDeferSimple(): Unit = runBlocking {
193-
expect(1)
194-
val d = defer(context) {
195-
expect(2)
196-
42
197-
}
198-
expect(3)
199-
check(d.await() == 42)
200-
finish(4)
201-
}
202-
203-
@Test
204-
fun testDeferAndYield(): Unit = runBlocking {
205-
expect(1)
206-
val d = defer(context) {
207-
expect(2)
208-
yield()
209-
expect(4)
210-
42
211-
}
212-
expect(3)
213-
check(d.await() == 42)
214-
finish(5)
215-
}
216-
217-
@Test(expected = IOException::class)
218-
fun testDeferSimpleException(): Unit = runBlocking {
219-
expect(1)
220-
val d = defer(context) {
221-
expect(2)
222-
throw IOException()
223-
}
224-
finish(3)
225-
d.await() // will throw IOException
226-
}
227-
228-
@Test(expected = IOException::class)
229-
fun testDeferAndYieldException(): Unit = runBlocking {
230-
expect(1)
231-
val d = defer(context) {
232-
expect(2)
233-
yield()
234-
throw IOException()
235-
}
236-
finish(3)
237-
d.await() // will throw IOException
238-
}
239162
}

0 commit comments

Comments
 (0)