Skip to content

Commit ec19864

Browse files
Merge branch 'OperatorDistinct' of github.com:akarnokd/RxJava into merge-prs
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents d4fd741 + 34177b0 commit ec19864

File tree

4 files changed

+74
-262
lines changed

4 files changed

+74
-262
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import rx.operators.OnSubscribeRange;
5151
import rx.operators.OperationDebounce;
5252
import rx.operators.OperationDelay;
53-
import rx.operators.OperationDistinct;
5453
import rx.operators.OperationDistinctUntilChanged;
5554
import rx.operators.OperationFinally;
5655
import rx.operators.OperationFlatMap;
@@ -98,6 +97,7 @@
9897
import rx.operators.OperatorDefaultIfEmpty;
9998
import rx.operators.OperatorDefer;
10099
import rx.operators.OperatorDematerialize;
100+
import rx.operators.OperatorDistinct;
101101
import rx.operators.OperatorDoOnEach;
102102
import rx.operators.OperatorElementAt;
103103
import rx.operators.OperatorFilter;
@@ -3625,7 +3625,7 @@ public final <T2> Observable<T2> dematerialize() {
36253625
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229764.aspx">MSDN: Observable.distinct</a>
36263626
*/
36273627
public final Observable<T> distinct() {
3628-
return create(OperationDistinct.distinct(this));
3628+
return lift(new OperatorDistinct<T, T>(Functions.<T>identity()));
36293629
}
36303630

36313631
/**
@@ -3642,7 +3642,7 @@ public final Observable<T> distinct() {
36423642
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244310.aspx">MSDN: Observable.distinct</a>
36433643
*/
36443644
public final <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector) {
3645-
return create(OperationDistinct.distinct(this, keySelector));
3645+
return lift(new OperatorDistinct<T, U>(keySelector));
36463646
}
36473647

36483648
/**

rxjava-core/src/main/java/rx/operators/OperationDistinct.java

Lines changed: 0 additions & 188 deletions
This file was deleted.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.HashSet;
19+
import java.util.Set;
20+
import rx.Observable.Operator;
21+
import rx.Subscriber;
22+
import rx.functions.Func1;
23+
24+
/**
25+
* Returns an Observable that emits all distinct items emitted by the source.
26+
*
27+
* @param <T> the value type
28+
* @param <U> the key type
29+
*/
30+
public final class OperatorDistinct<T, U> implements Operator<T, T> {
31+
final Func1<? super T, ? extends U> keySelector;
32+
33+
public OperatorDistinct(Func1<? super T, ? extends U> keySelector) {
34+
this.keySelector = keySelector;
35+
}
36+
37+
@Override
38+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
39+
return new Subscriber<T>(child) {
40+
Set<U> keyMemory = new HashSet<U>();
41+
42+
@Override
43+
public void onNext(T t) {
44+
U key = keySelector.call(t);
45+
if (keyMemory.add(key)) {
46+
child.onNext(t);
47+
}
48+
}
49+
50+
@Override
51+
public void onError(Throwable e) {
52+
keyMemory = null;
53+
child.onError(e);
54+
}
55+
56+
@Override
57+
public void onCompleted() {
58+
keyMemory = null;
59+
child.onCompleted();
60+
}
61+
62+
};
63+
}
64+
}

0 commit comments

Comments
 (0)