Skip to content

Commit 2ad0e94

Browse files
committed
LinkedListChannel implementation
1 parent ebe18b4 commit 2ad0e94

File tree

7 files changed

+231
-87
lines changed

7 files changed

+231
-87
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 133 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,66 +23,141 @@ import kotlinx.coroutines.experimental.selects.SelectInstance
2323
import kotlin.coroutines.experimental.startCoroutine
2424

2525
/**
26-
* Abstract channel. It is a base class for buffered and unbuffered channels.
27-
*
28-
* @suppress **This is unstable API and it is subject to change.**
26+
* Abstract channel. It is a base class for all channel implementations.
2927
*/
3028
public abstract class AbstractChannel<E> : Channel<E> {
3129
private val queue = LockFreeLinkedListHead()
3230

3331
// ------ extension points for buffered channels ------
3432

3533
/**
36-
* Returns `true` if this channel has buffer.
34+
* Returns `true` if [isBufferEmpty] is always `true`.
35+
* @suppress **This is unstable API and it is subject to change.**
3736
*/
38-
protected abstract val hasBuffer: Boolean
37+
protected abstract val isBufferAlwaysEmpty: Boolean
3938

4039
/**
4140
* Returns `true` if this channel's buffer is empty.
41+
* @suppress **This is unstable API and it is subject to change.**
4242
*/
4343
protected abstract val isBufferEmpty: Boolean
4444

45+
/**
46+
* Returns `true` if [isBufferFull] is always `true`.
47+
* @suppress **This is unstable API and it is subject to change.**
48+
*/
49+
protected abstract val isBufferAlwaysFull: Boolean
50+
4551
/**
4652
* Returns `true` if this channel's buffer is full.
53+
* @suppress **This is unstable API and it is subject to change.**
4754
*/
4855
protected abstract val isBufferFull: Boolean
4956

57+
// ------ internal functions for override by buffered channels ------
58+
5059
/**
5160
* Tries to add element to buffer or to queued receiver.
5261
* Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
62+
* @suppress **This is unstable API and it is subject to change.**
5363
*/
54-
protected abstract fun offerInternal(element: E): Any
64+
protected open fun offerInternal(element: E): Any {
65+
while (true) {
66+
val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
67+
val token = receive.tryResumeReceive(element, idempotent = null)
68+
if (token != null) {
69+
receive.completeResumeReceive(token)
70+
return receive.offerResult
71+
}
72+
}
73+
}
5574

5675
/**
5776
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
5877
* Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
78+
* @suppress **This is unstable API and it is subject to change.**
5979
*/
60-
protected abstract fun offerSelectInternal(element: E, select: SelectInstance<*>): Any
80+
protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
81+
// offer atomically with select
82+
val offerOp = describeTryOffer(element)
83+
val failure = select.performAtomicTrySelect(offerOp)
84+
if (failure != null) return failure
85+
val receive = offerOp.result
86+
receive.completeResumeReceive(offerOp.resumeToken!!)
87+
return receive.offerResult
88+
}
6189

6290
/**
6391
* Tries to remove element from buffer or from queued sender.
6492
* Return type is `E | POLL_FAILED | Closed`
93+
* @suppress **This is unstable API and it is subject to change.**
6594
*/
66-
protected abstract fun pollInternal(): Any?
95+
protected open fun pollInternal(): Any? {
96+
while (true) {
97+
val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
98+
val token = send.tryResumeSend(idempotent = null)
99+
if (token != null) {
100+
send.completeResumeSend(token)
101+
return send.pollResult
102+
}
103+
}
104+
}
67105

68106
/**
69107
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
70108
* Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
109+
* @suppress **This is unstable API and it is subject to change.**
71110
*/
72-
protected abstract fun pollSelectInternal(select: SelectInstance<*>): Any?
111+
protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
112+
// poll atomically with select
113+
val pollOp = describeTryPoll()
114+
val failure = select.performAtomicTrySelect(pollOp)
115+
if (failure != null) return failure
116+
val send = pollOp.result
117+
send.completeResumeSend(pollOp.resumeToken!!)
118+
return pollOp.pollResult
119+
}
73120

74-
// ------ state functions for concrete implementations ------
121+
// ------ state functions & helpers for concrete implementations ------
75122

76123
/**
77124
* Returns non-null closed token if it is first in the queue.
125+
* @suppress **This is unstable API and it is subject to change.**
78126
*/
79127
protected val closedForReceive: Any? get() = queue.next as? Closed<*>
80128

81129
/**
82130
* Returns non-null closed token if it is last in the queue.
131+
* @suppress **This is unstable API and it is subject to change.**
83132
*/
84133
protected val closedForSend: ReceiveOrClosed<*>? get() = queue.prev as? Closed<*>
85134

135+
/**
136+
* @suppress **This is unstable API and it is subject to change.**
137+
*/
138+
protected val hasReceiveOrClosed: Boolean get() = queue.next is ReceiveOrClosed<*>
139+
140+
/**
141+
* @suppress **This is unstable API and it is subject to change.**
142+
*/
143+
protected fun sendBuffered(element: E): Boolean =
144+
queue.addLastIfPrev(SendBuffered(element), { it !is ReceiveOrClosed<*> })
145+
146+
/**
147+
* @suppress **This is unstable API and it is subject to change.**
148+
*/
149+
protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
150+
151+
private class SendBufferedDesc<E>(
152+
queue: LockFreeLinkedListHead,
153+
element: E
154+
) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
155+
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
156+
if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
157+
return null
158+
}
159+
}
160+
86161
// ------ SendChannel ------
87162

88163
public final override val isClosedForSend: Boolean get() = closedForSend != null
@@ -131,10 +206,9 @@ public abstract class AbstractChannel<E> : Channel<E> {
131206
}
132207

133208
private fun enqueueSend(send: SendElement) =
134-
if (hasBuffer)
209+
if (isBufferAlwaysFull)
210+
queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> }) else
135211
queue.addLastIfPrevAndIf(send, { it !is ReceiveOrClosed<*> }, { isBufferFull })
136-
else
137-
queue.addLastIfPrev(send, { it !is ReceiveOrClosed<*> })
138212

139213
public final override fun close(cause: Throwable?): Boolean {
140214
val closed = Closed<E>(cause)
@@ -161,14 +235,21 @@ public abstract class AbstractChannel<E> : Channel<E> {
161235

162236
/**
163237
* Retrieves first receiving waiter from the queue or returns closed token.
238+
* @suppress **This is unstable API and it is subject to change.**
164239
*/
165240
protected fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
166241
queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
167242

168243
// ------ registerSelectSend ------
169244

245+
/**
246+
* @suppress **This is unstable API and it is subject to change.**
247+
*/
170248
protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
171249

250+
/**
251+
* @suppress **This is unstable API and it is subject to change.**
252+
*/
172253
protected class TryOfferDesc<E>(
173254
@JvmField val element: E,
174255
queue: LockFreeLinkedListHead
@@ -283,9 +364,9 @@ public abstract class AbstractChannel<E> : Channel<E> {
283364
}
284365

285366
private fun enqueueReceive(receive: Receive<E>): Boolean {
286-
val result = if (hasBuffer)
287-
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty }) else
288-
queue.addLastIfPrev(receive, { it !is Send })
367+
val result = if (isBufferAlwaysEmpty)
368+
queue.addLastIfPrev(receive, { it !is Send }) else
369+
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
289370
if (result) onEnqueuedReceive()
290371
return result
291372
}
@@ -343,14 +424,21 @@ public abstract class AbstractChannel<E> : Channel<E> {
343424

344425
/**
345426
* Retrieves first sending waiter from the queue or returns closed token.
427+
* @suppress **This is unstable API and it is subject to change.**
346428
*/
347429
protected fun takeFirstSendOrPeekClosed(): Send? =
348430
queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
349431

350432
// ------ registerSelectReceive ------
351433

434+
/**
435+
* @suppress **This is unstable API and it is subject to change.**
436+
*/
352437
protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
353438

439+
/**
440+
* @suppress **This is unstable API and it is subject to change.**
441+
*/
354442
protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
355443
@JvmField var resumeToken: Any? = null
356444
@JvmField var pollResult: E? = null
@@ -387,8 +475,10 @@ public abstract class AbstractChannel<E> : Channel<E> {
387475

388476
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
389477
super.finishOnSuccess(affected, next)
478+
// notify the there is one more receiver
479+
onEnqueuedReceive()
390480
// we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
391-
(node as ReceiveSelect<*, *>).removeOnSelectCompletion()
481+
node.removeOnSelectCompletion()
392482
}
393483
}
394484

@@ -459,25 +549,35 @@ public abstract class AbstractChannel<E> : Channel<E> {
459549
protected companion object {
460550
private const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
461551

552+
/** @suppress **This is unstable API and it is subject to change.** */
462553
@JvmStatic
463554
val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
555+
556+
/** @suppress **This is unstable API and it is subject to change.** */
464557
@JvmStatic
465558
val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
466559

560+
/** @suppress **This is unstable API and it is subject to change.** */
467561
@JvmStatic
468562
val POLL_FAILED: Any = Symbol("POLL_FAILED")
469563

564+
/** @suppress **This is unstable API and it is subject to change.** */
470565
@JvmStatic
471566
val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
472567

473568
@JvmStatic
474569
private val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
570+
475571
@JvmStatic
476572
private val NULL_VALUE: Any = Symbol("NULL_VALUE")
477573

478574
@JvmStatic
479575
private val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
480576

577+
@JvmStatic
578+
private val SEND_RESUMED = Symbol("SEND_RESUMED")
579+
580+
/** @suppress **This is unstable API and it is subject to change.** */
481581
@JvmStatic
482582
fun isClosed(result: Any?): Boolean = result is Closed<*>
483583
}
@@ -562,6 +662,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
562662

563663
/**
564664
* Represents sending waiter in the queue.
665+
* @suppress **This is unstable API and it is subject to change.**
565666
*/
566667
protected interface Send {
567668
val pollResult: Any? // E | Closed
@@ -571,6 +672,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
571672

572673
/**
573674
* Represents receiver waiter in the queue or closed token.
675+
* @suppress **This is unstable API and it is subject to change.**
574676
*/
575677
protected interface ReceiveOrClosed<in E> {
576678
val offerResult: Any // OFFER_SUCCESS | Closed
@@ -612,8 +714,17 @@ public abstract class AbstractChannel<E> : Channel<E> {
612714
override fun toString(): String = "SendSelect($pollResult)[$select]"
613715
}
614716

717+
private class SendBuffered<out E>(
718+
@JvmField val element: E
719+
) : LockFreeLinkedListNode(), Send {
720+
override val pollResult: Any? get() = element
721+
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
722+
override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
723+
}
724+
615725
/**
616726
* Represents closed channel.
727+
* @suppress **This is unstable API and it is subject to change.**
617728
*/
618729
protected class Closed<in E>(
619730
@JvmField val closeCause: Throwable?
@@ -705,7 +816,7 @@ public abstract class AbstractChannel<E> : Channel<E> {
705816
override fun toString(): String = "ReceiveHasNext[$cont]"
706817
}
707818

708-
private class ReceiveSelect<R, in E>(
819+
private inner class ReceiveSelect<R, in E>(
709820
@JvmField val select: SelectInstance<R>,
710821
@JvmField val block: suspend (E?) -> R,
711822
@JvmField val nullOnClose: Boolean
@@ -728,12 +839,11 @@ public abstract class AbstractChannel<E> : Channel<E> {
728839
}
729840
}
730841

731-
fun removeOnSelectCompletion() {
732-
select.invokeOnCompletion(this)
733-
}
842+
fun removeOnSelectCompletion() { select.invokeOnCompletion(this) }
734843

735-
override fun invoke(cause: Throwable?) {
736-
remove()
844+
override fun invoke(cause: Throwable?) { // invoked on select completion
845+
if (remove())
846+
onCancelledReceive() // notify cancellation of receive
737847
}
738848

739849
override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.concurrent.locks.ReentrantLock
2727
* This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
2828
* The lists of suspended senders or receivers are lock-free.
2929
*/
30-
public class ArrayChannel<E>(
30+
public open class ArrayChannel<E>(
3131
/**
3232
* Buffer capacity.
3333
*/
@@ -49,12 +49,13 @@ public class ArrayChannel<E>(
4949
finally { lock.unlock() }
5050
}
5151

52-
override val hasBuffer: Boolean get() = true
53-
override val isBufferEmpty: Boolean get() = size == 0
54-
override val isBufferFull: Boolean get() = size == capacity
52+
protected final override val isBufferAlwaysEmpty: Boolean get() = false
53+
protected final override val isBufferEmpty: Boolean get() = size == 0
54+
protected final override val isBufferAlwaysFull: Boolean get() = false
55+
protected final override val isBufferFull: Boolean get() = size == capacity
5556

5657
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
57-
override fun offerInternal(element: E): Any {
58+
protected override fun offerInternal(element: E): Any {
5859
var receive: ReceiveOrClosed<E>? = null
5960
var token: Any? = null
6061
locked {
@@ -90,7 +91,7 @@ public class ArrayChannel<E>(
9091
}
9192

9293
// result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
93-
override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
94+
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
9495
var receive: ReceiveOrClosed<E>? = null
9596
var token: Any? = null
9697
locked {
@@ -138,7 +139,7 @@ public class ArrayChannel<E>(
138139
}
139140

140141
// result is `E | POLL_FAILED | Closed`
141-
override fun pollInternal(): Any? {
142+
protected override fun pollInternal(): Any? {
142143
var send: Send? = null
143144
var token: Any? = null
144145
var result: Any? = null
@@ -174,7 +175,7 @@ public class ArrayChannel<E>(
174175
}
175176

176177
// result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
177-
override fun pollSelectInternal(select: SelectInstance<*>): Any? {
178+
protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
178179
var send: Send? = null
179180
var token: Any? = null
180181
var result: Any? = null

0 commit comments

Comments
 (0)