Skip to content

Commit ba41db1

Browse files
author
jmhofer
committed
Got rid of the last remnants of synchronization in favor of a concurrent map.
1 parent bb39cac commit ba41db1

File tree

1 file changed

+17
-42
lines changed

1 file changed

+17
-42
lines changed

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22-
import java.util.HashMap;
23-
import java.util.HashSet;
2422
import java.util.LinkedList;
2523
import java.util.List;
2624
import java.util.Map;
27-
import java.util.Set;
25+
import java.util.concurrent.ConcurrentHashMap;
2826
import java.util.concurrent.atomic.AtomicBoolean;
2927
import java.util.concurrent.atomic.AtomicInteger;
3028

@@ -96,7 +94,7 @@ public CombineObserver(Aggregator<R> a, Observable<T> w) {
9694
this.w = w;
9795
}
9896

99-
public synchronized void startWatching() {
97+
private void startWatching() {
10098
if (subscription != null) {
10199
throw new RuntimeException("This should only be called once.");
102100
}
@@ -134,23 +132,11 @@ private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
134132
// Stores how many observers have already completed
135133
private final AtomicInteger numCompleted = new AtomicInteger(0);
136134

137-
// Used as an internal lock for handling the latest values of each observer
138-
private final Object lockObject = new Object();
139-
140135
/**
141-
* The latest value from each observer
142-
* <p>
143-
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
144-
* */
145-
private final Map<CombineObserver<R, ?>, Object> latestValue = new HashMap<CombineObserver<R, ?>, Object>();
136+
* The latest value from each observer.
137+
*/
138+
private final Map<CombineObserver<R, ?>, Object> latestValue = new ConcurrentHashMap<CombineObserver<R, ?>, Object>();
146139

147-
/**
148-
* Whether each observer has a latest value at all.
149-
* <p>
150-
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
151-
* */
152-
private final Set<CombineObserver<R, ?>> hasLatestValue = new HashSet<CombineObserver<R, ?>>();
153-
154140
/**
155141
* Ordered list of observers to combine.
156142
* No synchronization is necessary as these can not be added or changed asynchronously.
@@ -215,31 +201,20 @@ <T> void next(CombineObserver<R, T> w, T arg) {
215201
return;
216202
}
217203

218-
// define here so the variable is out of the synchronized scope
204+
// remember this as the latest value for this observer
205+
latestValue.put(w, arg);
206+
207+
if (latestValue.size() < observers.size()) {
208+
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
209+
return;
210+
}
211+
219212
Object[] argsToCombineLatest = new Object[observers.size()];
220-
221-
// we synchronize everything that touches latest values
222-
synchronized (lockObject) {
223-
// remember this as the latest value for this observer
224-
latestValue.put(w, arg);
225-
226-
// remember that this observer now has a latest value set
227-
hasLatestValue.add(w);
228-
229-
if (hasLatestValue.size() < observers.size()) {
230-
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
231-
return;
232-
}
233-
234-
// if we get to here this means all the observers have a latest value
235-
int i = 0;
236-
for (CombineObserver<R, ?> _w : observers) {
237-
argsToCombineLatest[i++] = latestValue.get(_w);
238-
}
213+
int i = 0;
214+
for (CombineObserver<R, ?> _w : observers) {
215+
argsToCombineLatest[i++] = latestValue.get(_w);
239216
}
240-
// if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args
241-
// we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling
242-
// this 'next' method while another thread finishes calling this combineLatestFunction
217+
243218
try {
244219
R combinedValue = combineLatestFunction.call(argsToCombineLatest);
245220
observer.onNext(combinedValue);

0 commit comments

Comments
 (0)