Skip to content

Commit f0fea5f

Browse files
committed
Implement ElementAt as Operator
1 parent 11a18aa commit f0fea5f

File tree

2 files changed

+13
-15
lines changed

2 files changed

+13
-15
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4466,7 +4466,7 @@ public final void onNext(T args) {
44664466
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementat">RxJava Wiki: elementAt()</a>
44674467
*/
44684468
public final Observable<T> elementAt(int index) {
4469-
return create(new OperatorElementAt<T>(this, index));
4469+
return lift(new OperatorElementAt<T>(index));
44704470
}
44714471

44724472
/**
@@ -4486,7 +4486,7 @@ public final Observable<T> elementAt(int index) {
44864486
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementatordefault">RxJava Wiki: elementAtOrDefault()</a>
44874487
*/
44884488
public final Observable<T> elementAtOrDefault(int index, T defaultValue) {
4489-
return create(new OperatorElementAt<T>(this, index, defaultValue));
4489+
return lift(new OperatorElementAt<T>(index, defaultValue));
44904490
}
44914491

44924492
/**

rxjava-core/src/main/java/rx/operators/OperatorElementAt.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,38 @@
1515
*/
1616
package rx.operators;
1717

18-
import rx.Observable;
19-
import rx.Observable.OnSubscribe;
18+
import rx.Observable.Operator;
2019
import rx.Subscriber;
2120

2221
/**
2322
* Returns the element at a specified index in a sequence.
2423
*/
25-
public final class OperatorElementAt<T> implements OnSubscribe<T> {
24+
public final class OperatorElementAt<T> implements Operator<T, T> {
2625

27-
private final Observable<? extends T> source;
2826
private final int index;
2927
private final boolean hasDefault;
3028
private final T defaultValue;
3129

32-
public OperatorElementAt(Observable<? extends T> source, int index) {
33-
this(source, index, null, false);
30+
public OperatorElementAt(int index) {
31+
this(index, null, false);
3432
}
3533

36-
public OperatorElementAt(Observable<? extends T> source, int index, T defaultValue) {
37-
this(source, index, defaultValue, true);
34+
public OperatorElementAt(int index, T defaultValue) {
35+
this(index, defaultValue, true);
3836
}
3937

40-
private OperatorElementAt(Observable<? extends T> source, int index, T defaultValue, boolean hasDefault) {
38+
private OperatorElementAt(int index, T defaultValue, boolean hasDefault) {
4139
if (index < 0) {
4240
throw new IndexOutOfBoundsException(index + " is out of bounds");
4341
}
44-
this.source = source;
4542
this.index = index;
4643
this.defaultValue = defaultValue;
4744
this.hasDefault = hasDefault;
4845
}
4946

5047
@Override
51-
public void call(final Subscriber<? super T> subscriber) {
52-
source.subscribe(new Subscriber<T>(subscriber) {
48+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
49+
return new Subscriber<T>(subscriber) {
5350

5451
private int currentIndex = 0;
5552

@@ -79,6 +76,7 @@ public void onCompleted() {
7976
}
8077
}
8178
}
82-
});
79+
};
8380
}
81+
8482
}

0 commit comments

Comments
 (0)