Skip to content

Commit 7c9ab65

Browse files
committed
OperatorDistinctUntilChanged
1 parent 4e77f8a commit 7c9ab65

File tree

4 files changed

+78
-219
lines changed

4 files changed

+78
-219
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import rx.operators.OperationDelay;
5858
import rx.operators.OperationDematerialize;
5959
import rx.operators.OperationDistinct;
60-
import rx.operators.OperationDistinctUntilChanged;
6160
import rx.operators.OperationFinally;
6261
import rx.operators.OperationFlatMap;
6362
import rx.operators.OperationGroupByUntil;
@@ -95,6 +94,7 @@
9594
import rx.operators.OperatorAsObservable;
9695
import rx.operators.OperatorCache;
9796
import rx.operators.OperatorCast;
97+
import rx.operators.OperatorDistinctUntilChanged;
9898
import rx.operators.OperatorDoOnEach;
9999
import rx.operators.OperatorElementAt;
100100
import rx.operators.OperatorFilter;
@@ -3638,7 +3638,7 @@ public final <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelecto
36383638
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229494.aspx">MSDN: Observable.distinctUntilChanged</a>
36393639
*/
36403640
public final Observable<T> distinctUntilChanged() {
3641-
return create(OperationDistinctUntilChanged.distinctUntilChanged(this));
3641+
return lift(new OperatorDistinctUntilChanged<T, T>(Functions.<T>identity()));
36423642
}
36433643

36443644
/**
@@ -3656,7 +3656,7 @@ public final Observable<T> distinctUntilChanged() {
36563656
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229508.aspx">MSDN: Observable.distinctUntilChanged</a>
36573657
*/
36583658
public final <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keySelector) {
3659-
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector));
3659+
return lift(new OperatorDistinctUntilChanged<T, U>(keySelector));
36603660
}
36613661

36623662
/**

rxjava-core/src/main/java/rx/operators/OperationDistinctUntilChanged.java

Lines changed: 0 additions & 148 deletions
This file was deleted.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
import rx.functions.Func1;
21+
22+
/**
23+
* Returns an Observable that emits all sequentially distinct items emitted by the source.
24+
* @param <T> the value type
25+
* @param <U> the key type
26+
*/
27+
public final class OperatorDistinctUntilChanged<T, U> implements Operator<T, T> {
28+
final Func1<? super T, ? extends U> keySelector;
29+
30+
public OperatorDistinctUntilChanged(Func1<? super T, ? extends U> keySelector) {
31+
this.keySelector = keySelector;
32+
}
33+
34+
@Override
35+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
36+
return new Subscriber<T>(child) {
37+
U previousKey;
38+
boolean hasPrevious;
39+
@Override
40+
public void onNext(T t) {
41+
U currentKey = previousKey;
42+
U key = keySelector.call(t);
43+
previousKey = key;
44+
45+
if (hasPrevious) {
46+
if (!(currentKey == key || (key != null && key.equals(currentKey)))) {
47+
child.onNext(t);
48+
}
49+
} else {
50+
hasPrevious = true;
51+
child.onNext(t);
52+
}
53+
}
54+
55+
@Override
56+
public void onError(Throwable e) {
57+
child.onError(e);
58+
}
59+
60+
@Override
61+
public void onCompleted() {
62+
child.onCompleted();
63+
}
64+
65+
};
66+
}
67+
68+
}

rxjava-core/src/test/java/rx/operators/OperationDistinctUntilChangedTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorDistinctUntilChangedTest.java

Lines changed: 7 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import static org.mockito.Mockito.times;
2323
import static org.mockito.Mockito.verify;
2424
import static org.mockito.MockitoAnnotations.initMocks;
25-
import static rx.operators.OperationDistinctUntilChanged.distinctUntilChanged;
2625

27-
import java.util.Comparator;
2826

2927
import org.junit.Before;
3028
import org.junit.Test;
@@ -35,7 +33,7 @@
3533
import rx.Observer;
3634
import rx.functions.Func1;
3735

38-
public class OperationDistinctUntilChangedTest {
36+
public class OperatorDistinctUntilChangedTest {
3937

4038
@Mock
4139
Observer<String> w;
@@ -53,13 +51,6 @@ public String call(String s) {
5351
}
5452
};
5553

56-
final Comparator<String> COMPARE_LENGTH = new Comparator<String>() {
57-
@Override
58-
public int compare(String s1, String s2) {
59-
return s1.length() - s2.length();
60-
}
61-
};
62-
6354
@Before
6455
public void before() {
6556
initMocks(this);
@@ -68,7 +59,7 @@ public void before() {
6859
@Test
6960
public void testDistinctUntilChangedOfNone() {
7061
Observable<String> src = Observable.empty();
71-
Observable.create(distinctUntilChanged(src)).subscribe(w);
62+
src.distinctUntilChanged().subscribe(w);
7263

7364
verify(w, never()).onNext(anyString());
7465
verify(w, never()).onError(any(Throwable.class));
@@ -78,7 +69,7 @@ public void testDistinctUntilChangedOfNone() {
7869
@Test
7970
public void testDistinctUntilChangedOfNoneWithKeySelector() {
8071
Observable<String> src = Observable.empty();
81-
Observable.create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
72+
src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
8273

8374
verify(w, never()).onNext(anyString());
8475
verify(w, never()).onError(any(Throwable.class));
@@ -88,7 +79,7 @@ public void testDistinctUntilChangedOfNoneWithKeySelector() {
8879
@Test
8980
public void testDistinctUntilChangedOfNormalSource() {
9081
Observable<String> src = Observable.from("a", "b", "c", "c", "c", "b", "b", "a", "e");
91-
Observable.create(distinctUntilChanged(src)).subscribe(w);
82+
src.distinctUntilChanged().subscribe(w);
9283

9384
InOrder inOrder = inOrder(w);
9485
inOrder.verify(w, times(1)).onNext("a");
@@ -105,7 +96,7 @@ public void testDistinctUntilChangedOfNormalSource() {
10596
@Test
10697
public void testDistinctUntilChangedOfNormalSourceWithKeySelector() {
10798
Observable<String> src = Observable.from("a", "b", "c", "C", "c", "B", "b", "a", "e");
108-
Observable.create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
99+
src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
109100

110101
InOrder inOrder = inOrder(w);
111102
inOrder.verify(w, times(1)).onNext("a");
@@ -122,7 +113,7 @@ public void testDistinctUntilChangedOfNormalSourceWithKeySelector() {
122113
@Test
123114
public void testDistinctUntilChangedOfSourceWithNulls() {
124115
Observable<String> src = Observable.from(null, "a", "a", null, null, "b", null, null);
125-
Observable.create(distinctUntilChanged(src)).subscribe(w);
116+
src.distinctUntilChanged().subscribe(w);
126117

127118
InOrder inOrder = inOrder(w);
128119
inOrder.verify(w, times(1)).onNext(null);
@@ -138,7 +129,7 @@ public void testDistinctUntilChangedOfSourceWithNulls() {
138129
@Test
139130
public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
140131
Observable<String> src = Observable.from("a", "b", null, "c");
141-
Observable.create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
132+
src.distinctUntilChanged(TO_UPPER_WITH_EXCEPTION).subscribe(w);
142133

143134
InOrder inOrder = inOrder(w);
144135
inOrder.verify(w, times(1)).onNext("a");
@@ -147,56 +138,4 @@ public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
147138
inOrder.verify(w, never()).onNext(anyString());
148139
inOrder.verify(w, never()).onCompleted();
149140
}
150-
151-
@Test
152-
public void testDistinctUntilChangedWithComparator() {
153-
Observable<String> src = Observable.from("a", "b", "c", "aa", "bb", "c", "ddd");
154-
Observable.create(distinctUntilChanged(src, COMPARE_LENGTH)).subscribe(w);
155-
InOrder inOrder = inOrder(w);
156-
inOrder.verify(w, times(1)).onNext("a");
157-
inOrder.verify(w, times(1)).onNext("aa");
158-
inOrder.verify(w, times(1)).onNext("c");
159-
inOrder.verify(w, times(1)).onNext("ddd");
160-
inOrder.verify(w, times(1)).onCompleted();
161-
inOrder.verify(w, never()).onNext(anyString());
162-
verify(w, never()).onError(any(Throwable.class));
163-
}
164-
165-
@Test
166-
public void testDistinctUntilChangedWithComparatorAndKeySelector() {
167-
Observable<String> src = Observable.from("a", "b", "x", "aa", "bb", "c", "ddd");
168-
Observable.create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w);
169-
InOrder inOrder = inOrder(w);
170-
inOrder.verify(w, times(1)).onNext("a");
171-
inOrder.verify(w, times(1)).onNext("x");
172-
inOrder.verify(w, times(1)).onNext("c");
173-
inOrder.verify(w, times(1)).onNext("ddd");
174-
inOrder.verify(w, times(1)).onCompleted();
175-
inOrder.verify(w, never()).onNext(anyString());
176-
verify(w, never()).onError(any(Throwable.class));
177-
}
178-
179-
@Test
180-
public void testDistinctUntilChangedWithComparatorAndKeySelectorandTwoSubscriptions() {
181-
Observable<String> src = Observable.from("a", "b", "x", "aa", "bb", "c", "ddd");
182-
Observable.create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w);
183-
InOrder inOrder = inOrder(w);
184-
inOrder.verify(w, times(1)).onNext("a");
185-
inOrder.verify(w, times(1)).onNext("x");
186-
Observable.create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION, COMPARE_LENGTH)).subscribe(w2);
187-
inOrder.verify(w, times(1)).onNext("c");
188-
inOrder.verify(w, times(1)).onNext("ddd");
189-
inOrder.verify(w, times(1)).onCompleted();
190-
inOrder.verify(w, never()).onNext(anyString());
191-
verify(w, never()).onError(any(Throwable.class));
192-
193-
InOrder inOrder2 = inOrder(w2);
194-
inOrder2.verify(w2, times(1)).onNext("a");
195-
inOrder2.verify(w2, times(1)).onNext("x");
196-
inOrder2.verify(w2, times(1)).onNext("c");
197-
inOrder2.verify(w2, times(1)).onNext("ddd");
198-
inOrder2.verify(w2, times(1)).onCompleted();
199-
inOrder2.verify(w2, never()).onNext(anyString());
200-
verify(w2, never()).onError(any(Throwable.class));
201-
}
202141
}

0 commit comments

Comments
 (0)