@@ -43,84 +43,71 @@ public final class OperatorTakeUntil {
43
43
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
44
44
*/
45
45
public static <T , E > Observable <T > takeUntil (final Observable <? extends T > source , final Observable <? extends E > other ) {
46
- Observable <Notification < T > > s = source .lift (new SourceObservable <T >());
47
- Observable <Notification < T >> o = other .lift (new OtherObservable <T , E >());
46
+ Observable <Object > s = source .lift (new SourceObservable <T >());
47
+ Observable <Object > o = other .lift (new OtherObservable <E >());
48
48
49
- Observable <Notification < T > > result = Observable .merge (s , o );
49
+ Observable <Object > result = Observable .merge (s , o );
50
50
51
- return result .takeWhile (new Func1 <Notification <T >, Boolean >() {
51
+ final NotificationLite <T > notification = NotificationLite .instance ();
52
+
53
+ return result .takeWhile (new Func1 <Object , Boolean >() {
52
54
@ Override
53
- public Boolean call (Notification < T > notification ) {
54
- return !notification .halt ;
55
+ public Boolean call (Object args ) {
56
+ return !notification .isCompleted ( args ) ;
55
57
}
56
- }).map (new Func1 <Notification < T > , T >() {
58
+ }).map (new Func1 <Object , T >() {
57
59
@ Override
58
- public T call (Notification < T > notification ) {
59
- return notification .value ;
60
+ public T call (Object args ) {
61
+ return notification .getValue ( args ) ;
60
62
}
61
63
});
62
64
}
63
65
64
- private static class Notification <T > {
65
- private final boolean halt ;
66
- private final T value ;
67
-
68
- public static <T > Notification <T > value (T value ) {
69
- return new Notification <T >(false , value );
70
- }
71
-
72
- public static <T > Notification <T > halt () {
73
- return new Notification <T >(true , null );
74
- }
75
-
76
- private Notification (boolean halt , T value ) {
77
- this .halt = halt ;
78
- this .value = value ;
79
- }
66
+ private final static class SourceObservable <T > implements Operator <Object , T > {
80
67
81
- }
82
-
83
- private static class SourceObservable <T > implements Operator <Notification <T >, T > {
68
+ private final NotificationLite <T > notification = NotificationLite .instance ();
84
69
85
70
@ Override
86
- public Subscriber <? super T > call (final Subscriber <? super Notification < T >> notificationObserver ) {
87
- return new Subscriber <T >(notificationObserver ) {
71
+ public Subscriber <? super T > call (final Subscriber <? super Object > subscriber ) {
72
+ return new Subscriber <T >(subscriber ) {
88
73
@ Override
89
74
public void onCompleted () {
90
- notificationObserver .onNext (Notification .< T > halt ());
75
+ subscriber .onNext (notification . completed ());
91
76
}
92
77
93
78
@ Override
94
79
public void onError (Throwable e ) {
95
- notificationObserver .onError (e );
80
+ subscriber .onError (e );
96
81
}
97
82
98
83
@ Override
99
84
public void onNext (T args ) {
100
- notificationObserver .onNext (Notification . value (args ));
85
+ subscriber .onNext (notification . next (args ));
101
86
}
102
87
};
103
88
}
104
89
}
105
90
106
- private static class OtherObservable <T , E > implements Operator <Notification <T >, E > {
91
+ private final static class OtherObservable <E > implements Operator <Object , E > {
92
+
93
+ private final NotificationLite <E > notification = NotificationLite .instance ();
107
94
108
95
@ Override
109
- public Subscriber <? super E > call (final Subscriber <? super Notification < T >> notificationObserver ) {
110
- return new Subscriber <E >(notificationObserver ) {
96
+ public Subscriber <? super E > call (final Subscriber <? super Object > subscriber ) {
97
+ return new Subscriber <E >(subscriber ) {
111
98
@ Override
112
99
public void onCompleted () {
113
- notificationObserver .onNext (Notification .< T > halt ());
100
+ subscriber .onNext (notification . completed ());
114
101
}
115
102
116
103
@ Override
117
104
public void onError (Throwable e ) {
118
- notificationObserver .onError (e );
105
+ subscriber .onError (e );
119
106
}
120
107
121
108
@ Override
122
109
public void onNext (E args ) {
123
- notificationObserver .onNext (Notification .< T > halt ());
110
+ subscriber .onNext (notification . completed ());
124
111
}
125
112
};
126
113
}
0 commit comments