2222import static rx .Observable .empty ;
2323import static rx .Observable .from ;
2424
25+ import java .util .ArrayList ;
26+ import java .util .Comparator ;
2527import java .util .HashSet ;
28+ import java .util .List ;
2629import java .util .Set ;
2730
2831import org .junit .Before ;
@@ -57,6 +60,30 @@ public static <T, U> OnSubscribeFunc<T> distinct(Observable<? extends T> source,
5760 return new Distinct <T , U >(source , keySelector );
5861 }
5962
63+ /**
64+ * Returns an Observable that emits all distinct items emitted by the source
65+ * @param source
66+ * The source Observable to emit the distinct items for.
67+ * @param equalityComparator
68+ * The comparator to use for deciding whether to consider two items as equal or not.
69+ * @return A subscription function for creating the target Observable.
70+ */
71+ public static <T > OnSubscribeFunc <T > distinct (Observable <? extends T > source , Comparator <T > equalityComparator ) {
72+ return new DistinctWithComparator <T , T >(source , Functions .<T >identity (), equalityComparator );
73+ }
74+
75+ /**
76+ * Returns an Observable that emits all distinct items emitted by the source
77+ * @param source
78+ * The source Observable to emit the distinct items for.
79+ * @param equalityComparator
80+ * The comparator to use for deciding whether to consider the two item keys as equal or not.
81+ * @return A subscription function for creating the target Observable.
82+ */
83+ public static <T , U > OnSubscribeFunc <T > distinct (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector , Comparator <U > equalityComparator ) {
84+ return new DistinctWithComparator <T , U >(source , keySelector , equalityComparator );
85+ }
86+
6087 /**
6188 * Returns an Observable that emits all distinct items emitted by the source
6289 * @param source
@@ -93,16 +120,67 @@ public void onError(Throwable e) {
93120
94121 @ Override
95122 public void onNext (T next ) {
96- try {
97- U nextKey = keySelector .call (next );
98- if (!emittedKeys .contains (nextKey )) {
99- emittedKeys .add (nextKey );
100- observer .onNext (next );
123+ U nextKey = keySelector .call (next );
124+ if (!emittedKeys .contains (nextKey )) {
125+ emittedKeys .add (nextKey );
126+ observer .onNext (next );
127+ }
128+ }
129+ });
130+
131+ return Subscriptions .create (new Action0 () {
132+ @ Override
133+ public void call () {
134+ sourceSub .unsubscribe ();
135+ }
136+ });
137+ }
138+ }
139+
140+ private static class DistinctWithComparator <T , U > implements OnSubscribeFunc <T > {
141+ private final Observable <? extends T > source ;
142+ private final Func1 <? super T , ? extends U > keySelector ;
143+ private final Comparator <U > equalityComparator ;
144+
145+ private DistinctWithComparator (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector , Comparator <U > equalityComparator ) {
146+ this .source = source ;
147+ this .keySelector = keySelector ;
148+ this .equalityComparator = equalityComparator ;
149+ }
150+
151+ @ Override
152+ public Subscription onSubscribe (final Observer <? super T > observer ) {
153+ final Subscription sourceSub = source .subscribe (new Observer <T >() {
154+
155+ // due to the totally arbitrary equality comparator, we can't use anything more efficient than lists here
156+ private final List <U > emittedKeys = new ArrayList <U >();
157+
158+ @ Override
159+ public void onCompleted () {
160+ observer .onCompleted ();
161+ }
162+
163+ @ Override
164+ public void onError (Throwable e ) {
165+ observer .onError (e );
166+ }
167+
168+ @ Override
169+ public void onNext (T next ) {
170+ U nextKey = keySelector .call (next );
171+ if (!alreadyEmitted (nextKey )) {
172+ emittedKeys .add (nextKey );
173+ observer .onNext (next );
174+ }
175+ }
176+
177+ private boolean alreadyEmitted (U newKey ) {
178+ for (U key : emittedKeys ) {
179+ if (equalityComparator .compare (key , newKey ) == 0 ) {
180+ return true ;
101181 }
102- } catch (Throwable t ) {
103- // keySelector is a user function, may throw something
104- observer .onError (t );
105182 }
183+ return false ;
106184 }
107185 });
108186
@@ -118,15 +196,27 @@ public void call() {
118196 public static class UnitTest {
119197 @ Mock
120198 Observer <? super String > w ;
199+ @ Mock
200+ Observer <? super String > w2 ;
121201
122202 // nulls lead to exceptions
123203 final Func1 <String , String > TO_UPPER_WITH_EXCEPTION = new Func1 <String , String >() {
124204 @ Override
125205 public String call (String s ) {
206+ if (s .equals ("x" )) {
207+ return "XX" ;
208+ }
126209 return s .toUpperCase ();
127210 }
128211 };
129212
213+ final Comparator <String > COMPARE_LENGTH = new Comparator <String >() {
214+ @ Override
215+ public int compare (String s1 , String s2 ) {
216+ return s1 .length () - s2 .length ();
217+ }
218+ };
219+
130220 @ Before
131221 public void before () {
132222 initMocks (this );
@@ -182,6 +272,61 @@ public void testDistinctOfNormalSourceWithKeySelector() {
182272 verify (w , never ()).onError (any (Throwable .class ));
183273 }
184274
275+ @ Test
276+ public void testDistinctOfNormalSourceWithComparator () {
277+ Observable <String > src = from ("1" , "12" , "123" , "aaa" , "321" , "12" , "21" , "1" , "12345" );
278+ create (distinct (src , COMPARE_LENGTH )).subscribe (w );
279+
280+ InOrder inOrder = inOrder (w );
281+ inOrder .verify (w , times (1 )).onNext ("1" );
282+ inOrder .verify (w , times (1 )).onNext ("12" );
283+ inOrder .verify (w , times (1 )).onNext ("123" );
284+ inOrder .verify (w , times (1 )).onNext ("12345" );
285+ inOrder .verify (w , times (1 )).onCompleted ();
286+ inOrder .verify (w , never ()).onNext (anyString ());
287+ verify (w , never ()).onError (any (Throwable .class ));
288+ }
289+
290+ @ Test
291+ public void testDistinctOfNormalSourceWithKeySelectorAndComparator () {
292+ Observable <String > src = from ("a" , "x" , "ab" , "abc" , "cba" , "de" , "x" , "a" , "abcd" );
293+ create (distinct (src , TO_UPPER_WITH_EXCEPTION , COMPARE_LENGTH )).subscribe (w );
294+
295+ InOrder inOrder = inOrder (w );
296+ inOrder .verify (w , times (1 )).onNext ("a" );
297+ inOrder .verify (w , times (1 )).onNext ("x" );
298+ inOrder .verify (w , times (1 )).onNext ("abc" );
299+ inOrder .verify (w , times (1 )).onNext ("abcd" );
300+ inOrder .verify (w , times (1 )).onCompleted ();
301+ inOrder .verify (w , never ()).onNext (anyString ());
302+ verify (w , never ()).onError (any (Throwable .class ));
303+ }
304+
305+ @ Test
306+ public void testDistinctOfNormalSourceWithKeySelectorAndComparatorAndTwoSubscriptions () {
307+ Observable <String > src = from ("a" , "x" , "ab" , "abc" , "cba" , "de" , "x" , "a" , "abcd" );
308+ create (distinct (src , TO_UPPER_WITH_EXCEPTION , COMPARE_LENGTH )).subscribe (w );
309+
310+ InOrder inOrder = inOrder (w );
311+ inOrder .verify (w , times (1 )).onNext ("a" );
312+ inOrder .verify (w , times (1 )).onNext ("x" );
313+ create (distinct (src , TO_UPPER_WITH_EXCEPTION , COMPARE_LENGTH )).subscribe (w2 );
314+ inOrder .verify (w , times (1 )).onNext ("abc" );
315+ inOrder .verify (w , times (1 )).onNext ("abcd" );
316+ inOrder .verify (w , times (1 )).onCompleted ();
317+ inOrder .verify (w , never ()).onNext (anyString ());
318+ verify (w , never ()).onError (any (Throwable .class ));
319+
320+ InOrder inOrder2 = inOrder (w2 );
321+ inOrder2 .verify (w2 , times (1 )).onNext ("a" );
322+ inOrder2 .verify (w2 , times (1 )).onNext ("x" );
323+ inOrder2 .verify (w2 , times (1 )).onNext ("abc" );
324+ inOrder2 .verify (w2 , times (1 )).onNext ("abcd" );
325+ inOrder2 .verify (w2 , times (1 )).onCompleted ();
326+ inOrder2 .verify (w2 , never ()).onNext (anyString ());
327+ verify (w2 , never ()).onError (any (Throwable .class ));
328+ }
329+
185330 @ Test
186331 public void testDistinctOfSourceWithNulls () {
187332 Observable <String > src = from (null , "a" , "a" , null , null , "b" , null );
0 commit comments