Skip to content

Commit 19c96cd

Browse files
committed
Merge pull request #2566 from akarnokd/CombineLatestBackpressureFix
CombineLatest: fixed concurrent requestUpTo yielding -1 requests
2 parents 9a76608 + cc522a5 commit 19c96cd

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,14 @@ public MultiSourceRequestableSubscriber(int index, int initial, Subscriber<? sup
230230
}
231231

232232
public void requestUpTo(long n) {
233-
long r = Math.min(emitted.get(), n);
234-
request(r);
235-
emitted.addAndGet(-r);
233+
do {
234+
long r = emitted.get();
235+
long u = Math.min(r, n);
236+
if (emitted.compareAndSet(r, r - u)) {
237+
request(u);
238+
break;
239+
}
240+
} while (true);
236241
}
237242

238243
@Override

src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,13 @@ public Object call(Object... args) {
789789

790790
}
791791

792+
@Test
793+
public void testBackpressureLoop() {
794+
for (int i = 0; i < 5000; i++) {
795+
testBackpressure();
796+
}
797+
}
798+
792799
@Test
793800
public void testBackpressure() {
794801
Func2<String, Integer, String> combineLatestFunction = getConcatStringIntegerCombineLatestFunction();

0 commit comments

Comments
 (0)