Skip to content

Commit e753858

Browse files
authored
2.x: Fig groupBy not requesting more if a group is cancelled with buffered items (#6894)
1 parent 030528b commit e753858

File tree

2 files changed

+57
-12
lines changed

2 files changed

+57
-12
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,7 @@ public void request(long n) {
518518
public void cancel() {
519519
if (cancelled.compareAndSet(false, true)) {
520520
parent.cancel(key);
521+
drain();
521522
}
522523
}
523524

@@ -568,7 +569,6 @@ void drainFused() {
568569
for (;;) {
569570
if (a != null) {
570571
if (cancelled.get()) {
571-
q.clear();
572572
return;
573573
}
574574

@@ -623,7 +623,7 @@ void drainNormal() {
623623
T v = q.poll();
624624
boolean empty = v == null;
625625

626-
if (checkTerminated(d, empty, a, delayError)) {
626+
if (checkTerminated(d, empty, a, delayError, e)) {
627627
return;
628628
}
629629

@@ -636,7 +636,7 @@ void drainNormal() {
636636
e++;
637637
}
638638

639-
if (e == r && checkTerminated(done, q.isEmpty(), a, delayError)) {
639+
if (e == r && checkTerminated(done, q.isEmpty(), a, delayError, e)) {
640640
return;
641641
}
642642

@@ -658,9 +658,15 @@ void drainNormal() {
658658
}
659659
}
660660

661-
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boolean delayError) {
661+
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boolean delayError, long emitted) {
662662
if (cancelled.get()) {
663-
queue.clear();
663+
// make sure buffered items can get replenished
664+
while (queue.poll() != null) {
665+
emitted++;
666+
}
667+
if (emitted != 0) {
668+
parent.upstream.request(emitted);
669+
}
664670
return true;
665671
}
666672

@@ -732,7 +738,12 @@ void tryReplenish() {
732738

733739
@Override
734740
public void clear() {
735-
queue.clear();
741+
// make sure buffered items can get replenished
742+
SpscLinkedArrayQueue<T> q = queue;
743+
while (q.poll() != null) {
744+
produced++;
745+
}
746+
tryReplenish();
736747
}
737748
}
738749
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2215,20 +2215,20 @@ public void fusedNoConcurrentCleanDueToCancel() {
22152215
try {
22162216
final PublishProcessor<Integer> pp = PublishProcessor.create();
22172217

2218-
final AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>> qs = new AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>>();
2218+
final AtomicReference<QueueSubscription<GroupedFlowable<Integer, Integer>>> qs = new AtomicReference<QueueSubscription<GroupedFlowable<Integer, Integer>>>();
22192219

22202220
final TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>();
22212221

2222-
pp.groupBy(Functions.identity(), Functions.<Integer>identity(), false, 4)
2223-
.subscribe(new FlowableSubscriber<GroupedFlowable<Object, Integer>>() {
2222+
pp.groupBy(Functions.<Integer>identity(), Functions.<Integer>identity(), false, 4)
2223+
.subscribe(new FlowableSubscriber<GroupedFlowable<Integer, Integer>>() {
22242224

22252225
boolean once;
22262226

22272227
@Override
2228-
public void onNext(GroupedFlowable<Object, Integer> g) {
2228+
public void onNext(GroupedFlowable<Integer, Integer> g) {
22292229
if (!once) {
22302230
try {
2231-
GroupedFlowable<Object, Integer> t = qs.get().poll();
2231+
GroupedFlowable<Integer, Integer> t = qs.get().poll();
22322232
if (t != null) {
22332233
once = true;
22342234
t.subscribe(ts2);
@@ -2250,7 +2250,7 @@ public void onComplete() {
22502250
@Override
22512251
public void onSubscribe(Subscription s) {
22522252
@SuppressWarnings("unchecked")
2253-
QueueSubscription<GroupedFlowable<Object, Integer>> q = (QueueSubscription<GroupedFlowable<Object, Integer>>)s;
2253+
QueueSubscription<GroupedFlowable<Integer, Integer>> q = (QueueSubscription<GroupedFlowable<Integer, Integer>>)s;
22542254
qs.set(q);
22552255
q.requestFusion(QueueFuseable.ANY);
22562256
q.request(1);
@@ -2316,4 +2316,38 @@ public Publisher<Integer> apply(GroupedFlowable<Integer, Integer> g) {
23162316
.assertComplete()
23172317
.assertNoErrors();
23182318
}
2319+
2320+
@Test
2321+
public void cancelledGroupResumesRequesting() {
2322+
final List<TestSubscriber<Integer>> tss = new ArrayList<TestSubscriber<Integer>>();
2323+
final AtomicInteger counter = new AtomicInteger();
2324+
final AtomicBoolean done = new AtomicBoolean();
2325+
Flowable.range(1, 1000)
2326+
.doOnNext(new Consumer<Integer>() {
2327+
@Override
2328+
public void accept(Integer v) throws Exception {
2329+
counter.getAndIncrement();
2330+
}
2331+
})
2332+
.groupBy(Functions.justFunction(1))
2333+
.subscribe(new Consumer<GroupedFlowable<Integer, Integer>>() {
2334+
@Override
2335+
public void accept(GroupedFlowable<Integer, Integer> v) throws Exception {
2336+
TestSubscriber<Integer> ts = TestSubscriber.create(0L);
2337+
tss.add(ts);
2338+
v.subscribe(ts);
2339+
}
2340+
}, Functions.emptyConsumer(), new Action() {
2341+
@Override
2342+
public void run() throws Exception {
2343+
done.set(true);
2344+
}
2345+
});
2346+
2347+
while (!done.get()) {
2348+
tss.remove(0).cancel();
2349+
}
2350+
2351+
assertEquals(1000, counter.get());
2352+
}
23192353
}

0 commit comments

Comments
 (0)