Skip to content

Commit 7ce4c84

Browse files
committed
Reimplement the 'single' operator
1 parent d3ead3a commit 7ce4c84

File tree

4 files changed

+94
-100
lines changed

4 files changed

+94
-100
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
import rx.operators.OperationReplay;
8787
import rx.operators.OperationSample;
8888
import rx.operators.OperationSequenceEqual;
89-
import rx.operators.OperationSingle;
89+
import rx.operators.OperatorSingle;
9090
import rx.operators.OperationSkip;
9191
import rx.operators.OperationSkipLast;
9292
import rx.operators.OperationSkipUntil;
@@ -6216,7 +6216,7 @@ public final Observable<T> serialize() {
62166216
* @see "MSDN: Observable.singleAsync()"
62176217
*/
62186218
public final Observable<T> single() {
6219-
return create(OperationSingle.<T> single(this));
6219+
return lift(new OperatorSingle<T>());
62206220
}
62216221

62226222
/**
@@ -6257,7 +6257,7 @@ public final Observable<T> single(Func1<? super T, Boolean> predicate) {
62576257
* @see "MSDN: Observable.singleOrDefaultAsync()"
62586258
*/
62596259
public final Observable<T> singleOrDefault(T defaultValue) {
6260-
return create(OperationSingle.<T> singleOrDefault(this, defaultValue));
6260+
return lift(new OperatorSingle<T>(defaultValue));
62616261
}
62626262

62636263
/**

rxjava-core/src/main/java/rx/operators/OperationSingle.java

Lines changed: 0 additions & 96 deletions
This file was deleted.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
21+
/**
22+
* If the Observable completes after emitting a single item that matches a
23+
* predicate, return an Observable containing that item. If it emits more than
24+
* one such item or no item, throw an IllegalArgumentException.
25+
*/
26+
public final class OperatorSingle<T> implements Operator<T, T> {
27+
28+
private final boolean hasDefaultValue;
29+
private final T defaultValue;
30+
31+
public OperatorSingle() {
32+
this(false, null);
33+
}
34+
35+
public OperatorSingle(T defaultValue) {
36+
this(true, defaultValue);
37+
}
38+
39+
private OperatorSingle(boolean hasDefaultValue, final T defaultValue) {
40+
this.hasDefaultValue = hasDefaultValue;
41+
this.defaultValue = defaultValue;
42+
}
43+
44+
@Override
45+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
46+
return new Subscriber<T>(subscriber) {
47+
48+
private T value;
49+
private boolean isNonEmpty = false;
50+
private boolean hasTooManyElemenets = false;
51+
52+
@Override
53+
public void onNext(T value) {
54+
if (isNonEmpty) {
55+
hasTooManyElemenets = true;
56+
subscriber.onError(new IllegalArgumentException("Sequence contains too many elements"));
57+
} else {
58+
this.value = value;
59+
isNonEmpty = true;
60+
}
61+
}
62+
63+
@Override
64+
public void onCompleted() {
65+
if (hasTooManyElemenets) {
66+
// We has already sent an onError message
67+
} else {
68+
if (isNonEmpty) {
69+
subscriber.onNext(value);
70+
subscriber.onCompleted();
71+
} else {
72+
if (hasDefaultValue) {
73+
subscriber.onNext(defaultValue);
74+
subscriber.onCompleted();
75+
} else {
76+
subscriber.onError(new IllegalArgumentException("Sequence contains no elements"));
77+
}
78+
}
79+
}
80+
}
81+
82+
@Override
83+
public void onError(Throwable e) {
84+
subscriber.onError(e);
85+
}
86+
87+
};
88+
}
89+
90+
}

rxjava-core/src/test/java/rx/operators/OperationSingleTest.java renamed to rxjava-core/src/test/java/rx/operators/OperatorSingleTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import rx.Observer;
2626
import rx.functions.Func1;
2727

28-
public class OperationSingleTest {
28+
public class OperatorSingleTest {
2929

3030
@Test
3131
public void testSingle() {

0 commit comments

Comments
 (0)