2222import static rx .Observable .empty ;
2323import static rx .Observable .from ;
2424
25+ import java .util .ArrayList ;
26+ import java .util .Comparator ;
2527import java .util .HashSet ;
28+ import java .util .List ;
2629import java .util .Set ;
2730
2831import org .junit .Before ;
@@ -57,6 +60,30 @@ public static <T, U> OnSubscribeFunc<T> distinct(Observable<? extends T> source,
5760 return new Distinct <T , U >(source , keySelector );
5861 }
5962
63+ /**
64+ * Returns an Observable that emits all distinct items emitted by the source
65+ * @param source
66+ * The source Observable to emit the distinct items for.
67+ * @param equalityComparator
68+ * The comparator to use for deciding whether to consider two items as equal or not.
69+ * @return A subscription function for creating the target Observable.
70+ */
71+ public static <T > OnSubscribeFunc <T > distinct (Observable <? extends T > source , Comparator <T > equalityComparator ) {
72+ return new DistinctWithComparator <T , T >(source , Functions .<T >identity (), equalityComparator );
73+ }
74+
75+ /**
76+ * Returns an Observable that emits all distinct items emitted by the source
77+ * @param source
78+ * The source Observable to emit the distinct items for.
79+ * @param equalityComparator
80+ * The comparator to use for deciding whether to consider the two item keys as equal or not.
81+ * @return A subscription function for creating the target Observable.
82+ */
83+ public static <T , U > OnSubscribeFunc <T > distinct (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector , Comparator <U > equalityComparator ) {
84+ return new DistinctWithComparator <T , U >(source , keySelector , equalityComparator );
85+ }
86+
6087 /**
6188 * Returns an Observable that emits all distinct items emitted by the source
6289 * @param source
@@ -115,6 +142,67 @@ public void call() {
115142 }
116143 }
117144
145+ private static class DistinctWithComparator <T , U > implements OnSubscribeFunc <T > {
146+ private final Observable <? extends T > source ;
147+ private final Func1 <? super T , ? extends U > keySelector ;
148+ private final Comparator <U > equalityComparator ;
149+
150+ private DistinctWithComparator (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector , Comparator <U > equalityComparator ) {
151+ this .source = source ;
152+ this .keySelector = keySelector ;
153+ this .equalityComparator = equalityComparator ;
154+ }
155+
156+ @ Override
157+ public Subscription onSubscribe (final Observer <? super T > observer ) {
158+ final Subscription sourceSub = source .subscribe (new Observer <T >() {
159+
160+ // due to the totally arbitrary equality comparator, we can't use anything more efficient than lists here
161+ private final List <U > emittedKeys = new ArrayList <U >();
162+
163+ @ Override
164+ public void onCompleted () {
165+ observer .onCompleted ();
166+ }
167+
168+ @ Override
169+ public void onError (Throwable e ) {
170+ observer .onError (e );
171+ }
172+
173+ @ Override
174+ public void onNext (T next ) {
175+ try {
176+ U nextKey = keySelector .call (next );
177+ if (!alreadyEmitted (nextKey )) {
178+ emittedKeys .add (nextKey );
179+ observer .onNext (next );
180+ }
181+ } catch (Throwable t ) {
182+ // keySelector and comparator are user functions, may throw something
183+ observer .onError (t );
184+ }
185+ }
186+
187+ private boolean alreadyEmitted (U newKey ) {
188+ for (U key : emittedKeys ) {
189+ if (equalityComparator .compare (key , newKey ) == 0 ) {
190+ return true ;
191+ }
192+ }
193+ return false ;
194+ }
195+ });
196+
197+ return Subscriptions .create (new Action0 () {
198+ @ Override
199+ public void call () {
200+ sourceSub .unsubscribe ();
201+ }
202+ });
203+ }
204+ }
205+
118206 public static class UnitTest {
119207 @ Mock
120208 Observer <? super String > w ;
@@ -123,10 +211,20 @@ public static class UnitTest {
123211 final Func1 <String , String > TO_UPPER_WITH_EXCEPTION = new Func1 <String , String >() {
124212 @ Override
125213 public String call (String s ) {
214+ if (s .equals ("x" )) {
215+ return "XX" ;
216+ }
126217 return s .toUpperCase ();
127218 }
128219 };
129220
221+ final Comparator <String > COMPARE_LENGTH = new Comparator <String >() {
222+ @ Override
223+ public int compare (String s1 , String s2 ) {
224+ return s1 .length () - s2 .length ();
225+ }
226+ };
227+
130228 @ Before
131229 public void before () {
132230 initMocks (this );
@@ -182,6 +280,36 @@ public void testDistinctOfNormalSourceWithKeySelector() {
182280 verify (w , never ()).onError (any (Throwable .class ));
183281 }
184282
283+ @ Test
284+ public void testDistinctOfNormalSourceWithComparator () {
285+ Observable <String > src = from ("1" , "12" , "123" , "aaa" , "321" , "12" , "21" , "1" , "12345" );
286+ create (distinct (src , COMPARE_LENGTH )).subscribe (w );
287+
288+ InOrder inOrder = inOrder (w );
289+ inOrder .verify (w , times (1 )).onNext ("1" );
290+ inOrder .verify (w , times (1 )).onNext ("12" );
291+ inOrder .verify (w , times (1 )).onNext ("123" );
292+ inOrder .verify (w , times (1 )).onNext ("12345" );
293+ inOrder .verify (w , times (1 )).onCompleted ();
294+ inOrder .verify (w , never ()).onNext (anyString ());
295+ verify (w , never ()).onError (any (Throwable .class ));
296+ }
297+
298+ @ Test
299+ public void testDistinctOfNormalSourceWithKeySelectorAndComparator () {
300+ Observable <String > src = from ("a" , "x" , "ab" , "abc" , "cba" , "de" , "x" , "a" , "abcd" );
301+ create (distinct (src , TO_UPPER_WITH_EXCEPTION , COMPARE_LENGTH )).subscribe (w );
302+
303+ InOrder inOrder = inOrder (w );
304+ inOrder .verify (w , times (1 )).onNext ("a" );
305+ inOrder .verify (w , times (1 )).onNext ("x" );
306+ inOrder .verify (w , times (1 )).onNext ("abc" );
307+ inOrder .verify (w , times (1 )).onNext ("abcd" );
308+ inOrder .verify (w , times (1 )).onCompleted ();
309+ inOrder .verify (w , never ()).onNext (anyString ());
310+ verify (w , never ()).onError (any (Throwable .class ));
311+ }
312+
185313 @ Test
186314 public void testDistinctOfSourceWithNulls () {
187315 Observable <String > src = from (null , "a" , "a" , null , null , "b" , null );
0 commit comments