File tree Expand file tree Collapse file tree 2 files changed +35
-0
lines changed
main/java/io/reactivex/rxjava3/internal/operators/flowable
test/java/io/reactivex/rxjava3/internal/operators/flowable Expand file tree Collapse file tree 2 files changed +35
-0
lines changed Original file line number Diff line number Diff line change @@ -401,6 +401,7 @@ public void request(long n) {
401401 public void cancel () {
402402 if (cancelled .compareAndSet (false , true )) {
403403 cancelParent ();
404+ drain ();
404405 }
405406 }
406407
Original file line number Diff line number Diff line change @@ -2444,4 +2444,38 @@ public void accept(Integer v) throws Throwable {
24442444 .assertNoErrors ()
24452445 .assertComplete ();
24462446 }
2447+
2448+ @ Test
2449+ public void cancelledGroupResumesRequesting () {
2450+ final List <TestSubscriber <Integer >> tss = new ArrayList <>();
2451+ final AtomicInteger counter = new AtomicInteger ();
2452+ final AtomicBoolean done = new AtomicBoolean ();
2453+ Flowable .range (1 , 1000 )
2454+ .doOnNext (new Consumer <Integer >() {
2455+ @ Override
2456+ public void accept (Integer v ) throws Exception {
2457+ counter .getAndIncrement ();
2458+ }
2459+ })
2460+ .groupBy (Functions .justFunction (1 ))
2461+ .subscribe (new Consumer <GroupedFlowable <Integer , Integer >>() {
2462+ @ Override
2463+ public void accept (GroupedFlowable <Integer , Integer > v ) throws Exception {
2464+ TestSubscriber <Integer > ts = TestSubscriber .create (0L );
2465+ tss .add (ts );
2466+ v .subscribe (ts );
2467+ }
2468+ }, Functions .emptyConsumer (), new Action () {
2469+ @ Override
2470+ public void run () throws Exception {
2471+ done .set (true );
2472+ }
2473+ });
2474+
2475+ while (!done .get ()) {
2476+ tss .remove (0 ).cancel ();
2477+ }
2478+
2479+ assertEquals (1000 , counter .get ());
2480+ }
24472481}
You can’t perform that action at this time.
0 commit comments