Skip to content

Commit 23538fa

Browse files
Ryland Degnanrobertroeser
authored andcommitted
Added SwitchTransform (#435)
1 parent c8a5ed7 commit 23538fa

File tree

1 file changed

+81
-0
lines changed

1 file changed

+81
-0
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.rsocket.internal;
2+
3+
import org.reactivestreams.Publisher;
4+
import org.reactivestreams.Subscription;
5+
import reactor.core.CoreSubscriber;
6+
import reactor.core.publisher.DirectProcessor;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.Operators;
9+
10+
import java.util.Objects;
11+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
12+
import java.util.function.BiFunction;
13+
14+
public final class SwitchTransform<T, R> extends Flux<R> {
15+
16+
final Publisher<? extends T> source;
17+
final BiFunction<T, Flux<? extends T>, Publisher<? extends R>> transformer;
18+
19+
public SwitchTransform(Publisher<? extends T> source, BiFunction<T, Flux<? extends T>, Publisher<? extends R>> transformer) {
20+
this.source = Objects.requireNonNull(source, "source");
21+
this.transformer = Objects.requireNonNull(transformer, "transformer");
22+
}
23+
24+
@Override
25+
public void subscribe(CoreSubscriber<? super R> actual) {
26+
source.subscribe(new SwitchTransformSubscriber<>(actual, transformer));
27+
}
28+
29+
static final class SwitchTransformSubscriber<T, R> implements CoreSubscriber<T> {
30+
final CoreSubscriber<? super R> actual;
31+
final BiFunction<T, Flux<? extends T>, Publisher<? extends R>> transformer;
32+
final DirectProcessor<T> processor = DirectProcessor.create();
33+
34+
Subscription s;
35+
36+
volatile int once;
37+
@SuppressWarnings("rawtypes")
38+
static final AtomicIntegerFieldUpdater<SwitchTransformSubscriber> ONCE =
39+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformSubscriber.class, "once");
40+
41+
SwitchTransformSubscriber(CoreSubscriber<? super R> actual, BiFunction<T, Flux<? extends T>, Publisher<? extends R>> transformer) {
42+
this.actual = actual;
43+
this.transformer = transformer;
44+
}
45+
46+
@Override
47+
public void onSubscribe(Subscription s) {
48+
if (Operators.validate(this.s, s)) {
49+
this.s = s;
50+
51+
processor.onSubscribe(s);
52+
}
53+
}
54+
55+
@Override
56+
public void onNext(T t) {
57+
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
58+
try {
59+
Publisher<? extends R> result = Objects.requireNonNull(transformer.apply(t, processor),
60+
"The transformer returned a null value");
61+
result.subscribe(actual);
62+
}
63+
catch (Throwable e) {
64+
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
65+
return;
66+
}
67+
}
68+
processor.onNext(t);
69+
}
70+
71+
@Override
72+
public void onError(Throwable t) {
73+
processor.onError(t);
74+
}
75+
76+
@Override
77+
public void onComplete() {
78+
processor.onComplete();
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)