Skip to content

Commit eb5df87

Browse files
author
jmhofer
committed
avoiding some synchronization on combineLatest
1 parent 04068c1 commit eb5df87

File tree

1 file changed

+19
-27
lines changed

1 file changed

+19
-27
lines changed

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
2930

3031
import org.junit.Test;
3132
import org.mockito.InOrder;
@@ -125,17 +126,13 @@ private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
125126

126127
private final FuncN<R> combineLatestFunction;
127128
private final AtomicBoolean running = new AtomicBoolean(true);
128-
129-
// used as an internal lock for handling the latest values and the completed state of each observer
129+
130+
// Stores how many observers have already completed
131+
private final AtomicInteger numCompleted = new AtomicInteger(0);
132+
133+
// Used as an internal lock for handling the latest values of each observer
130134
private final Object lockObject = new Object();
131135

132-
/**
133-
* Store when an observer completes.
134-
* <p>
135-
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
136-
* */
137-
private final Set<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();
138-
139136
/**
140137
* The latest value from each observer
141138
* <p>
@@ -175,17 +172,14 @@ <T> void addObserver(CombineObserver<R, T> w) {
175172
* @param w The observer that has completed.
176173
*/
177174
<T> void complete(CombineObserver<R, T> w) {
178-
synchronized(lockObject) {
179-
// store that this CombineLatestObserver is completed
180-
completed.add(w);
181-
// if all CombineObservers are completed, we mark the whole thing as completed
182-
if (completed.size() == observers.size()) {
183-
if (running.get()) {
184-
// mark ourselves as done
185-
observer.onCompleted();
186-
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
187-
running.set(false);
188-
}
175+
int completed = numCompleted.incrementAndGet();
176+
// if all CombineObservers are completed, we mark the whole thing as completed
177+
if (completed == observers.size()) {
178+
if (running.get()) {
179+
// mark ourselves as done
180+
observer.onCompleted();
181+
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
182+
running.set(false);
189183
}
190184
}
191185
}
@@ -228,14 +222,12 @@ <T> void next(CombineObserver<R, T> w, T arg) {
228222
// remember that this observer now has a latest value set
229223
hasLatestValue.add(w);
230224

231-
// if all observers in the 'observers' list have a value, invoke the combineLatestFunction
232-
for (CombineObserver<R, ?> rw : observers) {
233-
if (!hasLatestValue.contains(rw)) {
234-
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
235-
return;
236-
}
225+
if (hasLatestValue.size() < observers.size()) {
226+
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
227+
return;
237228
}
238-
// if we get to here this means all the queues have data
229+
230+
// if we get to here this means all the observers have a latest value
239231
int i = 0;
240232
for (CombineObserver<R, ?> _w : observers) {
241233
argsToCombineLatest[i++] = latestValue.get(_w);

0 commit comments

Comments
 (0)