Skip to content

Commit 5532412

Browse files
committed
A first set of stateless operators that don't need to be instantiated
for lift all the time.
1 parent 1a94d55 commit 5532412

File tree

7 files changed

+94
-56
lines changed

7 files changed

+94
-56
lines changed

src/main/java/rx/Observable.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ public static final <T, R> Observable<R> combineLatest(List<? extends Observable
758758
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#concat">RxJava wiki: concat</a>
759759
*/
760760
public final static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
761-
return observables.lift(new OperatorConcat<T>());
761+
return observables.lift(OperatorConcat.<T>instance());
762762
}
763763

764764
/**
@@ -1619,7 +1619,7 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
16191619
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Combining-Observables#merge">RxJava wiki: merge</a>
16201620
*/
16211621
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
1622-
return source.lift(new OperatorMerge<T>());
1622+
return source.lift(OperatorMerge.<T>instance(false));
16231623
}
16241624

16251625
/**
@@ -1944,7 +1944,7 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
19441944
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Combining-Observables#mergedelayerror">RxJava wiki: mergeDelayError</a>
19451945
*/
19461946
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
1947-
return source.lift(new OperatorMergeDelayError<T>());
1947+
return source.lift(OperatorMerge.<T>instance(true));
19481948
}
19491949

19501950
/**
@@ -3002,7 +3002,7 @@ public final Observable<T> ambWith(Observable<? extends T> t1) {
30023002
* @return an Observable that hides the identity of this Observable
30033003
*/
30043004
public final Observable<T> asObservable() {
3005-
return lift(new OperatorAsObservable<T>());
3005+
return lift(OperatorAsObservable.<T>instance());
30063006
}
30073007

30083008
/**
@@ -3923,9 +3923,9 @@ public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>>
39233923
* if the source Observable is not of type {@code Observable<Notification<T>>}
39243924
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#dematerialize">RxJava wiki: dematerialize</a>
39253925
*/
3926-
@SuppressWarnings({"unchecked", "rawtypes"})
3926+
@SuppressWarnings({"unchecked"})
39273927
public final <T2> Observable<T2> dematerialize() {
3928-
return lift(new OperatorDematerialize());
3928+
return lift(OperatorDematerialize.instance());
39293929
}
39303930

39313931
/**
@@ -4968,7 +4968,7 @@ private final <R> Observable<R> mapNotification(Func1<? super T, ? extends R> on
49684968
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#materialize">RxJava wiki: materialize</a>
49694969
*/
49704970
public final Observable<Notification<T>> materialize() {
4971-
return lift(new OperatorMaterialize<T>());
4971+
return lift(OperatorMaterialize.<T>instance());
49724972
}
49734973

49744974
/**

src/main/java/rx/internal/operators/OperatorAsObservable.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,18 @@
2525
* the return value type of the wrapped observable.
2626
*/
2727
public final class OperatorAsObservable<T> implements Operator<T, T> {
28-
28+
/** Lazy initialization via inner-class holder. */
29+
private static final class Holder {
30+
/** A singleton instance. */
31+
static final OperatorAsObservable<Object> INSTANCE = new OperatorAsObservable<Object>();
32+
}
33+
/**
34+
* @return an singleton instance of this stateless operator.
35+
*/
36+
@SuppressWarnings("unchecked")
37+
public static <T> OperatorAsObservable<T> instance() {
38+
return (OperatorAsObservable<T>)Holder.INSTANCE;
39+
}
2940
@Override
3041
public Subscriber<? super T> call(Subscriber<? super T> s) {
3142
return s;

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,19 @@
3737
* the source and result value type
3838
*/
3939
public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> {
40+
/** Lazy initialization via inner-class holder. */
41+
private static final class Holder {
42+
/** A singleton instance. */
43+
static final OperatorConcat<Object> INSTANCE = new OperatorConcat<Object>();
44+
}
45+
/**
46+
* @return an singleton instance of this stateless operator.
47+
*/
48+
@SuppressWarnings("unchecked")
49+
public static <T> OperatorConcat<T> instance() {
50+
return (OperatorConcat<T>)Holder.INSTANCE;
51+
}
52+
4053
@Override
4154
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
4255
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);

src/main/java/rx/internal/operators/OperatorDematerialize.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,18 @@
3030
* @param <T> the wrapped value type
3131
*/
3232
public final class OperatorDematerialize<T> implements Operator<T, Notification<T>> {
33-
33+
/** Lazy initialization via inner-class holder. */
34+
private static final class Holder {
35+
/** A singleton instance. */
36+
static final OperatorDematerialize<Object> INSTANCE = new OperatorDematerialize<Object>();
37+
}
38+
/**
39+
* @return an singleton instance of this stateless operator.
40+
*/
41+
@SuppressWarnings({ "rawtypes" })
42+
public static OperatorDematerialize instance() {
43+
return Holder.INSTANCE; // using raw types because the type inference is not good enough
44+
}
3445
@Override
3546
public Subscriber<? super Notification<T>> call(final Subscriber<? super T> child) {
3647
return new Subscriber<Notification<T>>(child) {

src/main/java/rx/internal/operators/OperatorMaterialize.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,18 @@
2929
* See <a href="http://msdn.microsoft.com/en-us/library/hh229453.aspx">here</a> for the Microsoft Rx equivalent.
3030
*/
3131
public final class OperatorMaterialize<T> implements Operator<Notification<T>, T> {
32-
32+
/** Lazy initialization via inner-class holder. */
33+
private static final class Holder {
34+
/** A singleton instance. */
35+
static final OperatorMaterialize<Object> INSTANCE = new OperatorMaterialize<Object>();
36+
}
37+
/**
38+
* @return an singleton instance of this stateless operator.
39+
*/
40+
@SuppressWarnings("unchecked")
41+
public static <T> OperatorMaterialize<T> instance() {
42+
return (OperatorMaterialize<T>)Holder.INSTANCE;
43+
}
3344
@Override
3445
public Subscriber<? super T> call(final Subscriber<? super Notification<T>> child) {
3546
return new Subscriber<T>(child) {

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,46 @@
3838
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
3939
* <p>
4040
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
41-
*
41+
* <p>
42+
* The {@code instance(true)} call behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
43+
* an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error
44+
* notification until all of the merged Observables have finished emitting items.
45+
* <p>
46+
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/mergeDelayError.png" alt="">
47+
* <p>
48+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will
49+
* only invoke the {@code onError} method of its Observers once.
50+
* <p>
51+
* This operation allows an Observer to receive all successfully emitted items from all of the
52+
* source Observables without being interrupted by an error notification from one of them.
53+
* <p>
54+
* <em>Note:</em> If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors.
55+
4256
* @param <T>
4357
* the type of the items emitted by both the source and merged {@code Observable}s
4458
*/
4559
public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
46-
60+
/** Lazy initialization via inner-class holder. */
61+
private static final class HolderNoDelay {
62+
/** A singleton instance. */
63+
static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false);
64+
}
65+
/** Lazy initialization via inner-class holder. */
66+
private static final class HolderDelayErrors {
67+
/** A singleton instance. */
68+
static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(true);
69+
}
70+
/**
71+
* @param delayErrors should the merge delay errors?
72+
* @return an singleton instance of this stateless operator.
73+
*/
74+
@SuppressWarnings("unchecked")
75+
public static <T> OperatorMerge<T> instance(boolean delayErrors) {
76+
if (delayErrors) {
77+
return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
78+
}
79+
return (OperatorMerge<T>)HolderNoDelay.INSTANCE;
80+
}
4781
/*
4882
* benjchristensen => This class is complex and I'm not a fan of it despite writing it. I want to give some background
4983
* as to why for anyone who wants to try and help improve it.
@@ -74,11 +108,11 @@ public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
74108
* to track object allocation.
75109
*/
76110

77-
public OperatorMerge() {
111+
private OperatorMerge() {
78112
this.delayErrors = false;
79113
}
80114

81-
public OperatorMerge(boolean delayErrors) {
115+
private OperatorMerge(boolean delayErrors) {
82116
this.delayErrors = delayErrors;
83117
}
84118

src/main/java/rx/internal/operators/OperatorMergeDelayError.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

0 commit comments

Comments
 (0)