File tree Expand file tree Collapse file tree 2 files changed +43
-1
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +43
-1
lines changed Original file line number Diff line number Diff line change @@ -185,7 +185,9 @@ void pollQueue() {
185185 counter = 1 ;
186186 long produced = 0 ;
187187 long r = requested ;
188- while (!child .isUnsubscribed ()) {
188+ for (;;) {
189+ if (child .isUnsubscribed ())
190+ return ;
189191 Throwable error ;
190192 if (finished ) {
191193 if ((error = this .error ) != null ) {
Original file line number Diff line number Diff line change 2626import static org .mockito .Mockito .times ;
2727import static org .mockito .Mockito .verify ;
2828
29+ import java .util .ArrayList ;
2930import java .util .Arrays ;
31+ import java .util .Collections ;
3032import java .util .Iterator ;
3133import java .util .List ;
3234import java .util .concurrent .CountDownLatch ;
@@ -765,4 +767,42 @@ public void onNext(Integer t) {
765767
766768 }
767769
770+ @ Test
771+ public void testNoMoreRequestsAfterUnsubscribe () throws InterruptedException {
772+ final CountDownLatch latch = new CountDownLatch (1 );
773+ final List <Long > requests = Collections .synchronizedList (new ArrayList <Long >());
774+ Observable .range (1 , 1000000 )
775+ .doOnRequest (new Action1 <Long >() {
776+
777+ @ Override
778+ public void call (Long n ) {
779+ requests .add (n );
780+ }
781+ })
782+ .observeOn (Schedulers .io ())
783+ .subscribe (new Subscriber <Integer >() {
784+
785+ @ Override
786+ public void onStart () {
787+ request (1 );
788+ }
789+
790+ @ Override
791+ public void onCompleted () {
792+ }
793+
794+ @ Override
795+ public void onError (Throwable e ) {
796+ }
797+
798+ @ Override
799+ public void onNext (Integer t ) {
800+ unsubscribe ();
801+ latch .countDown ();
802+ }
803+ });
804+ assertTrue (latch .await (10 , TimeUnit .SECONDS ));
805+ assertEquals (1 , requests .size ());
806+ }
807+
768808}
You can’t perform that action at this time.
0 commit comments