1919 * @author gscampbell
2020 */
2121public class DebugHook <C > extends RxJavaObservableExecutionHook {
22- private final Func1 onNextHook ;
23- private final Func1 <DebugNotification , C > start ;
24- private final Action1 <C > complete ;
25- private final Action2 <C , Throwable > error ;
22+ private DebugNotificationListener <C > listener ;
2623
2724 /**
2825 * Creates a new instance of the DebugHook RxJava plug-in that can be passed into
@@ -34,39 +31,59 @@ public class DebugHook<C> extends RxJavaObservableExecutionHook {
3431 * @param events
3532 * This action is invoked as each notification is generated
3633 */
37- public DebugHook (Func1 onNextDataHook , Func1 <DebugNotification , C > start , Action1 <C > complete , Action2 <C , Throwable > error ) {
38- this .complete = complete ;
39- this .error = error ;
40- this .onNextHook = onNextDataHook == null ? Functions .identity () : onNextDataHook ;
41- this .start = (Func1 <DebugNotification , C >) (start == null ? Actions .empty () : start );
34+ public DebugHook (DebugNotificationListener <C > listener ) {
35+ if (listener == null )
36+ throw new IllegalArgumentException ("The debug listener must not be null" );
37+ this .listener = listener ;
4238 }
4339
4440 @ Override
4541 public <T > OnSubscribe <T > onSubscribeStart (final Observable <? extends T > observableInstance , final OnSubscribe <T > f ) {
4642 return new OnSubscribe <T >() {
4743 @ Override
4844 public void call (Subscriber <? super T > o ) {
49- C context = start .call (DebugNotification .createSubscribe (o , observableInstance , f ));
45+ final DebugNotification <T > n = DebugNotification .createSubscribe (o , observableInstance , f );
46+ o = wrapOutbound (null , o );
47+
48+ C context = listener .start (n );
5049 try {
51- f .call (wrapOutbound ( null , o ) );
52- complete . call (context );
50+ f .call (o );
51+ listener . complete (context );
5352 }
5453 catch (Throwable e ) {
55- error . call (context , e );
54+ listener . error (context , e );
5655 }
5756 }
5857 };
5958 }
6059
6160 @ Override
62- public <T > Subscription onSubscribeReturn (Observable <? extends T > observableInstance , Subscription subscription ) {
61+ public <T > Subscription onSubscribeReturn (Subscription subscription ) {
6362 return subscription ;
6463 }
6564
6665 @ Override
6766 public <T > OnSubscribe <T > onCreate (final OnSubscribe <T > f ) {
68- return new OnCreateWrapper <T >(f );
67+ return new DebugOnSubscribe <T >(f );
6968 }
69+
70+ public final class DebugOnSubscribe <T > implements OnSubscribe <T > {
71+ private final OnSubscribe <T > f ;
72+
73+ private DebugOnSubscribe (OnSubscribe <T > f ) {
74+ this .f = f ;
75+ }
76+
77+ @ Override
78+ public void call (Subscriber <? super T > o ) {
79+ f .call (wrapInbound (null , o ));
80+ }
81+
82+ public OnSubscribe <T > getActual () {
83+ return f ;
84+ }
85+ }
86+
7087
7188 @ Override
7289 public <T , R > Operator <? extends R , ? super T > onLift (final Operator <? extends R , ? super T > bind ) {
@@ -78,19 +95,14 @@ public Subscriber<? super T> call(final Subscriber<? super R> o) {
7895 };
7996 }
8097
81- @ Override
82- public <T > Subscription onAdd (Subscriber <T > subscriber , Subscription s ) {
83- return s ;
84- }
85-
8698 @ SuppressWarnings ("unchecked" )
8799 private <R > Subscriber <? super R > wrapOutbound (Operator <? extends R , ?> bind , Subscriber <? super R > o ) {
88100 if (o instanceof DebugSubscriber ) {
89101 if (bind != null )
90102 ((DebugSubscriber <R , C >) o ).setFrom (bind );
91103 return o ;
92104 }
93- return new DebugSubscriber <R , C >(onNextHook , start , complete , error , o , bind , null );
105+ return new DebugSubscriber <R , C >(listener , o , bind , null );
94106 }
95107
96108 @ SuppressWarnings ("unchecked" )
@@ -100,23 +112,6 @@ private <T> Subscriber<? super T> wrapInbound(Operator<?, ? super T> bind, Subsc
100112 ((DebugSubscriber <T , C >) o ).setTo (bind );
101113 return o ;
102114 }
103- return new DebugSubscriber <T , C >(onNextHook , start , complete , error , o , null , bind );
104- }
105-
106- public final class OnCreateWrapper <T > implements OnSubscribe <T > {
107- private final OnSubscribe <T > f ;
108-
109- private OnCreateWrapper (OnSubscribe <T > f ) {
110- this .f = f ;
111- }
112-
113- @ Override
114- public void call (Subscriber <? super T > o ) {
115- f .call (wrapInbound (null , o ));
116- }
117-
118- public OnSubscribe <T > getActual () {
119- return f ;
120- }
115+ return new DebugSubscriber <T , C >(listener , o , null , bind );
121116 }
122117}
0 commit comments