Skip to content

Commit 8cced24

Browse files
elizarovqwwdfsad
authored andcommitted
Reactive streams integration conformance to TCK verified & fixed
* Throw NPE on null subscriber * Throw IAE when requested n <= 0 elements * Do not sent any notification after cancel()
1 parent b2cdc5e commit 8cced24

File tree

4 files changed

+238
-13
lines changed

4 files changed

+238
-13
lines changed

reactive/kotlinx-coroutines-reactive/build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5-
ext.reactive_streams_version = '1.0.0'
5+
ext.reactive_streams_version = '1.0.2'
6+
67
dependencies {
78
compile "org.reactivestreams:reactive-streams:$reactive_streams_version"
9+
testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
810
}
911

1012
tasks.withType(dokka.getClass()) {

reactive/kotlinx-coroutines-reactive/src/Publish.kt

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public fun <T> CoroutineScope.publish(
4242
context: CoroutineContext = EmptyCoroutineContext,
4343
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
4444
): Publisher<T> = Publisher { subscriber ->
45+
// specification requires NPE on null subscriber
46+
if (subscriber == null) throw NullPointerException("subscriber")
4547
val newContext = newCoroutineContext(context)
4648
val coroutine = PublisherCoroutine(newContext, subscriber)
4749
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
@@ -64,6 +66,9 @@ private class PublisherCoroutine<in T>(
6466

6567
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
6668

69+
@Volatile
70+
private var cancelled = false // true when Subscription.cancel() is invoked
71+
6772
override val isClosedForSend: Boolean get() = isCompleted
6873
override val isFull: Boolean = mutex.isLocked
6974
override fun close(cause: Throwable?): Boolean = cancel(cause)
@@ -148,6 +153,12 @@ private class PublisherCoroutine<in T>(
148153
if (_nRequested.value >= CLOSED) {
149154
_nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
150155
val cause = getCompletionCause()
156+
// Specification requires that after cancellation requested we don't call onXXX
157+
if (cancelled) {
158+
// but we cannot just ignore exception so we handle it
159+
if (cause != null) handleCoroutineException(context, cause, this)
160+
return
161+
}
151162
try {
152163
if (cause != null && cause !is CancellationException)
153164
subscriber.onError(cause)
@@ -163,8 +174,9 @@ private class PublisherCoroutine<in T>(
163174
}
164175

165176
override fun request(n: Long) {
166-
if (n < 0) {
167-
cancel(IllegalArgumentException("Must request non-negative number, but $n requested"))
177+
if (n <= 0) {
178+
// Specification requires IAE for n <= 0
179+
cancel(IllegalArgumentException("non-positive subscription request $n"))
168180
return
169181
}
170182
while (true) { // lock-free loop for nRequested
@@ -206,8 +218,10 @@ private class PublisherCoroutine<in T>(
206218
}
207219
}
208220

209-
// Subscription impl
210-
@JvmName("cancel")
211-
@Suppress("NOTHING_TO_OVERRIDE", "ACCIDENTAL_OVERRIDE")
212-
override fun cancelSubscription() = super.cancel(null)
221+
override fun cancel() {
222+
// Specification requires that after cancellation publisher stops signalling
223+
// This flag distinguishes subscription cancellation request from the job crash
224+
cancelled = true
225+
super.cancel()
226+
}
213227
}

reactive/kotlinx-coroutines-reactive/test/PublisherBackpressureTest.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class PublisherBackpressureTest : TestBase() {
2121
try {
2222
send("C") // will suspend (no more requested)
2323
} finally {
24-
expect(13)
24+
expect(12)
2525
}
2626
expectUnreached()
2727
}
@@ -43,7 +43,7 @@ class PublisherBackpressureTest : TestBase() {
4343
}
4444

4545
override fun onComplete() {
46-
expect(11)
46+
expectUnreached()
4747
}
4848

4949
override fun onError(e: Throwable) {
@@ -53,9 +53,9 @@ class PublisherBackpressureTest : TestBase() {
5353
expect(4)
5454
yield() // yield to observable coroutine
5555
expect(10)
56-
sub!!.cancel() // now unsubscribe -- shall cancel coroutine & immediately signal onComplete
57-
expect(12)
58-
yield() // shall perform finally in coroutine & report onComplete
59-
finish(14)
56+
sub!!.cancel() // now unsubscribe -- shall cancel coroutine (& do not signal)
57+
expect(11)
58+
yield() // shall perform finally in coroutine
59+
finish(13)
6060
}
6161
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.reactive
6+
7+
import kotlinx.coroutines.*
8+
import org.junit.*
9+
import org.junit.runner.*
10+
import org.junit.runners.*
11+
import org.reactivestreams.*
12+
import org.reactivestreams.tck.*
13+
14+
@RunWith(Parameterized::class)
15+
class ReactiveStreamTckTest(
16+
private val dispatcher: Dispatcher
17+
) : PublisherVerification<Long>(TestEnvironment()) {
18+
19+
enum class Dispatcher(val dispatcher: CoroutineDispatcher) {
20+
DEFAULT(Dispatchers.Default),
21+
UNCONFINED(Dispatchers.Unconfined)
22+
}
23+
24+
private val scope = CoroutineScope(dispatcher.dispatcher)
25+
26+
companion object {
27+
@Parameterized.Parameters(name = "{0}")
28+
@JvmStatic
29+
fun params(): Collection<Array<Any>> = Dispatcher.values().map { arrayOf<Any>(it) }
30+
}
31+
32+
override fun createPublisher(elements: Long): Publisher<Long> =
33+
scope.publish {
34+
for (i in 1..elements) send(i)
35+
}
36+
37+
override fun createFailedPublisher(): Publisher<Long> =
38+
scope.publish {
39+
throw TestException()
40+
}
41+
42+
@Before
43+
override fun setUp() {
44+
super.setUp()
45+
}
46+
47+
@Test
48+
override fun required_spec306_afterSubscriptionIsCancelledRequestMustBeNops() {
49+
super.required_spec306_afterSubscriptionIsCancelledRequestMustBeNops()
50+
}
51+
52+
@Test
53+
override fun required_spec303_mustNotAllowUnboundedRecursion() {
54+
super.required_spec303_mustNotAllowUnboundedRecursion()
55+
}
56+
57+
@Test
58+
override fun required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled() {
59+
super.required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled()
60+
}
61+
62+
@Test
63+
override fun required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe() {
64+
super.required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe()
65+
}
66+
67+
@Test
68+
override fun required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe() {
69+
super.required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe()
70+
}
71+
72+
@Test
73+
override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() {
74+
// This test fails on default dispatcher because it retains a reference to the last task
75+
// in the structure of its GlobalQueue
76+
// So we skip it with the default dispatcher.
77+
// todo: remove it when CoroutinesScheduler is improved
78+
if (dispatcher == Dispatcher.DEFAULT) return
79+
super.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
80+
}
81+
82+
@Test
83+
override fun required_validate_boundedDepthOfOnNextAndRequestRecursion() {
84+
super.required_validate_boundedDepthOfOnNextAndRequestRecursion()
85+
}
86+
87+
@Test
88+
override fun required_spec317_mustSupportAPendingElementCountUpToLongMaxValue() {
89+
super.required_spec317_mustSupportAPendingElementCountUpToLongMaxValue()
90+
}
91+
92+
@Test
93+
override fun required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() {
94+
super.required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue()
95+
}
96+
97+
@Test
98+
override fun required_validate_maxElementsFromPublisher() {
99+
super.required_validate_maxElementsFromPublisher()
100+
}
101+
102+
@Test
103+
@Ignore // This OPTIONAL requirement is not implemented, which is fine
104+
override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() {
105+
super.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete()
106+
}
107+
108+
@Test
109+
override fun required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates() {
110+
super.required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates()
111+
}
112+
113+
@Test
114+
override fun optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals() {
115+
super.optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals()
116+
}
117+
118+
@Test
119+
override fun required_spec102_maySignalLessThanRequestedAndTerminateSubscription() {
120+
super.required_spec102_maySignalLessThanRequestedAndTerminateSubscription()
121+
}
122+
123+
@Test
124+
override fun required_createPublisher3MustProduceAStreamOfExactly3Elements() {
125+
super.required_createPublisher3MustProduceAStreamOfExactly3Elements()
126+
}
127+
128+
@Test
129+
override fun optional_spec111_maySupportMultiSubscribe() {
130+
super.optional_spec111_maySupportMultiSubscribe()
131+
}
132+
133+
@Test
134+
override fun stochastic_spec103_mustSignalOnMethodsSequentially() {
135+
super.stochastic_spec103_mustSignalOnMethodsSequentially()
136+
}
137+
138+
@Test
139+
override fun required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops() {
140+
super.required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops()
141+
}
142+
143+
@Test
144+
override fun required_createPublisher1MustProduceAStreamOfExactly1Element() {
145+
super.required_createPublisher1MustProduceAStreamOfExactly1Element()
146+
}
147+
148+
@Test
149+
override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne() {
150+
super.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne()
151+
}
152+
153+
@Test
154+
override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront() {
155+
super.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront()
156+
}
157+
158+
@Test
159+
override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException() {
160+
super.required_spec309_requestNegativeNumberMustSignalIllegalArgumentException()
161+
}
162+
163+
@Test
164+
override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling() {
165+
super.required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling()
166+
}
167+
168+
@Test
169+
override fun required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue() {
170+
super.required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue()
171+
}
172+
173+
@Test
174+
override fun optional_spec104_mustSignalOnErrorWhenFails() {
175+
super.optional_spec104_mustSignalOnErrorWhenFails()
176+
}
177+
178+
@Test
179+
override fun required_spec309_requestZeroMustSignalIllegalArgumentException() {
180+
super.required_spec309_requestZeroMustSignalIllegalArgumentException()
181+
}
182+
183+
@Test
184+
override fun optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage() {
185+
super.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage()
186+
}
187+
188+
@Test
189+
override fun required_spec109_subscribeThrowNPEOnNullSubscriber() {
190+
super.required_spec109_subscribeThrowNPEOnNullSubscriber()
191+
}
192+
193+
@Test
194+
override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected() {
195+
super.optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected()
196+
}
197+
198+
@Test
199+
override fun required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements() {
200+
super.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements()
201+
}
202+
203+
@Test
204+
override fun required_spec109_mustIssueOnSubscribeForNonNullSubscriber() {
205+
super.required_spec109_mustIssueOnSubscribeForNonNullSubscriber()
206+
}
207+
208+
class TestException : Exception()
209+
}

0 commit comments

Comments
 (0)