Skip to content

Commit baef9fb

Browse files
committed
fix: removing a (maybe unnecessary) upstream request on group emission
each emitted item already requests upstream items
1 parent 5e25bf4 commit baef9fb

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ private void drain() {
234234
if (requests != Long.MAX_VALUE) {
235235
requested.addAndGet(-emitted);
236236
}
237-
super.request(emitted);
238237
}
239238

240239
missed = wip.addAndGet(-missed);

implementation/src/test/java/io/smallrye/mutiny/operators/MultiGroupTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,29 +1043,31 @@ public void rejectGroupByBadPrefetch() {
10431043

10441044
@Test
10451045
void testUpstreamRequestsNotBlownOutOfProportion() {
1046-
ExecutorService executor = Executors.newFixedThreadPool(10);
1046+
ExecutorService executor = Executors.newFixedThreadPool(100);
1047+
int maxConcurrency = 100;
1048+
int prefetch = 25;
10471049
try {
10481050
AtomicLong requestCounter = new AtomicLong(0);
10491051
AtomicLong itemCounter = new AtomicLong(0);
10501052
AtomicReference<MultiEmitter<? super Integer>> e = new AtomicReference<>();
10511053

10521054
Multi.createFrom().<Integer> emitter(e::set)
10531055
.onRequest().invoke(requestCounter::addAndGet)
1054-
.group().by(i -> i / 10)
1056+
.group().by(i -> i / 10, prefetch)
10551057
.onItem().transformToMulti(g -> g.map(i -> g.key() + " : " + i)
10561058
.emitOn(executor)
10571059
.invoke(s -> {
10581060
try {
1059-
Thread.sleep(100);
1061+
Thread.sleep(10);
10601062
itemCounter.incrementAndGet();
10611063
} catch (InterruptedException ex) {
10621064
throw new RuntimeException(ex);
10631065
}
10641066
}))
1065-
.merge()
1067+
.merge(maxConcurrency)
10661068
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
10671069

1068-
int itemCount = 100;
1070+
int itemCount = 1000;
10691071
MultiEmitter<? super Integer> emitter = e.get();
10701072
new Thread(() -> {
10711073
int i = 0;
@@ -1078,7 +1080,7 @@ void testUpstreamRequestsNotBlownOutOfProportion() {
10781080
}).start();
10791081

10801082
await().untilAsserted(() -> assertThat(itemCounter).hasValueGreaterThanOrEqualTo(itemCount));
1081-
assertThat(requestCounter.get()).isLessThan(10000L); // this should not blow up
1083+
assertThat(requestCounter.get()).isEqualTo(itemCounter.get() + prefetch);
10821084
} finally {
10831085
executor.shutdownNow();
10841086
}

0 commit comments

Comments
 (0)