Skip to content

Commit d799eca

Browse files
committed
1.x: deanonymize Observable inner classes (#3848)
* 1.x: deanonymize Observable inner classes * Further simplification of types * Fix indentation, rename Lambda to Action
1 parent 3c5efaa commit d799eca

10 files changed

+825
-444
lines changed

src/main/java/rx/Observable.java

Lines changed: 76 additions & 444 deletions
Large diffs are not rendered by default.

src/main/java/rx/functions/Actions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,4 +432,27 @@ public R call(Object... args) {
432432
}
433433
};
434434
}
435+
436+
/**
437+
* Wraps an Action0 instance into an Action1 instance where the latter calls
438+
* the former.
439+
* @param action the action to call
440+
* @return the new Action1 instance
441+
*/
442+
public static <T> Action1<T> toAction1(Action0 action) {
443+
return new Action1CallsAction0<T>(action);
444+
}
445+
446+
static final class Action1CallsAction0<T> implements Action1<T> {
447+
final Action0 action;
448+
449+
public Action1CallsAction0(Action0 action) {
450+
this.action = action;
451+
}
452+
453+
@Override
454+
public void call(T t) {
455+
action.call();
456+
}
457+
}
435458
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.*;
20+
import rx.Observable.OnSubscribe;
21+
22+
/**
23+
* Holds a singleton instance of an empty Observable which is stateless and completes
24+
* the child subscriber immediately.
25+
*/
26+
public enum EmptyObservableHolder implements OnSubscribe<Object> {
27+
INSTANCE
28+
;
29+
30+
/**
31+
* Returns a type-corrected singleton instance of the empty Observable.
32+
* @return a type-corrected singleton instance of the empty Observable.
33+
*/
34+
@SuppressWarnings("unchecked")
35+
public static <T> Observable<T> instance() {
36+
return (Observable<T>)EMPTY;
37+
}
38+
39+
/** The singleton instance. */
40+
static final Observable<Object> EMPTY = Observable.create(INSTANCE);
41+
42+
@Override
43+
public void call(Subscriber<? super Object> child) {
44+
child.onCompleted();
45+
}
46+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.*;
20+
import rx.Observable.OnSubscribe;
21+
22+
/**
23+
* Holds a singleton instance of a never Observable which is stateless doesn't
24+
* call any of the Subscriber's methods.
25+
*/
26+
public enum NeverObservableHolder implements OnSubscribe<Object> {
27+
INSTANCE
28+
;
29+
30+
/**
31+
* Returns a type-corrected singleton instance of the never Observable.
32+
* @return a type-corrected singleton instance of the never Observable.
33+
*/
34+
@SuppressWarnings("unchecked")
35+
public static <T> Observable<T> instance() {
36+
return (Observable<T>)NEVER;
37+
}
38+
39+
/** The singleton instance. */
40+
static final Observable<Object> NEVER = Observable.create(INSTANCE);
41+
42+
@Override
43+
public void call(Subscriber<? super Object> child) {
44+
}
45+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.Observable.*;
20+
import rx.Subscriber;
21+
import rx.exceptions.Exceptions;
22+
import rx.plugins.*;
23+
24+
/**
25+
* Transforms the downstream Subscriber into a Subscriber via an operator
26+
* callback and calls the parent OnSubscribe.call() method with it.
27+
* @param <T> the source value type
28+
* @param <R> the result value type
29+
*/
30+
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
31+
32+
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
33+
34+
final OnSubscribe<T> parent;
35+
36+
final Operator<? extends R, ? super T> operator;
37+
38+
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
39+
this.parent = parent;
40+
this.operator = operator;
41+
}
42+
43+
@Override
44+
public void call(Subscriber<? super R> o) {
45+
try {
46+
Subscriber<? super T> st = hook.onLift(operator).call(o);
47+
try {
48+
// new Subscriber created and being subscribed with so 'onStart' it
49+
st.onStart();
50+
parent.call(st);
51+
} catch (Throwable e) {
52+
// localized capture of errors rather than it skipping all operators
53+
// and ending up in the try/catch of the subscribe method which then
54+
// prevents onErrorResumeNext and other similar approaches to error handling
55+
Exceptions.throwIfFatal(e);
56+
st.onError(e);
57+
}
58+
} catch (Throwable e) {
59+
Exceptions.throwIfFatal(e);
60+
// if the lift function failed all we can do is pass the error to the final Subscriber
61+
// as we don't have the operator available to us
62+
o.onError(e);
63+
}
64+
}
65+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.*;
20+
import rx.Observable.OnSubscribe;
21+
22+
/**
23+
* An Observable that invokes {@link Observer#onError onError} when the {@link Observer} subscribes to it.
24+
*
25+
* @param <T>
26+
* the type of item (ostensibly) emitted by the Observable
27+
*/
28+
public final class OnSubscribeThrow<T> implements OnSubscribe<T> {
29+
30+
private final Throwable exception;
31+
32+
public OnSubscribeThrow(Throwable exception) {
33+
this.exception = exception;
34+
}
35+
36+
/**
37+
* Accepts an {@link Observer} and calls its {@link Observer#onError onError} method.
38+
*
39+
* @param observer
40+
* an {@link Observer} of this Observable
41+
*/
42+
@Override
43+
public void call(Subscriber<? super T> observer) {
44+
observer.onError(exception);
45+
}
46+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import rx.*;
19+
import rx.functions.*;
20+
21+
/**
22+
* An Observer that forwards the onXXX method calls to a notification callback
23+
* by transforming each signal type into Notifications.
24+
* @param <T> the value type
25+
*/
26+
public final class ActionNotificationObserver<T> implements Observer<T> {
27+
28+
final Action1<Notification<? super T>> onNotification;
29+
30+
public ActionNotificationObserver(Action1<Notification<? super T>> onNotification) {
31+
this.onNotification = onNotification;
32+
}
33+
34+
@Override
35+
public void onNext(T t) {
36+
onNotification.call(Notification.createOnNext(t));
37+
}
38+
39+
@Override
40+
public void onError(Throwable e) {
41+
onNotification.call(Notification.createOnError(e));
42+
}
43+
44+
@Override
45+
public void onCompleted() {
46+
onNotification.call(Notification.createOnCompleted());
47+
}
48+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import rx.Subscriber;
19+
import rx.functions.*;
20+
21+
/**
22+
* A Subscriber that forwards the onXXX method calls to callbacks.
23+
* @param <T> the value type
24+
*/
25+
public final class ActionSubscriber<T> extends Subscriber<T> {
26+
27+
final Action1<? super T> onNext;
28+
final Action1<Throwable> onError;
29+
final Action0 onCompleted;
30+
31+
public ActionSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) {
32+
this.onNext = onNext;
33+
this.onError = onError;
34+
this.onCompleted = onCompleted;
35+
}
36+
37+
@Override
38+
public void onNext(T t) {
39+
onNext.call(t);
40+
}
41+
42+
@Override
43+
public void onError(Throwable e) {
44+
onError.call(e);
45+
}
46+
47+
@Override
48+
public void onCompleted() {
49+
onCompleted.call();
50+
}
51+
}

0 commit comments

Comments
 (0)