Skip to content

Commit 034c4e6

Browse files
committed
Multi-threaded test in MetricsContainerImplTest and some clean up
1 parent 269c5d5 commit 034c4e6

File tree

4 files changed

+41
-6
lines changed

4 files changed

+41
-6
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
* In that case retrieving the underlying cell and reporting directly to it avoids a step of
3232
* indirection.
3333
*/
34-
// TODO: Write multi-threaded test in MetricContainerImp for this Cell class too.
3534
public class BoundedTrieCell implements BoundedTrie, MetricCell<BoundedTrieData> {
3635

3736
private final DirtyState dirty = new DirtyState();
@@ -62,6 +61,10 @@ public DirtyState getDirty() {
6261
return dirty;
6362
}
6463

64+
/**
65+
* @return Returns a deep copy of the {@link BoundedTrieData} contained in this {@link
66+
* BoundedTrieCell}.
67+
*/
6568
@Override
6669
public synchronized BoundedTrieData getCumulative() {
6770
return value.getCumulative();

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* <p>Represents data stored in a bounded trie. This data structure is used to efficiently store and
4141
* aggregate a collection of string sequences, paths/FQN with a limited size.
4242
*
43-
* <p>This class is thread-safe but the underlying BoundedTrieNode contained on it isn't. This is
43+
* <p>This class is thread-safe but the underlying BoundedTrieNode contained in it isn't. This is
4444
* intentional for performance concerns. Hence, this class does not expose the contained node and
4545
* should not be modified to do so in future when used with multiple threads. This class choose to
4646
* achieve thread-safety through locks rather than just creating and returning immutable instances
@@ -147,8 +147,8 @@ public synchronized BoundedTrieData getCumulative() {
147147
}
148148

149149
/**
150-
* Returns an immutable set of lists, where each list represents a path in the bounded trie. The
151-
* last element in each path is a boolean in string representation denoting whether this path was
150+
* Returns {@link BoundedTrieResult}, which represents all path in the bounded trie. The last
151+
* element in each path is a boolean in string representation denoting whether this path was
152152
* truncated. i.e. <["a", "b", "false"], ["c", "true"]>
153153
*
154154
* @return The set of paths.
@@ -281,7 +281,6 @@ public final String toString() {
281281
}
282282

283283
// ------------------------------ BoundedTrieNode Implementation ------------------------------
284-
285284
/**
286285
* BoundedTrieNode implementation. This class is not thread-safe and relies on the {@link
287286
* BoundedTrieData} which uses this class to ensure thread-safety by acquiring a lock on the root

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
import java.util.List;
3535
import java.util.Map;
3636
import java.util.Set;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
40+
import java.util.concurrent.TimeUnit;
3741
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
3842
import org.apache.beam.sdk.metrics.MetricName;
3943
import org.apache.beam.sdk.util.HistogramData;
@@ -561,4 +565,32 @@ public void testMatchMetric() {
561565
Collections.singletonMap("name", "counter"));
562566
assertFalse(MetricsContainerImpl.matchMetric(elementCountName, allowedMetricUrns));
563567
}
568+
569+
@Test
570+
public void testBoundedTrieMultithreaded() throws InterruptedException {
571+
MetricsContainerImpl container = new MetricsContainerImpl("step1");
572+
BoundedTrieCell boundedTrieCell =
573+
container.getBoundedTrie(MetricName.named("test", "boundedTrie"));
574+
int num_threads = 10;
575+
int num_updates_per_thread = 9; // be under the default bound of 100
576+
577+
CountDownLatch latch = new CountDownLatch(num_threads);
578+
ExecutorService executor = Executors.newFixedThreadPool(num_threads);
579+
List<Runnable> tasks = new ArrayList<>();
580+
for (int i = 0; i < num_threads; i++) {
581+
tasks.add(
582+
() -> {
583+
for (int j = 0; j < num_updates_per_thread; j++) {
584+
boundedTrieCell.add("value-" + Thread.currentThread().getId() + "-" + j);
585+
}
586+
latch.countDown();
587+
});
588+
}
589+
590+
tasks.forEach(executor::execute);
591+
latch.await(1, TimeUnit.MINUTES);
592+
executor.shutdown();
593+
594+
assertEquals(num_threads * num_updates_per_thread, boundedTrieCell.getCumulative().size());
595+
}
564596
}

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@
5959
class DataflowMetrics extends MetricResults {
6060

6161
private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class);
62-
// TODO: Remove this
62+
// TODO (rosinha): Remove this once bounded_trie is available in metrics proto Dataflow
63+
// java client.
6364
public static final String BOUNDED_TRIE = "bounded_trie";
6465
/**
6566
* Client for the Dataflow service. This can be used to query the service for information about

0 commit comments

Comments
 (0)