Skip to content

Commit a2404bf

Browse files
Merge pull request #303 from benjchristensen/pull-267-combineLatest
Pull 267 - Merge combineLatest
2 parents c9f2c13 + c88f36e commit a2404bf

File tree

3 files changed

+125
-96
lines changed

3 files changed

+125
-96
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import rx.operators.AtomicObserver;
4444
import rx.operators.OperationAll;
4545
import rx.operators.OperationCache;
46+
import rx.operators.OperationCombineLatest;
4647
import rx.operators.OperationConcat;
4748
import rx.operators.OperationDefer;
4849
import rx.operators.OperationDematerialize;
@@ -2803,7 +2804,36 @@ public static <R> Observable<R> zip(Collection<Observable<?>> ws, final Object f
28032804
}
28042805

28052806
/**
2806-
* Filters an Observable by discarding any items it emits that do not meet some test.
2807+
* Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables
2808+
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
2809+
* @param w0
2810+
* The first source observable.
2811+
* @param w1
2812+
* The second source observable.
2813+
* @param combineFunction
2814+
* The aggregation function used to combine the source observable values.
2815+
* @return A function from an observer to a subscription. This can be used to create an observable from.
2816+
*/
2817+
public static <R, T0, T1> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
2818+
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
2819+
}
2820+
2821+
/**
2822+
* @see #combineLatest(Observable, Observable, Func2)
2823+
*/
2824+
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineFunction) {
2825+
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
2826+
}
2827+
2828+
/**
2829+
* @see #combineLatest(Observable, Observable, Func2)
2830+
*/
2831+
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
2832+
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
2833+
}
2834+
2835+
/**
2836+
* Filters an Observable by discarding any of its emissions that do not meet some test.
28072837
* <p>
28082838
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/filter.png">
28092839
*

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 92 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22-
import java.util.HashMap;
23-
import java.util.HashSet;
2422
import java.util.LinkedList;
2523
import java.util.List;
2624
import java.util.Map;
27-
import java.util.Set;
25+
import java.util.concurrent.ConcurrentHashMap;
2826
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicInteger;
2928

3029
import org.junit.Test;
3130
import org.mockito.InOrder;
31+
import org.mockito.Matchers;
3232

3333
import rx.Observable;
3434
import rx.Observer;
@@ -52,16 +52,17 @@
5252
public class OperationCombineLatest {
5353

5454
/**
55-
* Combines the two given Observables, emitting an item that aggregates the latest values of
56-
* each of the source Observables each time an item is received from either of the source
57-
* Observables, where the aggregation is defined by the given function.
58-
* @param w0 the first source Observable.
59-
* @param w1 the second source Observable.
60-
* @param combineLatestFunction the aggregation function that combines the source Observable
61-
* items.
62-
* @return a function from an Observer to a Subscription. This can be used to create an Observable.
55+
* Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables
56+
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
57+
* @param w0
58+
* The first source observable.
59+
* @param w1
60+
* The second source observable.
61+
* @param combineLatestFunction
62+
* The aggregation function used to combine the source observable values.
63+
* @return A function from an observer to a subscription. This can be used to create an observable from.
6364
*/
64-
public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction) {
65+
public static <T0, T1, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineLatestFunction) {
6566
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
6667
a.addObserver(new CombineObserver<R, T0>(a, w0));
6768
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -71,7 +72,7 @@ public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observa
7172
/**
7273
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
7374
*/
74-
public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineLatestFunction) {
75+
public static <T0, T1, T2, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineLatestFunction) {
7576
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
7677
a.addObserver(new CombineObserver<R, T0>(a, w0));
7778
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -82,7 +83,7 @@ public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Obs
8283
/**
8384
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
8485
*/
85-
public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineLatestFunction) {
86+
public static <T0, T1, T2, T3, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineLatestFunction) {
8687
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
8788
a.addObserver(new CombineObserver<R, T0>(a, w0));
8889
a.addObserver(new CombineObserver<R, T1>(a, w1));
@@ -92,16 +93,16 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest
9293
}
9394

9495
private static class CombineObserver<R, T> implements Observer<T> {
95-
final Observable<T> w;
96-
final Aggregator<R> a;
96+
final Observable<? super T> w;
97+
final Aggregator<? super R> a;
9798
private Subscription subscription;
9899

99-
public CombineObserver(Aggregator<R> a, Observable<T> w) {
100+
public CombineObserver(Aggregator<? super R> a, Observable<? super T> w) {
100101
this.a = a;
101102
this.w = w;
102103
}
103104

104-
public synchronized void startWatching() {
105+
private void startWatching() {
105106
if (subscription != null) {
106107
throw new RuntimeException("This should only be called once.");
107108
}
@@ -129,44 +130,28 @@ public void onNext(T args) {
129130
* whenever we have received an event from one of the observables, as soon as each Observable has received
130131
* at least one event.
131132
*/
132-
private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
133+
private static class Aggregator<R> implements Func1<Observer<? super R>, Subscription> {
133134

134-
private Observer<R> observer;
135+
private volatile Observer<? super R> observer;
135136

136-
private final FuncN<R> combineLatestFunction;
137+
private final FuncN<? extends R> combineLatestFunction;
137138
private final AtomicBoolean running = new AtomicBoolean(true);
138-
139-
// used as an internal lock for handling the latest values and the completed state of each observer
140-
private final Object lockObject = new Object();
141139

142-
/**
143-
* Store when an observer completes.
144-
* <p>
145-
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
146-
* */
147-
private final Set<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();
148-
149-
/**
150-
* The latest value from each observer
151-
* <p>
152-
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
153-
* */
154-
private final Map<CombineObserver<R, ?>, Object> latestValue = new HashMap<CombineObserver<R, ?>, Object>();
140+
// Stores how many observers have already completed
141+
private final AtomicInteger numCompleted = new AtomicInteger(0);
155142

156143
/**
157-
* Whether each observer has a latest value at all.
158-
* <p>
159-
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
160-
* */
161-
private final Set<CombineObserver<R, ?>> hasLatestValue = new HashSet<CombineObserver<R, ?>>();
162-
144+
* The latest value from each observer.
145+
*/
146+
private final Map<CombineObserver<? extends R, ?>, Object> latestValue = new ConcurrentHashMap<CombineObserver<? extends R, ?>, Object>();
147+
163148
/**
164149
* Ordered list of observers to combine.
165150
* No synchronization is necessary as these can not be added or changed asynchronously.
166151
*/
167152
private final List<CombineObserver<R, ?>> observers = new LinkedList<CombineObserver<R, ?>>();
168153

169-
public Aggregator(FuncN<R> combineLatestFunction) {
154+
public Aggregator(FuncN<? extends R> combineLatestFunction) {
170155
this.combineLatestFunction = combineLatestFunction;
171156
}
172157

@@ -184,18 +169,15 @@ <T> void addObserver(CombineObserver<R, T> w) {
184169
*
185170
* @param w The observer that has completed.
186171
*/
187-
<T> void complete(CombineObserver<R, T> w) {
188-
synchronized(lockObject) {
189-
// store that this CombineLatestObserver is completed
190-
completed.add(w);
191-
// if all CombineObservers are completed, we mark the whole thing as completed
192-
if (completed.size() == observers.size()) {
193-
if (running.get()) {
194-
// mark ourselves as done
195-
observer.onCompleted();
196-
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
197-
running.set(false);
198-
}
172+
<T> void complete(CombineObserver<? extends R, ? super T> w) {
173+
int completed = numCompleted.incrementAndGet();
174+
// if all CombineObservers are completed, we mark the whole thing as completed
175+
if (completed == observers.size()) {
176+
if (running.get()) {
177+
// mark ourselves as done
178+
observer.onCompleted();
179+
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
180+
running.set(false);
199181
}
200182
}
201183
}
@@ -217,7 +199,7 @@ void error(Exception e) {
217199
* @param w
218200
* @param arg
219201
*/
220-
<T> void next(CombineObserver<R, T> w, T arg) {
202+
<T> void next(CombineObserver<? extends R, ? super T> w, T arg) {
221203
if (observer == null) {
222204
throw new RuntimeException("This shouldn't be running if an Observer isn't registered");
223205
}
@@ -227,54 +209,48 @@ <T> void next(CombineObserver<R, T> w, T arg) {
227209
return;
228210
}
229211

230-
// define here so the variable is out of the synchronized scope
212+
// remember this as the latest value for this observer
213+
latestValue.put(w, arg);
214+
215+
if (latestValue.size() < observers.size()) {
216+
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
217+
return;
218+
}
219+
231220
Object[] argsToCombineLatest = new Object[observers.size()];
232-
233-
// we synchronize everything that touches latest values
234-
synchronized (lockObject) {
235-
// remember this as the latest value for this observer
236-
latestValue.put(w, arg);
237-
238-
// remember that this observer now has a latest value set
239-
hasLatestValue.add(w);
240-
241-
// if all observers in the 'observers' list have a value, invoke the combineLatestFunction
242-
for (CombineObserver<R, ?> rw : observers) {
243-
if (!hasLatestValue.contains(rw)) {
244-
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
245-
return;
246-
}
247-
}
248-
// if we get to here this means all the queues have data
249-
int i = 0;
250-
for (CombineObserver<R, ?> _w : observers) {
251-
argsToCombineLatest[i++] = latestValue.get(_w);
252-
}
221+
int i = 0;
222+
for (CombineObserver<R, ?> _w : observers) {
223+
argsToCombineLatest[i++] = latestValue.get(_w);
224+
}
225+
226+
try {
227+
R combinedValue = combineLatestFunction.call(argsToCombineLatest);
228+
observer.onNext(combinedValue);
229+
} catch(Exception ex) {
230+
observer.onError(ex);
253231
}
254-
// if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args
255-
// we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling
256-
// this 'next' method while another thread finishes calling this combineLatestFunction
257-
observer.onNext(combineLatestFunction.call(argsToCombineLatest));
258232
}
259233

260234
@Override
261-
public Subscription call(Observer<R> observer) {
235+
public Subscription call(Observer<? super R> observer) {
262236
if (this.observer != null) {
263237
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
264238
}
265-
this.observer = observer;
239+
240+
AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() {
241+
@Override
242+
public void unsubscribe() {
243+
stop();
244+
}
245+
});
246+
this.observer = new SynchronizedObserver<R>(observer, subscription);
266247

267248
/* start the observers */
268249
for (CombineObserver<R, ?> rw : observers) {
269250
rw.startWatching();
270251
}
271252

272-
return new Subscription() {
273-
@Override
274-
public void unsubscribe() {
275-
stop();
276-
}
277-
};
253+
return subscription;
278254
}
279255

280256
private void stop() {
@@ -291,10 +267,33 @@ private void stop() {
291267

292268
public static class UnitTest {
293269

294-
@SuppressWarnings("unchecked")
295-
/* mock calls don't do generics */
270+
@Test
271+
public void testCombineLatestWithFunctionThatThrowsAnException() {
272+
@SuppressWarnings("unchecked") // mock calls don't do generics
273+
Observer<String> w = mock(Observer.class);
274+
275+
TestObservable w1 = new TestObservable();
276+
TestObservable w2 = new TestObservable();
277+
278+
Observable<String> combined = Observable.create(combineLatest(w1, w2, new Func2<String, String, String>() {
279+
@Override
280+
public String call(String v1, String v2) {
281+
throw new RuntimeException("I don't work.");
282+
}
283+
}));
284+
combined.subscribe(w);
285+
286+
w1.Observer.onNext("first value of w1");
287+
w2.Observer.onNext("first value of w2");
288+
289+
verify(w, never()).onNext(anyString());
290+
verify(w, never()).onCompleted();
291+
verify(w, times(1)).onError(Matchers.<RuntimeException>any());
292+
}
293+
296294
@Test
297295
public void testCombineLatestDifferentLengthObservableSequences1() {
296+
@SuppressWarnings("unchecked") // mock calls don't do generics
298297
Observer<String> w = mock(Observer.class);
299298

300299
TestObservable w1 = new TestObservable();

rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ public final class SynchronizedObserver<T> implements Observer<T> {
6464
* // TODO composing of this class should rarely happen now with updated design so this decision should be revisited
6565
*/
6666

67-
private final Observer<T> observer;
67+
private final Observer<? super T> observer;
6868
private final AtomicObservableSubscription subscription;
6969
private volatile boolean finishRequested = false;
7070
private volatile boolean finished = false;
7171

72-
public SynchronizedObserver(Observer<T> Observer, AtomicObservableSubscription subscription) {
72+
public SynchronizedObserver(Observer<? super T> Observer, AtomicObservableSubscription subscription) {
7373
this.observer = Observer;
7474
this.subscription = subscription;
7575
}

0 commit comments

Comments
 (0)