Skip to content

Commit 3d6cfed

Browse files
authored
Merge pull request #1845 from jponge/feat/multi-gather
feat: introduction of a Multi gathering operator
2 parents bf5a302 + fb056b0 commit 3d6cfed

File tree

9 files changed

+1229
-3
lines changed

9 files changed

+1229
-3
lines changed
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package io.smallrye.mutiny.groups;
2+
3+
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
4+
5+
import java.util.Optional;
6+
import java.util.function.BiFunction;
7+
import java.util.function.Function;
8+
import java.util.function.Supplier;
9+
10+
import io.smallrye.common.annotation.CheckReturnValue;
11+
import io.smallrye.common.annotation.Experimental;
12+
import io.smallrye.mutiny.Multi;
13+
14+
/**
15+
* A Gatherer operator transforms a stream of items by accumulating them into an accumulator and extracting
16+
* items from that accumulator when certain conditions are met.
17+
*
18+
* @param <I> the type of the items emitted by the upstream
19+
* @param <ACC> the type of the accumulator
20+
* @param <O> the type of the items emitted to the downstream
21+
*/
22+
@Experimental("This API is still being designed and may change in the future")
23+
public interface Gatherer<I, ACC, O> {
24+
25+
/**
26+
* Creates a new accumulator.
27+
*
28+
* @return a new accumulator
29+
*/
30+
ACC accumulator();
31+
32+
/**
33+
* Accumulates an item into the accumulator.
34+
*
35+
* @param accumulator the current accumulator
36+
* @param item the item to accumulate
37+
* @return the updated accumulator
38+
*/
39+
ACC accumulate(ACC accumulator, I item);
40+
41+
/**
42+
* Extracts an item from the accumulator.
43+
*
44+
* @param accumulator the current accumulator
45+
* @param upstreamCompleted whether the upstream has completed
46+
* @return an Optional containing a Extraction with the updated accumulator and the extracted item, or an empty Optional if
47+
* no
48+
* item can be extracted
49+
*/
50+
Optional<Extraction<ACC, O>> extract(ACC accumulator, boolean upstreamCompleted);
51+
52+
/**
53+
* Finalizes the accumulator and extracts the final item, if any.
54+
* This method is called when the upstream has completed and no more items can be extracted using the extract method.
55+
*
56+
* @param accumulator the current accumulator
57+
* @return an Optional containing the final item, or an empty Optional if no final item can be extracted
58+
*/
59+
Optional<O> finalize(ACC accumulator);
60+
61+
/**
62+
* An extraction result containing the next accumulator and the next item to emit.
63+
*
64+
* @param nextAccumulator the next accumulator
65+
* @param nextItem the next item to emit
66+
* @param <ACC> the type of the accumulator
67+
* @param <O> the type of the item to emit
68+
*/
69+
record Extraction<ACC, O>(ACC nextAccumulator, O nextItem) {
70+
71+
/**
72+
* Creates a new {@link Extraction} instance.
73+
*
74+
* @param nextAccumulator the next accumulator
75+
* @param nextItem the next item to emit
76+
* @return a new {@link Extraction} instance
77+
* @param <ACC> the type of the accumulator
78+
* @param <O> the type of the item to emit
79+
*/
80+
public static <ACC, O> Extraction<ACC, O> of(ACC nextAccumulator, O nextItem) {
81+
return new Extraction<>(nextAccumulator, nextItem);
82+
}
83+
}
84+
85+
/**
86+
* Builder for creating a {@link Gatherer}.
87+
*
88+
* @param <I> the type of the items emitted by the upstream
89+
*/
90+
class Builder<I> {
91+
92+
/**
93+
* Specifies the initial accumulator supplier.
94+
* <p>
95+
* The initial accumulator supplier is used to create a new accumulator.
96+
*
97+
* @param initialAccumulatorSupplier the supplier for the initial accumulator
98+
* @param <ACC> the type of the accumulator
99+
* @return the next step in the builder
100+
*/
101+
@CheckReturnValue
102+
public <ACC> InitialAccumulatorStep<I, ACC> into(Supplier<ACC> initialAccumulatorSupplier) {
103+
nonNull(initialAccumulatorSupplier, "initialAccumulatorSupplier");
104+
return new InitialAccumulatorStep<>(initialAccumulatorSupplier);
105+
}
106+
}
107+
108+
/**
109+
* The first step in the builder to gather items emitted by a {@link Multi} into an accumulator.
110+
*
111+
* @param <I> the type of the items emitted by the upstream {@link Multi}
112+
* @param <ACC> the type of the accumulator
113+
*/
114+
class InitialAccumulatorStep<I, ACC> {
115+
private final Supplier<ACC> initialAccumulatorSupplier;
116+
117+
private InitialAccumulatorStep(Supplier<ACC> initialAccumulatorSupplier) {
118+
this.initialAccumulatorSupplier = initialAccumulatorSupplier;
119+
}
120+
121+
/**
122+
* Specifies the accumulator function.
123+
* <p>
124+
* The accumulator function is used to accumulate the items emitted by the upstream.
125+
*
126+
* @param accumulator the accumulator function, which takes the current accumulator and the item emitted by the
127+
* upstream, and returns the new accumulator
128+
* @return the next step in the builder
129+
*/
130+
@CheckReturnValue
131+
public ExtractStep<I, ACC> accumulate(BiFunction<ACC, I, ACC> accumulator) {
132+
nonNull(accumulator, "accumulator");
133+
return new ExtractStep<>(initialAccumulatorSupplier, accumulator);
134+
}
135+
}
136+
137+
/**
138+
* The second step in the builder to gather items emitted by a {@link Multi} into an accumulator.
139+
*
140+
* @param <I> the type of the items emitted by the upstream {@link Multi}
141+
* @param <ACC> the type of the accumulator
142+
*/
143+
class ExtractStep<I, ACC> {
144+
private final Supplier<ACC> initialAccumulatorSupplier;
145+
private final BiFunction<ACC, I, ACC> accumulator;
146+
147+
private ExtractStep(Supplier<ACC> initialAccumulatorSupplier, BiFunction<ACC, I, ACC> accumulator) {
148+
this.initialAccumulatorSupplier = initialAccumulatorSupplier;
149+
this.accumulator = accumulator;
150+
}
151+
152+
/**
153+
* Specifies the extractor function.
154+
* <p>
155+
* The extractor function is used to extract the items from the accumulator.
156+
* When the extractor function returns an empty {@link Optional}, no value is emitted.
157+
* When the extractor function returns a non-empty {@link Optional}, the value is emitted, and the accumulator is
158+
* updated.
159+
* This is done by returning a {@link Extraction} containing the new accumulator and the value to emit.
160+
*
161+
* @param extractor the extractor function, which takes the current accumulator and returns an {@link Optional}
162+
* containing a {@link Extraction} with the new accumulator and the value to emit
163+
* @param <O> the type of the value to emit
164+
* @return the next step in the builder
165+
*/
166+
@CheckReturnValue
167+
public <O> FinalizerStep<I, ACC, O> extract(BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor) {
168+
nonNull(extractor, "extractor");
169+
return new FinalizerStep<>(initialAccumulatorSupplier, accumulator, extractor);
170+
}
171+
}
172+
173+
/**
174+
* The third step in the builder to gather items emitted by a {@link Multi} into an accumulator.
175+
*
176+
* @param <I> the type of the items emitted by the upstream
177+
* @param <ACC> the type of the accumulator
178+
* @param <O> the type of the items emitted to the downstream
179+
*/
180+
class FinalizerStep<I, ACC, O> {
181+
private final Supplier<ACC> initialAccumulatorSupplier;
182+
private final BiFunction<ACC, I, ACC> accumulator;
183+
private final BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor;
184+
185+
private FinalizerStep(Supplier<ACC> initialAccumulatorSupplier,
186+
BiFunction<ACC, I, ACC> accumulator,
187+
BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor) {
188+
this.initialAccumulatorSupplier = initialAccumulatorSupplier;
189+
this.accumulator = accumulator;
190+
this.extractor = extractor;
191+
}
192+
193+
/**
194+
* Specifies the finalizer function.
195+
* <p>
196+
* The finalizer function is used to emit the final value upon completion of the upstream and when there are no more
197+
* items that can be extracted from the accumulator.
198+
* When the finalizer function returns an empty {@link Optional}, no value is emitted before the completion signal.
199+
* When the finalizer function returns a non-empty {@link Optional}, the value is emitted before the completion signal.
200+
*
201+
* @param finalizer the finalizer function, which takes the current accumulator and returns an {@link Optional}
202+
* containing the value to emit before the completion signal, if any
203+
* @return the gathering {@link Multi}
204+
*/
205+
@CheckReturnValue
206+
public Gatherer<I, ACC, O> finalize(Function<ACC, Optional<O>> finalizer) {
207+
nonNull(finalizer, "finalizer");
208+
return Gatherers.of(initialAccumulatorSupplier, accumulator, extractor, finalizer);
209+
}
210+
}
211+
212+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package io.smallrye.mutiny.groups;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Optional;
6+
import java.util.function.BiFunction;
7+
import java.util.function.Function;
8+
import java.util.function.Supplier;
9+
import java.util.stream.Collectors;
10+
11+
import io.smallrye.common.annotation.Experimental;
12+
import io.smallrye.mutiny.groups.Gatherer.Extraction;
13+
14+
/**
15+
* Factory interface for creating {@link Gatherer} instances.
16+
* <p>
17+
* This interface provides various static methods to create different types of gatherers.
18+
*/
19+
@Experimental("This API is still being designed and may change in the future")
20+
public interface Gatherers {
21+
22+
/**
23+
* Creates a new {@link Gatherer} with the specified components.
24+
*
25+
* @param initialAccumulatorSupplier the supplier for the initial accumulator
26+
* @param accumulatorFunction the function to accumulate items into the accumulator
27+
* @param extractor the function to extract items from the accumulator
28+
* @param finalizer the function to extract the final item from the accumulator
29+
* @param <I> the type of the items emitted by the upstream
30+
* @param <ACC> the type of the accumulator
31+
* @param <O> the type of the items emitted to the downstream
32+
* @return a new {@link Gatherer}
33+
*/
34+
static <I, ACC, O> Gatherer<I, ACC, O> of(Supplier<ACC> initialAccumulatorSupplier,
35+
BiFunction<ACC, I, ACC> accumulatorFunction,
36+
BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor,
37+
Function<ACC, Optional<O>> finalizer) {
38+
return new DefaultGatherer<>(initialAccumulatorSupplier, accumulatorFunction, extractor, finalizer);
39+
}
40+
41+
/**
42+
* Creates a new {@link Gatherer} that performs a scan operation.
43+
* The scan operation applies a function to each item emitted by the upstream, using the result of the previous
44+
* application as the first argument to the function. The initial value is provided by the initialAccumulatorSupplier.
45+
* <p>
46+
* Each intermediate result is emitted downstream.
47+
*
48+
* @param initialAccumulatorSupplier the supplier for the initial accumulator
49+
* @param accumulatorFunction the function to accumulate items
50+
* @param <I> the type of the items emitted by the upstream and downstream
51+
* @return a new {@link Gatherer} that performs a scan operation
52+
*/
53+
static <I> Gatherer<I, I, I> scan(Supplier<I> initialAccumulatorSupplier, BiFunction<I, I, I> accumulatorFunction) {
54+
return of(initialAccumulatorSupplier, accumulatorFunction,
55+
(acc, done) -> done ? Optional.empty() : Optional.of(Extraction.of(acc, acc)), Optional::of);
56+
}
57+
58+
/**
59+
* Creates a new {@link Gatherer} that performs a fold operation.
60+
* The fold operation applies a function to each item emitted by the upstream, using the result of the previous
61+
* application as the first argument to the function. The initial value is provided by the initialAccumulatorSupplier.
62+
* <p>
63+
* Only emits the final result when the upstream completes.
64+
*
65+
* @param initialAccumulatorSupplier the supplier for the initial accumulator
66+
* @param accumulatorFunction the function to accumulate items
67+
* @param <I> the type of the items emitted by the upstream and downstream
68+
* @return a new {@link Gatherer} that performs a fold operation
69+
*/
70+
static <I> Gatherer<I, I, I> fold(Supplier<I> initialAccumulatorSupplier, BiFunction<I, I, I> accumulatorFunction) {
71+
return of(initialAccumulatorSupplier, accumulatorFunction, (acc, done) -> Optional.empty(), Optional::of);
72+
}
73+
74+
/**
75+
* Creates a new {@link Gatherer} that performs a windowing operation.
76+
* The windowing operation collects items emitted by the upstream into non-overlapping windows of the specified size.
77+
* When a window is full, it is emitted downstream and a new window is started.
78+
* If the upstream completes before a window is full, the current window is emitted if it is not empty.
79+
*
80+
* @param size the size of the window
81+
* @param <I> the type of the items emitted by the upstream
82+
* @return a new {@link Gatherer} that performs a windowing operation
83+
*/
84+
static <I> Gatherer<I, List<I>, List<I>> window(int size) {
85+
return of(ArrayList::new, (acc, next) -> {
86+
acc.add(next);
87+
return acc;
88+
}, (acc, completed) -> {
89+
if (acc.size() == size) {
90+
return Optional.of(Extraction.of(new ArrayList<>(), new ArrayList<>(acc)));
91+
}
92+
return Optional.empty();
93+
}, acc -> acc.isEmpty()
94+
? Optional.empty()
95+
: Optional.of(acc));
96+
}
97+
98+
/**
99+
* Creates a new {@link Gatherer} that performs a sliding window operation.
100+
* The sliding window operation collects items emitted by the upstream into overlapping windows of the specified size.
101+
* When a window is full, it is emitted downstream and a new window is started with all but the first item from the previous
102+
* window.
103+
* If the upstream completes before a window is full, the current window is emitted if it is not empty.
104+
*
105+
* @param size the size of the window
106+
* @param <I> the type of the items emitted by the upstream
107+
* @return a new {@link Gatherer} that performs a sliding window operation
108+
*/
109+
static <I> Gatherer<I, List<I>, List<I>> slidingWindow(int size) {
110+
return of(ArrayList::new, (acc, item) -> {
111+
acc.add(item);
112+
return acc;
113+
}, (acc, completed) -> {
114+
if (acc.size() == size) {
115+
return Optional.of(Extraction.of(acc.stream().skip(1).collect(Collectors.toList()), new ArrayList<>(acc)));
116+
}
117+
return Optional.empty();
118+
}, acc -> acc.isEmpty()
119+
? Optional.empty()
120+
: Optional.of(acc));
121+
}
122+
123+
/**
124+
* Default implementation of the {@link Gatherer} interface.
125+
*
126+
* @param <I> the type of the items emitted by the upstream
127+
* @param <ACC> the type of the accumulator
128+
* @param <O> the type of the items emitted to the downstream
129+
*/
130+
class DefaultGatherer<I, ACC, O> implements Gatherer<I, ACC, O> {
131+
132+
private final Supplier<ACC> initialAccumulatorSupplier;
133+
private final BiFunction<ACC, I, ACC> accumulatorFunction;
134+
private final BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor;
135+
private final Function<ACC, Optional<O>> finalizer;
136+
137+
public DefaultGatherer(Supplier<ACC> initialAccumulatorSupplier,
138+
BiFunction<ACC, I, ACC> accumulatorFunction,
139+
BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor,
140+
Function<ACC, Optional<O>> finalizer) {
141+
this.initialAccumulatorSupplier = initialAccumulatorSupplier;
142+
this.accumulatorFunction = accumulatorFunction;
143+
this.extractor = extractor;
144+
this.finalizer = finalizer;
145+
}
146+
147+
@Override
148+
public ACC accumulator() {
149+
return initialAccumulatorSupplier.get();
150+
}
151+
152+
@Override
153+
public ACC accumulate(ACC accumulator, I item) {
154+
return accumulatorFunction.apply(accumulator, item);
155+
}
156+
157+
@Override
158+
public Optional<Extraction<ACC, O>> extract(ACC accumulator, boolean upstreamCompleted) {
159+
return extractor.apply(accumulator, upstreamCompleted);
160+
}
161+
162+
@Override
163+
public Optional<O> finalize(ACC accumulator) {
164+
return finalizer.apply(accumulator);
165+
}
166+
}
167+
168+
/**
169+
* Creates a new {@link Gatherer} builder.
170+
*
171+
* @param <I> the type of the items emitted by the upstream
172+
* @return the builder
173+
*/
174+
static <I> Gatherer.Builder<I> builder() {
175+
return new Gatherer.Builder<>();
176+
}
177+
}

0 commit comments

Comments
 (0)