@@ -399,8 +399,9 @@ public void call(final Subscriber<? super String> o) {
399399 public void request (long n ) {
400400 if (n == Long .MAX_VALUE ) {
401401 o .onNext ("beginningEveryTime" );
402- if (count .getAndIncrement () < numFailures ) {
403- o .onError (new RuntimeException ("forced failure: " + count .get ()));
402+ int i = count .getAndIncrement ();
403+ if (i < numFailures ) {
404+ o .onError (new RuntimeException ("forced failure: " + (i + 1 )));
404405 } else {
405406 o .onNext ("onSuccessOnly" );
406407 o .onCompleted ();
@@ -411,8 +412,7 @@ public void request(long n) {
411412 int i = count .getAndIncrement ();
412413 if (i < numFailures ) {
413414 o .onNext ("beginningEveryTime" );
414- o .onError (new RuntimeException ("forced failure: " + count .get ()));
415- req .decrementAndGet ();
415+ o .onError (new RuntimeException ("forced failure: " + (i + 1 )));
416416 } else {
417417 do {
418418 if (i == numFailures ) {
@@ -705,17 +705,18 @@ public void testRetryWithBackpressure() throws InterruptedException {
705705 inOrder .verifyNoMoreInteractions ();
706706 }
707707 }
708+
708709 @ Test (timeout = 15000 )
709710 public void testRetryWithBackpressureParallel () throws InterruptedException {
710711 final int NUM_RETRIES = RxRingBuffer .SIZE * 2 ;
711712 int ncpu = Runtime .getRuntime ().availableProcessors ();
712- ExecutorService exec = Executors .newFixedThreadPool (Math .max (ncpu / 2 , 1 ));
713+ ExecutorService exec = Executors .newFixedThreadPool (Math .max (ncpu / 2 , 2 ));
713714 final AtomicInteger timeouts = new AtomicInteger ();
714715 final Map <Integer , List <String >> data = new ConcurrentHashMap <Integer , List <String >>();
715716 final Map <Integer , List <Throwable >> exceptions = new ConcurrentHashMap <Integer , List <Throwable >>();
716717 final Map <Integer , Integer > completions = new ConcurrentHashMap <Integer , Integer >();
717718
718- int m = 2000 ;
719+ int m = 5000 ;
719720 final CountDownLatch cdl = new CountDownLatch (m );
720721 for (int i = 0 ; i < m ; i ++) {
721722 final int j = i ;
@@ -726,16 +727,17 @@ public void run() {
726727 try {
727728 Observable <String > origin = Observable .create (new FuncWithErrors (NUM_RETRIES ));
728729 TestSubscriber <String > ts = new TestSubscriber <String >();
729- origin .retry ().observeOn (Schedulers .computation ()).unsafeSubscribe (ts );
730- ts .awaitTerminalEvent (2 , TimeUnit .SECONDS );
731- if (ts .getOnNextEvents ().size () != NUM_RETRIES + 2 ) {
732- data .put (j , ts .getOnNextEvents ());
730+ origin .retry ()
731+ .observeOn (Schedulers .computation ()).unsafeSubscribe (ts );
732+ ts .awaitTerminalEvent (2500 , TimeUnit .MILLISECONDS );
733+ if (ts .getOnCompletedEvents ().size () != 1 ) {
734+ completions .put (j , ts .getOnCompletedEvents ().size ());
733735 }
734736 if (ts .getOnErrorEvents ().size () != 0 ) {
735737 exceptions .put (j , ts .getOnErrorEvents ());
736738 }
737- if (ts .getOnCompletedEvents ().size () != 1 ) {
738- completions .put (j , ts .getOnCompletedEvents (). size ());
739+ if (ts .getOnNextEvents ().size () != NUM_RETRIES + 2 ) {
740+ data .put (j , ts .getOnNextEvents ());
739741 }
740742 } catch (Throwable t ) {
741743 timeouts .incrementAndGet ();
@@ -749,7 +751,16 @@ public void run() {
749751 cdl .await ();
750752 assertEquals (0 , timeouts .get ());
751753 if (data .size () > 0 ) {
752- fail ("Data content mismatch: " + data );
754+ System .out .println (allSequenceFrequency (data ));
755+ }
756+ if (exceptions .size () > 0 ) {
757+ System .out .println (exceptions );
758+ }
759+ if (completions .size () > 0 ) {
760+ System .out .println (completions );
761+ }
762+ if (data .size () > 0 ) {
763+ fail ("Data content mismatch: " + allSequenceFrequency (data ));
753764 }
754765 if (exceptions .size () > 0 ) {
755766 fail ("Exceptions received: " + exceptions );
@@ -758,6 +769,45 @@ public void run() {
758769 fail ("Multiple completions received: " + completions );
759770 }
760771 }
772+ static <T > StringBuilder allSequenceFrequency (Map <Integer , List <T >> its ) {
773+ StringBuilder b = new StringBuilder ();
774+ for (Map .Entry <Integer , List <T >> e : its .entrySet ()) {
775+ if (b .length () > 0 ) {
776+ b .append (", " );
777+ }
778+ b .append (e .getKey ()).append ("={" );
779+ b .append (sequenceFrequency (e .getValue ()));
780+ b .append ("}" );
781+ }
782+ return b ;
783+ }
784+ static <T > StringBuilder sequenceFrequency (Iterable <T > it ) {
785+ StringBuilder sb = new StringBuilder ();
786+
787+ Object prev = null ;
788+ int cnt = 0 ;
789+
790+ for (Object curr : it ) {
791+ if (sb .length () > 0 ) {
792+ if (!curr .equals (prev )) {
793+ if (cnt > 1 ) {
794+ sb .append (" x " ).append (cnt );
795+ cnt = 1 ;
796+ }
797+ sb .append (", " );
798+ sb .append (curr );
799+ } else {
800+ cnt ++;
801+ }
802+ } else {
803+ sb .append (curr );
804+ cnt ++;
805+ }
806+ prev = curr ;
807+ }
808+
809+ return sb ;
810+ }
761811 @ Test (timeout = 3000 )
762812 public void testIssue1900 () throws InterruptedException {
763813 @ SuppressWarnings ("unchecked" )
0 commit comments