Skip to content

Commit d14a390

Browse files
committed
Fixed bug with sometimes missing onComplete in publish, rxObservable, and rxFlowable builders
1 parent 50e3221 commit d14a390

File tree

5 files changed

+167
-2
lines changed

5 files changed

+167
-2
lines changed

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,12 @@ private class PublisherCoroutine<T>(
187187
if (cur == upd) return // nothing to do
188188
if (N_REQUESTED.compareAndSet(this, cur, upd)) {
189189
// unlock the mutex when we don't have back-pressure anymore
190-
if (cur == 0L) mutex.unlock()
190+
if (cur == 0L) {
191+
mutex.unlock()
192+
// recheck isCompleted
193+
if (isCompleted && mutex.tryLock())
194+
doLockedSignalCompleted()
195+
}
191196
return
192197
}
193198
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.reactive
18+
19+
import kotlinx.coroutines.experimental.CommonPool
20+
import kotlinx.coroutines.experimental.TestBase
21+
import kotlinx.coroutines.experimental.runBlocking
22+
import kotlinx.coroutines.experimental.withTimeout
23+
import org.junit.Test
24+
import java.util.*
25+
import kotlin.coroutines.experimental.CoroutineContext
26+
27+
class PublisherCompletionStressTest : TestBase() {
28+
val N_REPEATS = 10_000 * stressTestMultiplier
29+
30+
fun range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
31+
for (x in start until start + count) send(x)
32+
}
33+
34+
@Test
35+
fun testCompletion() {
36+
val rnd = Random()
37+
repeat(N_REPEATS) {
38+
val count = rnd.nextInt(5)
39+
runBlocking {
40+
withTimeout(5000) {
41+
var received = 0
42+
for (x in range(CommonPool, 1, count)) {
43+
received++
44+
if (x != received) error("$x != $received")
45+
}
46+
if (received != count) error("$received != $count")
47+
}
48+
}
49+
}
50+
}
51+
}

reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,12 @@ private class RxObservableCoroutine<T>(
186186
if (cur == upd) return // nothing to do
187187
if (N_REQUESTED.compareAndSet(this, cur, upd)) {
188188
// unlock the mutex when we don't have back-pressure anymore
189-
if (cur == 0L) mutex.unlock()
189+
if (cur == 0L) {
190+
mutex.unlock()
191+
// recheck isCompleted
192+
if (isCompleted && mutex.tryLock())
193+
doLockedSignalCompleted()
194+
}
190195
return
191196
}
192197
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx1
18+
19+
import kotlinx.coroutines.experimental.CommonPool
20+
import kotlinx.coroutines.experimental.TestBase
21+
import kotlinx.coroutines.experimental.runBlocking
22+
import kotlinx.coroutines.experimental.withTimeout
23+
import org.junit.Test
24+
import java.util.*
25+
import kotlin.coroutines.experimental.CoroutineContext
26+
27+
class ObservableCompletionStressTest : TestBase() {
28+
val N_REPEATS = 10_000 * stressTestMultiplier
29+
30+
fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable<Int>(context) {
31+
for (x in start until start + count) send(x)
32+
}
33+
34+
@Test
35+
fun testCompletion() {
36+
val rnd = Random()
37+
repeat(N_REPEATS) {
38+
val count = rnd.nextInt(5)
39+
runBlocking {
40+
withTimeout(5000) {
41+
var received = 0
42+
for (x in range(CommonPool, 1, count)) {
43+
received++
44+
if (x != received) error("$x != $received")
45+
}
46+
if (received != count) error("$received != $count")
47+
}
48+
}
49+
}
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.rx1
18+
19+
import kotlinx.coroutines.experimental.CommonPool
20+
import kotlinx.coroutines.experimental.TestBase
21+
import kotlinx.coroutines.experimental.runBlocking
22+
import kotlinx.coroutines.experimental.rx2.iterator
23+
import kotlinx.coroutines.experimental.rx2.rxObservable
24+
import kotlinx.coroutines.experimental.withTimeout
25+
import org.junit.Test
26+
import java.util.*
27+
import kotlin.coroutines.experimental.CoroutineContext
28+
29+
class ObservableCompletionStressTest : TestBase() {
30+
val N_REPEATS = 10_000 * stressTestMultiplier
31+
32+
fun range(context: CoroutineContext, start: Int, count: Int) = rxObservable<Int>(context) {
33+
for (x in start until start + count) send(x)
34+
}
35+
36+
@Test
37+
fun testCompletion() {
38+
val rnd = Random()
39+
repeat(N_REPEATS) {
40+
val count = rnd.nextInt(5)
41+
runBlocking {
42+
withTimeout(5000) {
43+
var received = 0
44+
for (x in range(CommonPool, 1, count)) {
45+
received++
46+
if (x != received) error("$x != $received")
47+
}
48+
if (received != count) error("$received != $count")
49+
}
50+
}
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)