Skip to content

Commit 187eace

Browse files
committed
Basic Channel interfaces and RendezvousChannel implementation
1 parent fa7723e commit 187eace

File tree

7 files changed

+805
-5
lines changed

7 files changed

+805
-5
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,37 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
2020
* Returns `true` if this continuation was cancelled. It implies that [isActive] is `false`.
2121
*/
2222
val isCancelled: Boolean
23+
24+
/**
25+
* Tries to resume this continuation with a given value and returns `true` if it was successful,
26+
* or `false` otherwise (it was already resumed or cancelled).
27+
*
28+
* An optional [onSuccess] callback is invoked with [value] as its parameter after the state of this continuation
29+
* is updated (so that is cannot be cancelled anymore), but before it is actually resumed.
30+
*/
31+
fun tryResume(value: T, onSuccess: ((Any?) -> Unit)? = null): Boolean
32+
33+
/**
34+
* Makes this continuation cancellable. Use it with `holdCancellability` optional parameter to
35+
* [suspendCancellableCoroutine] function. It throws [IllegalStateException] if invoked more than once.
36+
*/
37+
fun initCancellability()
2338
}
2439

2540
/**
2641
* Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
2742
* the [block]. This function throws [CancellationException] if the coroutine is cancelled while suspended.
43+
*
44+
* If [holdCancellability] optional parameter is `true`, then the coroutine is suspended, but it is not
45+
* cancellable until [CancellableContinuation.initCancellability] is invoked.
2846
*/
29-
public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
47+
public inline suspend fun <T> suspendCancellableCoroutine(
48+
holdCancellability: Boolean = false,
49+
crossinline block: (CancellableContinuation<T>) -> Unit
50+
): T =
3051
suspendCoroutineOrReturn { cont ->
3152
val safe = SafeCancellableContinuation(cont, getParentJobOrAbort(cont))
53+
if (!holdCancellability) safe.initCancellability()
3254
block(safe)
3355
safe.getResult()
3456
}
@@ -63,7 +85,9 @@ internal class SafeCancellableContinuation<in T>(
6385
const val YIELD = 3 // used by cancellable "yield"
6486
}
6587

66-
init { initParentJob(parentJob) }
88+
override fun initCancellability() {
89+
initParentJob(parentJob)
90+
}
6791

6892
fun getResult(): Any? {
6993
val decision = this.decision // volatile read
@@ -80,6 +104,16 @@ internal class SafeCancellableContinuation<in T>(
80104
override val isCancelled: Boolean
81105
get() = getState() is Cancelled
82106

107+
override fun tryResume(value: T, onSuccess: ((Any?) -> Unit)?): Boolean {
108+
while (true) { // lock-free loop on state
109+
val state = getState() // atomic read
110+
when (state) {
111+
is Active -> if (updateState(state, value, onSuccess)) return true
112+
else -> return false // cannot resume -- not active anymore
113+
}
114+
}
115+
}
116+
83117
@Suppress("UNCHECKED_CAST")
84118
override fun afterCompletion(state: Any?) {
85119
val decision = this.decision // volatile read

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,16 @@ internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
213213
/**
214214
* Tries to update current [state][getState] of this job.
215215
*/
216-
fun updateState(expect: Any, update: Any?): Boolean {
216+
fun updateState(expect: Any, update: Any?, onSuccess: ((Any?) -> Unit)? = null): Boolean {
217217
require(expect is Active && update !is Active) // only active -> inactive transition is allowed
218218
if (!STATE.compareAndSet(this, expect, update)) return false
219219
// #1. Update linked state before invoking completion handlers
220220
onStateUpdate(update)
221221
// #2. Unregister from parent job
222222
registration?.unregister() // volatile read registration _after_ state was updated
223-
// #3. Invoke completion handlers
223+
// #3. Additional (optional) callback
224+
onSuccess?.invoke(update)
225+
// #4. Invoke completion handlers
224226
val reason = (update as? CompletedExceptionally)?.cancelReason
225227
var completionException: Throwable? = null
226228
when (expect) {
@@ -242,7 +244,7 @@ internal open class JobSupport : AbstractCoroutineContextElement(Job), Job {
242244
// otherwise -- do nothing (Empty)
243245
else -> check(expect == Empty)
244246
}
245-
// #4. Do other (overridable) processing after completion handlers
247+
// #5. Do other (overridable) processing after completion handlers
246248
completionException?.let { handleCompletionException(it) }
247249
afterCompletion(update)
248250
return true
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package kotlinx.coroutines.experimental.channels
2+
3+
import kotlinx.coroutines.experimental.CancellationException
4+
import kotlinx.coroutines.experimental.CoroutineScope
5+
import kotlinx.coroutines.experimental.Job
6+
import kotlinx.coroutines.experimental.yield
7+
import java.util.*
8+
9+
/**
10+
* Sender's interface to [Channel].
11+
*/
12+
public interface SendChannel<in E> {
13+
/**
14+
* Returns `true` if this channel was closed by invocation of [close] and thus
15+
* the [send] attempt will throw [ClosedSendChannelException].
16+
*/
17+
public val isClosedForSend: Boolean
18+
19+
/**
20+
* Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
21+
* This function returns `false` for [isClosedForSend] channel.
22+
*/
23+
public val isFull: Boolean
24+
25+
/**
26+
* Adds [element] into to this queue, suspending the caller while this queue [isFull],
27+
* or throws [ClosedSendChannelException] if the channel [isClosedForSend].
28+
*
29+
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
30+
* function is suspended, this function immediately resumes with [CancellationException].
31+
* Cancellation of suspended send is *atomic* -- when this function
32+
* throws [CancellationException] it means that the [element] was not sent to this channel.
33+
*
34+
* Note, that this function does not check for cancellation when it is not suspended.
35+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
36+
*/
37+
public suspend fun send(element: E)
38+
39+
/**
40+
* Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
41+
* and returns `true`. Otherwise, it returns `false` immediately
42+
* or throws [ClosedSendChannelException] if the channel [isClosedForSend].
43+
*/
44+
public fun offer(element: E): Boolean
45+
46+
/**
47+
* Closes this channel. This is an idempotent operation -- repeated invocations of this function have no effect.
48+
* Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
49+
* [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
50+
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
51+
* are received.
52+
*/
53+
public fun close()
54+
}
55+
56+
/**
57+
* Receiver's interface to [Channel].
58+
*/
59+
public interface ReceiveChannel<out E> {
60+
/**
61+
* Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
62+
* side and all previously sent items were already received, so that the [receive] attempt will
63+
* throw [ClosedReceiveChannelException].
64+
*/
65+
public val isClosedForReceive: Boolean
66+
67+
/**
68+
* Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
69+
* This function returns `false` for [isClosedForReceive] channel.
70+
*/
71+
public val isEmpty: Boolean
72+
73+
/**
74+
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
75+
* or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
76+
*
77+
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
78+
* function is suspended, this function immediately resumes with [CancellationException].
79+
* Cancellation of suspended receive is *atomic* -- when this function
80+
* throws [CancellationException] it means that the element was not retrieved from this channel.
81+
*
82+
* Note, that this function does not check for cancellation when it is not suspended.
83+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
84+
*/
85+
public suspend fun receive(): E
86+
87+
/**
88+
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
89+
* or returns `null` if the channel [isClosedForReceive].
90+
*
91+
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
92+
* function is suspended, this function immediately resumes with [CancellationException].
93+
* Cancellation of suspended receive is *atomic* -- when this function
94+
* throws [CancellationException] it means that the element was not retrieved from this channel.
95+
*
96+
* Note, that this function does not check for cancellation when it is not suspended.
97+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
98+
*/
99+
public suspend fun receiveOrNull(): E?
100+
101+
/**
102+
* Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
103+
* or [isClosedForReceive].
104+
*/
105+
public fun pool(): E?
106+
107+
/**
108+
* Returns new iterator to receive elements from this channels using `for` loop.
109+
*/
110+
public operator fun iterator(): ChannelIterator<E>
111+
}
112+
113+
/**
114+
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
115+
* from concurrent coroutines.
116+
*/
117+
public interface ChannelIterator<out E> {
118+
/**
119+
* Returns `true` if the channel has more elements suspending the caller while this channel
120+
* [isEmpty][ReceiveChannel.isEmpty] or `false` [ClosedReceiveChannelException] if the channel
121+
* [isClosedForReceive][ReceiveChannel.isClosedForReceive].
122+
* This function retrieves and removes the element from this channel for the subsequent invocation
123+
* of [next].
124+
*
125+
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
126+
* function is suspended, this function immediately resumes with [CancellationException].
127+
* Cancellation of suspended receive is *atomic* -- when this function
128+
* throws [CancellationException] it means that the element was not retrieved from this channel.
129+
*
130+
* Note, that this function does not check for cancellation when it is not suspended.
131+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
132+
*/
133+
public suspend operator fun hasNext(): Boolean
134+
135+
/**
136+
* Retrieves and removes the element from this channel suspending the caller while this channel
137+
* [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
138+
* [isClosedForReceive][ReceiveChannel.isClosedForReceive].
139+
*
140+
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
141+
* function is suspended, this function immediately resumes with [CancellationException].
142+
* Cancellation of suspended receive is *atomic* -- when this function
143+
* throws [CancellationException] it means that the element was not retrieved from this channel.
144+
*
145+
* Note, that this function does not check for cancellation when it is not suspended.
146+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
147+
*/
148+
public suspend operator fun next(): E
149+
}
150+
151+
/**
152+
* Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
153+
* Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
154+
* but it has suspending operations instead of blocking ones and it can be closed.
155+
*/
156+
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
157+
158+
/**
159+
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel.
160+
*/
161+
public class ClosedSendChannelException : IllegalStateException()
162+
163+
/**
164+
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
165+
* channel.
166+
*/
167+
public class ClosedReceiveChannelException : NoSuchElementException()

0 commit comments

Comments
 (0)