Skip to content

Commit f4ff067

Browse files
authored
BAEL-5969 - Parallel Flux vs Flux in Project Reactor (#18552)
* BAEL-5969 - Parallel Flux vs Flux in Project Reactor 1) Implementation * BAEL-5969 - Parallel Flux vs Flux in Project Reactor 1) Implementation * BAEL-5969 - Parallel Flux vs Flux in Project Reactor 1) Implementation * BAEL-5969 - Parallel Flux vs Flux in Project Reactor 1) Disable Fibonacci Tests marked them as manual. * Revert "BAEL-5969 - Parallel Flux vs Flux in Project Reactor" This reverts commit 1510fa9. * BAEL-5969 - Parallel Flux vs Flux in Project Reactor 1) Disable Fibonacci Tests marked them as manual. * BAEL-5969 - PR Comment fixes. * BAEL-5969 - article Comment fixes to add JMH for benchmarking.
1 parent 5a588a4 commit f4ff067

File tree

5 files changed

+156
-0
lines changed

5 files changed

+156
-0
lines changed

reactor-core-2/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,17 @@
3737
<version>${lombok.version}</version>
3838
<scope>test</scope>
3939
</dependency>
40+
<dependency>
41+
<groupId>org.openjdk.jmh</groupId>
42+
<artifactId>jmh-core</artifactId>
43+
<version>${jmh-core.version}</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.openjdk.jmh</groupId>
47+
<artifactId>jmh-generator-annprocess</artifactId>
48+
<version>${jmh-generator.version}</version>
49+
<scope>provided</scope>
50+
</dependency>
4051
</dependencies>
4152

4253
<properties>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.baeldung.reactor.flux.parallelflux;
2+
3+
public class Fibonacci {
4+
public static long fibonacci(int n) {
5+
if (n <= 1) return n;
6+
return fibonacci(n - 1) + fibonacci(n - 2);
7+
}
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.baeldung.reactor.flux.parallelflux;
2+
3+
import org.openjdk.jmh.annotations.*;
4+
import reactor.core.publisher.Flux;
5+
import reactor.core.publisher.ParallelFlux;
6+
import reactor.core.scheduler.Schedulers;
7+
8+
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
10+
11+
@BenchmarkMode(Mode.AverageTime)
12+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
13+
@State(Scope.Thread)
14+
public class FibonacciFluxParallelFluxBenchmark {
15+
16+
@Benchmark
17+
public List<Long> benchMarkParallelFluxSequential() {
18+
ParallelFlux<Long> parallelFluxFibonacci = Flux.just(43, 44, 45, 47, 48)
19+
.parallel(3)
20+
.runOn(Schedulers.parallel())
21+
.map(Fibonacci::fibonacci);
22+
23+
return parallelFluxFibonacci.sequential().collectList().block();
24+
}
25+
26+
@Benchmark
27+
public List<Long> benchMarkFluxSequential() {
28+
Flux<Long> fluxFibonacci = Flux.just(43, 44, 45, 47, 48)
29+
.map(Fibonacci::fibonacci);
30+
31+
return fluxFibonacci.collectList().block();
32+
}
33+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.baeldung.reactor.flux.parallelflux;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.junit.jupiter.api.Test;
5+
import org.openjdk.jmh.Main;
6+
import reactor.core.publisher.Flux;
7+
import reactor.core.scheduler.Schedulers;
8+
import reactor.test.StepVerifier;
9+
10+
import java.io.IOException;
11+
12+
@Slf4j
13+
public class FluxManualTest {
14+
15+
@Test
16+
public void givenFibonacciIndices_whenComputingWithFlux_thenRunBenchMarks() throws IOException {
17+
Main.main(new String[] {
18+
"com.baeldung.reactor.flux.parallelflux.FibonacciFluxParallelFluxBenchmark.benchMarkFluxSequential",
19+
"-i", "3",
20+
"-wi", "2",
21+
"-f", "1"
22+
});
23+
}
24+
25+
@Test
26+
public void givenFibonacciIndices_whenComputingWithFlux_thenCorrectResults() {
27+
Flux<Long> fluxFibonacci = Flux.just(43, 44, 45, 47, 48)
28+
.publishOn(Schedulers.boundedElastic())
29+
.map(Fibonacci::fibonacci);
30+
31+
StepVerifier.create(fluxFibonacci)
32+
.expectNext(433494437L, 701408733L, 1134903170L, 2971215073L, 4807526976L)
33+
.verifyComplete();
34+
35+
}
36+
37+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.baeldung.reactor.flux.parallelflux;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.junit.jupiter.api.RepeatedTest;
5+
import org.junit.jupiter.api.Test;
6+
import org.openjdk.jmh.Main;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.ParallelFlux;
9+
import reactor.core.scheduler.Schedulers;
10+
import reactor.test.StepVerifier;
11+
12+
import java.io.IOException;
13+
import java.util.HashSet;
14+
import java.util.List;
15+
import java.util.Set;
16+
import java.util.concurrent.CopyOnWriteArrayList;
17+
18+
@Slf4j
19+
public class ParallelFluxManualTest {
20+
21+
@Test
22+
public void givenFibonacciIndices_whenComputingWithParallelFlux_thenRunBenchMarks() throws IOException {
23+
Main.main(new String[] {
24+
"com.baeldung.reactor.flux.parallelflux.FibonacciFluxParallelFluxBenchmark.benchMarkParallelFluxSequential",
25+
"-i", "3",
26+
"-wi", "2",
27+
"-f", "1"
28+
});
29+
}
30+
31+
@Test
32+
public void givenFibonacciIndices_whenComputingWithParallelFlux_thenCorrectResults() {
33+
ParallelFlux<Long> parallelFluxFibonacci = Flux.just(43, 44, 45, 47, 48)
34+
.parallel(3)
35+
.runOn(Schedulers.parallel())
36+
.map(Fibonacci::fibonacci);
37+
38+
Flux<Long> sequencialParallelFlux = parallelFluxFibonacci.sequential();
39+
40+
Set<Long> expectedSet = new HashSet<>(Set.of(433494437L, 701408733L, 1134903170L, 2971215073L, 4807526976L));
41+
42+
StepVerifier.create(sequencialParallelFlux)
43+
.expectNextMatches(expectedSet::remove)
44+
.expectNextMatches(expectedSet::remove)
45+
.expectNextMatches(expectedSet::remove)
46+
.expectNextMatches(expectedSet::remove)
47+
.expectNextMatches(expectedSet::remove)
48+
.verifyComplete();
49+
50+
}
51+
52+
@RepeatedTest(5)
53+
public void givenListOfIds_whenComputingWithParallelFlux_thenOrderChanges() {
54+
ParallelFlux<String> parallelFlux = Flux.just("id1", "id2", "id3")
55+
.parallel(2)
56+
.runOn(Schedulers.parallel())
57+
.map(String::toUpperCase);
58+
59+
List<String> emitted = new CopyOnWriteArrayList<>();
60+
61+
StepVerifier.create(parallelFlux.sequential().doOnNext(emitted::add))
62+
.expectNextCount(3)
63+
.verifyComplete();
64+
65+
log.info("ParallelFlux emitted order: {}", emitted);
66+
}
67+
}

0 commit comments

Comments
 (0)