Skip to content

Commit 5df8e3c

Browse files
committed
Implemented the 'elementAt' and 'elementAtOrDefault' operators. see #41
1 parent 00d7c3b commit 5df8e3c

File tree

2 files changed

+279
-0
lines changed

2 files changed

+279
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import rx.operators.OperationDematerialize;
3939
import rx.operators.OperationDistinctUntilChanged;
4040
import rx.operators.OperationDistinct;
41+
import rx.operators.OperationElementAt;
4142
import rx.operators.OperationFilter;
4243
import rx.operators.OperationFinally;
4344
import rx.operators.OperationFirstOrDefault;
@@ -4186,5 +4187,45 @@ private boolean isInternalImplementation(Object o) {
41864187
Package p = o.getClass().getPackage(); // it can be null
41874188
return p != null && p.getName().startsWith("rx.operators");
41884189
}
4190+
4191+
/**
4192+
* Returns the element at a specified index in a sequence.
4193+
*
4194+
* @param index
4195+
* The zero-based index of the element to retrieve.
4196+
*
4197+
* @return An observable sequence that produces the element at the specified
4198+
* position in the source sequence.
4199+
*
4200+
* @throws IndexOutOfBoundsException
4201+
* Index is greater than or equal to the number of elements in
4202+
* the source sequence.
4203+
* @throws IndexOutOfBoundsException
4204+
* Index is less than 0.
4205+
*/
4206+
public Observable<T> elementAt(int index) {
4207+
return create(OperationElementAt.elementAt(this, index));
4208+
}
41894209

4210+
/**
4211+
* Returns the element at a specified index in a sequence or the default
4212+
* value if the index is out of range.
4213+
*
4214+
* @param index
4215+
* The zero-based index of the element to retrieve.
4216+
* @param defaultValue
4217+
* The default value.
4218+
*
4219+
* @return An observable sequence that produces the element at the specified
4220+
* position in the source sequence, or the default value if the
4221+
* index is outside the bounds of the source sequence.
4222+
*
4223+
* @throws IndexOutOfBoundsException
4224+
* Index is less than 0.
4225+
*/
4226+
public Observable<T> elementAtOrDefault(int index, T defaultValue) {
4227+
return create(OperationElementAt.elementAtOrDefault(this, index,
4228+
defaultValue));
4229+
}
4230+
41904231
}
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package rx.operators;
2+
3+
import static org.junit.Assert.assertTrue;
4+
import static org.junit.Assert.fail;
5+
import static org.mockito.Mockito.mock;
6+
import static org.mockito.Mockito.never;
7+
import static org.mockito.Mockito.times;
8+
import static org.mockito.Mockito.verify;
9+
10+
import java.util.Iterator;
11+
import java.util.concurrent.ExecutionException;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
import org.junit.Test;
15+
16+
import rx.Observable;
17+
import rx.Observable.OnSubscribeFunc;
18+
import rx.Observer;
19+
import rx.Subscription;
20+
21+
/**
22+
* Returns the element at a specified index in a sequence.
23+
*/
24+
public class OperationElementAt {
25+
26+
/**
27+
* Returns the element at a specified index in a sequence.
28+
*
29+
* @param source
30+
* Observable sequence to return the element from.
31+
* @param index
32+
* The zero-based index of the element to retrieve.
33+
*
34+
* @return An observable sequence that produces the element at the specified
35+
* position in the source sequence.
36+
*
37+
* @throws IndexOutOfBoundsException
38+
* Index is greater than or equal to the number of elements in
39+
* the source sequence.
40+
* @throws IndexOutOfBoundsException
41+
* Index is less than 0.
42+
*/
43+
public static <T> OnSubscribeFunc<T> elementAt(
44+
Observable<? extends T> source, int index) {
45+
return new ElementAt<T>(source, index, null, false);
46+
}
47+
48+
/**
49+
* Returns the element at a specified index in a sequence or the default
50+
* value if the index is out of range.
51+
*
52+
* @param source
53+
* Observable sequence to return the element from.
54+
* @param index
55+
* The zero-based index of the element to retrieve.
56+
* @param defaultValue
57+
* The default value.
58+
*
59+
* @return An observable sequence that produces the element at the specified
60+
* position in the source sequence, or the default value if the
61+
* index is outside the bounds of the source sequence.
62+
*
63+
* @throws IndexOutOfBoundsException
64+
* Index is less than 0.
65+
*/
66+
public static <T> OnSubscribeFunc<T> elementAtOrDefault(
67+
Observable<? extends T> source, int index, T defaultValue) {
68+
return new ElementAt<T>(source, index, defaultValue, true);
69+
}
70+
71+
private static class ElementAt<T> implements OnSubscribeFunc<T> {
72+
73+
private final Observable<? extends T> source;
74+
private final int index;
75+
private final boolean hasDefault;
76+
private final T defaultValue;
77+
78+
private ElementAt(Observable<? extends T> source, int index,
79+
T defaultValue, boolean hasDefault) {
80+
this.source = source;
81+
this.index = index;
82+
this.defaultValue = defaultValue;
83+
this.hasDefault = hasDefault;
84+
}
85+
86+
@Override
87+
public Subscription onSubscribe(final Observer<? super T> observer) {
88+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
89+
return subscription.wrap(source.subscribe(new Observer<T>() {
90+
91+
private AtomicInteger counter = new AtomicInteger();
92+
93+
@Override
94+
public void onNext(T value) {
95+
try {
96+
int currentIndex = counter.getAndIncrement();
97+
if (currentIndex == index) {
98+
observer.onNext(value);
99+
observer.onCompleted();
100+
} else if (currentIndex > index) {
101+
// this will work if the sequence is asynchronous,
102+
// it will have no effect on a synchronous
103+
// observable
104+
subscription.unsubscribe();
105+
}
106+
} catch (Throwable ex) {
107+
observer.onError(ex);
108+
// this will work if the sequence is asynchronous, it
109+
// will have no effect on a synchronous observable
110+
subscription.unsubscribe();
111+
}
112+
113+
}
114+
115+
@Override
116+
public void onError(Throwable ex) {
117+
observer.onError(ex);
118+
}
119+
120+
@Override
121+
public void onCompleted() {
122+
if (index < 0) {
123+
observer.onError(new IndexOutOfBoundsException(index
124+
+ " is out of bounds"));
125+
} else if (counter.get() <= index) {
126+
if (hasDefault) {
127+
observer.onNext(defaultValue);
128+
observer.onCompleted();
129+
} else {
130+
observer.onError(new IndexOutOfBoundsException(
131+
index + " is out of bounds"));
132+
}
133+
}
134+
}
135+
}));
136+
}
137+
}
138+
139+
public static class UnitTest {
140+
141+
@Test
142+
public void testElementAt() {
143+
Observable<Integer> w = Observable.from(1, 2);
144+
Observable<Integer> observable = Observable.create(elementAt(w, 1));
145+
146+
@SuppressWarnings("unchecked")
147+
Observer<Integer> aObserver = mock(Observer.class);
148+
observable.subscribe(aObserver);
149+
verify(aObserver, never()).onNext(1);
150+
verify(aObserver, times(1)).onNext(2);
151+
verify(aObserver, never()).onError(
152+
org.mockito.Matchers.any(Throwable.class));
153+
verify(aObserver, times(1)).onCompleted();
154+
}
155+
156+
@Test
157+
public void testElementAtWithMinusIndex() {
158+
Observable<Integer> w = Observable.from(1, 2);
159+
Observable<Integer> observable = Observable
160+
.create(elementAt(w, -1));
161+
162+
try {
163+
Iterator<Integer> iter = OperationToIterator
164+
.toIterator(observable);
165+
assertTrue(iter.hasNext());
166+
iter.next();
167+
fail("expect an IndexOutOfBoundsException when index is out of bounds");
168+
} catch (IndexOutOfBoundsException e) {
169+
}
170+
}
171+
172+
@Test
173+
public void testElementAtWithIndexOutOfBounds()
174+
throws InterruptedException, ExecutionException {
175+
Observable<Integer> w = Observable.from(1, 2);
176+
Observable<Integer> observable = Observable.create(elementAt(w, 2));
177+
try {
178+
Iterator<Integer> iter = OperationToIterator
179+
.toIterator(observable);
180+
assertTrue(iter.hasNext());
181+
iter.next();
182+
fail("expect an IndexOutOfBoundsException when index is out of bounds");
183+
} catch (IndexOutOfBoundsException e) {
184+
}
185+
}
186+
187+
@Test
188+
public void testElementAtOrDefault() throws InterruptedException,
189+
ExecutionException {
190+
Observable<Integer> w = Observable.from(1, 2);
191+
Observable<Integer> observable = Observable
192+
.create(elementAtOrDefault(w, 1, 0));
193+
194+
@SuppressWarnings("unchecked")
195+
Observer<Integer> aObserver = mock(Observer.class);
196+
observable.subscribe(aObserver);
197+
verify(aObserver, never()).onNext(1);
198+
verify(aObserver, times(1)).onNext(2);
199+
verify(aObserver, never()).onError(
200+
org.mockito.Matchers.any(Throwable.class));
201+
verify(aObserver, times(1)).onCompleted();
202+
}
203+
204+
@Test
205+
public void testElementAtOrDefaultWithIndexOutOfBounds()
206+
throws InterruptedException, ExecutionException {
207+
Observable<Integer> w = Observable.from(1, 2);
208+
Observable<Integer> observable = Observable
209+
.create(elementAtOrDefault(w, 2, 0));
210+
211+
@SuppressWarnings("unchecked")
212+
Observer<Integer> aObserver = mock(Observer.class);
213+
observable.subscribe(aObserver);
214+
verify(aObserver, never()).onNext(1);
215+
verify(aObserver, never()).onNext(2);
216+
verify(aObserver, times(1)).onNext(0);
217+
verify(aObserver, never()).onError(
218+
org.mockito.Matchers.any(Throwable.class));
219+
verify(aObserver, times(1)).onCompleted();
220+
}
221+
222+
@Test
223+
public void testElementAtOrDefaultWithMinusIndex() {
224+
Observable<Integer> w = Observable.from(1, 2);
225+
Observable<Integer> observable = Observable
226+
.create(elementAtOrDefault(w, -1, 0));
227+
228+
try {
229+
Iterator<Integer> iter = OperationToIterator
230+
.toIterator(observable);
231+
assertTrue(iter.hasNext());
232+
iter.next();
233+
fail("expect an IndexOutOfBoundsException when index is out of bounds");
234+
} catch (IndexOutOfBoundsException e) {
235+
}
236+
}
237+
}
238+
}

0 commit comments

Comments
 (0)