Skip to content

Commit a9f1f4f

Browse files
authored
2.x: fix publish(Function) not replenishing its queue (#4943)
1 parent 4851637 commit a9f1f4f

File tree

2 files changed

+105
-6
lines changed

2 files changed

+105
-6
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Subscrib
137137
final AtomicReference<MulticastSubscription<T>[]> subscribers;
138138

139139
final int prefetch;
140+
141+
final int limit;
140142

141143
final boolean delayError;
142144

@@ -148,10 +150,13 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Subscrib
148150

149151
volatile boolean done;
150152
Throwable error;
153+
154+
int consumed;
151155

152156
@SuppressWarnings("unchecked")
153157
MulticastProcessor(int prefetch, boolean delayError) {
154158
this.prefetch = prefetch;
159+
this.limit = prefetch - (prefetch >> 2); // request after 75% consumption
155160
this.delayError = delayError;
156161
this.wip = new AtomicInteger();
157162
this.s = new AtomicReference<Subscription>();
@@ -314,7 +319,11 @@ void drain() {
314319
int missed = 1;
315320

316321
SimpleQueue<T> q = queue;
317-
322+
323+
int upstreamConsumed = consumed;
324+
int localLimit = limit;
325+
boolean canRequest = sourceMode != QueueSubscription.SYNC;
326+
318327
for (;;) {
319328
MulticastSubscription<T>[] array = subscribers.get();
320329

@@ -383,6 +392,11 @@ void drain() {
383392
}
384393

385394
e++;
395+
396+
if (canRequest && ++upstreamConsumed == localLimit) {
397+
upstreamConsumed = 0;
398+
s.get().request(localLimit);
399+
}
386400
}
387401

388402
if (e == r) {
@@ -417,6 +431,7 @@ void drain() {
417431
}
418432
}
419433

434+
consumed = upstreamConsumed;
420435
missed = wip.addAndGet(-missed);
421436
if (missed == 0) {
422437
break;
@@ -472,8 +487,10 @@ public void request(long n) {
472487

473488
@Override
474489
public void cancel() {
475-
getAndSet(Long.MIN_VALUE);
476-
parent.remove(this);
490+
if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
491+
parent.remove(this);
492+
parent.drain(); // unblock the others
493+
}
477494
}
478495

479496
public boolean isCancelled() {

src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import io.reactivex.*;
2828
import io.reactivex.exceptions.*;
29-
import io.reactivex.functions.Function;
29+
import io.reactivex.functions.*;
3030
import io.reactivex.internal.functions.Functions;
3131
import io.reactivex.internal.subscriptions.BooleanSubscription;
3232
import io.reactivex.processors.PublishProcessor;
@@ -408,7 +408,7 @@ public Publisher<Integer> apply(Flowable<Integer> f) throws Exception {
408408

409409
for (int i = 0; i < 500; i++) {
410410
source.test()
411-
.awaitDone(5, TimeUnit.MILLISECONDS)
411+
.awaitDone(5, TimeUnit.SECONDS)
412412
.assertResult(1);
413413
}
414414
}
@@ -420,7 +420,7 @@ public void inputOutputSubscribeRace2() {
420420

421421
for (int i = 0; i < 500; i++) {
422422
source.test()
423-
.awaitDone(5, TimeUnit.MILLISECONDS)
423+
.awaitDone(5, TimeUnit.SECONDS)
424424
.assertResult(1);
425425
}
426426
}
@@ -459,4 +459,86 @@ public void run() {
459459
ts1.assertResult(1);
460460
}
461461
}
462+
463+
@Test
464+
public void longFlow() {
465+
Flowable.range(1, 1000000)
466+
.publish(new Function<Flowable<Integer>, Publisher<Integer>>() {
467+
@SuppressWarnings("unchecked")
468+
@Override
469+
public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
470+
return Flowable.mergeArray(
471+
v.filter(new Predicate<Integer>() {
472+
@Override
473+
public boolean test(Integer w) throws Exception {
474+
return w % 2 == 0;
475+
}
476+
}),
477+
v.filter(new Predicate<Integer>() {
478+
@Override
479+
public boolean test(Integer w) throws Exception {
480+
return w % 2 != 0;
481+
}
482+
}));
483+
}
484+
})
485+
.takeLast(1)
486+
.test()
487+
.assertResult(1000000);
488+
}
489+
490+
@Test
491+
public void longFlow2() {
492+
Flowable.range(1, 100000)
493+
.publish(new Function<Flowable<Integer>, Publisher<Integer>>() {
494+
@SuppressWarnings("unchecked")
495+
@Override
496+
public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
497+
return Flowable.mergeArray(
498+
v.filter(new Predicate<Integer>() {
499+
@Override
500+
public boolean test(Integer w) throws Exception {
501+
return w % 2 == 0;
502+
}
503+
}),
504+
v.filter(new Predicate<Integer>() {
505+
@Override
506+
public boolean test(Integer w) throws Exception {
507+
return w % 2 != 0;
508+
}
509+
}));
510+
}
511+
})
512+
.test()
513+
.assertValueCount(100000)
514+
.assertNoErrors()
515+
.assertComplete();
516+
}
517+
518+
@Test
519+
public void longFlowHidden() {
520+
Flowable.range(1, 1000000).hide()
521+
.publish(new Function<Flowable<Integer>, Publisher<Integer>>() {
522+
@SuppressWarnings("unchecked")
523+
@Override
524+
public Publisher<Integer> apply(Flowable<Integer> v) throws Exception {
525+
return Flowable.mergeArray(
526+
v.filter(new Predicate<Integer>() {
527+
@Override
528+
public boolean test(Integer w) throws Exception {
529+
return w % 2 == 0;
530+
}
531+
}),
532+
v.filter(new Predicate<Integer>() {
533+
@Override
534+
public boolean test(Integer w) throws Exception {
535+
return w % 2 != 0;
536+
}
537+
}));
538+
}
539+
})
540+
.takeLast(1)
541+
.test()
542+
.assertResult(1000000);
543+
}
462544
}

0 commit comments

Comments
 (0)