Skip to content

Commit 7b2d8b0

Browse files
committed
ArrayChannel implementation and tests
1 parent 6c63aea commit 7b2d8b0

File tree

13 files changed

+902
-371
lines changed

13 files changed

+902
-371
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
2222
val isCancelled: Boolean
2323

2424
/**
25-
* Tries to resume this continuation with a given value and returns `true` if it was successful,
26-
* or `false` otherwise (it was already resumed or cancelled).
27-
*
28-
* An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
29-
* is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
25+
* Tries to resume this continuation with a given value and returns non-null object token if it was successful,
26+
* or `null` otherwise (it was already resumed or cancelled). When non-null object was returned,
27+
* [completeResume] must be invoked with it.
3028
*/
31-
fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
29+
fun tryResume(value: T): Any?
30+
31+
/**
32+
* Completes the execution of [tryResume] on its non-null result.
33+
*/
34+
fun completeResume(token: Any)
3235

3336
/**
3437
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
@@ -104,16 +107,20 @@ internal class SafeCancellableContinuation<in T>(
104107
override val isCancelled: Boolean
105108
get() = getState() is Cancelled
106109

107-
override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
110+
override fun tryResume(value: T): Any? {
108111
while (true) { // lock-free loop on state
109112
val state = getState() // atomic read
110113
when (state) {
111-
is Active -> if (updateState(state, value, onSuccess)) return true
112-
else -> return false // cannot resume -- not active anymore
114+
is Active -> if (tryUpdateState(state, value)) return state
115+
else -> return null // cannot resume -- not active anymore
113116
}
114117
}
115118
}
116119

120+
override fun completeResume(token: Any) {
121+
completeUpdateState(token, getState())
122+
}
123+
117124
@Suppress("UNCHECKED_CAST")
118125
override fun afterCompletion(state: Any?) {
119126
val decision = this.decision // volatile read

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,16 +213,24 @@ internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
213213
/**
214214
* Tries to update current [state][getState] of this job.
215215
*/
216-
fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
216+
fun updateState(expect: Any, update: Any?): Boolean {
217+
if (!tryUpdateState(expect, update)) return false
218+
completeUpdateState(expect, update)
219+
return true
220+
}
221+
222+
fun tryUpdateState(expect: Any, update: Any?): Boolean {
217223
require(expect is Active && update !is Active) // only active -> inactive transition is allowed
218224
if (!STATE.compareAndSet(this, expect, update)) return false
219225
// #1. Update linked state before invoking completion handlers
220226
onStateUpdate(update)
221227
// #2. Unregister from parent job
222228
registration?.unregister() // volatile read registration _after_ state was updated
223-
// #3. Additional (optional) callback
224-
onSuccess?.invoke(update)
225-
// #4. Invoke completion handlers
229+
return true // continues in completeUpdateState
230+
}
231+
232+
fun completeUpdateState(expect: Any, update: Any?) {
233+
// #3. Invoke completion handlers
226234
val reason = (update as? CompletedExceptionally)?.cancelReason
227235
var completionException: Throwable? = null
228236
when (expect) {
@@ -244,10 +252,9 @@ internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
244252
// otherwise -- do nothing (Empty)
245253
else -> check(expect == Empty)
246254
}
247-
// #5. Do other (overridable) processing after completion handlers
255+
// #4. Do other (overridable) processing after completion handlers
248256
completionException?.let { handleCompletionException(it) }
249257
afterCompletion(update)
250-
return true
251258
}
252259

253260
final override val isActive: Boolean get() = state is Active

0 commit comments

Comments
 (0)