Skip to content

Commit 9faa61e

Browse files
committed
Fixed produce builder to close the channel on completion instead of
cancelling it, which lead to lost elements with buffered channels. Fixes #256
1 parent 0e647a1 commit 9faa61e

File tree

4 files changed

+95
-10
lines changed

4 files changed

+95
-10
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,12 @@ private open class ActorCoroutine<E>(
148148
parentContext: CoroutineContext,
149149
channel: Channel<E>,
150150
active: Boolean
151-
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E>, ActorJob<E>
151+
) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E>, ActorJob<E> {
152+
override fun onCancellation(cause: Throwable?) {
153+
if (!_channel.cancel(cause) && cause != null)
154+
handleCoroutineException(context, cause)
155+
}
156+
}
152157

153158
private class LazyActorCoroutine<E>(
154159
parentContext: CoroutineContext,

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,15 @@
1717
package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.*
20-
import kotlin.coroutines.experimental.CoroutineContext
20+
import kotlin.coroutines.experimental.*
2121

2222
internal open class ChannelCoroutine<E>(
2323
parentContext: CoroutineContext,
24-
private val _channel: Channel<E>,
24+
protected val _channel: Channel<E>,
2525
active: Boolean
2626
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
2727
val channel: Channel<E>
2828
get() = this
2929

30-
override fun onCancellation(cause: Throwable?) {
31-
if (!_channel.cancel(cause) && cause != null)
32-
handleCoroutineException(context, cause)
33-
}
34-
3530
override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
3631
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,16 @@ public fun <E> buildChannel(
105105
): ProducerJob<E> =
106106
produce(context, capacity, block = block) as ProducerJob<E>
107107

108-
private class ProducerCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>) :
109-
ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E>
108+
private class ProducerCoroutine<E>(
109+
parentContext: CoroutineContext, channel: Channel<E>
110+
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E> {
111+
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
112+
val cause = exceptionally?.cause
113+
val processed = when (exceptionally) {
114+
is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
115+
else -> _channel.close(cause) // producer coroutine has completed -- close channel
116+
}
117+
if (!processed && cause != null)
118+
handleCoroutineException(context, cause)
119+
}
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.*
20+
import org.junit.*
21+
import org.junit.Assert.*
22+
import org.junit.runner.*
23+
import org.junit.runners.*
24+
25+
@RunWith(Parameterized::class)
26+
class ProduceConsumeTest(
27+
private val capacity: Int,
28+
private val number: Int
29+
) : TestBase() {
30+
companion object {
31+
@Parameterized.Parameters(name = "capacity={0}, number={1}")
32+
@JvmStatic
33+
fun params(): Collection<Array<Any>> =
34+
listOf(0, 1, 10, 1000, Channel.UNLIMITED).flatMap { capacity ->
35+
listOf(1, 10, 1000).map { number ->
36+
arrayOf<Any>(capacity, number)
37+
}
38+
}
39+
}
40+
41+
@Test
42+
fun testProducer() = runTest {
43+
var sentAll = false
44+
val producer = produce(coroutineContext, capacity = capacity) {
45+
for(i in 1..number) {
46+
send(i)
47+
}
48+
sentAll = true
49+
}
50+
var consumed = 0
51+
for (x in producer) {
52+
consumed++
53+
}
54+
assertTrue(sentAll)
55+
assertEquals(number, consumed)
56+
}
57+
58+
@Test
59+
fun testActor() = runTest {
60+
val received = CompletableDeferred<Int>()
61+
val actor = actor<Int>(coroutineContext, capacity = capacity) {
62+
var n = 0
63+
for(i in channel) {
64+
n++
65+
}
66+
received.complete(n)
67+
}
68+
for(i in 1..number) {
69+
actor.send(i)
70+
}
71+
actor.close()
72+
assertEquals(number, received.await())
73+
}
74+
}

0 commit comments

Comments
 (0)