Skip to content

Commit db4e944

Browse files
committed
Address comments part 2
1 parent 034c4e6 commit db4e944

File tree

2 files changed

+34
-23
lines changed

2 files changed

+34
-23
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.core.metrics;
1919

20+
import com.google.common.annotations.VisibleForTesting;
2021
import java.io.Serializable;
2122
import java.util.ArrayList;
2223
import java.util.Collections;
@@ -44,8 +45,8 @@
4445
* intentional for performance concerns. Hence, this class does not expose the contained node and
4546
* should not be modified to do so in future when used with multiple threads. This class choose to
4647
* achieve thread-safety through locks rather than just creating and returning immutable instances
47-
* to its caller because the combining of a large and wide trie require per-node copy which has
48-
* exponential cost and more expensive than synchronization.
48+
* to its caller because the combining of a large and wide trie require per-node copy which is
49+
* more expensive than synchronization.
4950
*
5051
* <p>Note: {@link #equals(Object)}, {@link #hashCode()} of this class are not synchronized and if
5152
* their usage needs synchronization then the client should do it.
@@ -197,28 +198,36 @@ public synchronized void add(Iterable<String> segments) {
197198
* @param other The other {@link BoundedTrieData} to combine with.
198199
* @return The combined {@link BoundedTrieData}.
199200
*/
200-
public synchronized BoundedTrieData combine(@Nonnull BoundedTrieData other) {
201-
if (other.root == null && other.singleton == null) {
202-
return this;
203-
}
201+
public BoundedTrieData combine(@Nonnull BoundedTrieData other) {
202+
BoundedTrieData otherDeepCopy;
204203
// other can be modified in some different thread, and we need to atomically access
205-
// its fields to combine correctly. Furthermore, simply doing this under synchronized(other)
206-
// is not safe as it might lead to deadlock. Assume the current thread got lock on
207-
// 'this' and is executing combine with `other` and waiting to get a lock on `other`
208-
// while some other thread is performing `other.combiner(this)` and waiting to get a
209-
// lock on `this` object.
210-
BoundedTrieData otherDeepCopy = other.getCumulative();
211-
if (this.root == null && this.singleton == null) {
212-
return otherDeepCopy;
204+
// its fields to combine correctly. Furthermore, doing this whole method under
205+
// synchronized(other) is not safe as it might lead to deadlock. Assume the current
206+
// thread got lock on 'this' and is executing combine with `other` and waiting to get a
207+
// lock on `other` while some other thread is performing `other.combiner(this)` and
208+
// waiting to get a lock on `this` object.
209+
// Here it is safe to get a lock on other as we don't yet hold a lock on this to end up with
210+
// race condition.
211+
synchronized (other) {
212+
if (other.root == null && other.singleton == null) {
213+
return this;
214+
}
215+
otherDeepCopy = other.getCumulative();
213216
}
214-
otherDeepCopy.root = otherDeepCopy.asTrie();
215-
otherDeepCopy.singleton = null;
216-
otherDeepCopy.root.merge(this.asTrie());
217-
otherDeepCopy.bound = Math.min(this.bound, otherDeepCopy.bound);
218-
while (otherDeepCopy.root.getSize() > otherDeepCopy.bound) {
219-
otherDeepCopy.root.trim();
217+
218+
synchronized (this) {
219+
if (this.root == null && this.singleton == null) {
220+
return otherDeepCopy;
221+
}
222+
otherDeepCopy.root = otherDeepCopy.asTrie();
223+
otherDeepCopy.singleton = null;
224+
otherDeepCopy.root.merge(this.asTrie());
225+
otherDeepCopy.bound = Math.min(this.bound, otherDeepCopy.bound);
226+
while (otherDeepCopy.root.getSize() > otherDeepCopy.bound) {
227+
otherDeepCopy.root.trim();
228+
}
229+
return otherDeepCopy;
220230
}
221-
return otherDeepCopy;
222231
}
223232

224233
/**
@@ -287,6 +296,7 @@ public final String toString() {
287296
* of the tree itself. This avoids acquiring and release N nodes in a path. This class is not
288297
* intended to be used directly outside of {@link BoundedTrieData} with multiple threads.
289298
*/
299+
@VisibleForTesting
290300
static class BoundedTrieNode implements Serializable {
291301

292302
public static final String TRUNCATED_TRUE = String.valueOf(true);
@@ -375,6 +385,7 @@ int add(List<String> segments) {
375385
* @param segmentsIter An iterator over the paths to add.
376386
* @return The total change in the size of the subtree rooted at this node.
377387
*/
388+
@VisibleForTesting
378389
int addAll(List<List<String>> segmentsIter) {
379390
return segmentsIter.stream().mapToInt(this::add).sum();
380391
}

runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ private static Iterable<MetricResult<BoundedTrieResult>> extractBoundedTrieMetri
166166
return monitoringInfoList.stream()
167167
.filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType()))
168168
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
169-
.map(PortableMetrics::convertBoundedTrieMonitoringInfoToStringSet)
169+
.map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie)
170170
.collect(Collectors.toList());
171171
}
172172

@@ -183,7 +183,7 @@ private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStr
183183
return MetricResult.create(key, false, result);
184184
}
185185

186-
private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoToStringSet(
186+
private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoToBoundedTrie(
187187
MetricsApi.MonitoringInfo monitoringInfo) {
188188
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
189189
MetricKey key =

0 commit comments

Comments
 (0)