Skip to content

Commit 8a059f5

Browse files
elizarovqwwdfsad
authored andcommitted
Publish: Proper handling of onNext exception and test for it
1 parent f18b9f5 commit 8a059f5

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

reactive/kotlinx-coroutines-reactive/src/Publish.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,15 @@ private class PublisherCoroutine<in T>(
132132
throw getCancellationException()
133133
}
134134
// notify subscriber
135-
subscriber.onNext(elem)
135+
try {
136+
subscriber.onNext(elem)
137+
} catch (e: Throwable) {
138+
// If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
139+
// to abort the corresponding send/offer invocation
140+
cancel(e)
141+
unlockAndCheckCompleted()
142+
throw e
143+
}
136144
// now update nRequested
137145
while (true) { // lock-free loop on nRequested
138146
val cur = _nRequested.value

reactive/kotlinx-coroutines-reactive/test/PublishTest.kt

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,5 +168,45 @@ class PublishTest : TestBase() {
168168
expectUnreached()
169169
}
170170

171+
@Test
172+
fun testOnNextError() = runTest {
173+
expect(1)
174+
val publisher = publish<String>(NonCancellable) {
175+
expect(4)
176+
try {
177+
send("OK")
178+
} catch(e: Throwable) {
179+
expect(6)
180+
assert(e is TestException)
181+
}
182+
}
183+
expect(2)
184+
val latch = CompletableDeferred<Unit>()
185+
publisher.subscribe(object : Subscriber<String> {
186+
override fun onComplete() {
187+
expectUnreached()
188+
}
189+
190+
override fun onSubscribe(s: Subscription) {
191+
expect(3)
192+
s.request(1)
193+
}
194+
195+
override fun onNext(t: String) {
196+
expect(5)
197+
assertEquals("OK", t)
198+
throw TestException()
199+
}
200+
201+
override fun onError(t: Throwable) {
202+
expect(7)
203+
assert(t is TestException)
204+
latch.complete(Unit)
205+
}
206+
})
207+
latch.await()
208+
finish(8)
209+
}
210+
171211
private class TestException : Exception()
172212
}

0 commit comments

Comments
 (0)