2222import static rx .Observable .empty ;
2323import static rx .Observable .from ;
2424
25+ import java .util .Comparator ;
26+
2527import org .junit .Before ;
2628import org .junit .Test ;
2729import org .mockito .InOrder ;
4143 */
4244public final class OperationDistinctUntilChanged {
4345
46+ public static <T > OnSubscribeFunc <T > distinctUntilChanged (Observable <? extends T > source , Comparator <T > equalityComparator ) {
47+ return new DistinctUntilChanged <T , T >(source , Functions .<T >identity (), equalityComparator );
48+ }
49+
4450 /**
4551 * Returns an Observable that emits all sequentially distinct items emitted by the source.
4652 * @param source
4753 * The source Observable to emit the sequentially distinct items for.
4854 * @return A subscription function for creating the target Observable.
4955 */
5056 public static <T , U > OnSubscribeFunc <T > distinctUntilChanged (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector ) {
51- return new DistinctUntilChanged <T , U >(source , keySelector );
57+ return new DistinctUntilChanged <T , U >(source , keySelector , new DefaultEqualityComparator < U >() );
5258 }
5359
5460 /**
@@ -58,16 +64,30 @@ public static <T, U> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extend
5864 * @return A subscription function for creating the target Observable.
5965 */
6066 public static <T > OnSubscribeFunc <T > distinctUntilChanged (Observable <? extends T > source ) {
61- return new DistinctUntilChanged <T , T >(source , Functions .<T >identity ());
67+ return new DistinctUntilChanged <T , T >(source , Functions .<T >identity (), new DefaultEqualityComparator <T >());
68+ }
69+
70+ // does not define a useful ordering; it's only used for equality tests here
71+ private static class DefaultEqualityComparator <T > implements Comparator <T > {
72+ @ Override
73+ public int compare (T t1 , T t2 ) {
74+ if (t1 == null ) {
75+ return t2 == null ? 0 : 1 ;
76+ } else {
77+ return t1 .equals (t2 ) ? 0 : 1 ;
78+ }
79+ }
6280 }
6381
6482 private static class DistinctUntilChanged <T , U > implements OnSubscribeFunc <T > {
6583 private final Observable <? extends T > source ;
6684 private final Func1 <? super T , ? extends U > keySelector ;
85+ private final Comparator <U > equalityComparator ;
6786
68- private DistinctUntilChanged (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector ) {
87+ private DistinctUntilChanged (Observable <? extends T > source , Func1 <? super T , ? extends U > keySelector , Comparator < U > equalityComparator ) {
6988 this .source = source ;
7089 this .keySelector = keySelector ;
90+ this .equalityComparator = equalityComparator ;
7191 }
7292
7393 @ Override
@@ -95,16 +115,8 @@ public void onNext(T next) {
95115 if (!hasEmitted ) {
96116 hasEmitted = true ;
97117 observer .onNext (next );
98- } else {
99- if (lastKey == null ) {
100- if (nextKey != null ) {
101- observer .onNext (next );
102- }
103- } else {
104- if (!lastKey .equals (nextKey )) {
105- observer .onNext (next );
106- }
107- }
118+ } else if (equalityComparator .compare (lastKey , nextKey ) != 0 ) {
119+ observer .onNext (next );
108120 }
109121 } catch (Throwable t ) {
110122 // keySelector is a user function, may throw something
0 commit comments