Skip to content

Commit ff97efa

Browse files
author
Joachim Hofer
committed
added both new variants to observable
1 parent b88eea3 commit ff97efa

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.Arrays;
22+
import java.util.Comparator;
2223
import java.util.List;
2324
import java.util.concurrent.Future;
2425
import java.util.concurrent.TimeUnit;
@@ -2948,6 +2949,35 @@ public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keyS
29482949
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector));
29492950
}
29502951

2952+
/**
2953+
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to
2954+
* a comparator.
2955+
*
2956+
* @param equalityComparator
2957+
* a comparator for deciding whether two emitted items are equal or not
2958+
* @return an Observable of sequentially distinct items
2959+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229508%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
2960+
*/
2961+
public <U> Observable<T> distinctUntilChanged(Comparator<T> equalityComparator) {
2962+
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, equalityComparator));
2963+
}
2964+
2965+
/**
2966+
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to
2967+
* a key selector function and a comparator.
2968+
*
2969+
* @param keySelector
2970+
* a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially
2971+
* distinct from another one or not
2972+
* @param equalityComparator
2973+
* a comparator for deciding whether two emitted item keys are equal or not
2974+
* @return an Observable of sequentially distinct items
2975+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229508%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
2976+
*/
2977+
public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
2978+
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector, equalityComparator));
2979+
}
2980+
29512981
/**
29522982
* Returns an Observable that forwards all distinct items emitted from the source Observable.
29532983
*

rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@
4343
*/
4444
public final class OperationDistinctUntilChanged {
4545

46+
/**
47+
* Returns an Observable that emits all sequentially distinct items emitted by the source.
48+
* @param source
49+
* The source Observable to emit the sequentially distinct items for.
50+
* @param equalityComparator
51+
* The comparator to use for deciding whether to consider two items as equal or not.
52+
* @return A subscription function for creating the target Observable.
53+
*/
4654
public static <T> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extends T> source, Comparator<T> equalityComparator) {
4755
return new DistinctUntilChanged<T, T>(source, Functions.<T>identity(), equalityComparator);
4856
}
@@ -51,6 +59,22 @@ public static <T> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extends T
5159
* Returns an Observable that emits all sequentially distinct items emitted by the source.
5260
* @param source
5361
* The source Observable to emit the sequentially distinct items for.
62+
* @param keySelector
63+
* The function to select the key to use for the equality checks.
64+
* @param equalityComparator
65+
* The comparator to use for deciding whether to consider the two item keys as equal or not.
66+
* @return A subscription function for creating the target Observable.
67+
*/
68+
public static <T, U> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector, Comparator<U> equalityComparator) {
69+
return new DistinctUntilChanged<T, U>(source, keySelector, equalityComparator);
70+
}
71+
72+
/**
73+
* Returns an Observable that emits all sequentially distinct items emitted by the source.
74+
* @param source
75+
* The source Observable to emit the sequentially distinct items for.
76+
* @param keySelector
77+
* The function to select the key to use for the equality checks.
5478
* @return A subscription function for creating the target Observable.
5579
*/
5680
public static <T, U> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector) {
@@ -142,6 +166,9 @@ public static class UnitTest {
142166
final Func1<String, String> TO_UPPER_WITH_EXCEPTION = new Func1<String, String>() {
143167
@Override
144168
public String call(String s) {
169+
if (s.equals("x")) {
170+
return "xx";
171+
}
145172
return s.toUpperCase();
146173
}
147174
};
@@ -254,5 +281,19 @@ public void testDistinctUntilChangedWithComparator() {
254281
inOrder.verify(w, never()).onNext(anyString());
255282
verify(w, never()).onError(any(Throwable.class));
256283
}
284+
285+
@Test
286+
public void testDistinctUntilChangedWithComparatorAndKeySelector() {
287+
Observable<String> src = from("a", "b", "x", "aa", "bb", "c", "ddd");
288+
create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w);
289+
InOrder inOrder = inOrder(w);
290+
inOrder.verify(w, times(1)).onNext("a");
291+
inOrder.verify(w, times(1)).onNext("x");
292+
inOrder.verify(w, times(1)).onNext("c");
293+
inOrder.verify(w, times(1)).onNext("ddd");
294+
inOrder.verify(w, times(1)).onCompleted();
295+
inOrder.verify(w, never()).onNext(anyString());
296+
verify(w, never()).onError(any(Throwable.class));
297+
}
257298
}
258299
}

0 commit comments

Comments
 (0)