Skip to content

Commit 7ed8e1a

Browse files
authored
3.x: Fix Observable.window (size, skip, overlap) dispose behavior (#7049)
1 parent a8fcaf0 commit 7ed8e1a

File tree

3 files changed

+208
-25
lines changed

3 files changed

+208
-25
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindow.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,20 @@ static final class WindowExactObserver<T>
5151
final long count;
5252
final int capacityHint;
5353

54+
final AtomicBoolean cancelled;
55+
5456
long size;
5557

5658
Disposable upstream;
5759

5860
UnicastSubject<T> window;
5961

60-
volatile boolean cancelled;
61-
6262
WindowExactObserver(Observer<? super Observable<T>> actual, long count, int capacityHint) {
6363
this.downstream = actual;
6464
this.count = count;
6565
this.capacityHint = capacityHint;
66+
this.cancelled = new AtomicBoolean();
67+
this.lazySet(1);
6668
}
6769

6870
@Override
@@ -78,7 +80,9 @@ public void onSubscribe(Disposable d) {
7880
public void onNext(T t) {
7981
UnicastSubject<T> w = window;
8082
ObservableWindowSubscribeIntercept<T> intercept = null;
81-
if (w == null && !cancelled) {
83+
if (w == null && !cancelled.get()) {
84+
getAndIncrement();
85+
8286
w = UnicastSubject.create(capacityHint, this);
8387
window = w;
8488
intercept = new ObservableWindowSubscribeIntercept<>(w);
@@ -92,15 +96,12 @@ public void onNext(T t) {
9296
size = 0;
9397
window = null;
9498
w.onComplete();
95-
if (cancelled) {
96-
upstream.dispose();
97-
}
9899
}
99100

100101
if (intercept != null && intercept.tryAbandon()) {
102+
window = null;
101103
w.onComplete();
102104
w = null;
103-
window = null;
104105
}
105106
}
106107
}
@@ -127,23 +128,25 @@ public void onComplete() {
127128

128129
@Override
129130
public void dispose() {
130-
cancelled = true;
131+
if (cancelled.compareAndSet(false, true)) {
132+
run();
133+
}
131134
}
132135

133136
@Override
134137
public boolean isDisposed() {
135-
return cancelled;
138+
return cancelled.get();
136139
}
137140

138141
@Override
139142
public void run() {
140-
if (cancelled) {
143+
if (decrementAndGet() == 0) {
141144
upstream.dispose();
142145
}
143146
}
144147
}
145148

146-
static final class WindowSkipObserver<T> extends AtomicBoolean
149+
static final class WindowSkipObserver<T> extends AtomicInteger
147150
implements Observer<T>, Disposable, Runnable {
148151

149152
private static final long serialVersionUID = 3366976432059579510L;
@@ -153,23 +156,23 @@ static final class WindowSkipObserver<T> extends AtomicBoolean
153156
final int capacityHint;
154157
final ArrayDeque<UnicastSubject<T>> windows;
155158

156-
long index;
159+
final AtomicBoolean cancelled;
157160

158-
volatile boolean cancelled;
161+
long index;
159162

160163
/** Counts how many elements were emitted to the very first window in windows. */
161164
long firstEmission;
162165

163166
Disposable upstream;
164167

165-
final AtomicInteger wip = new AtomicInteger();
166-
167168
WindowSkipObserver(Observer<? super Observable<T>> actual, long count, long skip, int capacityHint) {
168169
this.downstream = actual;
169170
this.count = count;
170171
this.skip = skip;
171172
this.capacityHint = capacityHint;
172173
this.windows = new ArrayDeque<>();
174+
this.cancelled = new AtomicBoolean();
175+
this.lazySet(1);
173176
}
174177

175178
@Override
@@ -191,8 +194,8 @@ public void onNext(T t) {
191194

192195
ObservableWindowSubscribeIntercept<T> intercept = null;
193196

194-
if (i % s == 0 && !cancelled) {
195-
wip.getAndIncrement();
197+
if (i % s == 0 && !cancelled.get()) {
198+
getAndIncrement();
196199
UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
197200
intercept = new ObservableWindowSubscribeIntercept<>(w);
198201
ws.offer(w);
@@ -207,8 +210,7 @@ public void onNext(T t) {
207210

208211
if (c >= count) {
209212
ws.poll().onComplete();
210-
if (ws.isEmpty() && cancelled) {
211-
this.upstream.dispose();
213+
if (ws.isEmpty() && cancelled.get()) {
212214
return;
213215
}
214216
firstEmission = c - s;
@@ -243,20 +245,20 @@ public void onComplete() {
243245

244246
@Override
245247
public void dispose() {
246-
cancelled = true;
248+
if (cancelled.compareAndSet(false, true)) {
249+
run();
250+
}
247251
}
248252

249253
@Override
250254
public boolean isDisposed() {
251-
return cancelled;
255+
return cancelled.get();
252256
}
253257

254258
@Override
255259
public void run() {
256-
if (wip.decrementAndGet() == 0) {
257-
if (cancelled) {
258-
upstream.dispose();
259-
}
260+
if (decrementAndGet() == 0) {
261+
upstream.dispose();
260262
}
261263
}
262264
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithSizeTest.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.reactivestreams.*;
2626

2727
import io.reactivex.rxjava3.core.*;
28+
import io.reactivex.rxjava3.core.Flowable;
2829
import io.reactivex.rxjava3.exceptions.TestException;
2930
import io.reactivex.rxjava3.functions.*;
3031
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
@@ -721,4 +722,94 @@ public void moreQueuedClean() {
721722
.test(3)
722723
.cancel();
723724
}
725+
726+
@Test
727+
public void cancelWithoutWindowSize() {
728+
PublishProcessor<Integer> pp = PublishProcessor.create();
729+
730+
TestSubscriber<Flowable<Integer>> ts = pp.window(10)
731+
.test();
732+
733+
assertTrue(pp.hasSubscribers());
734+
735+
ts.cancel();
736+
737+
assertFalse("Subject still has subscribers!", pp.hasSubscribers());
738+
}
739+
740+
@Test
741+
public void cancelAfterAbandonmentSize() {
742+
PublishProcessor<Integer> pp = PublishProcessor.create();
743+
744+
TestSubscriber<Flowable<Integer>> ts = pp.window(10)
745+
.test();
746+
747+
assertTrue(pp.hasSubscribers());
748+
749+
pp.onNext(1);
750+
751+
ts.cancel();
752+
753+
assertFalse("Subject still has subscribers!", pp.hasSubscribers());
754+
}
755+
756+
@Test
757+
public void cancelWithoutWindowSkip() {
758+
PublishProcessor<Integer> pp = PublishProcessor.create();
759+
760+
TestSubscriber<Flowable<Integer>> ts = pp.window(10, 15)
761+
.test();
762+
763+
assertTrue(pp.hasSubscribers());
764+
765+
ts.cancel();
766+
767+
assertFalse("Subject still has subscribers!", pp.hasSubscribers());
768+
}
769+
770+
@Test
771+
public void cancelAfterAbandonmentSkip() {
772+
PublishProcessor<Integer> pp = PublishProcessor.create();
773+
774+
TestSubscriber<Flowable<Integer>> ts = pp.window(10, 15)
775+
.test();
776+
777+
assertTrue(pp.hasSubscribers());
778+
779+
pp.onNext(1);
780+
781+
ts.cancel();
782+
783+
assertFalse("Subject still has subscribers!", pp.hasSubscribers());
784+
}
785+
786+
@Test
787+
public void cancelWithoutWindowOverlap() {
788+
PublishProcessor<Integer> pp = PublishProcessor.create();
789+
790+
TestSubscriber<Flowable<Integer>> ts = pp.window(10, 5)
791+
.test();
792+
793+
assertTrue(pp.hasSubscribers());
794+
795+
ts.cancel();
796+
797+
assertFalse("Subject still has subscribers!", pp.hasSubscribers());
798+
}
799+
800+
@Test
801+
public void cancelAfterAbandonmentOverlap() {
802+
PublishProcessor<Integer> pp = PublishProcessor.create();
803+
804+
TestSubscriber<Flowable<Integer>> ts = pp.window(10, 5)
805+
.test();
806+
807+
assertTrue(pp.hasSubscribers());
808+
809+
pp.onNext(1);
810+
811+
ts.cancel();
812+
813+
assertFalse("Subject still has subscribers!", pp.hasSubscribers());
814+
}
724815
}

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithSizeTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,4 +535,94 @@ public void accept(Observable<Integer> v) throws Throwable {
535535

536536
inner.get().test().assertResult(1);
537537
}
538+
539+
@Test
540+
public void cancelWithoutWindowSize() {
541+
PublishSubject<Integer> ps = PublishSubject.create();
542+
543+
TestObserver<Observable<Integer>> to = ps.window(10)
544+
.test();
545+
546+
assertTrue(ps.hasObservers());
547+
548+
to.dispose();
549+
550+
assertFalse("Subject still has observers!", ps.hasObservers());
551+
}
552+
553+
@Test
554+
public void cancelAfterAbandonmentSize() {
555+
PublishSubject<Integer> ps = PublishSubject.create();
556+
557+
TestObserver<Observable<Integer>> to = ps.window(10)
558+
.test();
559+
560+
assertTrue(ps.hasObservers());
561+
562+
ps.onNext(1);
563+
564+
to.dispose();
565+
566+
assertFalse("Subject still has observers!", ps.hasObservers());
567+
}
568+
569+
@Test
570+
public void cancelWithoutWindowSkip() {
571+
PublishSubject<Integer> ps = PublishSubject.create();
572+
573+
TestObserver<Observable<Integer>> to = ps.window(10, 15)
574+
.test();
575+
576+
assertTrue(ps.hasObservers());
577+
578+
to.dispose();
579+
580+
assertFalse("Subject still has observers!", ps.hasObservers());
581+
}
582+
583+
@Test
584+
public void cancelAfterAbandonmentSkip() {
585+
PublishSubject<Integer> ps = PublishSubject.create();
586+
587+
TestObserver<Observable<Integer>> to = ps.window(10, 15)
588+
.test();
589+
590+
assertTrue(ps.hasObservers());
591+
592+
ps.onNext(1);
593+
594+
to.dispose();
595+
596+
assertFalse("Subject still has observers!", ps.hasObservers());
597+
}
598+
599+
@Test
600+
public void cancelWithoutWindowOverlap() {
601+
PublishSubject<Integer> ps = PublishSubject.create();
602+
603+
TestObserver<Observable<Integer>> to = ps.window(10, 5)
604+
.test();
605+
606+
assertTrue(ps.hasObservers());
607+
608+
to.dispose();
609+
610+
assertFalse("Subject still has observers!", ps.hasObservers());
611+
}
612+
613+
@Test
614+
public void cancelAfterAbandonmentOverlap() {
615+
PublishSubject<Integer> ps = PublishSubject.create();
616+
617+
TestObserver<Observable<Integer>> to = ps.window(10, 5)
618+
.test();
619+
620+
assertTrue(ps.hasObservers());
621+
622+
ps.onNext(1);
623+
624+
to.dispose();
625+
626+
assertFalse("Subject still has observers!", ps.hasObservers());
627+
}
538628
}

0 commit comments

Comments
 (0)