Replies: 2 comments
-
|
Your observations are correct, concatenation requires subscriptions in order, while merging allows for some level of concurrency at the expense of ordering. The main problem with concurrent subscriptions that'd preserve ordering is correctness. You can imagine that you get a failure on a I anticipate more cons than pros in doing that. |
Beta Was this translation helpful? Give feedback.
-
|
I ended up writing a custom Multi operator for this purpose, which does the job for us. It has the side-effect of calling a few additional import static com.google.common.base.Verify.verify;
import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import jakarta.annotation.Nullable;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
class MultiFlatMapConcatOp<I, O> extends AbstractMultiOperator<I, O> {
private final Function<? super I, Uni<? extends O>> toUniFunction;
private final int prefetch;
public MultiFlatMapConcatOp(Multi<? extends I> upstream, Function<? super I, Uni<? extends O>> toUniFunction, int prefetch) {
super(upstream);
this.toUniFunction = toUniFunction;
this.prefetch = prefetch;
}
@Override
public void subscribe(MultiSubscriber<? super O> downstream) {
upstream().subscribe().withSubscriber(new FlatMapProcessor<>(downstream, toUniFunction, prefetch));
}
static class FlatMapProcessor<I, O> extends MultiOperatorProcessor<I, O> {
private final Function<? super I, Uni<? extends O>> toUniFunction;
private final int prefetch;
@GuardedBy("itself")
private final PriorityQueue<IndexedItem<? extends O>> queue;
/**
* Index of the next item to be emitted downstream, starting from 0.
*/
private final AtomicInteger downstreamItemIndex;
/**
* Index of the next item to be received from upstream, starting from 0.
*/
private final AtomicInteger upstreamItemIndex;
/**
* How many items downstream requested from us.
*/
private final AtomicLong downstreamRequests;
private final AtomicInteger wip;
@GuardedBy("this")
private State state;
@GuardedBy("this")
private @Nullable Throwable upstreamFailure;
public FlatMapProcessor(MultiSubscriber<? super O> downstream, Function<? super I, Uni<? extends O>> toUniFunction, int prefetch) {
super(downstream);
this.toUniFunction = toUniFunction;
this.prefetch = prefetch;
this.queue = new PriorityQueue<>();
this.downstreamItemIndex = new AtomicInteger();
this.upstreamItemIndex = new AtomicInteger();
this.downstreamRequests = new AtomicLong();
this.wip = new AtomicInteger();
this.state = State.PRODUCING;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
super.onSubscribe(subscription);
super.request(prefetch);
}
@Override
public void request(long numberOfItems) {
Subscriptions.add(downstreamRequests, numberOfItems);
super.request(numberOfItems);
processEvents();
}
@Override
public void onItem(I item) {
Objects.requireNonNull(item);
int index = upstreamItemIndex.getAndIncrement();
try {
toUniFunction.apply(item).subscribe().with(
uniResult -> onUniResultItem(index, uniResult),
this::onUniResultFailure);
} catch (Throwable t) {
onUniResultFailure(t);
}
}
@Override
public void onCompletion() {
synchronized (this) {
if (state == State.PRODUCING) {
state = State.UPSTREAM_COMPLETED;
}
}
processEvents();
}
@Override
public void onFailure(Throwable throwable) {
synchronized (this) {
if (state == State.PRODUCING || state == State.UPSTREAM_COMPLETED) {
state = State.UPSTREAM_FAILED;
upstreamFailure = throwable;
}
}
processEvents();
}
private void onUniResultItem(int index, O item) {
synchronized (queue) {
queue.add(new IndexedItem<>(index, item));
}
processEvents();
}
private void onUniResultFailure(Throwable throwable) {
cancel();
onFailure(throwable);
}
private void processEvents() {
// According to the Reactive Streams spec, Subscriber's methods (onItem, onComplete, onFailure) must be called
// in strict happens-before order. The use of `wip` below is a way to ensure that order without holding any locks.
if (wip.getAndIncrement() == 0) {
int todo = 1;
while (todo > 0) {
tryEmitDownstreamItemsInOrder();
tryEmitDownstreamTermination();
todo = wip.addAndGet(-todo);
}
}
}
private void tryEmitDownstreamItemsInOrder() {
O item;
while (downstreamRequests.get() > 0 && (item = getNextItemInOrder()) != null) {
Flow.Subscription subscription = getUpstreamSubscription();
if (subscription != CANCELLED) {
downstream.onItem(item);
}
downstreamItemIndex.incrementAndGet();
Subscriptions.produced(downstreamRequests, 1);
}
}
private @Nullable O getNextItemInOrder() {
synchronized (queue) {
var item = queue.peek();
if (item == null || item.index() != downstreamItemIndex.get()) {
return null;
}
var polledItem = queue.poll();
verify(Objects.equals(item, polledItem));
return item.item();
}
}
private void tryEmitDownstreamTermination() {
boolean shouldTerminate;
Throwable throwable;
synchronized (this) {
switch (state) {
case UPSTREAM_COMPLETED -> {
shouldTerminate = upstreamItemIndex.get() == downstreamItemIndex.get();
throwable = null;
}
case UPSTREAM_FAILED -> {
shouldTerminate = true;
throwable = upstreamFailure;
}
default -> {
shouldTerminate = false;
throwable = null;
}
}
if (shouldTerminate) {
state = State.TERMINATED;
}
}
if (shouldTerminate) {
if (throwable == null) {
super.onCompletion();
} else {
super.onFailure(throwable);
}
}
}
enum State {
PRODUCING,
UPSTREAM_COMPLETED,
UPSTREAM_FAILED,
TERMINATED
}
}
record IndexedItem<T>(
int index,
T item
) implements Comparable<IndexedItem<T>> {
@Override
public int compareTo(IndexedItem<T> that) {
return Integer.compare(this.index, that.index);
}
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Usecase: build a Multi from a sequence of elements, where each element must be retrieved separately by calling a function returning Uni. The order of the elements is important and must be preserved.
The most straightforward way to do this is:
where the function to retrieve individual element is:
The downside of the straightforward approach is that every Uni is invoked sequentially, only after the previous Uni has returned its element. This means that the latencies of all Unis add up, and retrieving the whole sequence takes seconds.
The other ways I tried:
Is there any other way in Mutiny to achieve concurrent invocation of Unis without losing the element order?
If not, is it something that can be added in future versions?
Beta Was this translation helpful? Give feedback.
All reactions