Skip to content

Commit 9b3fca1

Browse files
Subscribers for common factory methods
Similar to Observers.
1 parent 84309b6 commit 9b3fca1

File tree

3 files changed

+152
-22
lines changed

3 files changed

+152
-22
lines changed

rxjava-core/src/main/java/rx/Subscriber.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,27 +47,6 @@ protected Subscriber(Subscriber<?> op) {
4747
this(op.cs);
4848
}
4949

50-
public static <T> Subscriber<T> from(final Observer<? super T> o) {
51-
return new Subscriber<T>() {
52-
53-
@Override
54-
public void onCompleted() {
55-
o.onCompleted();
56-
}
57-
58-
@Override
59-
public void onError(Throwable e) {
60-
o.onError(e);
61-
}
62-
63-
@Override
64-
public void onNext(T t) {
65-
o.onNext(t);
66-
}
67-
68-
};
69-
}
70-
7150
/**
7251
* Used to register an unsubscribe callback.
7352
*/
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package rx.observers;
2+
3+
import rx.Observer;
4+
import rx.Subscriber;
5+
import rx.util.OnErrorNotImplementedException;
6+
import rx.util.functions.Action0;
7+
import rx.util.functions.Action1;
8+
9+
public class Subscribers {
10+
11+
public static <T> Subscriber<T> from(final Observer<? super T> o) {
12+
return new Subscriber<T>() {
13+
14+
@Override
15+
public void onCompleted() {
16+
o.onCompleted();
17+
}
18+
19+
@Override
20+
public void onError(Throwable e) {
21+
o.onError(e);
22+
}
23+
24+
@Override
25+
public void onNext(T t) {
26+
o.onNext(t);
27+
}
28+
29+
};
30+
}
31+
32+
/**
33+
* Create an empty Subscriber that ignores all events.
34+
*/
35+
public static final <T> Subscriber<T> create() {
36+
return new Subscriber<T>() {
37+
38+
@Override
39+
public final void onCompleted() {
40+
// do nothing
41+
}
42+
43+
@Override
44+
public final void onError(Throwable e) {
45+
throw new OnErrorNotImplementedException(e);
46+
}
47+
48+
@Override
49+
public final void onNext(T args) {
50+
// do nothing
51+
}
52+
53+
};
54+
}
55+
56+
/**
57+
* Create an Subscriber that receives `onNext` and ignores `onError` and `onCompleted`.
58+
*/
59+
public static final <T> Subscriber<T> create(final Action1<? super T> onNext) {
60+
if (onNext == null) {
61+
throw new IllegalArgumentException("onNext can not be null");
62+
}
63+
64+
return new Subscriber<T>() {
65+
66+
@Override
67+
public final void onCompleted() {
68+
// do nothing
69+
}
70+
71+
@Override
72+
public final void onError(Throwable e) {
73+
throw new OnErrorNotImplementedException(e);
74+
}
75+
76+
@Override
77+
public final void onNext(T args) {
78+
onNext.call(args);
79+
}
80+
81+
};
82+
}
83+
84+
/**
85+
* Create an Subscriber that receives `onNext` and `onError` and ignores `onCompleted`.
86+
*
87+
*/
88+
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError) {
89+
if (onNext == null) {
90+
throw new IllegalArgumentException("onNext can not be null");
91+
}
92+
if (onError == null) {
93+
throw new IllegalArgumentException("onError can not be null");
94+
}
95+
96+
return new Subscriber<T>() {
97+
98+
@Override
99+
public final void onCompleted() {
100+
// do nothing
101+
}
102+
103+
@Override
104+
public final void onError(Throwable e) {
105+
onError.call(e);
106+
}
107+
108+
@Override
109+
public final void onNext(T args) {
110+
onNext.call(args);
111+
}
112+
113+
};
114+
}
115+
116+
/**
117+
* Create an Subscriber that receives `onNext`, `onError` and `onCompleted`.
118+
*
119+
*/
120+
public static final <T> Subscriber<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
121+
if (onNext == null) {
122+
throw new IllegalArgumentException("onNext can not be null");
123+
}
124+
if (onError == null) {
125+
throw new IllegalArgumentException("onError can not be null");
126+
}
127+
if (onComplete == null) {
128+
throw new IllegalArgumentException("onComplete can not be null");
129+
}
130+
131+
return new Subscriber<T>() {
132+
133+
@Override
134+
public final void onCompleted() {
135+
onComplete.call();
136+
}
137+
138+
@Override
139+
public final void onError(Throwable e) {
140+
onError.call(e);
141+
}
142+
143+
@Override
144+
public final void onNext(T args) {
145+
onNext.call(args);
146+
}
147+
148+
};
149+
}
150+
151+
}

rxjava-core/src/main/java/rx/observers/TestSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
*/
2727
public class TestSubscriber<T> extends Subscriber<T> {
2828

29-
private final Subscriber<Object> EMPTY = Subscriber.from(new EmptyObserver<Object>());
29+
private final Subscriber<Object> EMPTY = Subscribers.create();
3030

3131
private final TestObserver<T> testObserver;
3232

0 commit comments

Comments
 (0)