Skip to content

Commit 0e19566

Browse files
authored
2.x: fix window() with time+size emission problems (#5213)
1 parent 476a69f commit 0e19566

File tree

4 files changed

+233
-15
lines changed

4 files changed

+233
-15
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ static final class WindowExactBoundedSubscriber<T>
281281
final int bufferSize;
282282
final boolean restartTimerOnMaxSize;
283283
final long maxSize;
284+
final Scheduler.Worker worker;
284285

285286
long count;
286287

@@ -290,8 +291,6 @@ static final class WindowExactBoundedSubscriber<T>
290291

291292
UnicastProcessor<T> window;
292293

293-
Scheduler.Worker worker;
294-
295294
volatile boolean terminated;
296295

297296
final SequentialDisposable timer = new SequentialDisposable();
@@ -307,6 +306,11 @@ static final class WindowExactBoundedSubscriber<T>
307306
this.bufferSize = bufferSize;
308307
this.maxSize = maxSize;
309308
this.restartTimerOnMaxSize = restartTimerOnMaxSize;
309+
if (restartTimerOnMaxSize) {
310+
worker = scheduler.createWorker();
311+
} else {
312+
worker = null;
313+
}
310314
}
311315

312316
@Override
@@ -342,10 +346,7 @@ public void onSubscribe(Subscription s) {
342346
Disposable d;
343347
ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this);
344348
if (restartTimerOnMaxSize) {
345-
Scheduler.Worker sw = scheduler.createWorker();
346-
worker = sw;
347-
sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
348-
d = sw;
349+
d = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
349350
} else {
350351
d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
351352
}
@@ -451,6 +452,10 @@ public void cancel() {
451452

452453
public void dispose() {
453454
DisposableHelper.dispose(timer);
455+
Worker w = worker;
456+
if (w != null) {
457+
w.dispose();
458+
}
454459
}
455460

456461
void drainLoop() {
@@ -495,9 +500,9 @@ void drainLoop() {
495500

496501
if (isHolder) {
497502
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
498-
if (producerIndex == consumerIndexHolder.index) {
503+
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
499504
w.onComplete();
500-
505+
count = 0;
501506
w = UnicastProcessor.<T>create(bufferSize);
502507
window = w;
503508

src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ static final class WindowExactBoundedObserver<T>
254254
final boolean restartTimerOnMaxSize;
255255
final long maxSize;
256256

257+
final Scheduler.Worker worker;
258+
257259
long count;
258260

259261
long producerIndex;
@@ -262,7 +264,6 @@ static final class WindowExactBoundedObserver<T>
262264

263265
UnicastSubject<T> window;
264266

265-
Scheduler.Worker worker;
266267

267268
volatile boolean terminated;
268269

@@ -279,6 +280,11 @@ static final class WindowExactBoundedObserver<T>
279280
this.bufferSize = bufferSize;
280281
this.maxSize = maxSize;
281282
this.restartTimerOnMaxSize = restartTimerOnMaxSize;
283+
if (restartTimerOnMaxSize) {
284+
worker = scheduler.createWorker();
285+
} else {
286+
worker = null;
287+
}
282288
}
283289

284290
@Override
@@ -302,10 +308,7 @@ public void onSubscribe(Disposable s) {
302308
Disposable d;
303309
ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this);
304310
if (restartTimerOnMaxSize) {
305-
Scheduler.Worker sw = scheduler.createWorker();
306-
worker = sw;
307-
sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
308-
d = sw;
311+
d = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
309312
} else {
310313
d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
311314
}
@@ -394,6 +397,10 @@ public boolean isDisposed() {
394397

395398
void disposeTimer() {
396399
DisposableHelper.dispose(timer);
400+
Worker w = worker;
401+
if (w != null) {
402+
w.dispose();
403+
}
397404
}
398405

399406
void drainLoop() {
@@ -438,9 +445,9 @@ void drainLoop() {
438445

439446
if (isHolder) {
440447
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
441-
if (producerIndex == consumerIndexHolder.index) {
448+
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
442449
w.onComplete();
443-
450+
count = 0;
444451
w = UnicastSubject.create(bufferSize);
445452
window = w;
446453

src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,4 +704,107 @@ public void sizeTimeTimeout() {
704704

705705
ts.values().get(0).test().assertResult();
706706
}
707+
708+
@Test
709+
public void periodicWindowCompletion() {
710+
TestScheduler scheduler = new TestScheduler();
711+
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();
712+
713+
TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, false)
714+
.test();
715+
716+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
717+
718+
ts.assertValueCount(21)
719+
.assertNoErrors()
720+
.assertNotComplete();
721+
}
722+
723+
@Test
724+
public void periodicWindowCompletionRestartTimer() {
725+
TestScheduler scheduler = new TestScheduler();
726+
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();
727+
728+
TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, true)
729+
.test();
730+
731+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
732+
733+
ts.assertValueCount(21)
734+
.assertNoErrors()
735+
.assertNotComplete();
736+
}
737+
738+
@Test
739+
public void periodicWindowCompletionBounded() {
740+
TestScheduler scheduler = new TestScheduler();
741+
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();
742+
743+
TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, false)
744+
.test();
745+
746+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
747+
748+
ts.assertValueCount(21)
749+
.assertNoErrors()
750+
.assertNotComplete();
751+
}
752+
753+
@Test
754+
public void periodicWindowCompletionRestartTimerBounded() {
755+
TestScheduler scheduler = new TestScheduler();
756+
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();
757+
758+
TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
759+
.test();
760+
761+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
762+
763+
ts.assertValueCount(21)
764+
.assertNoErrors()
765+
.assertNotComplete();
766+
}
767+
768+
@Test
769+
public void periodicWindowCompletionRestartTimerBoundedSomeData() {
770+
TestScheduler scheduler = new TestScheduler();
771+
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();
772+
773+
TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 2, true)
774+
.test();
775+
776+
ps.onNext(1);
777+
ps.onNext(2);
778+
779+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
780+
781+
ts.assertValueCount(22)
782+
.assertNoErrors()
783+
.assertNotComplete();
784+
}
785+
@Test
786+
public void countRestartsOnTimeTick() {
787+
TestScheduler scheduler = new TestScheduler();
788+
FlowableProcessor<Integer> ps = PublishProcessor.<Integer>create();
789+
790+
TestSubscriber<Flowable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
791+
.test();
792+
793+
// window #1
794+
ps.onNext(1);
795+
ps.onNext(2);
796+
797+
scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS);
798+
799+
// window #2
800+
ps.onNext(3);
801+
ps.onNext(4);
802+
ps.onNext(5);
803+
ps.onNext(6);
804+
805+
ts.assertValueCount(2)
806+
.assertNoErrors()
807+
.assertNotComplete();
808+
}
707809
}
810+

src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,4 +603,107 @@ public void sizeTimeTimeout() {
603603

604604
ts.values().get(0).test().assertResult();
605605
}
606+
607+
@Test
608+
public void periodicWindowCompletion() {
609+
TestScheduler scheduler = new TestScheduler();
610+
Subject<Integer> ps = PublishSubject.<Integer>create();
611+
612+
TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, false)
613+
.test();
614+
615+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
616+
617+
ts.assertValueCount(21)
618+
.assertNoErrors()
619+
.assertNotComplete();
620+
}
621+
622+
@Test
623+
public void periodicWindowCompletionRestartTimer() {
624+
TestScheduler scheduler = new TestScheduler();
625+
Subject<Integer> ps = PublishSubject.<Integer>create();
626+
627+
TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, true)
628+
.test();
629+
630+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
631+
632+
ts.assertValueCount(21)
633+
.assertNoErrors()
634+
.assertNotComplete();
635+
}
636+
637+
@Test
638+
public void periodicWindowCompletionBounded() {
639+
TestScheduler scheduler = new TestScheduler();
640+
Subject<Integer> ps = PublishSubject.<Integer>create();
641+
642+
TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, false)
643+
.test();
644+
645+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
646+
647+
ts.assertValueCount(21)
648+
.assertNoErrors()
649+
.assertNotComplete();
650+
}
651+
652+
@Test
653+
public void periodicWindowCompletionRestartTimerBounded() {
654+
TestScheduler scheduler = new TestScheduler();
655+
Subject<Integer> ps = PublishSubject.<Integer>create();
656+
657+
TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
658+
.test();
659+
660+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
661+
662+
ts.assertValueCount(21)
663+
.assertNoErrors()
664+
.assertNotComplete();
665+
}
666+
667+
@Test
668+
public void periodicWindowCompletionRestartTimerBoundedSomeData() {
669+
TestScheduler scheduler = new TestScheduler();
670+
Subject<Integer> ps = PublishSubject.<Integer>create();
671+
672+
TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 2, true)
673+
.test();
674+
675+
ps.onNext(1);
676+
ps.onNext(2);
677+
678+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
679+
680+
ts.assertValueCount(22)
681+
.assertNoErrors()
682+
.assertNotComplete();
683+
}
684+
685+
@Test
686+
public void countRestartsOnTimeTick() {
687+
TestScheduler scheduler = new TestScheduler();
688+
Subject<Integer> ps = PublishSubject.<Integer>create();
689+
690+
TestObserver<Observable<Integer>> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true)
691+
.test();
692+
693+
// window #1
694+
ps.onNext(1);
695+
ps.onNext(2);
696+
697+
scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS);
698+
699+
// window #2
700+
ps.onNext(3);
701+
ps.onNext(4);
702+
ps.onNext(5);
703+
ps.onNext(6);
704+
705+
ts.assertValueCount(2)
706+
.assertNoErrors()
707+
.assertNotComplete();
708+
}
606709
}

0 commit comments

Comments
 (0)