Skip to content

Commit bd862bc

Browse files
authored
1.x: fix broken backpressure through unsubscribeOn() (#5726)
1 parent 048175a commit bd862bc

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

src/main/java/rx/internal/operators/OperatorUnsubscribeOn.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ public void onNext(T t) {
5252
subscriber.onNext(t);
5353
}
5454

55+
@Override
56+
public void setProducer(Producer p) {
57+
subscriber.setProducer(p);
58+
}
5559
};
5660

5761
subscriber.add(Subscriptions.create(new Action0() {

src/test/java/rx/internal/operators/OperatorUnsubscribeOnTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,4 +204,31 @@ public Thread getThread() {
204204
}
205205

206206
}
207+
208+
@Test
209+
public void backpressure() {
210+
AssertableSubscriber<Integer> as = Observable.range(1, 10)
211+
.unsubscribeOn(Schedulers.trampoline())
212+
.test(0);
213+
214+
as.assertNoValues()
215+
.assertNoErrors()
216+
.assertNotCompleted();
217+
218+
as.requestMore(1);
219+
220+
as.assertValue(1)
221+
.assertNoErrors()
222+
.assertNotCompleted();
223+
224+
as.requestMore(3);
225+
226+
as.assertValues(1, 2, 3, 4)
227+
.assertNoErrors()
228+
.assertNotCompleted();
229+
230+
as.requestMore(10);
231+
232+
as.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
233+
}
207234
}

0 commit comments

Comments
 (0)