Skip to content

Commit 3fb9d42

Browse files
author
Joachim Hofer
committed
initial version of a distinctUntilChanged operator
1 parent 4bda940 commit 3fb9d42

File tree

1 file changed

+156
-0
lines changed

1 file changed

+156
-0
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
import static org.mockito.MockitoAnnotations.initMocks;
21+
import static rx.Observable.create;
22+
import static rx.Observable.empty;
23+
import static rx.Observable.from;
24+
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.InOrder;
31+
import org.mockito.Mock;
32+
33+
import rx.Observable;
34+
import rx.Observable.OnSubscribeFunc;
35+
import rx.Observer;
36+
import rx.Subscription;
37+
import rx.subscriptions.Subscriptions;
38+
import rx.util.functions.Action0;
39+
40+
/**
41+
* Returns an Observable that emits the first item emitted by the source
42+
* Observable, or a default value if the source emits nothing.
43+
*/
44+
public final class OperationDistinctUntilChanged {
45+
46+
/**
47+
* Returns an Observable that emits all sequentially distinct items emitted by the source
48+
* @param source
49+
* The source Observable to emit the sequentially distinct items for.
50+
* @return A subscription function for creating the target Observable.
51+
*/
52+
public static <T> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extends T> source) {
53+
return new DistinctUntilChanged<T>(source);
54+
}
55+
56+
private static class DistinctUntilChanged<T> implements OnSubscribeFunc<T> {
57+
private final Observable<? extends T> source;
58+
59+
private DistinctUntilChanged(Observable<? extends T> source) {
60+
this.source = source;
61+
}
62+
63+
@Override
64+
public Subscription onSubscribe(final Observer<? super T> observer) {
65+
final Subscription sourceSub = source.subscribe(new Observer<T>() {
66+
private final AtomicReference<T> lastEmittedValue = new AtomicReference<T>(null);
67+
private final AtomicBoolean hasEmitted = new AtomicBoolean();
68+
69+
@Override
70+
public void onCompleted() {
71+
observer.onCompleted();
72+
}
73+
74+
@Override
75+
public void onError(Throwable e) {
76+
observer.onError(e);
77+
}
78+
79+
@Override
80+
public void onNext(T next) {
81+
boolean hasAlreadyEmitted = hasEmitted.getAndSet(true);
82+
T lastEmitted = lastEmittedValue.getAndSet(next);
83+
if (!hasAlreadyEmitted) {
84+
observer.onNext(next);
85+
} else if (lastEmitted == null) {
86+
if (next != null) {
87+
observer.onNext(next);
88+
}
89+
} else if (!lastEmitted.equals(next)) {
90+
observer.onNext(next);
91+
}
92+
}
93+
});
94+
95+
return Subscriptions.create(new Action0() {
96+
@Override
97+
public void call() {
98+
sourceSub.unsubscribe();
99+
}
100+
});
101+
}
102+
}
103+
104+
public static class UnitTest {
105+
@Mock
106+
Observer<? super String> w;
107+
108+
@Before
109+
public void before() {
110+
initMocks(this);
111+
}
112+
113+
@Test
114+
public void testDistinctUntilChangedOfNone() {
115+
Observable<String> src = empty();
116+
create(distinctUntilChanged(src)).subscribe(w);
117+
118+
verify(w, never()).onNext(anyString());
119+
verify(w, never()).onError(any(Throwable.class));
120+
verify(w, times(1)).onCompleted();
121+
}
122+
123+
@Test
124+
public void testDistinctUntilChangedOfNormalSource() {
125+
Observable<String> src = from("a", "b", "c", "c", "c", "b", "b", "a", "e");
126+
create(distinctUntilChanged(src)).subscribe(w);
127+
128+
InOrder inOrder = inOrder(w);
129+
inOrder.verify(w, times(1)).onNext("a");
130+
inOrder.verify(w, times(1)).onNext("b");
131+
inOrder.verify(w, times(1)).onNext("c");
132+
inOrder.verify(w, times(1)).onNext("b");
133+
inOrder.verify(w, times(1)).onNext("a");
134+
inOrder.verify(w, times(1)).onNext("e");
135+
inOrder.verify(w, times(1)).onCompleted();
136+
inOrder.verify(w, never()).onNext(anyString());
137+
verify(w, never()).onError(any(Throwable.class));
138+
}
139+
140+
@Test
141+
public void testDistinctUntilChangedOfSourceWithNulls() {
142+
Observable<String> src = from(null, "a", "a", null, null, "b", null, null);
143+
create(distinctUntilChanged(src)).subscribe(w);
144+
145+
InOrder inOrder = inOrder(w);
146+
inOrder.verify(w, times(1)).onNext(null);
147+
inOrder.verify(w, times(1)).onNext("a");
148+
inOrder.verify(w, times(1)).onNext(null);
149+
inOrder.verify(w, times(1)).onNext("b");
150+
inOrder.verify(w, times(1)).onNext(null);
151+
inOrder.verify(w, times(1)).onCompleted();
152+
inOrder.verify(w, never()).onNext(anyString());
153+
verify(w, never()).onError(any(Throwable.class));
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)