Skip to content

Commit 1fabd42

Browse files
author
Joachim Hofer
committed
An attempt at implementing the correct combineLatest semantics. The
tests do pass again now (they had to be adapted too).
1 parent bc0a089 commit 1fabd42

File tree

1 file changed

+44
-59
lines changed

1 file changed

+44
-59
lines changed

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 44 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import java.util.Arrays;
2222
import java.util.HashMap;
2323
import java.util.HashSet;
24-
import java.util.LinkedHashMap;
2524
import java.util.LinkedList;
25+
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Set;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.junit.Test;
@@ -110,26 +111,22 @@ private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
110111
private Observer<R> Observer;
111112
private AtomicBoolean running = new AtomicBoolean(true);
112113

113-
/**
114-
* Use LinkedHashMap to retain the order we receive the CombineLatestObserver objects in.
115-
* <p>
116-
* Note that access to this LinkedList inside MUST BE SYNCHRONIZED
117-
*/
118-
private Map<CombineObserver<R, ?>, LinkedList<Object>> receivedValuesPerObserver = new LinkedHashMap<CombineObserver<R, ?>, LinkedList<Object>>();
119-
120114
/**
121115
* store when a Observer completes
122116
* <p>
123117
* Note that access to this set MUST BE SYNCHRONIZED
124118
* */
125-
private HashSet<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();
119+
private Set<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();
126120

127121
/**
128122
* The last value from a Observer
129123
* <p>
130124
* Note that access to this set MUST BE SYNCHRONIZED
131125
* */
132-
private HashMap<CombineObserver<R, ?>, Object> lastValue = new HashMap<CombineObserver<R, ?>, Object>();
126+
private Map<CombineObserver<R, ?>, Object> lastValue = new HashMap<CombineObserver<R, ?>, Object>();
127+
128+
private Set<CombineObserver<R, ?>> hasLastValue = new HashSet<CombineObserver<R, ?>>();
129+
private List<CombineObserver<R, ?>> observers = new LinkedList<CombineObserver<R, ?>>();
133130

134131
public Aggregator(FuncN<R> combineLatestFunction) {
135132
this.combineLatestFunction = combineLatestFunction;
@@ -140,21 +137,20 @@ public Aggregator(FuncN<R> combineLatestFunction) {
140137
*
141138
* @param w
142139
*/
143-
synchronized void addObserver(CombineObserver<R, ?> w) {
144-
// initialize this CombineLatestObserver
145-
receivedValuesPerObserver.put(w, new LinkedList<Object>());
140+
synchronized <T> void addObserver(CombineObserver<R, T> w) {
141+
observers.add(w);
146142
}
147143

148144
/**
149145
* Receive notification of a Observer completing its iterations.
150146
*
151147
* @param w
152148
*/
153-
synchronized void complete(CombineObserver<R, ?> w) {
154-
// store that this ZipObserver is completed
149+
synchronized <T> void complete(CombineObserver<R, T> w) {
150+
// store that this CombineLatestObserver is completed
155151
completed.add(w);
156152
// if all CombineObservers are completed, we mark the whole thing as completed
157-
if (completed.size() == receivedValuesPerObserver.size()) {
153+
if (completed.size() == observers.size()) {
158154
if (running.get()) {
159155
// mark ourselves as done
160156
Observer.onCompleted();
@@ -169,7 +165,7 @@ synchronized void complete(CombineObserver<R, ?> w) {
169165
*
170166
* @param w
171167
*/
172-
synchronized void error(CombineObserver<R, ?> w, Exception e) {
168+
synchronized <T> void error(CombineObserver<R, T> w, Exception e) {
173169
Observer.onError(e);
174170
/* tell ourselves to stop processing onNext events, event if the Observers don't obey the unsubscribe we're about to send */
175171
running.set(false);
@@ -196,39 +192,25 @@ <T> void next(CombineObserver<R, T> w, T arg) {
196192
}
197193

198194
// define here so the variable is out of the synchronized scope
199-
Object[] argsToCombineLatest = new Object[receivedValuesPerObserver.size()];
195+
Object[] argsToCombineLatest = new Object[observers.size()];
200196

201197
// we synchronize everything that touches receivedValues and the internal LinkedList objects
202198
synchronized (this) {
203-
// add this value to the queue of the CombineLatestObserver for values received
204-
receivedValuesPerObserver.get(w).add(arg);
205199
// remember this as the last value for this Observer
206200
lastValue.put(w, arg);
201+
hasLastValue.add(w);
207202

208203
// if all CombineLatestObservers in 'receivedValues' map have a value, invoke the combineLatestFunction
209-
for (CombineObserver<R, ?> rw : receivedValuesPerObserver.keySet()) {
210-
if (receivedValuesPerObserver.get(rw).peek() == null && !completed.contains(rw)) {
211-
// we have a null (and the Observer isn't completed) meaning the queues aren't all populated so won't do anything
204+
for (CombineObserver<R, ?> rw : observers) {
205+
if (!hasLastValue.contains(rw)) {
206+
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
212207
return;
213208
}
214209
}
215-
// if we get to here this means all the queues have data (or some are completed)
210+
// if we get to here this means all the queues have data
216211
int i = 0;
217-
boolean foundData = false;
218-
for (CombineObserver<R, ?> _w : receivedValuesPerObserver.keySet()) {
219-
LinkedList<Object> q = receivedValuesPerObserver.get(_w);
220-
if (q.peek() == null) {
221-
// this is a completed Observer
222-
// we rely on the check above looking at completed.contains to mean that NULL here represents a completed Observer
223-
argsToCombineLatest[i++] = lastValue.get(_w);
224-
} else {
225-
foundData = true;
226-
argsToCombineLatest[i++] = q.remove();
227-
}
228-
}
229-
if (completed.size() == receivedValuesPerObserver.size() && !foundData) {
230-
// all are completed and queues have run out of data, so return and don't send empty data
231-
return;
212+
for (CombineObserver<R, ?> _w : observers) {
213+
argsToCombineLatest[i++] = lastValue.get(_w);
232214
}
233215
}
234216
// if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args
@@ -245,7 +227,7 @@ public Subscription call(Observer<R> Observer) {
245227
this.Observer = Observer;
246228

247229
/* start the Observers */
248-
for (CombineObserver<R, ?> rw : receivedValuesPerObserver.keySet()) {
230+
for (CombineObserver<R, ?> rw : observers) {
249231
rw.startWatching();
250232
}
251233

@@ -263,7 +245,7 @@ private void stop() {
263245
/* tell ourselves to stop processing onNext events */
264246
running.set(false);
265247
/* propogate to all Observers to unsubscribe */
266-
for (CombineObserver<R, ?> rw : receivedValuesPerObserver.keySet()) {
248+
for (CombineObserver<R, ?> rw : observers) {
267249
if (rw.subscription != null) {
268250
rw.subscription.unsubscribe();
269251
}
@@ -290,25 +272,26 @@ public void testCombineLatestDifferentLengthObservableSequences1() {
290272
/* simulate sending data */
291273
// once for w1
292274
w1.Observer.onNext("1a");
275+
w2.Observer.onNext("2a");
276+
w3.Observer.onNext("3a");
293277
w1.Observer.onCompleted();
294278
// twice for w2
295-
w2.Observer.onNext("2a");
296279
w2.Observer.onNext("2b");
297280
w2.Observer.onCompleted();
298281
// 4 times for w3
299-
w3.Observer.onNext("3a");
300282
w3.Observer.onNext("3b");
301283
w3.Observer.onNext("3c");
302284
w3.Observer.onNext("3d");
303285
w3.Observer.onCompleted();
304286

305287
/* we should have been called 4 times on the Observer */
306288
InOrder inOrder = inOrder(w);
289+
inOrder.verify(w).onNext("1a2a3a");
307290
inOrder.verify(w).onNext("1a2b3a");
308291
inOrder.verify(w).onNext("1a2b3b");
309292
inOrder.verify(w).onNext("1a2b3c");
310293
inOrder.verify(w).onNext("1a2b3d");
311-
294+
inOrder.verify(w, never()).onNext(anyString());
312295
inOrder.verify(w, times(1)).onCompleted();
313296
}
314297

@@ -341,17 +324,16 @@ public void testCombineLatestDifferentLengthObservableSequences2() {
341324

342325
/* we should have been called 1 time only on the Observer since we only combine the "latest" we don't go back and loop through others once completed */
343326
InOrder inOrder = inOrder(w);
344-
inOrder.verify(w, times(1)).onNext("1a2a3a");
327+
inOrder.verify(w, times(1)).onNext("1d2b3a");
345328
inOrder.verify(w, never()).onNext(anyString());
346329

347330
inOrder.verify(w, times(1)).onCompleted();
348331

349332
}
350333

351-
@SuppressWarnings("unchecked")
352-
/* mock calls don't do generics */
353334
@Test
354335
public void testCombineLatestWithInterleavingSequences() {
336+
@SuppressWarnings("unchecked")
355337
Observer<String> w = mock(Observer.class);
356338

357339
TestObservable w1 = new TestObservable();
@@ -383,7 +365,8 @@ public void testCombineLatestWithInterleavingSequences() {
383365
inOrder.verify(w).onNext("1b2c3a");
384366
inOrder.verify(w).onNext("1b2d3a");
385367
inOrder.verify(w).onNext("1b2d3b");
386-
368+
369+
inOrder.verify(w, never()).onNext(anyString());
387370
inOrder.verify(w, times(1)).onCompleted();
388371
}
389372

@@ -568,14 +551,14 @@ public void testAggregatorsWithDifferentSizesAndTiming() {
568551

569552
verify(aObserver, never()).onError(any(Exception.class));
570553
verify(aObserver, never()).onCompleted();
571-
verify(aObserver, times(1)).onNext("oneA");
554+
verify(aObserver, times(1)).onNext("threeA");
572555

573556
a.next(r1, "four");
574557
a.complete(r1);
575558
a.next(r2, "B");
576-
verify(aObserver, times(1)).onNext("twoB");
559+
verify(aObserver, times(1)).onNext("fourB");
577560
a.next(r2, "C");
578-
verify(aObserver, times(1)).onNext("threeC");
561+
verify(aObserver, times(1)).onNext("fourC");
579562
a.next(r2, "D");
580563
verify(aObserver, times(1)).onNext("fourD");
581564
a.next(r2, "E");
@@ -688,16 +671,18 @@ public void testAggregatorEarlyCompletion() {
688671
a.complete(r1);
689672
a.next(r2, "A");
690673

691-
verify(aObserver, never()).onError(any(Exception.class));
692-
verify(aObserver, never()).onCompleted();
693-
verify(aObserver, times(1)).onNext("oneA");
674+
InOrder inOrder = inOrder(aObserver);
675+
676+
inOrder.verify(aObserver, never()).onError(any(Exception.class));
677+
inOrder.verify(aObserver, never()).onCompleted();
678+
inOrder.verify(aObserver, times(1)).onNext("twoA");
694679

695680
a.complete(r2);
696681

697-
verify(aObserver, never()).onError(any(Exception.class));
698-
verify(aObserver, times(1)).onCompleted();
682+
inOrder.verify(aObserver, never()).onError(any(Exception.class));
683+
inOrder.verify(aObserver, times(1)).onCompleted();
699684
// we shouldn't get this since completed is called before any other onNext calls could trigger this
700-
verify(aObserver, never()).onNext("twoA");
685+
inOrder.verify(aObserver, never()).onNext(anyString());
701686
}
702687

703688
@SuppressWarnings("unchecked")
@@ -714,7 +699,7 @@ public void testCombineLatest2Types() {
714699

715700
verify(aObserver, never()).onError(any(Exception.class));
716701
verify(aObserver, times(1)).onCompleted();
717-
verify(aObserver, times(1)).onNext("one2");
702+
verify(aObserver, times(1)).onNext("two2");
718703
verify(aObserver, times(1)).onNext("two3");
719704
verify(aObserver, times(1)).onNext("two4");
720705
}
@@ -733,7 +718,7 @@ public void testCombineLatest3TypesA() {
733718

734719
verify(aObserver, never()).onError(any(Exception.class));
735720
verify(aObserver, times(1)).onCompleted();
736-
verify(aObserver, times(1)).onNext("one2[4, 5, 6]");
721+
verify(aObserver, times(1)).onNext("two2[4, 5, 6]");
737722
}
738723

739724
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)