Skip to content

Commit d6732ad

Browse files
Merge pull request #966 from zsxwing/elementAt
Reimplement the ElementAt operator and add it to rxjava-scala
2 parents d1ac686 + f0fea5f commit d6732ad

File tree

7 files changed

+213
-267
lines changed

7 files changed

+213
-267
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,4 +564,13 @@ class RxScalaDemo extends JUnitSuite {
564564
println(result)
565565
}
566566

567+
@Test def elementAtExample(): Unit = {
568+
val o = List("red", "green", "blue").toObservable
569+
println(o(2).toBlockingObservable.single)
570+
}
571+
572+
@Test def elementAtOrDefaultExample(): Unit = {
573+
val o : Observable[Seq[Char]] = List("red".toList, "green".toList, "blue".toList).toObservable.elementAtOrDefault(3, "black".toSeq)
574+
println(o.toBlockingObservable.single)
575+
}
567576
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2305,6 +2305,65 @@ trait Observable[+T]
23052305
def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = {
23062306
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler))
23072307
}
2308+
2309+
/**
2310+
* Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
2311+
* source Observbable.
2312+
*
2313+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/elementAt.png">
2314+
*
2315+
* @param index
2316+
* the zero-based index of the item to retrieve
2317+
* @return an Observable that emits a single item: the item at the specified position in the sequence of
2318+
* those emitted by the source Observable
2319+
* @throws IndexOutOfBoundsException
2320+
* if index is greater than or equal to the number of items emitted by the source
2321+
* Observable
2322+
* @throws IndexOutOfBoundsException
2323+
* if index is less than 0
2324+
* @see `Observable.elementAt`
2325+
*/
2326+
def apply(index: Int): Observable[T] = elementAt(index)
2327+
2328+
/**
2329+
* Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
2330+
* source Observbable.
2331+
*
2332+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/elementAt.png">
2333+
*
2334+
* @param index
2335+
* the zero-based index of the item to retrieve
2336+
* @return an Observable that emits a single item: the item at the specified position in the sequence of
2337+
* those emitted by the source Observable
2338+
* @throws IndexOutOfBoundsException
2339+
* if index is greater than or equal to the number of items emitted by the source
2340+
* Observable
2341+
* @throws IndexOutOfBoundsException
2342+
* if index is less than 0
2343+
*/
2344+
def elementAt(index: Int): Observable[T] = {
2345+
toScalaObservable[T](asJavaObservable.elementAt(index))
2346+
}
2347+
2348+
/**
2349+
* Returns an Observable that emits the item found at a specified index in a sequence of emissions from a
2350+
* source Observable, or a default item if that index is out of range.
2351+
*
2352+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/elementAtOrDefault.png">
2353+
*
2354+
* @param index
2355+
* the zero-based index of the item to retrieve
2356+
* @param defaultValue
2357+
* the default item
2358+
* @return an Observable that emits the item at the specified position in the sequence emitted by the source
2359+
* Observable, or the default item if that index is outside the bounds of the source sequence
2360+
* @throws IndexOutOfBoundsException
2361+
* if {@code index} is less than 0
2362+
*/
2363+
def elementAtOrDefault[U >: T](index: Int, default: U): Observable[U] = {
2364+
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[U]]
2365+
toScalaObservable[U](thisJava.elementAtOrDefault(index, default))
2366+
}
23082367
}
23092368

23102369
/**

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import rx.operators.OperationDematerialize;
6767
import rx.operators.OperationDistinct;
6868
import rx.operators.OperationDistinctUntilChanged;
69-
import rx.operators.OperationElementAt;
69+
import rx.operators.OperatorElementAt;
7070
import rx.operators.OperationFinally;
7171
import rx.operators.OperationFlatMap;
7272
import rx.operators.OperationGroupByUntil;
@@ -4510,7 +4510,7 @@ public final void onNext(T args) {
45104510
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementat">RxJava Wiki: elementAt()</a>
45114511
*/
45124512
public final Observable<T> elementAt(int index) {
4513-
return create(OperationElementAt.elementAt(this, index));
4513+
return lift(new OperatorElementAt<T>(index));
45144514
}
45154515

45164516
/**
@@ -4530,7 +4530,7 @@ public final Observable<T> elementAt(int index) {
45304530
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementatordefault">RxJava Wiki: elementAtOrDefault()</a>
45314531
*/
45324532
public final Observable<T> elementAtOrDefault(int index, T defaultValue) {
4533-
return create(OperationElementAt.elementAtOrDefault(this, index, defaultValue));
4533+
return lift(new OperatorElementAt<T>(index, defaultValue));
45344534
}
45354535

45364536
/**

rxjava-core/src/main/java/rx/operators/OperationElementAt.java

Lines changed: 0 additions & 137 deletions
This file was deleted.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
21+
/**
22+
* Returns the element at a specified index in a sequence.
23+
*/
24+
public final class OperatorElementAt<T> implements Operator<T, T> {
25+
26+
private final int index;
27+
private final boolean hasDefault;
28+
private final T defaultValue;
29+
30+
public OperatorElementAt(int index) {
31+
this(index, null, false);
32+
}
33+
34+
public OperatorElementAt(int index, T defaultValue) {
35+
this(index, defaultValue, true);
36+
}
37+
38+
private OperatorElementAt(int index, T defaultValue, boolean hasDefault) {
39+
if (index < 0) {
40+
throw new IndexOutOfBoundsException(index + " is out of bounds");
41+
}
42+
this.index = index;
43+
this.defaultValue = defaultValue;
44+
this.hasDefault = hasDefault;
45+
}
46+
47+
@Override
48+
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
49+
return new Subscriber<T>(subscriber) {
50+
51+
private int currentIndex = 0;
52+
53+
@Override
54+
public void onNext(T value) {
55+
if (currentIndex == index) {
56+
subscriber.onNext(value);
57+
subscriber.onCompleted();
58+
}
59+
currentIndex++;
60+
}
61+
62+
@Override
63+
public void onError(Throwable e) {
64+
subscriber.onError(e);
65+
}
66+
67+
@Override
68+
public void onCompleted() {
69+
if (currentIndex <= index) {
70+
// If "subscriber.onNext(value)" is called, currentIndex must be greater than index
71+
if (hasDefault) {
72+
subscriber.onNext(defaultValue);
73+
subscriber.onCompleted();
74+
} else {
75+
subscriber.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
76+
}
77+
}
78+
}
79+
};
80+
}
81+
82+
}

0 commit comments

Comments
 (0)