Skip to content

Commit 4ad5b0e

Browse files
elizarovqwwdfsad
authored andcommitted
Fixed aggregated error handling in Publish & test for it
1 parent 669a84f commit 4ad5b0e

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

reactive/kotlinx-coroutines-reactive/src/Publish.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ private class PublisherCoroutine<in T>(
199199
}
200200
}
201201

202-
override fun onCancellation(cause: Throwable?) {
202+
override fun onCompletedExceptionally(exception: Throwable) = onCompleted(Unit)
203+
204+
override fun onCompleted(value: Unit) {
203205
while (true) { // lock-free loop for nRequested
204206
val cur = _nRequested.value
205207
if (cur == SIGNALLED) return // some other thread holding lock already signalled cancellation/completion

reactive/kotlinx-coroutines-reactive/test/PublishTest.kt

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,44 @@ class PublishTest : TestBase() {
9090
throw RuntimeException("OK")
9191
}.openSubscription()
9292
}
93+
94+
@Test
95+
fun testHandleFailureAfterCancel() = runTest(
96+
unhandled = listOf({ it -> it is RuntimeException && it.message == "FAILED" })
97+
){
98+
expect(1)
99+
// Exception should be delivered to CoroutineExceptionHandler, because we create publisher
100+
// with the NonCancellable parent
101+
val publisher = publish<Unit>(NonCancellable + Dispatchers.Unconfined) {
102+
try {
103+
expect(3)
104+
delay(10000)
105+
} finally {
106+
expect(5)
107+
throw RuntimeException("FAILED") // crash after cancel
108+
}
109+
}
110+
var sub: Subscription? = null
111+
publisher.subscribe(object : Subscriber<Unit> {
112+
override fun onComplete() {
113+
expectUnreached()
114+
}
115+
116+
override fun onSubscribe(s: Subscription) {
117+
expect(2)
118+
sub = s
119+
}
120+
121+
override fun onNext(t: Unit?) {
122+
expectUnreached()
123+
}
124+
125+
override fun onError(t: Throwable?) {
126+
expectUnreached()
127+
}
128+
})
129+
expect(4)
130+
sub!!.cancel()
131+
finish(6)
132+
}
93133
}

0 commit comments

Comments
 (0)