Skip to content

Commit c698a61

Browse files
committed
Backpressure for window(size)
1 parent c4f611b commit c698a61

File tree

2 files changed

+75
-23
lines changed

2 files changed

+75
-23
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,14 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Iterator;
20-
import java.util.LinkedList;
21-
import java.util.List;
18+
import java.util.*;
2219

23-
import rx.Observable;
20+
import rx.*;
2421
import rx.Observable.Operator;
25-
import rx.Subscription;
22+
import rx.Observable;
23+
import rx.Observer;
2624
import rx.functions.Action0;
2725
import rx.subscriptions.Subscriptions;
28-
import rx.Observer;
29-
import rx.Subscriber;
3026

3127
/**
3228
* Creates windows of values into the source sequence with skip frequency and size bounds.
@@ -78,26 +74,36 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
7874
@Override
7975
public void call() {
8076
// if no window we unsubscribe up otherwise wait until window ends
81-
if(noWindow) {
77+
if (noWindow) {
8278
parentSubscription.unsubscribe();
8379
}
8480
}
8581

8682
}));
87-
}
88-
89-
@Override
90-
public void onStart() {
91-
// no backpressure as we are controlling data flow by window size
92-
request(Long.MAX_VALUE);
83+
child.setProducer(new Producer() {
84+
@Override
85+
public void request(long n) {
86+
if (n > 0) {
87+
long u = n * size;
88+
if (((u >>> 31) != 0) && (u / n != size)) {
89+
u = Long.MAX_VALUE;
90+
}
91+
requestMore(u);
92+
}
93+
}
94+
});
9395
}
9496

97+
void requestMore(long n) {
98+
request(n);
99+
}
100+
95101
@Override
96102
public void onNext(T t) {
97103
if (window == null) {
98104
noWindow = false;
99105
window = BufferUntilSubscriber.create();
100-
child.onNext(window);
106+
child.onNext(window);
101107
}
102108
window.onNext(t);
103109
if (++count % size == 0) {

src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,19 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertTrue;
18+
import static org.junit.Assert.*;
2019

21-
import java.util.ArrayList;
22-
import java.util.Arrays;
23-
import java.util.List;
20+
import java.util.*;
2421
import java.util.concurrent.TimeUnit;
2522
import java.util.concurrent.atomic.AtomicInteger;
2623

2724
import org.junit.Test;
2825

26+
import static org.mockito.Mockito.*;
27+
import rx.*;
2928
import rx.Observable;
30-
import rx.functions.Action1;
31-
import rx.functions.Func1;
29+
import rx.Observer;
30+
import rx.functions.*;
3231
import rx.observers.TestSubscriber;
3332
import rx.schedulers.Schedulers;
3433

@@ -198,5 +197,52 @@ private List<String> list(String... args) {
198197
}
199198
return list;
200199
}
200+
201+
@Test
202+
public void testBackpressureOuter() {
203+
Observable<Observable<Integer>> source = Observable.range(1, 10).window(3);
204+
205+
final List<Integer> list = new ArrayList<Integer>();
206+
207+
@SuppressWarnings("unchecked")
208+
final Observer<Integer> o = mock(Observer.class);
209+
210+
source.subscribe(new Subscriber<Observable<Integer>>() {
211+
@Override
212+
public void onStart() {
213+
request(1);
214+
}
215+
@Override
216+
public void onNext(Observable<Integer> t) {
217+
t.subscribe(new Observer<Integer>() {
218+
@Override
219+
public void onNext(Integer t) {
220+
list.add(t);
221+
}
222+
@Override
223+
public void onError(Throwable e) {
224+
o.onError(e);
225+
}
226+
@Override
227+
public void onCompleted() {
228+
o.onCompleted();
229+
}
230+
});
231+
}
232+
@Override
233+
public void onError(Throwable e) {
234+
o.onError(e);
235+
}
236+
@Override
237+
public void onCompleted() {
238+
o.onCompleted();
239+
}
240+
});
241+
242+
assertEquals(Arrays.asList(1, 2, 3), list);
243+
244+
verify(o, never()).onError(any(Throwable.class));
245+
verify(o, times(1)).onCompleted(); // 1 inner
246+
}
201247

202248
}

0 commit comments

Comments
 (0)