@@ -55,18 +55,6 @@ private OperatorTester() {
55
55
56
56
public static class UnitTest {
57
57
58
- public static <T > Func1 <Observer <T >, Subscription > assertTrustedObservable (final Func1 <Observer <T >, Subscription > source )
59
- {
60
- return new Func1 <Observer <T >, Subscription >()
61
- {
62
- @ Override
63
- public Subscription call (Observer <T > observer )
64
- {
65
- return source .call (new TestingObserver <T >(observer ));
66
- }
67
- };
68
- }
69
-
70
58
/**
71
59
* Used for mocking of Schedulers since many Scheduler implementations are static/final.
72
60
*
@@ -77,221 +65,6 @@ public static Scheduler forwardingScheduler(Scheduler underlying) {
77
65
return new ForwardingScheduler (underlying );
78
66
}
79
67
80
- public static class TestingObserver <T > implements Observer <T > {
81
-
82
- private final Observer <T > actual ;
83
- private final AtomicBoolean isFinished = new AtomicBoolean (false );
84
- private final AtomicBoolean isInCallback = new AtomicBoolean (false );
85
-
86
- public TestingObserver (Observer <T > actual ) {
87
- this .actual = actual ;
88
- }
89
-
90
- @ Override
91
- public void onCompleted () {
92
- assertFalse ("previous call to onCompleted() or onError()" , !isFinished .compareAndSet (false , true ));
93
- assertFalse ("concurrent callback pending" , !isInCallback .compareAndSet (false , true ));
94
- actual .onCompleted ();
95
- isInCallback .set (false );
96
- }
97
-
98
- @ Override
99
- public void onError (Exception e ) {
100
- assertFalse ("previous call to onCompleted() or onError()" , !isFinished .compareAndSet (false , true ));
101
- assertFalse ("concurrent callback pending" , !isInCallback .compareAndSet (false , true ));
102
- actual .onError (e );
103
- isInCallback .set (false );
104
- }
105
-
106
- @ Override
107
- public void onNext (T args ) {
108
- assertFalse ("previous call to onCompleted() or onError()" , isFinished .get ());
109
- assertFalse ("concurrent callback pending" , !isInCallback .compareAndSet (false , true ));
110
- actual .onNext (args );
111
- isInCallback .set (false );
112
- }
113
-
114
- }
115
-
116
- @ Test (expected = AssertionError .class )
117
- public void testDoubleCompleted () {
118
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
119
- {
120
- @ Override
121
- public Subscription call (Observer <String > observer )
122
- {
123
- observer .onCompleted ();
124
- observer .onCompleted ();
125
- return Subscriptions .empty ();
126
- }
127
- })).toBlockingObservable ().lastOrDefault ("end" );
128
-
129
- }
130
-
131
- @ Test (expected = AssertionError .class )
132
- public void testCompletedError () {
133
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
134
- {
135
- @ Override
136
- public Subscription call (Observer <String > observer )
137
- {
138
- observer .onCompleted ();
139
- observer .onError (new Exception ());
140
- return Subscriptions .empty ();
141
- }
142
- })).toBlockingObservable ().lastOrDefault ("end" );
143
- }
144
-
145
- @ Test (expected = AssertionError .class )
146
- public void testCompletedNext () {
147
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
148
- {
149
- @ Override
150
- public Subscription call (Observer <String > observer )
151
- {
152
- observer .onCompleted ();
153
- observer .onNext ("one" );
154
- return Subscriptions .empty ();
155
- }
156
- })).toBlockingObservable ().lastOrDefault ("end" );
157
- }
158
-
159
- @ Test (expected = AssertionError .class )
160
- public void testErrorCompleted () {
161
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
162
- {
163
- @ Override
164
- public Subscription call (Observer <String > observer )
165
- {
166
- observer .onError (new Exception ());
167
- observer .onCompleted ();
168
- return Subscriptions .empty ();
169
- }
170
- })).toBlockingObservable ().lastOrDefault ("end" );
171
- }
172
-
173
- @ Test (expected = AssertionError .class )
174
- public void testDoubleError () {
175
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
176
- {
177
- @ Override
178
- public Subscription call (Observer <String > observer )
179
- {
180
- observer .onError (new Exception ());
181
- observer .onError (new Exception ());
182
- return Subscriptions .empty ();
183
- }
184
- })).toBlockingObservable ().lastOrDefault ("end" );
185
- }
186
-
187
- @ Test (expected = AssertionError .class )
188
- public void testErrorNext () {
189
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
190
- {
191
- @ Override
192
- public Subscription call (Observer <String > observer )
193
- {
194
- observer .onError (new Exception ());
195
- observer .onNext ("one" );
196
- return Subscriptions .empty ();
197
- }
198
- })).toBlockingObservable ().lastOrDefault ("end" );
199
- }
200
-
201
- @ Test
202
- public void testNextCompleted () {
203
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
204
- {
205
- @ Override
206
- public Subscription call (Observer <String > observer )
207
- {
208
- observer .onNext ("one" );
209
- observer .onCompleted ();
210
- return Subscriptions .empty ();
211
- }
212
- })).toBlockingObservable ().lastOrDefault ("end" );
213
- }
214
-
215
- @ Test
216
- public void testConcurrentNextNext () {
217
- final List <Thread > threads = new ArrayList <Thread >();
218
- final AtomicReference <Throwable > threadFailure = new AtomicReference <Throwable >();
219
- Observable .create (assertTrustedObservable (new Func1 <Observer <String >, Subscription >()
220
- {
221
- @ Override
222
- public Subscription call (final Observer <String > observer )
223
- {
224
- threads .add (new Thread (new Runnable ()
225
- {
226
- @ Override
227
- public void run ()
228
- {
229
- observer .onNext ("one" );
230
- }
231
- }));
232
- threads .add (new Thread (new Runnable ()
233
- {
234
- @ Override
235
- public void run ()
236
- {
237
- observer .onNext ("two" );
238
- }
239
- }));
240
- return Subscriptions .empty ();
241
- }
242
- })).subscribe (new SlowObserver ());
243
- for (Thread thread : threads ) {
244
- thread .setUncaughtExceptionHandler (new UncaughtExceptionHandler ()
245
- {
246
- @ Override
247
- public void uncaughtException (Thread thread , Throwable throwable )
248
- {
249
- threadFailure .set (throwable );
250
- }
251
- });
252
- thread .start ();
253
- }
254
- for (Thread thread : threads ) {
255
- try {
256
- thread .join ();
257
- } catch (InterruptedException ignored ) {
258
- }
259
- }
260
- // Junit seems pretty bad about exposing test failures inside of created threads.
261
- assertNotNull ("exception thrown by thread" , threadFailure .get ());
262
- assertEquals ("class of exception thrown by thread" , AssertionError .class , threadFailure .get ().getClass ());
263
- }
264
-
265
- private static class SlowObserver implements Observer <String >
266
- {
267
- @ Override
268
- public void onCompleted ()
269
- {
270
- try {
271
- Thread .sleep (10 );
272
- } catch (InterruptedException ignored ) {
273
- }
274
- }
275
-
276
- @ Override
277
- public void onError (Exception e )
278
- {
279
- try {
280
- Thread .sleep (10 );
281
- } catch (InterruptedException ignored ) {
282
- }
283
- }
284
-
285
- @ Override
286
- public void onNext (String args )
287
- {
288
- try {
289
- Thread .sleep (10 );
290
- } catch (InterruptedException ignored ) {
291
- }
292
- }
293
- }
294
-
295
68
public static class ForwardingScheduler extends Scheduler {
296
69
private final Scheduler underlying ;
297
70
0 commit comments