Skip to content

Commit ec60fa4

Browse files
committed
Merge pull request #2335 from akarnokd/StatelessOperators
A set of stateless operators that don't need to be instantiated
2 parents adf1ab4 + 824c65a commit ec60fa4

12 files changed

+157
-68
lines changed

src/main/java/rx/Observable.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ public static final <T, R> Observable<R> combineLatest(List<? extends Observable
759759
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
760760
*/
761761
public final static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
762-
return observables.lift(new OperatorConcat<T>());
762+
return observables.lift(OperatorConcat.<T>instance());
763763
}
764764

765765
/**
@@ -1620,7 +1620,7 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
16201620
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16211621
*/
16221622
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
1623-
return source.lift(new OperatorMerge<T>());
1623+
return source.lift(OperatorMerge.<T>instance(false));
16241624
}
16251625

16261626
/**
@@ -1945,7 +1945,7 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
19451945
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
19461946
*/
19471947
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
1948-
return source.lift(new OperatorMergeDelayError<T>());
1948+
return source.lift(OperatorMerge.<T>instance(true));
19491949
}
19501950

19511951
/**
@@ -2421,7 +2421,7 @@ public final static <T> Observable<Boolean> sequenceEqual(Observable<? extends T
24212421
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
24222422
*/
24232423
public final static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences) {
2424-
return sequenceOfSequences.lift(new OperatorSwitch<T>());
2424+
return sequenceOfSequences.lift(OperatorSwitch.<T>instance());
24252425
}
24262426

24272427
/**
@@ -3004,7 +3004,7 @@ public final Observable<T> ambWith(Observable<? extends T> t1) {
30043004
* @return an Observable that hides the identity of this Observable
30053005
*/
30063006
public final Observable<T> asObservable() {
3007-
return lift(new OperatorAsObservable<T>());
3007+
return lift(OperatorAsObservable.<T>instance());
30083008
}
30093009

30103010
/**
@@ -3925,9 +3925,9 @@ public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>>
39253925
* if the source Observable is not of type {@code Observable<Notification<T>>}
39263926
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
39273927
*/
3928-
@SuppressWarnings({"unchecked", "rawtypes"})
3928+
@SuppressWarnings({"unchecked"})
39293929
public final <T2> Observable<T2> dematerialize() {
3930-
return lift(new OperatorDematerialize());
3930+
return lift(OperatorDematerialize.instance());
39313931
}
39323932

39333933
/**
@@ -4963,7 +4963,7 @@ private final <R> Observable<R> mapNotification(Func1<? super T, ? extends R> on
49634963
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Materialize</a>
49644964
*/
49654965
public final Observable<Notification<T>> materialize() {
4966-
return lift(new OperatorMaterialize<T>());
4966+
return lift(OperatorMaterialize.<T>instance());
49674967
}
49684968

49694969
/**
@@ -5108,7 +5108,7 @@ public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflo
51085108
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
51095109
*/
51105110
public final Observable<T> onBackpressureDrop() {
5111-
return lift(new OperatorOnBackpressureDrop<T>());
5111+
return lift(OperatorOnBackpressureDrop.<T>instance());
51125112
}
51135113

51145114
/**
@@ -6458,7 +6458,7 @@ public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accum
64586458
* @see <a href="http://reactivex.io/documentation/operators/serialize.html">ReactiveX operators documentation: Serialize</a>
64596459
*/
64606460
public final Observable<T> serialize() {
6461-
return lift(new OperatorSerialize<T>());
6461+
return lift(OperatorSerialize.<T>instance());
64626462
}
64636463

64646464
/**
@@ -8362,7 +8362,7 @@ public final BlockingObservable<T> toBlocking() {
83628362
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX operators documentation: To</a>
83638363
*/
83648364
public final Observable<List<T>> toList() {
8365-
return lift(new OperatorToObservableList<T>());
8365+
return lift(OperatorToObservableList.<T>instance());
83668366
}
83678367

83688368
/**

src/main/java/rx/internal/operators/OperatorAsObservable.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,19 @@
2525
* the return value type of the wrapped observable.
2626
*/
2727
public final class OperatorAsObservable<T> implements Operator<T, T> {
28-
28+
/** Lazy initialization via inner-class holder. */
29+
private static final class Holder {
30+
/** A singleton instance. */
31+
static final OperatorAsObservable<Object> INSTANCE = new OperatorAsObservable<Object>();
32+
}
33+
/**
34+
* @return a singleton instance of this stateless operator.
35+
*/
36+
@SuppressWarnings("unchecked")
37+
public static <T> OperatorAsObservable<T> instance() {
38+
return (OperatorAsObservable<T>)Holder.INSTANCE;
39+
}
40+
private OperatorAsObservable() { }
2941
@Override
3042
public Subscriber<? super T> call(Subscriber<? super T> s) {
3143
return s;

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,19 @@
3737
* the source and result value type
3838
*/
3939
public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> {
40+
/** Lazy initialization via inner-class holder. */
41+
private static final class Holder {
42+
/** A singleton instance. */
43+
static final OperatorConcat<Object> INSTANCE = new OperatorConcat<Object>();
44+
}
45+
/**
46+
* @return a singleton instance of this stateless operator.
47+
*/
48+
@SuppressWarnings("unchecked")
49+
public static <T> OperatorConcat<T> instance() {
50+
return (OperatorConcat<T>)Holder.INSTANCE;
51+
}
52+
private OperatorConcat() { }
4053
@Override
4154
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
4255
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);

src/main/java/rx/internal/operators/OperatorDematerialize.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,19 @@
3030
* @param <T> the wrapped value type
3131
*/
3232
public final class OperatorDematerialize<T> implements Operator<T, Notification<T>> {
33-
33+
/** Lazy initialization via inner-class holder. */
34+
private static final class Holder {
35+
/** A singleton instance. */
36+
static final OperatorDematerialize<Object> INSTANCE = new OperatorDematerialize<Object>();
37+
}
38+
/**
39+
* @return a singleton instance of this stateless operator.
40+
*/
41+
@SuppressWarnings({ "rawtypes" })
42+
public static OperatorDematerialize instance() {
43+
return Holder.INSTANCE; // using raw types because the type inference is not good enough
44+
}
45+
private OperatorDematerialize() { }
3446
@Override
3547
public Subscriber<? super Notification<T>> call(final Subscriber<? super T> child) {
3648
return new Subscriber<Notification<T>>(child) {

src/main/java/rx/internal/operators/OperatorMaterialize.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,19 @@
2929
* See <a href="http://msdn.microsoft.com/en-us/library/hh229453.aspx">here</a> for the Microsoft Rx equivalent.
3030
*/
3131
public final class OperatorMaterialize<T> implements Operator<Notification<T>, T> {
32-
32+
/** Lazy initialization via inner-class holder. */
33+
private static final class Holder {
34+
/** A singleton instance. */
35+
static final OperatorMaterialize<Object> INSTANCE = new OperatorMaterialize<Object>();
36+
}
37+
/**
38+
* @return a singleton instance of this stateless operator.
39+
*/
40+
@SuppressWarnings("unchecked")
41+
public static <T> OperatorMaterialize<T> instance() {
42+
return (OperatorMaterialize<T>)Holder.INSTANCE;
43+
}
44+
private OperatorMaterialize() { }
3345
@Override
3446
public Subscriber<? super T> call(final Subscriber<? super Notification<T>> child) {
3547
return new Subscriber<T>(child) {

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,46 @@
3838
* <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="">
3939
* <p>
4040
* You can combine the items emitted by multiple {@code Observable}s so that they act like a single {@code Observable}, by using the merge operation.
41-
*
41+
* <p>
42+
* The {@code instance(true)} call behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
43+
* an error via {@code onError}, {@code mergeDelayError} will refrain from propagating that error
44+
* notification until all of the merged Observables have finished emitting items.
45+
* <p>
46+
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/mergeDelayError.png" alt="">
47+
* <p>
48+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will
49+
* only invoke the {@code onError} method of its Observers once.
50+
* <p>
51+
* This operation allows an Observer to receive all successfully emitted items from all of the
52+
* source Observables without being interrupted by an error notification from one of them.
53+
* <p>
54+
* <em>Note:</em> If this is used on an Observable that never completes, it will never call {@code onError} and will effectively swallow errors.
55+
4256
* @param <T>
4357
* the type of the items emitted by both the source and merged {@code Observable}s
4458
*/
4559
public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
46-
60+
/** Lazy initialization via inner-class holder. */
61+
private static final class HolderNoDelay {
62+
/** A singleton instance. */
63+
static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false);
64+
}
65+
/** Lazy initialization via inner-class holder. */
66+
private static final class HolderDelayErrors {
67+
/** A singleton instance. */
68+
static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(true);
69+
}
70+
/**
71+
* @param delayErrors should the merge delay errors?
72+
* @return a singleton instance of this stateless operator.
73+
*/
74+
@SuppressWarnings("unchecked")
75+
public static <T> OperatorMerge<T> instance(boolean delayErrors) {
76+
if (delayErrors) {
77+
return (OperatorMerge<T>)HolderDelayErrors.INSTANCE;
78+
}
79+
return (OperatorMerge<T>)HolderNoDelay.INSTANCE;
80+
}
4781
/*
4882
* benjchristensen => This class is complex and I'm not a fan of it despite writing it. I want to give some background
4983
* as to why for anyone who wants to try and help improve it.
@@ -74,11 +108,11 @@ public class OperatorMerge<T> implements Operator<T, Observable<? extends T>> {
74108
* to track object allocation.
75109
*/
76110

77-
public OperatorMerge() {
111+
private OperatorMerge() {
78112
this.delayErrors = false;
79113
}
80114

81-
public OperatorMerge(boolean delayErrors) {
115+
private OperatorMerge(boolean delayErrors) {
82116
this.delayErrors = delayErrors;
83117
}
84118

src/main/java/rx/internal/operators/OperatorMergeDelayError.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,19 @@
2222
import rx.Subscriber;
2323

2424
public class OperatorOnBackpressureDrop<T> implements Operator<T, T> {
25-
25+
/** Lazy initialization via inner-class holder. */
26+
private static final class Holder {
27+
/** A singleton instance. */
28+
static final OperatorOnBackpressureDrop<Object> INSTANCE = new OperatorOnBackpressureDrop<Object>();
29+
}
30+
/**
31+
* @return a singleton instance of this stateless operator.
32+
*/
33+
@SuppressWarnings({ "unchecked" })
34+
public static <T> OperatorOnBackpressureDrop<T> instance() {
35+
return (OperatorOnBackpressureDrop<T>)Holder.INSTANCE;
36+
}
37+
private OperatorOnBackpressureDrop() { }
2638
@Override
2739
public Subscriber<? super T> call(final Subscriber<? super T> child) {
2840
final AtomicLong requested = new AtomicLong();

src/main/java/rx/internal/operators/OperatorSerialize.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,19 @@
2020
import rx.observers.SerializedSubscriber;
2121

2222
public final class OperatorSerialize<T> implements Operator<T, T> {
23-
23+
/** Lazy initialization via inner-class holder. */
24+
private static final class Holder {
25+
/** A singleton instance. */
26+
static final OperatorSerialize<Object> INSTANCE = new OperatorSerialize<Object>();
27+
}
28+
/**
29+
* @return a singleton instance of this stateless operator.
30+
*/
31+
@SuppressWarnings({ "unchecked" })
32+
public static <T> OperatorSerialize<T> instance() {
33+
return (OperatorSerialize<T>)Holder.INSTANCE;
34+
}
35+
private OperatorSerialize() { }
2436
@Override
2537
public Subscriber<? super T> call(final Subscriber<? super T> s) {
2638
return new SerializedSubscriber<T>(new Subscriber<T>(s) {

src/main/java/rx/internal/operators/OperatorSwitch.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20+
2021
import rx.Observable;
2122
import rx.Observable.Operator;
2223
import rx.Producer;
@@ -33,7 +34,19 @@
3334
* @param <T> the value type
3435
*/
3536
public final class OperatorSwitch<T> implements Operator<T, Observable<? extends T>> {
36-
37+
/** Lazy initialization via inner-class holder. */
38+
private static final class Holder {
39+
/** A singleton instance. */
40+
static final OperatorSwitch<Object> INSTANCE = new OperatorSwitch<Object>();
41+
}
42+
/**
43+
* @return a singleton instance of this stateless operator.
44+
*/
45+
@SuppressWarnings({ "unchecked" })
46+
public static <T> OperatorSwitch<T> instance() {
47+
return (OperatorSwitch<T>)Holder.INSTANCE;
48+
}
49+
private OperatorSwitch() { }
3750
@Override
3851
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
3952
return new SwitchSubscriber<T>(child);

0 commit comments

Comments
 (0)