5
5
package kotlinx.coroutines.reactive
6
6
7
7
import kotlinx.coroutines.*
8
- import org.junit.*
9
- import org.junit.runner.*
10
- import org.junit.runners.*
11
8
import org.reactivestreams.*
12
9
import org.reactivestreams.tck.*
10
+ import org.testng.*
11
+ import org.testng.annotations.*
13
12
14
- @RunWith(Parameterized ::class )
15
- class ReactiveStreamTckTest (
16
- private val dispatcher : Dispatcher
17
- ) : PublisherVerification<Long>(TestEnvironment ()) {
18
13
19
- enum class Dispatcher (val dispatcher : CoroutineDispatcher ) {
20
- DEFAULT (Dispatchers .Default ),
21
- UNCONFINED (Dispatchers .Unconfined )
22
- }
23
-
24
- private val scope = CoroutineScope (dispatcher.dispatcher)
25
-
26
- companion object {
27
- @Parameterized.Parameters (name = " {0}" )
28
- @JvmStatic
29
- fun params (): Collection <Array <Any >> = Dispatcher .values().map { arrayOf<Any >(it) }
30
- }
31
-
32
- override fun createPublisher (elements : Long ): Publisher <Long > =
33
- scope.publish {
34
- for (i in 1 .. elements) send(i)
35
- }
36
-
37
- override fun createFailedPublisher (): Publisher <Long > =
38
- scope.publish {
39
- throw TestException ()
40
- }
41
-
42
- @Before
43
- override fun setUp () {
44
- super .setUp()
45
- }
46
-
47
- @Test
48
- override fun required_spec306_afterSubscriptionIsCancelledRequestMustBeNops () {
49
- super .required_spec306_afterSubscriptionIsCancelledRequestMustBeNops()
50
- }
51
-
52
- @Test
53
- override fun required_spec303_mustNotAllowUnboundedRecursion () {
54
- super .required_spec303_mustNotAllowUnboundedRecursion()
55
- }
56
-
57
- @Test
58
- override fun required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled () {
59
- super .required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled()
60
- }
61
-
62
- @Test
63
- override fun required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe () {
64
- super .required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe()
65
- }
66
-
67
- @Test
68
- override fun required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe () {
69
- super .required_spec302_mustAllowSynchronousRequestCallsFromOnNextAndOnSubscribe()
70
- }
71
-
72
- @Test
73
- override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber () {
74
- // This test fails on default dispatcher because it retains a reference to the last task
75
- // in the structure of its GlobalQueue
76
- // So we skip it with the default dispatcher.
77
- // todo: remove it when CoroutinesScheduler is improved
78
- if (dispatcher == Dispatcher .DEFAULT ) return
79
- super .required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
80
- }
81
-
82
- @Test
83
- override fun required_validate_boundedDepthOfOnNextAndRequestRecursion () {
84
- super .required_validate_boundedDepthOfOnNextAndRequestRecursion()
85
- }
86
-
87
- @Test
88
- override fun required_spec317_mustSupportAPendingElementCountUpToLongMaxValue () {
89
- super .required_spec317_mustSupportAPendingElementCountUpToLongMaxValue()
90
- }
91
-
92
- @Test
93
- override fun required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue () {
94
- super .required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue()
95
- }
96
-
97
- @Test
98
- override fun required_validate_maxElementsFromPublisher () {
99
- super .required_validate_maxElementsFromPublisher()
100
- }
101
-
102
- @Test
103
- @Ignore // This OPTIONAL requirement is not implemented, which is fine
104
- override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete () {
105
- super .optional_spec105_emptyStreamMustTerminateBySignallingOnComplete()
106
- }
107
-
108
- @Test
109
- override fun required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates () {
110
- super .required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates()
111
- }
112
-
113
- @Test
114
- override fun optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals () {
115
- super .optional_spec111_registeredSubscribersMustReceiveOnNextOrOnCompleteSignals()
116
- }
117
-
118
- @Test
119
- override fun required_spec102_maySignalLessThanRequestedAndTerminateSubscription () {
120
- super .required_spec102_maySignalLessThanRequestedAndTerminateSubscription()
121
- }
122
-
123
- @Test
124
- override fun required_createPublisher3MustProduceAStreamOfExactly3Elements () {
125
- super .required_createPublisher3MustProduceAStreamOfExactly3Elements()
126
- }
14
+ class ReactiveStreamTckTest {
127
15
128
- @Test
129
- override fun optional_spec111_maySupportMultiSubscribe () {
130
- super .optional_spec111_maySupportMultiSubscribe( )
16
+ @Factory(dataProvider = " dispatchers " )
17
+ fun createTests ( dispatcher : Dispatcher ): Array < Any > {
18
+ return arrayOf( ReactiveStreamTckTestSuite (dispatcher) )
131
19
}
132
20
133
- @Test
134
- override fun stochastic_spec103_mustSignalOnMethodsSequentially () {
135
- super .stochastic_spec103_mustSignalOnMethodsSequentially()
136
- }
21
+ @DataProvider(name = " dispatchers" )
22
+ public fun dispatchers (): Array <Array <Any >> = Dispatcher .values().map { arrayOf<Any >(it) }.toTypedArray()
137
23
138
- @Test
139
- override fun required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops () {
140
- super .required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops()
141
- }
142
24
143
- @Test
144
- override fun required_createPublisher1MustProduceAStreamOfExactly1Element () {
145
- super .required_createPublisher1MustProduceAStreamOfExactly1Element()
146
- }
25
+ public class ReactiveStreamTckTestSuite (
26
+ private val dispatcher : Dispatcher
27
+ ) : PublisherVerification<Long>(TestEnvironment ()) {
147
28
148
- @Test
149
- override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne () {
150
- super .optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingOneByOne()
151
- }
29
+ private val scope = CoroutineScope (dispatcher.dispatcher + NonCancellable )
152
30
153
- @Test
154
- override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront () {
155
- super .optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfront( )
156
- }
31
+ override fun createPublisher ( elements : Long ): Publisher < Long > =
32
+ scope.publish {
33
+ for (i in 1 .. elements) send(i )
34
+ }
157
35
158
- @Test
159
- override fun required_spec309_requestNegativeNumberMustSignalIllegalArgumentException () {
160
- super .required_spec309_requestNegativeNumberMustSignalIllegalArgumentException ()
161
- }
36
+ override fun createFailedPublisher (): Publisher < Long > =
37
+ scope.publish {
38
+ throw TestException ()
39
+ }
162
40
163
- @Test
164
- override fun required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling () {
165
- super .required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling()
166
- }
167
-
168
- @Test
169
- override fun required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue () {
170
- super .required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue()
171
- }
172
-
173
- @Test
174
- override fun optional_spec104_mustSignalOnErrorWhenFails () {
175
- super .optional_spec104_mustSignalOnErrorWhenFails()
176
- }
177
-
178
- @Test
179
- override fun required_spec309_requestZeroMustSignalIllegalArgumentException () {
180
- super .required_spec309_requestZeroMustSignalIllegalArgumentException()
181
- }
182
-
183
- @Test
184
- override fun optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage () {
185
- super .optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage()
186
- }
187
-
188
- @Test
189
- override fun required_spec109_subscribeThrowNPEOnNullSubscriber () {
190
- super .required_spec109_subscribeThrowNPEOnNullSubscriber()
191
- }
192
-
193
- @Test
194
- override fun optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected () {
195
- super .optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected()
196
- }
41
+ @Test
42
+ public override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber () {
43
+ // This test fails on default dispatcher because it retains a reference to the last task
44
+ // in the structure of its GlobalQueue
45
+ // So we skip it with the default dispatcher.
46
+ // todo: remove it when CoroutinesScheduler is improved
47
+ if (dispatcher == Dispatcher .DEFAULT ) return
48
+ super .required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
49
+ }
197
50
198
- @Test
199
- override fun required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements () {
200
- super .required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements( )
201
- }
51
+ @Test
52
+ public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete () {
53
+ throw SkipException ( " Skipped " )
54
+ }
202
55
203
- @Test
204
- override fun required_spec109_mustIssueOnSubscribeForNonNullSubscriber () {
205
- super .required_spec109_mustIssueOnSubscribeForNonNullSubscriber()
56
+ class TestException : Exception ()
206
57
}
58
+ }
207
59
208
- class TestException : Exception ()
209
- }
60
+ enum class Dispatcher (val dispatcher : CoroutineDispatcher ) {
61
+ DEFAULT (Dispatchers .Default ),
62
+ UNCONFINED (Dispatchers .Unconfined )
63
+ }
0 commit comments