@@ -61,7 +61,8 @@ final class ExactSubscriber extends Subscriber<T> {
6161 final Subscriber <? super Observable <T >> child ;
6262 int count ;
6363 BufferUntilSubscriber <T > window ;
64- Subscription parentSubscription = this ;
64+ volatile boolean noWindow = true ;
65+ final Subscription parentSubscription = this ;
6566 public ExactSubscriber (Subscriber <? super Observable <T >> child ) {
6667 /**
6768 * See https://github.com/ReactiveX/RxJava/issues/1546
@@ -77,7 +78,7 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
7778 @ Override
7879 public void call () {
7980 // if no window we unsubscribe up otherwise wait until window ends
80- if (window == null ) {
81+ if (noWindow ) {
8182 parentSubscription .unsubscribe ();
8283 }
8384 }
@@ -94,13 +95,15 @@ public void onStart() {
9495 @ Override
9596 public void onNext (T t ) {
9697 if (window == null ) {
98+ noWindow = false ;
9799 window = BufferUntilSubscriber .create ();
98100 child .onNext (window );
99101 }
100102 window .onNext (t );
101103 if (++count % size == 0 ) {
102104 window .onCompleted ();
103105 window = null ;
106+ noWindow = true ;
104107 if (child .isUnsubscribed ()) {
105108 parentSubscription .unsubscribe ();
106109 return ;
@@ -130,7 +133,7 @@ final class InexactSubscriber extends Subscriber<T> {
130133 final Subscriber <? super Observable <T >> child ;
131134 int count ;
132135 final List <CountedSubject <T >> chunks = new LinkedList <CountedSubject <T >>();
133- Subscription parentSubscription = this ;
136+ final Subscription parentSubscription = this ;
134137
135138 public InexactSubscriber (Subscriber <? super Observable <T >> child ) {
136139 /**
0 commit comments