Skip to content

Commit 0229e21

Browse files
Concat Outer Backpressure
Fixed #1481
1 parent bc806a8 commit 0229e21

File tree

3 files changed

+35
-3
lines changed

3 files changed

+35
-3
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ public void call() {
6363
}
6464
}));
6565
}
66-
66+
67+
@Override
68+
public void onStart() {
69+
// no need for more than 1 at a time since we concat 1 at a time, so we'll request 2 to start ...
70+
// 1 to be subscribed to, 1 in the queue, then we'll keep requesting 1 at a time after that
71+
request(2);
72+
}
73+
6774
@Override
6875
public void onNext(Observable<? extends T> t) {
6976
queue.add(nl.next(t));
@@ -85,22 +92,25 @@ public void onCompleted() {
8592
subscribeNext();
8693
}
8794
}
95+
8896
void completeInner() {
97+
request(1);
8998
if (WIP_UPDATER.decrementAndGet(this) > 0) {
9099
subscribeNext();
91100
}
92101
}
102+
93103
void subscribeNext() {
94104
Object o = queue.poll();
95105
if (nl.isCompleted(o)) {
96106
s.onCompleted();
97-
} else
98-
if (o != null) {
107+
} else if (o != null) {
99108
Observable<? extends T> obs = nl.getValue(o);
100109
Subscriber<T> sourceSub = new Subscriber<T>() {
101110

102111
@Override
103112
public void onNext(T t) {
113+
// TODO need to support backpressure here https://github.com/Netflix/RxJava/issues/1480
104114
s.onNext(t);
105115
}
106116

rxjava-core/src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
6363
this.child = child;
6464
}
6565

66+
@Override
67+
public void onStart() {
68+
// no backpressure as we are controlling data flow by window size
69+
request(Long.MAX_VALUE);
70+
}
71+
6672
@Override
6773
public void onNext(T t) {
6874
if (count++ % size == 0) {
@@ -106,6 +112,12 @@ public InexactSubscriber(Subscriber<? super Observable<T>> child) {
106112
this.chunks = new LinkedList<CountedSubject<T>>();
107113
}
108114

115+
@Override
116+
public void onStart() {
117+
// no backpressure as we are controlling data flow by window size
118+
request(Long.MAX_VALUE);
119+
}
120+
109121
@Override
110122
public void onNext(T t) {
111123
if (count++ % skip == 0) {

rxjava-core/src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Matchers.anyString;
@@ -650,4 +651,13 @@ public void call(Subscriber<? super Observable<Integer>> s) {
650651
inOrder.verify(o).onCompleted();
651652
verify(o, never()).onError(any(Throwable.class));
652653
}
654+
655+
@Test
656+
public void testConcatOuterBackpressure() {
657+
assertEquals(1,
658+
(int) Observable.<Integer> empty()
659+
.concatWith(Observable.just(1))
660+
.take(1)
661+
.toBlocking().single());
662+
}
653663
}

0 commit comments

Comments
 (0)