Skip to content

Commit 3281b02

Browse files
authored
2.x: Fix Observable.flatMap to sustain concurrency level (#6283)
1 parent caefffa commit 3281b02

File tree

3 files changed

+93
-11
lines changed

3 files changed

+93
-11
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ void drainLoop() {
376376
return;
377377
}
378378

379-
boolean innerCompleted = false;
379+
int innerCompleted = 0;
380380
if (n != 0) {
381381
long startId = lastId;
382382
int index = lastIndex;
@@ -423,7 +423,7 @@ void drainLoop() {
423423
return;
424424
}
425425
removeInner(is);
426-
innerCompleted = true;
426+
innerCompleted++;
427427
j++;
428428
if (j == n) {
429429
j = 0;
@@ -449,7 +449,7 @@ void drainLoop() {
449449
if (checkTerminate()) {
450450
return;
451451
}
452-
innerCompleted = true;
452+
innerCompleted++;
453453
}
454454

455455
j++;
@@ -461,17 +461,19 @@ void drainLoop() {
461461
lastId = inner[j].id;
462462
}
463463

464-
if (innerCompleted) {
464+
if (innerCompleted != 0) {
465465
if (maxConcurrency != Integer.MAX_VALUE) {
466-
ObservableSource<? extends U> p;
467-
synchronized (this) {
468-
p = sources.poll();
469-
if (p == null) {
470-
wip--;
471-
continue;
466+
while (innerCompleted-- != 0) {
467+
ObservableSource<? extends U> p;
468+
synchronized (this) {
469+
p = sources.poll();
470+
if (p == null) {
471+
wip--;
472+
continue;
473+
}
472474
}
475+
subscribeInner(p);
473476
}
474-
subscribeInner(p);
475477
}
476478
continue;
477479
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,4 +1084,44 @@ public void remove() {
10841084

10851085
assertEquals(1, counter.get());
10861086
}
1087+
1088+
@Test
1089+
public void maxConcurrencySustained() {
1090+
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
1091+
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
1092+
PublishProcessor<Integer> pp3 = PublishProcessor.create();
1093+
PublishProcessor<Integer> pp4 = PublishProcessor.create();
1094+
1095+
TestSubscriber<Integer> ts = Flowable.just(pp1, pp2, pp3, pp4)
1096+
.flatMap(new Function<PublishProcessor<Integer>, Flowable<Integer>>() {
1097+
@Override
1098+
public Flowable<Integer> apply(PublishProcessor<Integer> v) throws Exception {
1099+
return v;
1100+
}
1101+
}, 2)
1102+
.doOnNext(new Consumer<Integer>() {
1103+
@Override
1104+
public void accept(Integer v) throws Exception {
1105+
if (v == 1) {
1106+
// this will make sure the drain loop detects two completed
1107+
// inner sources and replaces them with fresh ones
1108+
pp1.onComplete();
1109+
pp2.onComplete();
1110+
}
1111+
}
1112+
})
1113+
.test();
1114+
1115+
pp1.onNext(1);
1116+
1117+
assertFalse(pp1.hasSubscribers());
1118+
assertFalse(pp2.hasSubscribers());
1119+
assertTrue(pp3.hasSubscribers());
1120+
assertTrue(pp4.hasSubscribers());
1121+
1122+
ts.dispose();
1123+
1124+
assertFalse(pp3.hasSubscribers());
1125+
assertFalse(pp4.hasSubscribers());
1126+
}
10871127
}

src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,4 +1045,44 @@ public Integer apply(Integer v)
10451045

10461046
to.assertValuesOnly(10, 11, 12, 13, 14, 20, 21, 22, 23, 24);
10471047
}
1048+
1049+
@Test
1050+
public void maxConcurrencySustained() {
1051+
final PublishSubject<Integer> ps1 = PublishSubject.create();
1052+
final PublishSubject<Integer> ps2 = PublishSubject.create();
1053+
PublishSubject<Integer> ps3 = PublishSubject.create();
1054+
PublishSubject<Integer> ps4 = PublishSubject.create();
1055+
1056+
TestObserver<Integer> to = Observable.just(ps1, ps2, ps3, ps4)
1057+
.flatMap(new Function<PublishSubject<Integer>, ObservableSource<Integer>>() {
1058+
@Override
1059+
public ObservableSource<Integer> apply(PublishSubject<Integer> v) throws Exception {
1060+
return v;
1061+
}
1062+
}, 2)
1063+
.doOnNext(new Consumer<Integer>() {
1064+
@Override
1065+
public void accept(Integer v) throws Exception {
1066+
if (v == 1) {
1067+
// this will make sure the drain loop detects two completed
1068+
// inner sources and replaces them with fresh ones
1069+
ps1.onComplete();
1070+
ps2.onComplete();
1071+
}
1072+
}
1073+
})
1074+
.test();
1075+
1076+
ps1.onNext(1);
1077+
1078+
assertFalse(ps1.hasObservers());
1079+
assertFalse(ps2.hasObservers());
1080+
assertTrue(ps3.hasObservers());
1081+
assertTrue(ps4.hasObservers());
1082+
1083+
to.dispose();
1084+
1085+
assertFalse(ps3.hasObservers());
1086+
assertFalse(ps4.hasObservers());
1087+
}
10481088
}

0 commit comments

Comments
 (0)