22
22
import static rx .Observable .empty ;
23
23
import static rx .Observable .from ;
24
24
25
- import java .util .concurrent .atomic .AtomicBoolean ;
26
- import java .util .concurrent .atomic .AtomicReference ;
27
-
28
25
import org .junit .Before ;
29
26
import org .junit .Test ;
30
27
import org .mockito .InOrder ;
36
33
import rx .Subscription ;
37
34
import rx .subscriptions .Subscriptions ;
38
35
import rx .util .functions .Action0 ;
36
+ import rx .util .functions .Func1 ;
37
+ import rx .util .functions .Functions ;
39
38
40
39
/**
41
40
* Returns an Observable that emits the first item emitted by the source
42
41
* Observable, or a default value if the source emits nothing.
43
42
*/
44
43
public final class OperationDistinctUntilChanged {
45
44
45
+ /**
46
+ * Returns an Observable that emits all sequentially distinct items emitted by the source
47
+ * @param source
48
+ * The source Observable to emit the sequentially distinct items for.
49
+ * @return A subscription function for creating the target Observable.
50
+ */
51
+ public static <T , U > OnSubscribeFunc <T > distinctUntilChanged (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector ) {
52
+ return new DistinctUntilChanged <T , U >(source , keySelector );
53
+ }
54
+
46
55
/**
47
56
* Returns an Observable that emits all sequentially distinct items emitted by the source
48
57
* @param source
49
58
* The source Observable to emit the sequentially distinct items for.
50
59
* @return A subscription function for creating the target Observable.
51
60
*/
52
61
public static <T > OnSubscribeFunc <T > distinctUntilChanged (Observable <? extends T > source ) {
53
- return new DistinctUntilChanged <T >(source );
62
+ return new DistinctUntilChanged <T , T >(source , Functions .< T > identity () );
54
63
}
55
64
56
- private static class DistinctUntilChanged <T > implements OnSubscribeFunc <T > {
65
+ private static class DistinctUntilChanged <T , U > implements OnSubscribeFunc <T > {
57
66
private final Observable <? extends T > source ;
58
-
59
- private DistinctUntilChanged (Observable <? extends T > source ) {
67
+ private final Func1 <? super T , ? extends U > keySelector ;
68
+
69
+ private DistinctUntilChanged (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector ) {
60
70
this .source = source ;
71
+ this .keySelector = keySelector ;
61
72
}
62
73
63
74
@ Override
64
75
public Subscription onSubscribe (final Observer <? super T > observer ) {
65
76
final Subscription sourceSub = source .subscribe (new Observer <T >() {
66
- private final AtomicReference < T > lastEmittedValue = new AtomicReference < T >( null ) ;
67
- private final AtomicBoolean hasEmitted = new AtomicBoolean () ;
77
+ private U lastEmittedKey ;
78
+ private boolean hasEmitted ;
68
79
69
80
@ Override
70
81
public void onCompleted () {
@@ -78,16 +89,27 @@ public void onError(Throwable e) {
78
89
79
90
@ Override
80
91
public void onNext (T next ) {
81
- boolean hasAlreadyEmitted = hasEmitted . getAndSet ( true ) ;
82
- T lastEmitted = lastEmittedValue . getAndSet ( next );
83
- if (! hasAlreadyEmitted ) {
84
- observer . onNext ( next ) ;
85
- } else if ( lastEmitted == null ) {
86
- if ( next != null ) {
92
+ U lastKey = lastEmittedKey ;
93
+ try {
94
+ U nextKey = keySelector . call ( next );
95
+ lastEmittedKey = nextKey ;
96
+ if (! hasEmitted ) {
97
+ hasEmitted = true ;
87
98
observer .onNext (next );
99
+ } else {
100
+ if (lastKey == null ) {
101
+ if (nextKey != null ) {
102
+ observer .onNext (next );
103
+ }
104
+ } else {
105
+ if (!lastKey .equals (nextKey )) {
106
+ observer .onNext (next );
107
+ }
108
+ }
88
109
}
89
- } else if (!lastEmitted .equals (next )) {
90
- observer .onNext (next );
110
+ } catch (Throwable t ) {
111
+ // keySelector is a user function, may throw something
112
+ observer .onError (t );
91
113
}
92
114
}
93
115
});
@@ -105,6 +127,14 @@ public static class UnitTest {
105
127
@ Mock
106
128
Observer <? super String > w ;
107
129
130
+ // nulls lead to exceptions
131
+ final Func1 <String , String > TO_UPPER_WITH_EXCEPTION = new Func1 <String , String >() {
132
+ @ Override
133
+ public String call (String s ) {
134
+ return s .toUpperCase ();
135
+ }
136
+ };
137
+
108
138
@ Before
109
139
public void before () {
110
140
initMocks (this );
@@ -120,6 +150,16 @@ public void testDistinctUntilChangedOfNone() {
120
150
verify (w , times (1 )).onCompleted ();
121
151
}
122
152
153
+ @ Test
154
+ public void testDistinctUntilChangedOfNoneWithKeySelector () {
155
+ Observable <String > src = empty ();
156
+ create (distinctUntilChanged (src , TO_UPPER_WITH_EXCEPTION )).subscribe (w );
157
+
158
+ verify (w , never ()).onNext (anyString ());
159
+ verify (w , never ()).onError (any (Throwable .class ));
160
+ verify (w , times (1 )).onCompleted ();
161
+ }
162
+
123
163
@ Test
124
164
public void testDistinctUntilChangedOfNormalSource () {
125
165
Observable <String > src = from ("a" , "b" , "c" , "c" , "c" , "b" , "b" , "a" , "e" );
@@ -137,6 +177,23 @@ public void testDistinctUntilChangedOfNormalSource() {
137
177
verify (w , never ()).onError (any (Throwable .class ));
138
178
}
139
179
180
+ @ Test
181
+ public void testDistinctUntilChangedOfNormalSourceWithKeySelector () {
182
+ Observable <String > src = from ("a" , "b" , "c" , "C" , "c" , "B" , "b" , "a" , "e" );
183
+ create (distinctUntilChanged (src , TO_UPPER_WITH_EXCEPTION )).subscribe (w );
184
+
185
+ InOrder inOrder = inOrder (w );
186
+ inOrder .verify (w , times (1 )).onNext ("a" );
187
+ inOrder .verify (w , times (1 )).onNext ("b" );
188
+ inOrder .verify (w , times (1 )).onNext ("c" );
189
+ inOrder .verify (w , times (1 )).onNext ("B" );
190
+ inOrder .verify (w , times (1 )).onNext ("a" );
191
+ inOrder .verify (w , times (1 )).onNext ("e" );
192
+ inOrder .verify (w , times (1 )).onCompleted ();
193
+ inOrder .verify (w , never ()).onNext (anyString ());
194
+ verify (w , never ()).onError (any (Throwable .class ));
195
+ }
196
+
140
197
@ Test
141
198
public void testDistinctUntilChangedOfSourceWithNulls () {
142
199
Observable <String > src = from (null , "a" , "a" , null , null , "b" , null , null );
@@ -152,5 +209,18 @@ public void testDistinctUntilChangedOfSourceWithNulls() {
152
209
inOrder .verify (w , never ()).onNext (anyString ());
153
210
verify (w , never ()).onError (any (Throwable .class ));
154
211
}
212
+
213
+ @ Test
214
+ public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector () {
215
+ Observable <String > src = from ("a" , "b" , null , "c" );
216
+ create (distinctUntilChanged (src , TO_UPPER_WITH_EXCEPTION )).subscribe (w );
217
+
218
+ InOrder inOrder = inOrder (w );
219
+ inOrder .verify (w , times (1 )).onNext ("a" );
220
+ inOrder .verify (w , times (1 )).onNext ("b" );
221
+ verify (w , times (1 )).onError (any (NullPointerException .class ));
222
+ inOrder .verify (w , never ()).onNext (anyString ());
223
+ inOrder .verify (w , never ()).onCompleted ();
224
+ }
155
225
}
156
226
}
0 commit comments