@@ -338,63 +338,73 @@ public void run() {
338338 public void testReplaySubjectEmissionSubscriptionRace () throws Exception {
339339 Scheduler s = Schedulers .io ();
340340 Scheduler .Worker worker = Schedulers .io ().createWorker ();
341- for (int i = 0 ; i < 50000 ; i ++) {
342- if (i % 1000 == 0 ) {
343- System .out .println (i );
344- }
345- final ReplaySubject <Object > rs = ReplaySubject .create ();
346-
347- final CountDownLatch finish = new CountDownLatch (1 );
348- final CountDownLatch start = new CountDownLatch (1 );
349-
350- worker .schedule (new Action0 () {
351- @ Override
352- public void call () {
353- try {
354- start .await ();
355- } catch (Exception e1 ) {
356- e1 .printStackTrace ();
357- }
358- rs .onNext (1 );
359- }
360- });
361-
362- final AtomicReference <Object > o = new AtomicReference <Object >();
363-
364- rs .subscribeOn (s ).observeOn (Schedulers .io ())
365- .subscribe (new Observer <Object >() {
366-
367- @ Override
368- public void onCompleted () {
369- o .set (-1 );
370- finish .countDown ();
371- }
372-
373- @ Override
374- public void onError (Throwable e ) {
375- o .set (e );
376- finish .countDown ();
377- }
378-
379- @ Override
380- public void onNext (Object t ) {
381- o .set (t );
382- finish .countDown ();
341+ try {
342+ for (int i = 0 ; i < 50000 ; i ++) {
343+ if (i % 1000 == 0 ) {
344+ System .out .println (i );
383345 }
346+ final ReplaySubject <Object > rs = ReplaySubject .create ();
384347
385- });
386- start .countDown ();
387-
388- if (!finish .await (5 , TimeUnit .SECONDS )) {
389- System .out .println (o .get ());
390- System .out .println (rs .hasObservers ());
391- rs .onCompleted ();
392- Assert .fail ("Timeout @ " + i );
393- break ;
394- } else {
395- Assert .assertEquals (1 , o .get ());
396- rs .onCompleted ();
348+ final CountDownLatch finish = new CountDownLatch (1 );
349+ final CountDownLatch start = new CountDownLatch (1 );
350+
351+ worker .schedule (new Action0 () {
352+ @ Override
353+ public void call () {
354+ try {
355+ start .await ();
356+ } catch (Exception e1 ) {
357+ e1 .printStackTrace ();
358+ }
359+ rs .onNext (1 );
360+ }
361+ });
362+
363+ final AtomicReference <Object > o = new AtomicReference <Object >();
364+
365+ rs .subscribeOn (s ).observeOn (Schedulers .io ())
366+ .subscribe (new Observer <Object >() {
367+
368+ @ Override
369+ public void onCompleted () {
370+ o .set (-1 );
371+ finish .countDown ();
372+ }
373+
374+ @ Override
375+ public void onError (Throwable e ) {
376+ o .set (e );
377+ finish .countDown ();
378+ }
379+
380+ @ Override
381+ public void onNext (Object t ) {
382+ o .set (t );
383+ finish .countDown ();
384+ }
385+
386+ });
387+ start .countDown ();
388+
389+ if (!finish .await (5 , TimeUnit .SECONDS )) {
390+ System .out .println (o .get ());
391+ System .out .println (rs .hasObservers ());
392+ rs .onCompleted ();
393+ Assert .fail ("Timeout @ " + i );
394+ break ;
395+ } else {
396+ Assert .assertEquals (1 , o .get ());
397+ worker .schedule (new Action0 () {
398+ @ Override
399+ public void call () {
400+ rs .onCompleted ();
401+ }
402+ });
403+
404+ }
397405 }
406+ } finally {
407+ worker .unsubscribe ();
398408 }
399409 }
400410}
0 commit comments