Skip to content

Commit bc9d594

Browse files
authored
2.x: Add materialize() and dematerialize() (#6278)
* 2.x: Add materialize() and dematerialize() * Add remaining test cases * Correct dematerialize javadoc * Use dematerialize selector fix some docs
1 parent 6478312 commit bc9d594

File tree

12 files changed

+685
-1
lines changed

12 files changed

+685
-1
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.internal.operators.completable.*;
2727
import io.reactivex.internal.operators.maybe.*;
2828
import io.reactivex.internal.operators.mixed.*;
29-
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
29+
import io.reactivex.internal.operators.single.*;
3030
import io.reactivex.internal.util.ExceptionHelper;
3131
import io.reactivex.observers.TestObserver;
3232
import io.reactivex.plugins.RxJavaPlugins;
@@ -1782,6 +1782,27 @@ public final Completable lift(final CompletableOperator onLift) {
17821782
return RxJavaPlugins.onAssembly(new CompletableLift(this, onLift));
17831783
}
17841784

1785+
/**
1786+
* Maps the signal types of this Completable into a {@link Notification} of the same kind
1787+
* and emits it as a single success value to downstream.
1788+
* <p>
1789+
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/materialize.png" alt="">
1790+
* <dl>
1791+
* <dt><b>Scheduler:</b></dt>
1792+
* <dd>{@code materialize} does not operate by default on a particular {@link Scheduler}.</dd>
1793+
* </dl>
1794+
* @param <T> the intended target element type of the notification
1795+
* @return the new Single instance
1796+
* @since 2.2.4 - experimental
1797+
* @see Single#dematerialize(Function)
1798+
*/
1799+
@Experimental
1800+
@CheckReturnValue
1801+
@SchedulerSupport(SchedulerSupport.NONE)
1802+
public final <T> Single<Notification<T>> materialize() {
1803+
return RxJavaPlugins.onAssembly(new CompletableMaterialize<T>(this));
1804+
}
1805+
17851806
/**
17861807
* Returns a Completable which subscribes to this and the other Completable and completes
17871808
* when both of them complete or one emits an error.

src/main/java/io/reactivex/Maybe.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3377,6 +3377,26 @@ public final <R> Maybe<R> map(Function<? super T, ? extends R> mapper) {
33773377
return RxJavaPlugins.onAssembly(new MaybeMap<T, R>(this, mapper));
33783378
}
33793379

3380+
/**
3381+
* Maps the signal types of this Maybe into a {@link Notification} of the same kind
3382+
* and emits it as a single success value to downstream.
3383+
* <p>
3384+
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/materialize.png" alt="">
3385+
* <dl>
3386+
* <dt><b>Scheduler:</b></dt>
3387+
* <dd>{@code materialize} does not operate by default on a particular {@link Scheduler}.</dd>
3388+
* </dl>
3389+
* @return the new Single instance
3390+
* @since 2.2.4 - experimental
3391+
* @see Single#dematerialize(Function)
3392+
*/
3393+
@Experimental
3394+
@CheckReturnValue
3395+
@SchedulerSupport(SchedulerSupport.NONE)
3396+
public final Single<Notification<T>> materialize() {
3397+
return RxJavaPlugins.onAssembly(new MaybeMaterialize<T>(this));
3398+
}
3399+
33803400
/**
33813401
* Flattens this and another Maybe into a single Flowable, without any transformation.
33823402
* <p>

src/main/java/io/reactivex/Single.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2302,6 +2302,43 @@ public final Single<T> delaySubscription(long time, TimeUnit unit, Scheduler sch
23022302
return delaySubscription(Observable.timer(time, unit, scheduler));
23032303
}
23042304

2305+
/**
2306+
* Maps the {@link Notification} success value of this Single back into normal
2307+
* {@code onSuccess}, {@code onError} or {@code onComplete} signals as a
2308+
* {@link Maybe} source.
2309+
* <p>
2310+
* The intended use of the {@code selector} function is to perform a
2311+
* type-safe identity mapping (see example) on a source that is already of type
2312+
* {@code Notification<T>}. The Java language doesn't allow
2313+
* limiting instance methods to a certain generic argument shape, therefore,
2314+
* a function is used to ensure the conversion remains type safe.
2315+
* <dl>
2316+
* <dt><b>Scheduler:</b></dt>
2317+
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
2318+
* </dl>
2319+
* <p>
2320+
* Example:
2321+
* <pre><code>
2322+
* Single.just(Notification.createOnNext(1))
2323+
* .dematerialize(notification -&gt; notification)
2324+
* .test()
2325+
* .assertResult(1);
2326+
* </code></pre>
2327+
* @param <R> the result type
2328+
* @param selector the function called with the success item and should
2329+
* return a {@link Notification} instance.
2330+
* @return the new Maybe instance
2331+
* @since 2.2.4 - experimental
2332+
* @see #materialize()
2333+
*/
2334+
@CheckReturnValue
2335+
@SchedulerSupport(SchedulerSupport.NONE)
2336+
@Experimental
2337+
public final <R> Maybe<R> dematerialize(Function<? super T, Notification<R>> selector) {
2338+
ObjectHelper.requireNonNull(selector, "selector is null");
2339+
return RxJavaPlugins.onAssembly(new SingleDematerialize<T, R>(this, selector));
2340+
}
2341+
23052342
/**
23062343
* Calls the specified consumer with the success item after this item has been emitted to the downstream.
23072344
* <p>
@@ -2871,6 +2908,26 @@ public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
28712908
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
28722909
}
28732910

2911+
/**
2912+
* Maps the signal types of this Single into a {@link Notification} of the same kind
2913+
* and emits it as a single success value to downstream.
2914+
* <p>
2915+
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/materialize.png" alt="">
2916+
* <dl>
2917+
* <dt><b>Scheduler:</b></dt>
2918+
* <dd>{@code materialize} does not operate by default on a particular {@link Scheduler}.</dd>
2919+
* </dl>
2920+
* @return the new Single instance
2921+
* @since 2.2.4 - experimental
2922+
* @see #dematerialize(Function)
2923+
*/
2924+
@Experimental
2925+
@CheckReturnValue
2926+
@SchedulerSupport(SchedulerSupport.NONE)
2927+
public final Single<Notification<T>> materialize() {
2928+
return RxJavaPlugins.onAssembly(new SingleMaterialize<T>(this));
2929+
}
2930+
28742931
/**
28752932
* Signals true if the current Single signals a success value that is Object-equals with the value
28762933
* provided.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.annotations.Experimental;
18+
import io.reactivex.internal.operators.mixed.MaterializeSingleObserver;
19+
20+
/**
21+
* Turn the signal types of a Completable source into a single Notification of
22+
* equal kind.
23+
*
24+
* @param <T> the element type of the source
25+
* @since 2.2.4 - experimental
26+
*/
27+
@Experimental
28+
public final class CompletableMaterialize<T> extends Single<Notification<T>> {
29+
30+
final Completable source;
31+
32+
public CompletableMaterialize(Completable source) {
33+
this.source = source;
34+
}
35+
36+
@Override
37+
protected void subscribeActual(SingleObserver<? super Notification<T>> observer) {
38+
source.subscribe(new MaterializeSingleObserver<T>(observer));
39+
}
40+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.annotations.Experimental;
18+
import io.reactivex.internal.operators.mixed.MaterializeSingleObserver;
19+
20+
/**
21+
* Turn the signal types of a Maybe source into a single Notification of
22+
* equal kind.
23+
*
24+
* @param <T> the element type of the source
25+
* @since 2.2.4 - experimental
26+
*/
27+
@Experimental
28+
public final class MaybeMaterialize<T> extends Single<Notification<T>> {
29+
30+
final Maybe<T> source;
31+
32+
public MaybeMaterialize(Maybe<T> source) {
33+
this.source = source;
34+
}
35+
36+
@Override
37+
protected void subscribeActual(SingleObserver<? super Notification<T>> observer) {
38+
source.subscribe(new MaterializeSingleObserver<T>(observer));
39+
}
40+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.mixed;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.annotations.Experimental;
18+
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.internal.disposables.DisposableHelper;
20+
21+
/**
22+
* A consumer that implements the consumer types of Maybe, Single and Completable
23+
* and turns their signals into Notifications for a SingleObserver.
24+
* @param <T> the element type of the source
25+
* @since 2.2.4 - experimental
26+
*/
27+
@Experimental
28+
public final class MaterializeSingleObserver<T>
29+
implements SingleObserver<T>, MaybeObserver<T>, CompletableObserver, Disposable {
30+
31+
final SingleObserver<? super Notification<T>> downstream;
32+
33+
Disposable upstream;
34+
35+
public MaterializeSingleObserver(SingleObserver<? super Notification<T>> downstream) {
36+
this.downstream = downstream;
37+
}
38+
39+
@Override
40+
public void onSubscribe(Disposable d) {
41+
if (DisposableHelper.validate(upstream, d)) {
42+
this.upstream = d;
43+
downstream.onSubscribe(this);
44+
}
45+
}
46+
47+
@Override
48+
public void onComplete() {
49+
downstream.onSuccess(Notification.<T>createOnComplete());
50+
}
51+
52+
@Override
53+
public void onSuccess(T t) {
54+
downstream.onSuccess(Notification.<T>createOnNext(t));
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
downstream.onSuccess(Notification.<T>createOnError(e));
60+
}
61+
62+
@Override
63+
public boolean isDisposed() {
64+
return upstream.isDisposed();
65+
}
66+
67+
@Override
68+
public void dispose() {
69+
upstream.dispose();
70+
}
71+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.annotations.Experimental;
18+
import io.reactivex.disposables.Disposable;
19+
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.functions.Function;
21+
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.functions.ObjectHelper;
23+
24+
/**
25+
* Maps the success value of the source to a Notification, then
26+
* maps it back to the corresponding signal type.
27+
* @param <T> the element type of the source
28+
* @param <R> the element type of the Notification and result
29+
* @since 2.2.4 - experimental
30+
*/
31+
@Experimental
32+
public final class SingleDematerialize<T, R> extends Maybe<R> {
33+
34+
final Single<T> source;
35+
36+
final Function<? super T, Notification<R>> selector;
37+
38+
public SingleDematerialize(Single<T> source, Function<? super T, Notification<R>> selector) {
39+
this.source = source;
40+
this.selector = selector;
41+
}
42+
43+
@Override
44+
protected void subscribeActual(MaybeObserver<? super R> observer) {
45+
source.subscribe(new DematerializeObserver<T, R>(observer, selector));
46+
}
47+
48+
static final class DematerializeObserver<T, R> implements SingleObserver<T>, Disposable {
49+
50+
final MaybeObserver<? super R> downstream;
51+
52+
final Function<? super T, Notification<R>> selector;
53+
54+
Disposable upstream;
55+
56+
DematerializeObserver(MaybeObserver<? super R> downstream,
57+
Function<? super T, Notification<R>> selector) {
58+
this.downstream = downstream;
59+
this.selector = selector;
60+
}
61+
62+
@Override
63+
public void dispose() {
64+
upstream.dispose();
65+
}
66+
67+
@Override
68+
public boolean isDisposed() {
69+
return upstream.isDisposed();
70+
}
71+
72+
@Override
73+
public void onSubscribe(Disposable d) {
74+
if (DisposableHelper.validate(upstream, d)) {
75+
upstream = d;
76+
downstream.onSubscribe(this);
77+
}
78+
}
79+
80+
@Override
81+
public void onSuccess(T t) {
82+
Notification<R> notification;
83+
84+
try {
85+
notification = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Notification");
86+
} catch (Throwable ex) {
87+
Exceptions.throwIfFatal(ex);
88+
downstream.onError(ex);
89+
return;
90+
}
91+
if (notification.isOnNext()) {
92+
downstream.onSuccess(notification.getValue());
93+
} else if (notification.isOnComplete()) {
94+
downstream.onComplete();
95+
} else {
96+
downstream.onError(notification.getError());
97+
}
98+
}
99+
100+
@Override
101+
public void onError(Throwable e) {
102+
downstream.onError(e);
103+
}
104+
}
105+
}

0 commit comments

Comments
 (0)