@@ -582,64 +582,28 @@ public void onNext(Integer t) {
582582
583583 @ Test
584584 public void testQueueFullEmitsErrorWithVaryingBufferSize () {
585- final CountDownLatch latch = new CountDownLatch (1 );
586- // randomize buffer size, note that underlying implementations may be tuning the real size to a power of 2
587- // which can lead to unexpected results when adding excess capacity (e.g.: see ConcurrentCircularArrayQueue)
588585 for (int i = 1 ; i <= 1024 ; i = i * 2 ) {
589586 final int capacity = i ;
590- Observable <Integer > observable = Observable .create (new OnSubscribe <Integer >() {
591-
592- @ Override
593- public void call (Subscriber <? super Integer > o ) {
594- for (int i = 0 ; i < capacity + 10 ; i ++) {
595- o .onNext (i );
596- }
597- latch .countDown ();
598- o .onCompleted ();
599- }
600-
601- });
602-
603- TestSubscriber <Integer > testSubscriber = new TestSubscriber <Integer >(new Observer <Integer >() {
604-
605- @ Override
606- public void onCompleted () {
607-
608- }
609-
610- @ Override
611- public void onError (Throwable e ) {
612-
613- }
614-
615- @ Override
616- public void onNext (Integer t ) {
617- try {
618- // force it to be slow wait until we have queued everything
619- latch .await (500 , TimeUnit .MILLISECONDS );
620- } catch (InterruptedException e ) {
621- e .printStackTrace ();
622- }
623- }
624-
625- });
626- System .out .println ("Using capacity " + capacity ); // for post-failure debugging
627- observable .observeOn (Schedulers .newThread (), capacity ).subscribe (testSubscriber );
628-
629- testSubscriber .awaitTerminalEvent ();
630- List <Throwable > errors = testSubscriber .getOnErrorEvents ();
631- assertEquals (1 , errors .size ());
632- System .out .println ("Errors: " + errors );
633- Throwable t = errors .get (0 );
634- if (t instanceof MissingBackpressureException ) {
635- // success, we expect this
636- } else {
637- if (t .getCause () instanceof MissingBackpressureException ) {
638- // this is also okay
639- } else {
640- fail ("Expecting MissingBackpressureException" );
641- }
587+ System .out .println (">> testQueueFullEmitsErrorWithVaryingBufferSize @ " + i );
588+
589+ PublishSubject <Integer > ps = PublishSubject .create ();
590+
591+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >(0 );
592+
593+ TestScheduler test = Schedulers .test ();
594+
595+ ps .observeOn (test , capacity ).subscribe (ts );
596+
597+ for (int j = 0 ; j < capacity + 10 ; j ++) {
598+ ps .onNext (j );
642599 }
600+ ps .onCompleted ();
601+
602+ test .advanceTimeBy (1 , TimeUnit .SECONDS );
603+
604+ ts .assertNoValues ();
605+ ts .assertError (MissingBackpressureException .class );
606+ ts .assertNotCompleted ();
643607 }
644608 }
645609
0 commit comments