Skip to content

Commit 3aed4ee

Browse files
committed
ConflatedChannel
1 parent 43d831f commit 3aed4ee

File tree

9 files changed

+225
-23
lines changed

9 files changed

+225
-23
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,25 @@ public abstract class AbstractChannel<E> : Channel<E> {
143143
protected fun sendBuffered(element: E): Boolean =
144144
queue.addLastIfPrev(SendBuffered(element), { it !is ReceiveOrClosed<*> })
145145

146+
/**
147+
* @suppress **This is unstable API and it is subject to change.**
148+
*/
149+
protected fun sendConflated(element: E): Boolean {
150+
val node = SendBuffered(element)
151+
if (!queue.addLastIfPrev(node, { it !is ReceiveOrClosed<*> })) return false
152+
// remove previous SendBuffered
153+
val prev = node.prev
154+
if (prev is SendBuffered<*>)
155+
prev.remove()
156+
return true
157+
}
158+
146159
/**
147160
* @suppress **This is unstable API and it is subject to change.**
148161
*/
149162
protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
150163

151-
private class SendBufferedDesc<out E>(
164+
private open class SendBufferedDesc<out E>(
152165
queue: LockFreeLinkedListHead,
153166
element: E
154167
) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
@@ -158,6 +171,23 @@ public abstract class AbstractChannel<E> : Channel<E> {
158171
}
159172
}
160173

174+
/**
175+
* @suppress **This is unstable API and it is subject to change.**
176+
*/
177+
protected fun describeSendConflated(element: E): AddLastDesc<*> = SendConflatedDesc(queue, element)
178+
179+
private class SendConflatedDesc<out E>(
180+
queue: LockFreeLinkedListHead,
181+
element: E
182+
) : SendBufferedDesc<E>(queue, element) {
183+
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
184+
super.finishOnSuccess(affected, next)
185+
// remove previous SendBuffered
186+
if (affected is SendBuffered<*>)
187+
affected.remove()
188+
}
189+
}
190+
161191
// ------ SendChannel ------
162192

163193
public final override val isClosedForSend: Boolean get() = closedForSend != null

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,23 +224,31 @@ public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
224224
*/
225225
public companion object Factory {
226226
/**
227-
* Requests channel with unlimited capacity buffer in [Channel()][invoke] factory function.
227+
* Requests channel with unlimited capacity buffer in [Channel()][invoke] factory function --
228+
* the [LinkedListChannel] gets created.
228229
*/
229230
public const val UNLIMITED = Int.MAX_VALUE
230231

232+
/**
233+
* Requests conflated channel in [Channel()][invoke] factory function --
234+
* the [ConflatedChannel] gets created.
235+
*/
236+
public const val CONFLATED = -1
237+
231238
/**
232239
* Creates a channel with specified buffer capacity (or without a buffer by default).
233240
*
234241
* The resulting channel type depends on the specified [capacity] parameter:
235242
* * when `capacity` is 0 -- creates [RendezvousChannel];
236243
* * when `capacity` is [UNLIMITED] -- creates [LinkedListChannel];
244+
* * when `capacity` is [CONFLATED] -- creates [ConflatedChannel];
237245
* * otherwise -- creates [ArrayChannel].
238246
*/
239247
public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
240-
check(capacity >= 0) { "Channel capacity cannot be negative, but $capacity was specified" }
241248
return when (capacity) {
242249
0 -> RendezvousChannel()
243250
UNLIMITED -> LinkedListChannel()
251+
CONFLATED -> ConflatedChannel()
244252
else -> ArrayChannel(capacity)
245253
}
246254
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.ALREADY_SELECTED
20+
import kotlinx.coroutines.experimental.selects.SelectInstance
21+
22+
/**
23+
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
24+
* so that the receiver always gets the most recently sent element.
25+
* Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
26+
* while previously sent elements **are lost**.
27+
* Sender to this channel never suspends and [offer] always returns `true`.
28+
*
29+
* This implementation is fully lock-free.
30+
*/
31+
public open class ConflatedChannel<E> : AbstractChannel<E>() {
32+
protected final override val isBufferAlwaysEmpty: Boolean get() = true
33+
protected final override val isBufferEmpty: Boolean get() = true
34+
protected final override val isBufferAlwaysFull: Boolean get() = false
35+
protected final override val isBufferFull: Boolean get() = false
36+
37+
// result is always `OFFER_SUCCESS | Closed`
38+
protected override fun offerInternal(element: E): Any {
39+
while (true) {
40+
val result = super.offerInternal(element)
41+
when {
42+
result === OFFER_SUCCESS -> return OFFER_SUCCESS
43+
result === OFFER_FAILED -> { // try to buffer
44+
if (sendConflated(element))
45+
return OFFER_SUCCESS
46+
}
47+
result is Closed<*> -> return result
48+
else -> error("Invalid offerInternal result $result")
49+
}
50+
}
51+
}
52+
53+
// result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
54+
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
55+
while (true) {
56+
val result = if (hasReceiveOrClosed)
57+
super.offerSelectInternal(element, select) else
58+
(select.performAtomicTrySelect(describeSendConflated(element)) ?: OFFER_SUCCESS)
59+
when {
60+
result === ALREADY_SELECTED -> return ALREADY_SELECTED
61+
result === OFFER_SUCCESS -> return OFFER_SUCCESS
62+
result === OFFER_FAILED -> {} // retry
63+
result is Closed<*> -> return result
64+
else -> error("Invalid result $result")
65+
}
66+
}
67+
}
68+
}
69+

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ public typealias AddLastDesc<T> = LockFreeLinkedListNode.AddLastDesc<T>
6868
@Suppress("LeakingThis")
6969
public open class LockFreeLinkedListNode {
7070
@Volatile
71-
private var _next: Any = this // DoubleLinkedNode | Removed | OpDescriptor
71+
private var _next: Any = this // Node | Removed | OpDescriptor
7272
@Volatile
73-
private var _prev: Any = this // DoubleLinkedNode | Removed
73+
private var _prev: Any = this // Node | Removed
7474
@Volatile
7575
private var _removedRef: Removed? = null // lazily cached removed ref to this
7676

@@ -114,7 +114,7 @@ public open class LockFreeLinkedListNode {
114114

115115
public val isRemoved: Boolean get() = next is Removed
116116

117-
// LINEARIZABLE.
117+
// LINEARIZABLE. Returns Node | Removed
118118
public val next: Any get() {
119119
while (true) { // operation helper loop on _next
120120
val next = this._next
@@ -123,10 +123,12 @@ public open class LockFreeLinkedListNode {
123123
}
124124
}
125125

126-
// LINEARIZABLE. Note: use it on sentinel (never removed) node only
127-
public val prev: Node get() {
128-
while (true) {
129-
val prev = this._prev as Node // this sentinel node is never removed
126+
// LINEARIZABLE. Returns Node | Removed
127+
public val prev: Any get() {
128+
while (true) { // insert helper loop on _prev
129+
val prev = this._prev
130+
if (prev is Removed) return prev
131+
prev as Node // otherwise, it can be only node otherwise
130132
if (prev.next === this) return prev
131133
helpInsert(prev, null)
132134
}
@@ -155,7 +157,7 @@ public open class LockFreeLinkedListNode {
155157
*/
156158
public fun addLast(node: Node) {
157159
while (true) { // lock-free loop on prev.next
158-
val prev = prev
160+
val prev = prev as Node // sentinel node is never removed, so prev is always defined
159161
if (prev.addNext(node, this)) return
160162
}
161163
}
@@ -168,7 +170,7 @@ public open class LockFreeLinkedListNode {
168170
public inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
169171
val condAdd = makeCondAddOp(node, condition)
170172
while (true) { // lock-free loop on prev.next
171-
val prev = prev
173+
val prev = prev as Node // sentinel node is never removed, so prev is always defined
172174
when (prev.tryCondAddNext(node, this, condAdd)) {
173175
SUCCESS -> return true
174176
FAILURE -> return false
@@ -178,7 +180,7 @@ public open class LockFreeLinkedListNode {
178180

179181
public inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
180182
while (true) { // lock-free loop on prev.next
181-
val prev = prev
183+
val prev = prev as Node // sentinel node is never removed, so prev is always defined
182184
if (!predicate(prev)) return false
183185
if (prev.addNext(node, this)) return true
184186
}
@@ -191,7 +193,7 @@ public open class LockFreeLinkedListNode {
191193
): Boolean {
192194
val condAdd = makeCondAddOp(node, condition)
193195
while (true) { // lock-free loop on prev.next
194-
val prev = prev
196+
val prev = prev as Node // sentinel node is never removed, so prev is always defined
195197
if (!predicate(prev)) return false
196198
when (prev.tryCondAddNext(node, this, condAdd)) {
197199
SUCCESS -> return true

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,11 @@ class ChannelAtomicCancelStressTest(val kind: TestChannelKind) : TestBase() {
9898
println(" Missed $missedCnt ints")
9999
println(" Duplicated $dupCnt ints")
100100
failed.get()?.let { throw it }
101-
assertEquals(0, missedCnt)
102101
assertEquals(0, dupCnt)
103-
assertEquals(lastSent, lastReceived)
102+
if (kind != TestChannelKind.CONFLATED) {
103+
assertEquals(0, missedCnt)
104+
assertEquals(lastSent, lastReceived)
105+
}
104106
}
105107

106108
fun launchSender() {

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ class ChannelSendReceiveStressTest(
8181
sendersCompleted.incrementAndGet()
8282
}
8383
}
84+
// print progress
85+
val progressJob = launch(context) {
86+
var seconds = 0
87+
while (true) {
88+
delay(1000)
89+
println("${++seconds}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}")
90+
}
91+
}
8492
try {
8593
withTimeout(timeLimit) {
8694
senders.forEach { it.join() }
@@ -90,6 +98,7 @@ class ChannelSendReceiveStressTest(
9098
} catch (e: CancellationException) {
9199
println("!!! Test timed out $e")
92100
}
101+
progressJob.cancel()
93102
println("Tested $kind with nSenders=$nSenders, nReceivers=$nReceivers")
94103
println("Completed successfully ${sendersCompleted.get()} sender coroutines")
95104
println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
@@ -103,16 +112,18 @@ class ChannelSendReceiveStressTest(
103112
assertEquals(nReceivers, receiversCompleted.get())
104113
assertEquals(0, dupes.get())
105114
assertEquals(nEvents, sentTotal.get())
106-
assertEquals(nEvents, receivedTotal.get())
115+
if (kind != TestChannelKind.CONFLATED) assertEquals(nEvents, receivedTotal.get())
107116
repeat(nReceivers) { receiveIndex ->
108117
assertTrue("Each receiver should have received something", receivedBy[receiveIndex] > 0)
109118
}
110119
}
111120

112121
private suspend fun doSent() {
113122
sentTotal.incrementAndGet()
114-
while (sentTotal.get() > receivedTotal.get() + maxBuffer)
115-
yield() // throttle fast senders to prevent OOM with LinkedListChannel
123+
if (kind != TestChannelKind.CONFLATED) {
124+
while (sentTotal.get() > receivedTotal.get() + maxBuffer)
125+
yield() // throttle fast senders to prevent OOM with LinkedListChannel
126+
}
116127
}
117128

118129
private suspend fun doSend(senderIndex: Int) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.TestBase
20+
import kotlinx.coroutines.experimental.launch
21+
import kotlinx.coroutines.experimental.runBlocking
22+
import kotlinx.coroutines.experimental.yield
23+
import org.hamcrest.core.IsEqual
24+
import org.hamcrest.core.IsNull
25+
import org.junit.Assert.assertThat
26+
import org.junit.Test
27+
28+
class ConflatedChannelTest : TestBase() {
29+
@Test
30+
fun testBasicConflationOfferPoll() {
31+
val q = ConflatedChannel<Int>()
32+
assertThat(q.poll(), IsNull())
33+
assertThat(q.offer(1), IsEqual(true))
34+
assertThat(q.offer(2), IsEqual(true))
35+
assertThat(q.offer(3), IsEqual(true))
36+
assertThat(q.poll(), IsEqual(3))
37+
assertThat(q.poll(), IsNull())
38+
}
39+
40+
@Test
41+
fun testConflationSendReceive() = runBlocking<Unit> {
42+
val q = ConflatedChannel<Int>()
43+
expect(1)
44+
launch(context) { // receiver coroutine
45+
expect(4)
46+
assertThat(q.receive(), IsEqual(2))
47+
expect(5)
48+
assertThat(q.receive(), IsEqual(3)) // this receive suspends
49+
expect(8)
50+
assertThat(q.receive(), IsEqual(6)) // last conflated value
51+
expect(9)
52+
}
53+
expect(2)
54+
q.send(1)
55+
q.send(2) // shall conflate
56+
expect(3)
57+
yield() // to receiver
58+
expect(6)
59+
q.send(3) // send to the waiting receiver
60+
q.send(4) // buffer
61+
q.send(5) // conflate
62+
q.send(6) // conflate again
63+
expect(7)
64+
yield() // to receiver
65+
finish(10)
66+
}
67+
}

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ package kotlinx.coroutines.experimental.channels
1919
import kotlinx.coroutines.experimental.CommonPool
2020
import kotlinx.coroutines.experimental.launch
2121
import kotlinx.coroutines.experimental.runBlocking
22+
import org.hamcrest.core.IsEqual
23+
import org.junit.Assert.assertThat
24+
import org.junit.Assert.assertTrue
2225
import org.junit.Test
2326
import org.junit.runner.RunWith
2427
import org.junit.runners.Parameterized
25-
import org.junit.Assert.*
2628

2729
@RunWith(Parameterized::class)
2830
class SimpleSendReceiveTest(
@@ -47,10 +49,17 @@ class SimpleSendReceiveTest(
4749
repeat(n) { channel.send(it) }
4850
channel.close()
4951
}
50-
var received = 0
52+
var expected = 0
5153
for (x in channel) {
52-
assertEquals(received++, x)
54+
if (kind != TestChannelKind.CONFLATED) {
55+
assertThat(x, IsEqual(expected++))
56+
} else {
57+
assertTrue(x >= expected)
58+
expected = x + 1
59+
}
60+
}
61+
if (kind != TestChannelKind.CONFLATED) {
62+
assertThat(expected, IsEqual(n))
5363
}
54-
assertEquals(n, received)
5564
}
5665
}

0 commit comments

Comments
 (0)