Skip to content

Commit f21488b

Browse files
BAEL-9210: Stream Gatherers (#18513)
1 parent 67f015d commit f21488b

File tree

10 files changed

+240
-3
lines changed

10 files changed

+240
-3
lines changed

core-java-modules/core-java-streams-7/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
</build>
3636

3737
<properties>
38-
<maven.compiler.source>12</maven.compiler.source>
39-
<maven.compiler.target>12</maven.compiler.target>
38+
<maven.compiler.source>24</maven.compiler.source>
39+
<maven.compiler.target>24</maven.compiler.target>
4040
<vavr.version>0.10.2</vavr.version>
4141
<assertj-core.version>3.23.1</assertj-core.version>
4242
<commons-lang3.version>3.12.0</commons-lang3.version>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.baeldung.streams.gatherer;
2+
3+
import java.util.ArrayList;
4+
import java.util.function.BiConsumer;
5+
import java.util.function.Supplier;
6+
import java.util.stream.Gatherer;
7+
8+
public class NumericSumGatherer implements Gatherer<Integer, ArrayList<Integer>, Integer> {
9+
10+
@Override
11+
public Supplier<ArrayList<Integer>> initializer() {
12+
return ArrayList::new;
13+
}
14+
15+
@Override
16+
public Integrator<ArrayList<Integer>, Integer, Integer> integrator() {
17+
return new Integrator<>() {
18+
@Override
19+
public boolean integrate(ArrayList<Integer> state, Integer element, Downstream<? super Integer> downstream) {
20+
if (state.isEmpty()) {
21+
state.add(element);
22+
} else {
23+
state.addFirst(state.getFirst() + element);
24+
}
25+
return true;
26+
}
27+
};
28+
}
29+
30+
@Override
31+
public BiConsumer<ArrayList<Integer>, Downstream<? super Integer>> finisher() {
32+
return (state, downstream) -> {
33+
if (!downstream.isRejecting() && !state.isEmpty()) {
34+
downstream.push(state.getFirst());
35+
state.clear();
36+
}
37+
};
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.baeldung.streams.gatherer;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
import java.util.function.BinaryOperator;
7+
import java.util.function.Supplier;
8+
import java.util.stream.Gatherer;
9+
10+
public class SentenceSplitterGatherer implements Gatherer<String, List<String>,String> {
11+
12+
@Override
13+
public Supplier<List<String>> initializer() {
14+
return ArrayList::new;
15+
}
16+
17+
@Override
18+
public BinaryOperator<List<String>> combiner() {
19+
return (left, right) -> {
20+
left.addAll(right);
21+
return left;
22+
};
23+
}
24+
25+
@Override
26+
public Integrator<List<String>, String, String> integrator() {
27+
return (state, element, downstream) -> {
28+
var words = element.split("\\s+");
29+
for (var word : words) {
30+
state.add(word);
31+
downstream.push(word);
32+
}
33+
return true;
34+
};
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.baeldung.streams.gatherer;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.function.BiConsumer;
6+
import java.util.function.Supplier;
7+
import java.util.stream.Gatherer;
8+
9+
public class SlidingWindowGatherer implements Gatherer<Integer, ArrayList<Integer>, List<Integer>> {
10+
11+
@Override
12+
public Supplier<ArrayList<Integer>> initializer() {
13+
return ArrayList::new;
14+
}
15+
16+
@Override
17+
public Integrator<ArrayList<Integer>, Integer, List<Integer>> integrator() {
18+
return new Integrator<>() {
19+
@Override
20+
public boolean integrate(ArrayList<Integer> state, Integer element, Downstream<? super List<Integer>> downstream) {
21+
state.add(element);
22+
if (state.size() == 3) {
23+
downstream.push(new ArrayList<>(state));
24+
state.removeFirst();
25+
}
26+
return true;
27+
}
28+
};
29+
}
30+
31+
@Override
32+
public BiConsumer<ArrayList<Integer>, Downstream<? super List<Integer>>> finisher() {
33+
return (state, downstream) -> {
34+
if (state.size()==3) {
35+
downstream.push(new ArrayList<>(state));
36+
}
37+
};
38+
39+
}
40+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.baeldung.streams.gatherer;
2+
3+
import java.util.List;
4+
import java.util.stream.Gatherer;
5+
import java.util.stream.Gatherers;
6+
import java.util.stream.Stream;
7+
8+
import org.junit.jupiter.api.Assertions;
9+
import org.junit.jupiter.api.Test;
10+
11+
class GathererUnitTest {
12+
13+
@Test
14+
void givenNumbers_whenFolded_thenSumIsEmitted() {
15+
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
16+
Stream<Integer> folded = numbers.gather(Gatherers.fold(() -> 0, Integer::sum));
17+
List<Integer> resultList = folded.toList();
18+
Assertions.assertEquals(1, resultList.size());
19+
Assertions.assertEquals(Integer.valueOf(15), resultList.getFirst());
20+
}
21+
22+
@Test
23+
void givenWords_whenMappedConcurrently_thenUppercasedWordsAreEmitted() {
24+
Stream<String> words = Stream.of("a", "b", "c", "d");
25+
List<String> resultList = words.gather(Gatherers.mapConcurrent(2, String::toUpperCase))
26+
.toList();
27+
Assertions.assertEquals(4, resultList.size());
28+
Assertions.assertEquals(List.of("A", "B", "C", "D"), resultList);
29+
}
30+
31+
@Test
32+
void givenNumbers_whenScanned_thenRunningTotalsAreEmitted() {
33+
Stream<Integer> numbers = Stream.of(1, 2, 3, 4);
34+
List<Integer> resultList = numbers.gather(Gatherers.scan(() -> 0, Integer::sum))
35+
.toList();
36+
Assertions.assertEquals(4, resultList.size());
37+
Assertions.assertEquals(List.of(1, 3, 6, 10), resultList);
38+
}
39+
40+
@Test
41+
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
42+
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
43+
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
44+
List<List<Integer>> resultList = numbers.gather(Gatherers.windowSliding(3))
45+
.toList();
46+
Assertions.assertEquals(3, resultList.size());
47+
Assertions.assertEquals(expectedOutput, resultList);
48+
}
49+
50+
@Test
51+
void givenStrings_whenUsingCustomGatherer_thenLengthsAreCalculated() {
52+
List<Integer> expectedOutput = List.of(5, 6, 3);
53+
Stream<String> inputStrings = Stream.of("apple", "banana", "cat");
54+
List<Object> resultList = inputStrings.gather(Gatherer.of((state, element, downstream) -> {
55+
downstream.push(element.length());
56+
return true;
57+
}))
58+
.toList();
59+
Assertions.assertEquals(3, resultList.size());
60+
Assertions.assertEquals(expectedOutput, resultList);
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.baeldung.streams.gatherer;
2+
3+
import java.util.List;
4+
import java.util.stream.Stream;
5+
6+
import org.junit.jupiter.api.Assertions;
7+
import org.junit.jupiter.api.Test;
8+
9+
class NumericSumGathererUnitTest {
10+
11+
@Test
12+
void givenNumbers_whenUsingCustomManyToOneGatherer_thenSumIsCalculated() {
13+
Stream<Integer> inputValues = Stream.of(1, 2, 3, 4, 5, 6);
14+
List<Integer> result = inputValues.gather(new NumericSumGatherer())
15+
.toList();
16+
Assertions.assertEquals(Integer.valueOf(21), result.getFirst());
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.baeldung.streams.gatherer;
2+
3+
import java.util.List;
4+
import java.util.stream.Stream;
5+
6+
import org.junit.jupiter.api.Assertions;
7+
import org.junit.jupiter.api.Test;
8+
9+
class SentenceSplitterGathererUnitTest {
10+
11+
@Test
12+
void givenSentences_whenUsingCustomOneToManyGatherer_thenWordsAreExtracted() {
13+
List<String> expectedOutput = List.of("hello", "world", "java", "streams");
14+
Stream<String> sentences = Stream.of("hello world", "java streams");
15+
List<String> words = sentences.gather(new SentenceSplitterGatherer())
16+
.toList();
17+
Assertions.assertEquals(expectedOutput, words);
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.baeldung.streams.gatherer;
2+
3+
import java.util.List;
4+
import java.util.stream.Stream;
5+
6+
import org.junit.jupiter.api.Assertions;
7+
import org.junit.jupiter.api.Test;
8+
9+
class SlidingWindowGathererUnitTest {
10+
11+
@Test
12+
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
13+
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
14+
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
15+
List<List<Integer>> resultList = numbers.gather(new SlidingWindowGatherer())
16+
.toList();
17+
Assertions.assertEquals(3, resultList.size());
18+
Assertions.assertEquals(expectedOutput, resultList);
19+
}
20+
21+
}

core-java-modules/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
<module>core-java-streams-4</module>
6161
<module>core-java-streams-5</module>
6262
<module>core-java-streams-6</module>
63-
<module>core-java-streams-7</module>
63+
<!-- <module>core-java-streams-7</module> requires JDK 24-->
6464
<module>core-java-streams-collect</module>
6565
<module>core-java-streams-maps</module>
6666
<module>core-java-string-operations-3</module>

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,7 @@
951951
</build>
952952

953953
<modules>
954+
<module>core-java-modules/core-java-streams-7</module>
954955
<module>core-java-modules/core-java-24</module>
955956
</modules>
956957

@@ -1367,6 +1368,7 @@
13671368
</build>
13681369

13691370
<modules>
1371+
<module>core-java-modules/core-java-streams-7</module>
13701372
<module>core-java-modules/core-java-24</module>
13711373
</modules>
13721374

0 commit comments

Comments
 (0)