Skip to content

Commit e564b56

Browse files
Merge pull request #399 from benjchristensen/pull-390-element-at
Merge Pull #390 - elementAt
2 parents 6c4b573 + 0a14e45 commit e564b56

File tree

3 files changed

+302
-1
lines changed

3 files changed

+302
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import rx.observables.ConnectableObservable;
3030
import rx.observables.GroupedObservable;
3131
import rx.operators.OperationAll;
32+
import rx.operators.OperationAny;
3233
import rx.operators.OperationAverage;
3334
import rx.operators.OperationBuffer;
3435
import rx.operators.OperationCache;
@@ -39,6 +40,7 @@
3940
import rx.operators.OperationDematerialize;
4041
import rx.operators.OperationDistinct;
4142
import rx.operators.OperationDistinctUntilChanged;
43+
import rx.operators.OperationElementAt;
4244
import rx.operators.OperationFilter;
4345
import rx.operators.OperationFinally;
4446
import rx.operators.OperationFirstOrDefault;
@@ -78,7 +80,6 @@
7880
import rx.operators.OperationZip;
7981
import rx.operators.SafeObservableSubscription;
8082
import rx.operators.SafeObserver;
81-
import rx.operators.OperationAny;
8283
import rx.plugins.RxJavaErrorHandler;
8384
import rx.plugins.RxJavaObservableExecutionHook;
8485
import rx.plugins.RxJavaPlugins;
@@ -3139,6 +3140,44 @@ public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector, Com
31393140
return create(OperationDistinct.distinct(this, keySelector, equalityComparator));
31403141
}
31413142

3143+
/**
3144+
* Returns the element at a specified index in a sequence.
3145+
*
3146+
* @param index
3147+
* The zero-based index of the element to retrieve.
3148+
*
3149+
* @return An observable sequence that produces the element at the specified
3150+
* position in the source sequence.
3151+
*
3152+
* @throws IndexOutOfBoundsException
3153+
* Index is greater than or equal to the number of elements in the source sequence.
3154+
* @throws IndexOutOfBoundsException
3155+
* Index is less than 0.
3156+
*/
3157+
public Observable<T> elementAt(int index) {
3158+
return create(OperationElementAt.elementAt(this, index));
3159+
}
3160+
3161+
/**
3162+
* Returns the element at a specified index in a sequence or the default
3163+
* value if the index is out of range.
3164+
*
3165+
* @param index
3166+
* The zero-based index of the element to retrieve.
3167+
* @param defaultValue
3168+
* The default value.
3169+
*
3170+
* @return An observable sequence that produces the element at the specified
3171+
* position in the source sequence, or the default value if the
3172+
* index is outside the bounds of the source sequence.
3173+
*
3174+
* @throws IndexOutOfBoundsException
3175+
* Index is less than 0.
3176+
*/
3177+
public Observable<T> elementAtOrDefault(int index, T defaultValue) {
3178+
return create(OperationElementAt.elementAtOrDefault(this, index, defaultValue));
3179+
}
3180+
31423181
/**
31433182
* Returns an {@link Observable} that emits <code>true</code> if any element of the source {@link Observable} satisfies
31443183
* the given condition, otherwise <code>false</code>. Note: always emit <code>false</code> if the source {@link Observable} is empty.

rxjava-core/src/main/java/rx/operators/OperationAny.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.operators;
217

318
import static org.mockito.Mockito.*;
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.assertTrue;
19+
import static org.junit.Assert.fail;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.any;
22+
import static org.mockito.Mockito.never;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
26+
import java.util.Iterator;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
import org.junit.Test;
31+
32+
import rx.Observable;
33+
import rx.Observable.OnSubscribeFunc;
34+
import rx.Observer;
35+
import rx.Subscription;
36+
37+
/**
38+
* Returns the element at a specified index in a sequence.
39+
*/
40+
public class OperationElementAt {
41+
42+
/**
43+
* Returns the element at a specified index in a sequence.
44+
*
45+
* @param source
46+
* Observable sequence to return the element from.
47+
* @param index
48+
* The zero-based index of the element to retrieve.
49+
*
50+
* @return An observable sequence that produces the element at the specified
51+
* position in the source sequence.
52+
*
53+
* @throws IndexOutOfBoundsException
54+
* Index is greater than or equal to the number of elements in
55+
* the source sequence.
56+
* @throws IndexOutOfBoundsException
57+
* Index is less than 0.
58+
*/
59+
public static <T> OnSubscribeFunc<T> elementAt(Observable<? extends T> source, int index) {
60+
return new ElementAt<T>(source, index, null, false);
61+
}
62+
63+
/**
64+
* Returns the element at a specified index in a sequence or the default
65+
* value if the index is out of range.
66+
*
67+
* @param source
68+
* Observable sequence to return the element from.
69+
* @param index
70+
* The zero-based index of the element to retrieve.
71+
* @param defaultValue
72+
* The default value.
73+
*
74+
* @return An observable sequence that produces the element at the specified
75+
* position in the source sequence, or the default value if the
76+
* index is outside the bounds of the source sequence.
77+
*
78+
* @throws IndexOutOfBoundsException
79+
* Index is less than 0.
80+
*/
81+
public static <T> OnSubscribeFunc<T> elementAtOrDefault(Observable<? extends T> source, int index, T defaultValue) {
82+
return new ElementAt<T>(source, index, defaultValue, true);
83+
}
84+
85+
private static class ElementAt<T> implements OnSubscribeFunc<T> {
86+
87+
private final Observable<? extends T> source;
88+
private final int index;
89+
private final boolean hasDefault;
90+
private final T defaultValue;
91+
92+
private ElementAt(Observable<? extends T> source, int index,
93+
T defaultValue, boolean hasDefault) {
94+
this.source = source;
95+
this.index = index;
96+
this.defaultValue = defaultValue;
97+
this.hasDefault = hasDefault;
98+
}
99+
100+
@Override
101+
public Subscription onSubscribe(final Observer<? super T> observer) {
102+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
103+
return subscription.wrap(source.subscribe(new Observer<T>() {
104+
105+
private AtomicInteger counter = new AtomicInteger();
106+
107+
@Override
108+
public void onNext(T value) {
109+
try {
110+
int currentIndex = counter.getAndIncrement();
111+
if (currentIndex == index) {
112+
observer.onNext(value);
113+
observer.onCompleted();
114+
} else if (currentIndex > index) {
115+
// this will work if the sequence is asynchronous,
116+
// it will have no effect on a synchronous observable
117+
subscription.unsubscribe();
118+
}
119+
} catch (Throwable ex) {
120+
observer.onError(ex);
121+
// this will work if the sequence is asynchronous, it
122+
// will have no effect on a synchronous observable
123+
subscription.unsubscribe();
124+
}
125+
126+
}
127+
128+
@Override
129+
public void onError(Throwable ex) {
130+
observer.onError(ex);
131+
}
132+
133+
@Override
134+
public void onCompleted() {
135+
if (index < 0) {
136+
observer.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
137+
} else if (counter.get() <= index) {
138+
if (hasDefault) {
139+
observer.onNext(defaultValue);
140+
observer.onCompleted();
141+
} else {
142+
observer.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
143+
}
144+
}
145+
}
146+
}));
147+
}
148+
}
149+
150+
public static class UnitTest {
151+
152+
@Test
153+
public void testElementAt() {
154+
Observable<Integer> w = Observable.from(1, 2);
155+
Observable<Integer> observable = Observable.create(elementAt(w, 1));
156+
157+
@SuppressWarnings("unchecked")
158+
Observer<Integer> aObserver = mock(Observer.class);
159+
observable.subscribe(aObserver);
160+
verify(aObserver, never()).onNext(1);
161+
verify(aObserver, times(1)).onNext(2);
162+
verify(aObserver, never()).onError(
163+
any(Throwable.class));
164+
verify(aObserver, times(1)).onCompleted();
165+
}
166+
167+
@Test
168+
public void testElementAtWithMinusIndex() {
169+
Observable<Integer> w = Observable.from(1, 2);
170+
Observable<Integer> observable = Observable
171+
.create(elementAt(w, -1));
172+
173+
try {
174+
Iterator<Integer> iter = OperationToIterator
175+
.toIterator(observable);
176+
assertTrue(iter.hasNext());
177+
iter.next();
178+
fail("expect an IndexOutOfBoundsException when index is out of bounds");
179+
} catch (IndexOutOfBoundsException e) {
180+
}
181+
}
182+
183+
@Test
184+
public void testElementAtWithIndexOutOfBounds()
185+
throws InterruptedException, ExecutionException {
186+
Observable<Integer> w = Observable.from(1, 2);
187+
Observable<Integer> observable = Observable.create(elementAt(w, 2));
188+
try {
189+
Iterator<Integer> iter = OperationToIterator
190+
.toIterator(observable);
191+
assertTrue(iter.hasNext());
192+
iter.next();
193+
fail("expect an IndexOutOfBoundsException when index is out of bounds");
194+
} catch (IndexOutOfBoundsException e) {
195+
}
196+
}
197+
198+
@Test
199+
public void testElementAtOrDefault() throws InterruptedException,
200+
ExecutionException {
201+
Observable<Integer> w = Observable.from(1, 2);
202+
Observable<Integer> observable = Observable
203+
.create(elementAtOrDefault(w, 1, 0));
204+
205+
@SuppressWarnings("unchecked")
206+
Observer<Integer> aObserver = mock(Observer.class);
207+
observable.subscribe(aObserver);
208+
verify(aObserver, never()).onNext(1);
209+
verify(aObserver, times(1)).onNext(2);
210+
verify(aObserver, never()).onError(any(Throwable.class));
211+
verify(aObserver, times(1)).onCompleted();
212+
}
213+
214+
@Test
215+
public void testElementAtOrDefaultWithIndexOutOfBounds()
216+
throws InterruptedException, ExecutionException {
217+
Observable<Integer> w = Observable.from(1, 2);
218+
Observable<Integer> observable = Observable
219+
.create(elementAtOrDefault(w, 2, 0));
220+
221+
@SuppressWarnings("unchecked")
222+
Observer<Integer> aObserver = mock(Observer.class);
223+
observable.subscribe(aObserver);
224+
verify(aObserver, never()).onNext(1);
225+
verify(aObserver, never()).onNext(2);
226+
verify(aObserver, times(1)).onNext(0);
227+
verify(aObserver, never()).onError(any(Throwable.class));
228+
verify(aObserver, times(1)).onCompleted();
229+
}
230+
231+
@Test
232+
public void testElementAtOrDefaultWithMinusIndex() {
233+
Observable<Integer> w = Observable.from(1, 2);
234+
Observable<Integer> observable = Observable
235+
.create(elementAtOrDefault(w, -1, 0));
236+
237+
try {
238+
Iterator<Integer> iter = OperationToIterator
239+
.toIterator(observable);
240+
assertTrue(iter.hasNext());
241+
iter.next();
242+
fail("expect an IndexOutOfBoundsException when index is out of bounds");
243+
} catch (IndexOutOfBoundsException e) {
244+
}
245+
}
246+
}
247+
}

0 commit comments

Comments
 (0)