Skip to content

Commit 25555b4

Browse files
committed
Reimplement the ElementAt operator
1 parent bcded86 commit 25555b4

File tree

5 files changed

+147
-267
lines changed

5 files changed

+147
-267
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import rx.operators.OperationDematerialize;
6767
import rx.operators.OperationDistinct;
6868
import rx.operators.OperationDistinctUntilChanged;
69-
import rx.operators.OperationElementAt;
69+
import rx.operators.OperatorElementAt;
7070
import rx.operators.OperationFinally;
7171
import rx.operators.OperationFlatMap;
7272
import rx.operators.OperationGroupByUntil;
@@ -4466,7 +4466,7 @@ public final void onNext(T args) {
44664466
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementat">RxJava Wiki: elementAt()</a>
44674467
*/
44684468
public final Observable<T> elementAt(int index) {
4469-
return create(OperationElementAt.elementAt(this, index));
4469+
return create(new OperatorElementAt<T>(this, index));
44704470
}
44714471

44724472
/**
@@ -4486,7 +4486,7 @@ public final Observable<T> elementAt(int index) {
44864486
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementatordefault">RxJava Wiki: elementAtOrDefault()</a>
44874487
*/
44884488
public final Observable<T> elementAtOrDefault(int index, T defaultValue) {
4489-
return create(OperationElementAt.elementAtOrDefault(this, index, defaultValue));
4489+
return create(new OperatorElementAt<T>(this, index, defaultValue));
44904490
}
44914491

44924492
/**

rxjava-core/src/main/java/rx/operators/OperationElementAt.java

Lines changed: 0 additions & 137 deletions
This file was deleted.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.OnSubscribe;
20+
import rx.Subscriber;
21+
22+
/**
23+
* Returns the element at a specified index in a sequence.
24+
*/
25+
public final class OperatorElementAt<T> implements OnSubscribe<T> {
26+
27+
private final Observable<? extends T> source;
28+
private final int index;
29+
private final boolean hasDefault;
30+
private final T defaultValue;
31+
32+
public OperatorElementAt(Observable<? extends T> source, int index) {
33+
this(source, index, null, false);
34+
}
35+
36+
public OperatorElementAt(Observable<? extends T> source, int index, T defaultValue) {
37+
this(source, index, defaultValue, true);
38+
}
39+
40+
private OperatorElementAt(Observable<? extends T> source, int index, T defaultValue, boolean hasDefault) {
41+
if (index < 0) {
42+
throw new IndexOutOfBoundsException(index + " is out of bounds");
43+
}
44+
this.source = source;
45+
this.index = index;
46+
this.defaultValue = defaultValue;
47+
this.hasDefault = hasDefault;
48+
}
49+
50+
@Override
51+
public void call(final Subscriber<? super T> subscriber) {
52+
source.subscribe(new Subscriber<T>(subscriber) {
53+
54+
private int currentIndex = 0;
55+
56+
@Override
57+
public void onNext(T value) {
58+
if (currentIndex == index) {
59+
subscriber.onNext(value);
60+
subscriber.onCompleted();
61+
}
62+
currentIndex++;
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
subscriber.onError(e);
68+
}
69+
70+
@Override
71+
public void onCompleted() {
72+
if (currentIndex <= index) {
73+
// If "subscriber.onNext(value)" is called, currentIndex must be greater than index
74+
if (hasDefault) {
75+
subscriber.onNext(defaultValue);
76+
subscriber.onCompleted();
77+
} else {
78+
subscriber.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
79+
}
80+
}
81+
}
82+
});
83+
}
84+
}

rxjava-core/src/test/java/rx/operators/OperationElementAtTest.java

Lines changed: 0 additions & 127 deletions
This file was deleted.

0 commit comments

Comments
 (0)