Skip to content

Commit 04a9e05

Browse files
committed
Added: BO.Latest, fixed: BO.next, BO.mostRecent, BO.toIterable
1 parent 7ec374c commit 04a9e05

File tree

9 files changed

+495
-23
lines changed

9 files changed

+495
-23
lines changed

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Observable;
2424
import rx.Observer;
2525
import rx.Subscription;
26+
import rx.operators.OperationLatest;
2627
import rx.operators.OperationMostRecent;
2728
import rx.operators.OperationNext;
2829
import rx.operators.OperationToFuture;
@@ -266,6 +267,24 @@ public Iterable<T> next() {
266267
return OperationNext.next(o);
267268
}
268269

270+
/**
271+
* Returns the latest item emitted by the underlying Observable, waiting if necessary
272+
* for one to become available.
273+
* <p>
274+
* If the underlying observable produces items faster than the Iterator.next() takes them
275+
* onNext events might be skipped, but onError or onCompleted events are not.
276+
* <p>
277+
* The difference between BlockingObservable.next() and BlockingObservable.latest() is that
278+
* the former does not overwrite untaken values whereas the latter does.
279+
* <p>
280+
* Note also that an onNext() directly followed by onCompleted() might hide the given onNext() event.
281+
*
282+
* @return the Iterable sequence
283+
*/
284+
public Iterable<T> latest() {
285+
return OperationLatest.latest(o);
286+
}
287+
269288
/**
270289
* If the {@link Observable} completes after emitting a single item, return that item,
271290
* otherwise throw an exception.
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.Iterator;
19+
import java.util.NoSuchElementException;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.locks.Lock;
22+
import java.util.concurrent.locks.ReentrantLock;
23+
import rx.Notification;
24+
import rx.Observable;
25+
import rx.Observer;
26+
import rx.util.Exceptions;
27+
28+
/**
29+
* Wait for and iterate over the latest values of the source observable.
30+
* If the source works faster than the iterator, values may be skipped, but
31+
* not the onError or onCompleted events.
32+
*/
33+
public final class OperationLatest {
34+
/** Utility class. */
35+
private OperationLatest() { throw new IllegalStateException("No instances!"); }
36+
37+
public static <T> Iterable<T> latest(final Observable<? extends T> source) {
38+
return new Iterable<T>() {
39+
@Override
40+
public Iterator<T> iterator() {
41+
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
42+
source.subscribe(lio);
43+
return lio;
44+
}
45+
};
46+
}
47+
48+
/** Observer of source, iterator for output. */
49+
static final class LatestObserverIterator<T> implements Observer<T>, Iterator<T> {
50+
final Lock lock = new ReentrantLock();
51+
final Semaphore notify = new Semaphore(0);
52+
// observer's values
53+
boolean oHasValue;
54+
Notification.Kind oKind;
55+
T oValue;
56+
Throwable oError;
57+
@Override
58+
public void onNext(T args) {
59+
boolean wasntAvailable;
60+
lock.lock();
61+
try {
62+
wasntAvailable = !oHasValue;
63+
oHasValue = true;
64+
oValue = args;
65+
oKind = Notification.Kind.OnNext;
66+
} finally {
67+
lock.unlock();
68+
}
69+
if (wasntAvailable) {
70+
notify.release();
71+
}
72+
}
73+
74+
@Override
75+
public void onError(Throwable e) {
76+
boolean wasntAvailable;
77+
lock.lock();
78+
try {
79+
wasntAvailable = !oHasValue;
80+
oHasValue = true;
81+
oValue = null;
82+
oError = e;
83+
oKind = Notification.Kind.OnError;
84+
} finally {
85+
lock.unlock();
86+
}
87+
if (wasntAvailable) {
88+
notify.release();
89+
}
90+
}
91+
92+
@Override
93+
public void onCompleted() {
94+
boolean wasntAvailable;
95+
lock.lock();
96+
try {
97+
wasntAvailable = !oHasValue;
98+
oHasValue = true;
99+
oValue = null;
100+
oKind = Notification.Kind.OnCompleted;
101+
} finally {
102+
lock.unlock();
103+
}
104+
if (wasntAvailable) {
105+
notify.release();
106+
}
107+
}
108+
109+
// iterator's values
110+
111+
boolean iDone;
112+
boolean iHasValue;
113+
T iValue;
114+
Throwable iError;
115+
Notification.Kind iKind;
116+
117+
@Override
118+
public boolean hasNext() {
119+
if (iError != null) {
120+
Exceptions.propagate(iError);
121+
}
122+
if (!iDone) {
123+
if (!iHasValue) {
124+
try {
125+
notify.acquire();
126+
} catch (InterruptedException ex) {
127+
iError = ex;
128+
iHasValue = true;
129+
iKind = Notification.Kind.OnError;
130+
return true;
131+
}
132+
133+
lock.lock();
134+
try {
135+
iKind = oKind;
136+
switch (oKind) {
137+
case OnNext:
138+
iValue = oValue;
139+
oValue = null; // handover
140+
break;
141+
case OnError:
142+
iError = oError;
143+
oError = null; // handover
144+
if (iError != null) {
145+
Exceptions.propagate(iError);
146+
}
147+
break;
148+
case OnCompleted:
149+
iDone = true;
150+
break;
151+
}
152+
oHasValue = false;
153+
} finally {
154+
lock.unlock();
155+
}
156+
iHasValue = true;
157+
}
158+
}
159+
return !iDone;
160+
}
161+
162+
@Override
163+
public T next() {
164+
if (iKind == Notification.Kind.OnError) {
165+
Exceptions.propagate(iError);
166+
}
167+
if (hasNext()) {
168+
if (iKind == Notification.Kind.OnNext) {
169+
T v = iValue;
170+
iValue = null; // handover
171+
iHasValue = false;
172+
return v;
173+
}
174+
}
175+
throw new NoSuchElementException();
176+
}
177+
178+
@Override
179+
public void remove() {
180+
throw new UnsupportedOperationException("Read-only iterator.");
181+
}
182+
183+
}
184+
}

rxjava-core/src/main/java/rx/operators/OperationMostRecent.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@
3131
*/
3232
public final class OperationMostRecent {
3333

34-
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, T initialValue) {
35-
36-
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
37-
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);
38-
39-
source.subscribe(mostRecentObserver);
34+
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {
4035

4136
return new Iterable<T>() {
4237
@Override
4338
public Iterator<T> iterator() {
39+
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
40+
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);
41+
42+
source.subscribe(mostRecentObserver);
43+
4444
return nextIterator;
4545
}
4646
};

rxjava-core/src/main/java/rx/operators/OperationNext.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,18 @@
3434
public final class OperationNext {
3535

3636
public static <T> Iterable<T> next(final Observable<? extends T> items) {
37-
38-
NextObserver<T> nextObserver = new NextObserver<T>();
39-
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);
40-
41-
items.materialize().subscribe(nextObserver);
42-
4337
return new Iterable<T>() {
4438
@Override
4539
public Iterator<T> iterator() {
40+
NextObserver<T> nextObserver = new NextObserver<T>();
41+
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);
42+
43+
items.materialize().subscribe(nextObserver);
44+
4645
return nextIterator;
4746
}
4847
};
49-
48+
5049
}
5150

5251
private static class NextIterator<T> implements Iterator<T> {

rxjava-core/src/main/java/rx/operators/OperationToIterator.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.operators;
1717

1818
import java.util.Iterator;
19+
import java.util.NoSuchElementException;
1920
import java.util.concurrent.BlockingQueue;
2021
import java.util.concurrent.LinkedBlockingQueue;
2122

@@ -63,27 +64,26 @@ public void onNext(Notification<? extends T> args) {
6364

6465
return new Iterator<T>() {
6566
private Notification<? extends T> buf;
66-
67+
6768
@Override
6869
public boolean hasNext() {
6970
if (buf == null) {
7071
buf = take();
7172
}
73+
if (buf.isOnError()) {
74+
throw Exceptions.propagate(buf.getThrowable());
75+
}
7276
return !buf.isOnCompleted();
7377
}
7478

7579
@Override
7680
public T next() {
77-
if (buf == null) {
78-
buf = take();
81+
if (hasNext()) {
82+
T result = buf.getValue();
83+
buf = null;
84+
return result;
7985
}
80-
if (buf.isOnError()) {
81-
throw Exceptions.propagate(buf.getThrowable());
82-
}
83-
84-
T result = buf.getValue();
85-
buf = null;
86-
return result;
86+
throw new NoSuchElementException();
8787
}
8888

8989
private Notification<? extends T> take() {

rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.util.Iterator;
21+
import java.util.NoSuchElementException;
22+
import org.junit.Assert;
2123

2224
import org.junit.Before;
2325
import org.junit.Test;
@@ -201,6 +203,58 @@ public void testToIterable() {
201203
assertEquals(false, it.hasNext());
202204

203205
}
206+
@Test(expected = NoSuchElementException.class)
207+
public void testToIterableNextOnly() {
208+
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));
209+
210+
Iterator<Integer> it = obs.toIterable().iterator();
211+
212+
Assert.assertEquals((Integer)1, it.next());
213+
Assert.assertEquals((Integer)2, it.next());
214+
Assert.assertEquals((Integer)3, it.next());
215+
216+
it.next();
217+
}
218+
219+
@Test(expected = NoSuchElementException.class)
220+
public void testToIterableNextOnlyTwice() {
221+
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));
222+
223+
Iterator<Integer> it = obs.toIterable().iterator();
224+
225+
Assert.assertEquals((Integer)1, it.next());
226+
Assert.assertEquals((Integer)2, it.next());
227+
Assert.assertEquals((Integer)3, it.next());
228+
229+
boolean exc = false;
230+
try {
231+
it.next();
232+
} catch (NoSuchElementException ex) {
233+
exc = true;
234+
}
235+
Assert.assertEquals(true, exc);
236+
237+
it.next();
238+
}
239+
240+
@Test
241+
public void testToIterableManyTimes() {
242+
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));
243+
244+
Iterable<Integer> iter = obs.toIterable();
245+
246+
for (int j = 0; j < 3; j++) {
247+
Iterator<Integer> it = iter.iterator();
248+
249+
Assert.assertTrue(it.hasNext());
250+
Assert.assertEquals((Integer)1, it.next());
251+
Assert.assertTrue(it.hasNext());
252+
Assert.assertEquals((Integer)2, it.next());
253+
Assert.assertTrue(it.hasNext());
254+
Assert.assertEquals((Integer)3, it.next());
255+
Assert.assertFalse(it.hasNext());
256+
}
257+
}
204258

205259
@Test(expected = TestException.class)
206260
public void testToIterableWithException() {

0 commit comments

Comments
 (0)