Skip to content

Commit 8c1e037

Browse files
Merge branch 'clean-up-hook-args' of github.com:abersnaze/RxJava into merge-940
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents ec7eb97 + eb92efd commit 8c1e037

File tree

11 files changed

+641
-227
lines changed

11 files changed

+641
-227
lines changed

rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,72 +3,63 @@
33
import rx.Observable.Operator;
44
import rx.Observer;
55
import rx.Subscriber;
6-
import rx.functions.Action1;
7-
import rx.functions.Action2;
8-
import rx.functions.Func1;
96
import rx.plugins.DebugNotification;
7+
import rx.plugins.DebugNotificationListener;
108

119
public final class DebugSubscriber<T, C> extends Subscriber<T> {
12-
private final Func1<T, T> onNextHook;
13-
private final Func1<DebugNotification, C> start;
14-
private final Action1<C> complete;
15-
private final Action2<C, Throwable> error;
10+
private DebugNotificationListener<C> listener;
1611
private final Observer<? super T> o;
1712
private Operator<? extends T, ?> from = null;
1813
private Operator<?, ? super T> to = null;
1914

2015
public DebugSubscriber(
21-
Func1<T, T> onNextHook,
22-
Func1<DebugNotification, C> start,
23-
Action1<C> complete,
24-
Action2<C, Throwable> error,
16+
DebugNotificationListener<C> listener,
2517
Subscriber<? super T> _o,
2618
Operator<? extends T, ?> _out,
2719
Operator<?, ? super T> _in) {
2820
super(_o);
29-
this.start = start;
30-
this.complete = complete;
31-
this.error = error;
21+
this.listener = listener;
3222
this.o = _o;
33-
this.onNextHook = onNextHook;
3423
this.from = _out;
3524
this.to = _in;
36-
this.add(new DebugSubscription<T, C>(this, start, complete, error));
25+
this.add(new DebugSubscription<T, C>(this, listener));
3726
}
3827

3928
@Override
4029
public void onCompleted() {
41-
final DebugNotification<T, C> n = DebugNotification.createOnCompleted(o, from, to);
42-
C context = start.call(n);
30+
final DebugNotification<T> n = DebugNotification.createOnCompleted(o, from, to);
31+
C context = listener.start(n);
4332
try {
4433
o.onCompleted();
45-
complete.call(context);
34+
listener.complete(context);
4635
} catch (Throwable e) {
47-
error.call(context, e);
36+
listener.error(context, e);
4837
}
4938
}
5039

5140
@Override
5241
public void onError(Throwable e) {
53-
final DebugNotification<T, C> n = DebugNotification.createOnError(o, from, e, to);
54-
C context = start.call(n);
42+
final DebugNotification<T> n = DebugNotification.createOnError(o, from, e, to);
43+
C context = listener.start(n);
5544
try {
5645
o.onError(e);
57-
complete.call(context);
46+
listener.complete(context);
5847
} catch (Throwable e2) {
59-
error.call(context, e2);
48+
listener.error(context, e2);
6049
}
6150
}
6251

6352
@Override
6453
public void onNext(T t) {
65-
final DebugNotification<T, C> n = DebugNotification.createOnNext(o, from, t, to);
66-
C context = start.call(n);
54+
final DebugNotification<T> n = DebugNotification.createOnNext(o, from, t, to);
55+
t = (T) listener.onNext(n);
56+
57+
C context = listener.start(n);
6758
try {
68-
o.onNext(onNextHook.call(t));
69-
complete.call(context);
59+
o.onNext(t);
60+
listener.complete(context);
7061
} catch (Throwable e) {
71-
error.call(context, e);
62+
listener.error(context, e);
7263
}
7364
}
7465

rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,27 @@
11
package rx.operators;
22

33
import rx.Subscription;
4-
import rx.functions.Action1;
5-
import rx.functions.Action2;
6-
import rx.functions.Func1;
74
import rx.plugins.DebugNotification;
5+
import rx.plugins.DebugNotificationListener;
86

97
final class DebugSubscription<T, C> implements Subscription {
108
private final DebugSubscriber<T, C> debugObserver;
11-
private final Func1<DebugNotification, C> start;
12-
private final Action1<C> complete;
13-
private final Action2<C, Throwable> error;
9+
private DebugNotificationListener<C> listener;
1410

15-
DebugSubscription(DebugSubscriber<T, C> debugObserver, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
11+
DebugSubscription(DebugSubscriber<T, C> debugObserver, DebugNotificationListener<C> listener) {
1612
this.debugObserver = debugObserver;
17-
this.start = start;
18-
this.complete = complete;
19-
this.error = error;
13+
this.listener = listener;
2014
}
2115

2216
@Override
2317
public void unsubscribe() {
24-
final DebugNotification<T, C> n = DebugNotification.<T, C> createUnsubscribe(debugObserver.getActual(), debugObserver.getFrom(), debugObserver.getTo());
25-
C context = start.call(n);
18+
final DebugNotification<T> n = DebugNotification.<T> createUnsubscribe(debugObserver.getActual(), debugObserver.getFrom(), debugObserver.getTo());
19+
C context = listener.start(n);
2620
try {
2721
debugObserver.unsubscribe();
28-
complete.call(context);
22+
listener.complete(context);
2923
} catch (Throwable e) {
30-
error.call(context, e);
24+
listener.error(context, e);
3125
}
3226
}
3327

rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
* @author gscampbell
2020
*/
2121
public class DebugHook<C> extends RxJavaObservableExecutionHook {
22-
private final Func1 onNextHook;
23-
private final Func1<DebugNotification, C> start;
24-
private final Action1<C> complete;
25-
private final Action2<C, Throwable> error;
22+
private DebugNotificationListener<C> listener;
2623

2724
/**
2825
* Creates a new instance of the DebugHook RxJava plug-in that can be passed into
@@ -34,39 +31,59 @@ public class DebugHook<C> extends RxJavaObservableExecutionHook {
3431
* @param events
3532
* This action is invoked as each notification is generated
3633
*/
37-
public DebugHook(Func1 onNextDataHook, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
38-
this.complete = complete;
39-
this.error = error;
40-
this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook;
41-
this.start = (Func1<DebugNotification, C>) (start == null ? Actions.empty() : start);
34+
public DebugHook(DebugNotificationListener<C> listener) {
35+
if (listener == null)
36+
throw new IllegalArgumentException("The debug listener must not be null");
37+
this.listener = listener;
4238
}
4339

4440
@Override
4541
public <T> OnSubscribe<T> onSubscribeStart(final Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
4642
return new OnSubscribe<T>() {
4743
@Override
4844
public void call(Subscriber<? super T> o) {
49-
C context = start.call(DebugNotification.createSubscribe(o, observableInstance, f));
45+
final DebugNotification<T> n = DebugNotification.createSubscribe(o, observableInstance, f);
46+
o = wrapOutbound(null, o);
47+
48+
C context = listener.start(n);
5049
try {
51-
f.call(wrapOutbound(null, o));
52-
complete.call(context);
50+
f.call(o);
51+
listener.complete(context);
5352
}
5453
catch(Throwable e) {
55-
error.call(context, e);
54+
listener.error(context, e);
5655
}
5756
}
5857
};
5958
}
6059

6160
@Override
62-
public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInstance, Subscription subscription) {
61+
public <T> Subscription onSubscribeReturn(Subscription subscription) {
6362
return subscription;
6463
}
6564

6665
@Override
6766
public <T> OnSubscribe<T> onCreate(final OnSubscribe<T> f) {
68-
return new OnCreateWrapper<T>(f);
67+
return new DebugOnSubscribe<T>(f);
6968
}
69+
70+
public final class DebugOnSubscribe<T> implements OnSubscribe<T> {
71+
private final OnSubscribe<T> f;
72+
73+
private DebugOnSubscribe(OnSubscribe<T> f) {
74+
this.f = f;
75+
}
76+
77+
@Override
78+
public void call(Subscriber<? super T> o) {
79+
f.call(wrapInbound(null, o));
80+
}
81+
82+
public OnSubscribe<T> getActual() {
83+
return f;
84+
}
85+
}
86+
7087

7188
@Override
7289
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> bind) {
@@ -78,19 +95,14 @@ public Subscriber<? super T> call(final Subscriber<? super R> o) {
7895
};
7996
}
8097

81-
@Override
82-
public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
83-
return s;
84-
}
85-
8698
@SuppressWarnings("unchecked")
8799
private <R> Subscriber<? super R> wrapOutbound(Operator<? extends R, ?> bind, Subscriber<? super R> o) {
88100
if (o instanceof DebugSubscriber) {
89101
if (bind != null)
90102
((DebugSubscriber<R, C>) o).setFrom(bind);
91103
return o;
92104
}
93-
return new DebugSubscriber<R, C>(onNextHook, start, complete, error, o, bind, null);
105+
return new DebugSubscriber<R, C>(listener, o, bind, null);
94106
}
95107

96108
@SuppressWarnings("unchecked")
@@ -100,23 +112,6 @@ private <T> Subscriber<? super T> wrapInbound(Operator<?, ? super T> bind, Subsc
100112
((DebugSubscriber<T, C>) o).setTo(bind);
101113
return o;
102114
}
103-
return new DebugSubscriber<T, C>(onNextHook, start, complete, error, o, null, bind);
104-
}
105-
106-
public final class OnCreateWrapper<T> implements OnSubscribe<T> {
107-
private final OnSubscribe<T> f;
108-
109-
private OnCreateWrapper(OnSubscribe<T> f) {
110-
this.f = f;
111-
}
112-
113-
@Override
114-
public void call(Subscriber<? super T> o) {
115-
f.call(wrapInbound(null, o));
116-
}
117-
118-
public OnSubscribe<T> getActual() {
119-
return f;
120-
}
115+
return new DebugSubscriber<T, C>(listener, o, null, bind);
121116
}
122117
}

0 commit comments

Comments
 (0)