1515 */
1616package rx .internal .operators ;
1717
18- import static org .junit .Assert .assertEquals ;
19- import static org .junit .Assert .fail ;
18+ import static org .junit .Assert .*;
2019import static org .mockito .Matchers .any ;
2120import static org .mockito .Mockito .*;
2221
23- import java .util .concurrent .CountDownLatch ;
24- import java .util .concurrent .ExecutorService ;
25- import java .util .concurrent .Executors ;
26- import java .util .concurrent .TimeUnit ;
27- import java .util .concurrent .atomic .AtomicBoolean ;
28- import java .util .concurrent .atomic .AtomicInteger ;
29- import java .util .concurrent .atomic .AtomicLong ;
22+ import java .util .*;
23+ import java .util .concurrent .*;
24+ import java .util .concurrent .atomic .*;
3025
3126import org .junit .Test ;
32- import org .mockito .InOrder ;
33- import org .mockito .Mockito ;
27+ import org .mockito .*;
3428
35- import rx .Observable ;
29+ import rx .* ;
3630import rx .Observable .OnSubscribe ;
31+ import rx .Observable ;
3732import rx .Observer ;
38- import rx .Producer ;
39- import rx .Subscriber ;
40- import rx .Subscription ;
41- import rx .functions .Action0 ;
42- import rx .functions .Action1 ;
43- import rx .functions .Func1 ;
44- import rx .functions .Func2 ;
33+ import rx .functions .*;
4534import rx .internal .util .RxRingBuffer ;
4635import rx .observables .GroupedObservable ;
4736import rx .observers .TestSubscriber ;
@@ -722,7 +711,10 @@ public void testRetryWithBackpressureParallel() throws InterruptedException {
722711 int ncpu = Runtime .getRuntime ().availableProcessors ();
723712 ExecutorService exec = Executors .newFixedThreadPool (Math .max (ncpu / 2 , 1 ));
724713 final AtomicInteger timeouts = new AtomicInteger ();
725- final AtomicInteger data = new AtomicInteger ();
714+ final Map <Integer , List <String >> data = new ConcurrentHashMap <Integer , List <String >>();
715+ final Map <Integer , List <Throwable >> exceptions = new ConcurrentHashMap <Integer , List <Throwable >>();
716+ final Map <Integer , Integer > completions = new ConcurrentHashMap <Integer , Integer >();
717+
726718 int m = 2000 ;
727719 final CountDownLatch cdl = new CountDownLatch (m );
728720 for (int i = 0 ; i < m ; i ++) {
@@ -737,13 +729,13 @@ public void run() {
737729 origin .retry ().observeOn (Schedulers .computation ()).unsafeSubscribe (ts );
738730 ts .awaitTerminalEvent (2 , TimeUnit .SECONDS );
739731 if (ts .getOnNextEvents ().size () != NUM_RETRIES + 2 ) {
740- data .incrementAndGet ( );
732+ data .put ( j , ts . getOnNextEvents () );
741733 }
742734 if (ts .getOnErrorEvents ().size () != 0 ) {
743- data . incrementAndGet ( );
735+ exceptions . put ( j , ts . getOnErrorEvents () );
744736 }
745737 if (ts .getOnCompletedEvents ().size () != 1 ) {
746- data . incrementAndGet ( );
738+ completions . put ( j , ts . getOnCompletedEvents (). size () );
747739 }
748740 } catch (Throwable t ) {
749741 timeouts .incrementAndGet ();
@@ -756,8 +748,15 @@ public void run() {
756748 exec .shutdown ();
757749 cdl .await ();
758750 assertEquals (0 , timeouts .get ());
759- assertEquals (0 , data .get ());
760-
751+ if (data .size () > 0 ) {
752+ fail ("Data content mismatch: " + data );
753+ }
754+ if (exceptions .size () > 0 ) {
755+ fail ("Exceptions received: " + exceptions );
756+ }
757+ if (completions .size () > 0 ) {
758+ fail ("Multiple completions received: " + completions );
759+ }
761760 }
762761 @ Test (timeout = 3000 )
763762 public void testIssue1900 () throws InterruptedException {
0 commit comments