Skip to content

Commit 0def1af

Browse files
authored
Merge pull request #2003 from ozangunalp/group_by_over_request
Reproducer and fix for the groupBy operator upstream requests
2 parents c1b7b0e + 556bc46 commit 0def1af

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,6 @@ void drain() {
417417
requested.addAndGet(-e);
418418
}
419419
parent.getUpstreamSubscription().request(e);
420-
} else {
421-
parent.getUpstreamSubscription().request(parent.prefetch);
422420
}
423421
}
424422

implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414
import java.util.Collections;
1515
import java.util.List;
1616
import java.util.NoSuchElementException;
17+
import java.util.concurrent.ExecutorService;
18+
import java.util.concurrent.Executors;
1719
import java.util.concurrent.Flow.Subscriber;
1820
import java.util.concurrent.Flow.Subscription;
1921
import java.util.concurrent.atomic.AtomicBoolean;
2022
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.AtomicLong;
2124
import java.util.concurrent.atomic.AtomicReference;
2225
import java.util.function.Consumer;
2326

@@ -1037,4 +1040,47 @@ public void rejectGroupByBadPrefetch() {
10371040
assertThrows(IllegalArgumentException.class,
10381041
() -> Multi.createFrom().range(1, 10).group().by(i -> i % 2, 0L));
10391042
}
1043+
1044+
@Test
1045+
void testUpstreamRequestsNotBlownOutOfProportion() {
1046+
ExecutorService executor = Executors.newFixedThreadPool(10);
1047+
try {
1048+
AtomicLong requestCounter = new AtomicLong(0);
1049+
AtomicLong itemCounter = new AtomicLong(0);
1050+
AtomicReference<MultiEmitter<? super Integer>> e = new AtomicReference<>();
1051+
1052+
Multi.createFrom().<Integer>emitter(e::set)
1053+
.onRequest().invoke(requestCounter::addAndGet)
1054+
.group().by(i -> i / 10)
1055+
.onItem().transformToMulti(g -> g.map(i -> g.key() + " : " + i)
1056+
.emitOn(executor)
1057+
.invoke(s -> {
1058+
try {
1059+
Thread.sleep(100);
1060+
itemCounter.incrementAndGet();
1061+
} catch (InterruptedException ex) {
1062+
throw new RuntimeException(ex);
1063+
}
1064+
}))
1065+
.merge()
1066+
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
1067+
1068+
int itemCount = 100;
1069+
MultiEmitter<? super Integer> emitter = e.get();
1070+
new Thread(() -> {
1071+
int i = 0;
1072+
while (true) {
1073+
if (emitter.requested() > 0) {
1074+
emitter.emit(i);
1075+
i++;
1076+
}
1077+
}
1078+
}).start();
1079+
1080+
await().untilAsserted(() -> assertThat(itemCounter).hasValueGreaterThanOrEqualTo(itemCount));
1081+
assertThat(requestCounter.get()).isLessThan(10000L); // this should not blow up
1082+
} finally {
1083+
executor.shutdownNow();
1084+
}
1085+
}
10401086
}

0 commit comments

Comments
 (0)