|
19 | 19 | import static org.mockito.Matchers.any; |
20 | 20 | import static org.mockito.Mockito.*; |
21 | 21 |
|
22 | | -import java.util.Arrays; |
| 22 | +import java.util.*; |
23 | 23 | import java.util.concurrent.*; |
24 | 24 | import java.util.concurrent.atomic.*; |
25 | 25 |
|
26 | 26 | import org.junit.*; |
27 | 27 | import org.mockito.*; |
28 | 28 |
|
29 | 29 | import rx.*; |
| 30 | +import rx.Observable; |
30 | 31 | import rx.Observable.OnSubscribe; |
| 32 | +import rx.Observer; |
31 | 33 | import rx.exceptions.TestException; |
32 | 34 | import rx.schedulers.Schedulers; |
33 | 35 |
|
@@ -255,74 +257,62 @@ public void runConcurrencyTest() { |
255 | 257 | * |
256 | 258 | * @throws InterruptedException |
257 | 259 | */ |
258 | | - @Ignore |
259 | | - // this is non-deterministic ... haven't figured out what's wrong with the test yet (benjchristensen: July 2014) |
260 | 260 | @Test |
261 | 261 | public void testNotificationDelay() throws InterruptedException { |
262 | | - ExecutorService tp1 = Executors.newFixedThreadPool(1); |
263 | | - ExecutorService tp2 = Executors.newFixedThreadPool(1); |
| 262 | + final ExecutorService tp1 = Executors.newFixedThreadPool(1); |
264 | 263 | try { |
265 | | - int n = 10; |
| 264 | + int n = 10000; |
266 | 265 | for (int i = 0; i < n; i++) { |
267 | | - final CountDownLatch firstOnNext = new CountDownLatch(1); |
268 | | - final CountDownLatch onNextCount = new CountDownLatch(2); |
269 | | - final CountDownLatch latch = new CountDownLatch(1); |
270 | | - final CountDownLatch running = new CountDownLatch(2); |
271 | | - |
272 | | - TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() { |
273 | | - |
| 266 | + |
| 267 | + @SuppressWarnings("unchecked") |
| 268 | + final Observer<Integer>[] os = new Observer[1]; |
| 269 | + |
| 270 | + final List<Thread> threads = new ArrayList<Thread>(); |
| 271 | + |
| 272 | + final Observer<Integer> o = new SerializedObserver<Integer>(new Observer<Integer>() { |
| 273 | + boolean first; |
274 | 274 | @Override |
275 | | - public void onCompleted() { |
276 | | - |
| 275 | + public void onNext(Integer t) { |
| 276 | + threads.add(Thread.currentThread()); |
| 277 | + if (!first) { |
| 278 | + first = true; |
| 279 | + try { |
| 280 | + tp1.submit(new Runnable() { |
| 281 | + @Override |
| 282 | + public void run() { |
| 283 | + os[0].onNext(2); |
| 284 | + } |
| 285 | + }).get(); |
| 286 | + } catch (InterruptedException e) { |
| 287 | + e.printStackTrace(); |
| 288 | + } catch (ExecutionException e) { |
| 289 | + e.printStackTrace(); |
| 290 | + } |
| 291 | + } |
277 | 292 | } |
278 | | - |
| 293 | + |
279 | 294 | @Override |
280 | | - public void onError(Throwable e) { |
281 | | - |
| 295 | + public void onError(Throwable e) { |
| 296 | + e.printStackTrace(); |
282 | 297 | } |
283 | | - |
| 298 | + |
284 | 299 | @Override |
285 | | - public void onNext(String t) { |
286 | | - firstOnNext.countDown(); |
287 | | - // force it to take time when delivering so the second one is enqueued |
288 | | - try { |
289 | | - latch.await(); |
290 | | - } catch (InterruptedException e) { |
291 | | - } |
| 300 | + public void onCompleted() { |
| 301 | + |
292 | 302 | } |
293 | | - |
294 | 303 | }); |
295 | | - Observer<String> o = serializedObserver(to); |
296 | | - |
297 | | - Future<?> f1 = tp1.submit(new OnNextThread(o, 1, onNextCount, running)); |
298 | | - Future<?> f2 = tp2.submit(new OnNextThread(o, 1, onNextCount, running)); |
299 | | - |
300 | | - running.await(); // let one of the OnNextThread actually run before proceeding |
301 | 304 |
|
302 | | - firstOnNext.await(); |
303 | | - |
304 | | - Thread t1 = to.getLastSeenThread(); |
305 | | - System.out.println("first onNext on thread: " + t1); |
306 | | - |
307 | | - latch.countDown(); |
308 | | - |
309 | | - waitOnThreads(f1, f2); |
310 | | - // not completed yet |
311 | | - |
312 | | - assertEquals(2, to.getOnNextEvents().size()); |
313 | | - |
314 | | - Thread t2 = to.getLastSeenThread(); |
315 | | - System.out.println("second onNext on thread: " + t2); |
316 | | - |
317 | | - assertSame(t1, t2); |
318 | | - |
319 | | - System.out.println(to.getOnNextEvents()); |
320 | | - o.onCompleted(); |
321 | | - System.out.println(to.getOnNextEvents()); |
| 305 | + os[0] = o; |
| 306 | + |
| 307 | + o.onNext(1); |
| 308 | + |
| 309 | + System.out.println(threads); |
| 310 | + assertEquals(2, threads.size()); |
| 311 | + |
| 312 | + assertSame(threads.get(0), threads.get(1)); |
322 | 313 | } |
323 | 314 | } finally { |
324 | 315 | tp1.shutdown(); |
325 | | - tp2.shutdown(); |
326 | 316 | } |
327 | 317 | } |
328 | 318 |
|
|
0 commit comments