Skip to content

Commit 222ad95

Browse files
authored
Add support for bounded trie metric in legacy worker (#33474)
1 parent 1ae6fc4 commit 222ad95

File tree

13 files changed

+476
-23
lines changed

13 files changed

+476
-23
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ public synchronized BoundedTrieData getCumulative() {
7474
return value.getCumulative();
7575
}
7676

77+
// Used by Streaming metric container to extract deltas since streaming metrics are
78+
// reported as deltas rather than cumulative as in batch.
79+
// For delta we take the current value then reset the cell to empty so the next call only see
80+
// delta/updates from last call.
81+
public synchronized BoundedTrieData getAndReset() {
82+
// since we are resetting no need to do a deep copy, just change the reference
83+
BoundedTrieData shallowCopy = this.value;
84+
this.value = new BoundedTrieData(); // create new object, should not call reset on existing
85+
return shallowCopy;
86+
}
87+
7788
@Override
7889
public MetricName getName() {
7990
return name;

runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,14 +180,19 @@ public synchronized BoundedTrieResult extractResult() {
180180
*/
181181
public synchronized void add(Iterable<String> segments) {
182182
List<String> segmentsParts = ImmutableList.copyOf(segments);
183-
if (this.root == null) {
184-
if (this.singleton == null || !this.singleton.equals(segmentsParts)) {
183+
if (segmentsParts.isEmpty()) {
184+
return;
185+
}
186+
if (this.singleton == null && this.root == null) {
187+
// empty case
188+
this.singleton = segmentsParts;
189+
} else if (this.singleton != null && this.singleton.equals(segmentsParts)) {
190+
// skip
191+
} else {
192+
if (this.root == null) {
185193
this.root = this.asTrie();
186194
this.singleton = null;
187195
}
188-
}
189-
190-
if (this.root != null) {
191196
this.root.add(segmentsParts);
192197
if (this.root.getSize() > this.bound) {
193198
this.root.trim();
@@ -271,6 +276,11 @@ public synchronized boolean contains(@Nonnull List<String> value) {
271276
}
272277
}
273278

279+
/** @return true if this {@link BoundedTrieData} is empty else false. */
280+
public boolean isEmpty() {
281+
return (root == null || root.children.isEmpty()) && (singleton == null || singleton.isEmpty());
282+
}
283+
274284
@Override
275285
public final boolean equals(@Nullable Object other) {
276286
if (this == other) {
@@ -335,7 +345,7 @@ static class BoundedTrieNode implements Serializable {
335345
* @param truncated Whether this node is truncated.
336346
* @param size The size of the subtree rooted at this node.
337347
*/
338-
BoundedTrieNode(Map<String, BoundedTrieNode> children, boolean truncated, int size) {
348+
BoundedTrieNode(@Nonnull Map<String, BoundedTrieNode> children, boolean truncated, int size) {
339349
this.children = children;
340350
this.size = size;
341351
this.truncated = truncated;

runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -882,12 +882,31 @@ public void testClear() {
882882
assertTrue(trie.extractResult().getResult().isEmpty());
883883
}
884884

885+
@Test
886+
public void testIsEmpty() {
887+
BoundedTrieData trie = new BoundedTrieData();
888+
assertTrue(trie.isEmpty());
889+
890+
trie.add(Collections.emptyList());
891+
assertTrue(trie.isEmpty());
892+
893+
trie.add(ImmutableList.of("a", "b"));
894+
assertFalse(trie.isEmpty());
895+
896+
trie.add(ImmutableList.of("c", "d"));
897+
assertFalse(trie.isEmpty());
898+
899+
trie.clear();
900+
assertTrue(trie.isEmpty());
901+
}
902+
885903
@Test
886904
public void testBoundedTrieDataContains() {
887905
BoundedTrieData trie = new BoundedTrieData();
888906
trie.add(ImmutableList.of("a", "b"));
889907
assertTrue(trie.contains(ImmutableList.of("a", "b")));
890-
assertTrue(trie.contains(ImmutableList.of("a")));
908+
// path ab is not same as path a
909+
assertFalse(trie.contains(ImmutableList.of("a")));
891910
assertFalse(trie.contains(ImmutableList.of("a", "c")));
892911
}
893912

@@ -1000,18 +1019,11 @@ public void testGetCumulativeWithRoot() {
10001019
assertFalse(cumulativeTrie.contains(ImmutableList.of("g", "h")));
10011020
}
10021021

1003-
@Test
1004-
public void testAddEmptyPath() {
1005-
BoundedTrieData trie = new BoundedTrieData();
1006-
trie.add(Collections.emptyList());
1007-
assertEquals(1, trie.size());
1008-
assertTrue(trie.extractResult().getResult().contains(ImmutableList.of("false")));
1009-
}
1010-
10111022
@Test
10121023
public void testContainsEmptyPath() {
10131024
BoundedTrieData trie = new BoundedTrieData();
10141025
trie.add(Collections.emptyList());
1015-
assertTrue(trie.contains(Collections.emptyList()));
1026+
assertFalse(trie.contains(Collections.emptyList()));
1027+
assertTrue(trie.isEmpty());
10161028
}
10171029
}

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ dependencies {
9494
// io-kafka is only used in PTransform override so it is optional
9595
provided project(":sdks:java:io:kafka")
9696
implementation project(":sdks:java:io:google-cloud-platform")
97+
implementation project(":runners:core-java")
9798
implementation library.java.avro
9899
implementation library.java.bigdataoss_util
99100
implementation library.java.commons_codec

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import java.util.HashMap;
3030
import java.util.HashSet;
3131
import java.util.List;
32+
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
3233
import org.apache.beam.model.pipeline.v1.RunnerApi;
34+
import org.apache.beam.runners.core.metrics.BoundedTrieData;
3335
import org.apache.beam.sdk.metrics.BoundedTrieResult;
3436
import org.apache.beam.sdk.metrics.DistributionResult;
3537
import org.apache.beam.sdk.metrics.GaugeResult;
@@ -55,6 +57,7 @@
5557
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
5658
})
5759
class DataflowMetrics extends MetricResults {
60+
5861
private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class);
5962
/**
6063
* Client for the Dataflow service. This can be used to query the service for information about
@@ -104,13 +107,13 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
104107
ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
105108
ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of();
106109
ImmutableList<MetricResult<StringSetResult>> stringSets = ImmutableList.of();
107-
ImmutableList<MetricResult<BoundedTrieResult>> boudedTries = ImmutableList.of();
110+
ImmutableList<MetricResult<BoundedTrieResult>> boundedTries = ImmutableList.of();
108111
JobMetrics jobMetrics;
109112
try {
110113
jobMetrics = getJobMetrics();
111114
} catch (IOException e) {
112115
LOG.warn("Unable to query job metrics.\n");
113-
return MetricQueryResults.create(counters, distributions, gauges, stringSets, boudedTries);
116+
return MetricQueryResults.create(counters, distributions, gauges, stringSets, boundedTries);
114117
}
115118
metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList());
116119
return populateMetricQueryResults(metricUpdates, filter);
@@ -134,13 +137,15 @@ private static class DataflowMetricResultExtractor {
134137
private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
135138
private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;
136139
private final ImmutableList.Builder<MetricResult<StringSetResult>> stringSetResults;
140+
private final ImmutableList.Builder<MetricResult<BoundedTrieResult>> boundedTrieResults;
137141
private final boolean isStreamingJob;
138142

139143
DataflowMetricResultExtractor(boolean isStreamingJob) {
140144
counterResults = ImmutableList.builder();
141145
distributionResults = ImmutableList.builder();
142146
gaugeResults = ImmutableList.builder();
143147
stringSetResults = ImmutableList.builder();
148+
boundedTrieResults = ImmutableList.builder();
144149
/* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
145150
* In Dataflow batch jobs, only COMMITTED metrics are available, but
146151
* we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
@@ -169,6 +174,9 @@ public void addMetricResult(
169174
// stringset metric
170175
StringSetResult value = getStringSetValue(committed);
171176
stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
177+
} else if (committed.getTrie() != null && attempted.getTrie() != null) {
178+
BoundedTrieResult value = getBoundedTrieValue(committed);
179+
boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
172180
} else {
173181
// This is exceptionally unexpected. We expect matching user metrics to only have the
174182
// value types provided by the Metrics API.
@@ -196,6 +204,15 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
196204
return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet())));
197205
}
198206

207+
private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) {
208+
if (metricUpdate.getTrie() == null) {
209+
return BoundedTrieResult.empty();
210+
}
211+
BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie();
212+
BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie);
213+
return BoundedTrieResult.create(trieData.extractResult().getResult());
214+
}
215+
199216
private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
200217
if (metricUpdate.getDistribution() == null) {
201218
return DistributionResult.IDENTITY_ELEMENT;
@@ -220,9 +237,13 @@ public Iterable<MetricResult<GaugeResult>> getGaugeResults() {
220237
return gaugeResults.build();
221238
}
222239

223-
public Iterable<MetricResult<StringSetResult>> geStringSetResults() {
240+
public Iterable<MetricResult<StringSetResult>> getStringSetResults() {
224241
return stringSetResults.build();
225242
}
243+
244+
public Iterable<MetricResult<BoundedTrieResult>> getBoundedTrieResults() {
245+
return boundedTrieResults.build();
246+
}
226247
}
227248

228249
private static class DataflowMetricQueryResultsFactory {
@@ -388,8 +409,8 @@ public MetricQueryResults build() {
388409
extractor.getCounterResults(),
389410
extractor.getDistributionResults(),
390411
extractor.getGaugeResults(),
391-
extractor.geStringSetResults(),
392-
ImmutableList.of());
412+
extractor.getStringSetResults(),
413+
extractor.getBoundedTrieResults());
393414
}
394415
}
395416
}

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@
4141
import java.io.IOException;
4242
import java.math.BigDecimal;
4343
import java.util.Set;
44+
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
45+
import org.apache.beam.runners.core.metrics.BoundedTrieData;
4446
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
4547
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
4648
import org.apache.beam.sdk.PipelineResult.State;
4749
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
4850
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
51+
import org.apache.beam.sdk.metrics.BoundedTrieResult;
4952
import org.apache.beam.sdk.metrics.DistributionResult;
5053
import org.apache.beam.sdk.metrics.MetricQueryResults;
5154
import org.apache.beam.sdk.metrics.MetricsFilter;
@@ -196,6 +199,13 @@ private MetricUpdate makeStringSetMetricUpdate(
196199
return setStructuredName(update, name, namespace, step, tentative);
197200
}
198201

202+
private MetricUpdate makeBoundedTrieMetricUpdate(
203+
String name, String namespace, String step, BoundedTrie data, boolean tentative) {
204+
MetricUpdate update = new MetricUpdate();
205+
update.setTrie(data);
206+
return setStructuredName(update, name, namespace, step, tentative);
207+
}
208+
199209
@Test
200210
public void testSingleCounterUpdates() throws IOException {
201211
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
@@ -286,6 +296,64 @@ public void testSingleStringSetUpdates() throws IOException {
286296
StringSetResult.create(ImmutableSet.of("ab", "cd")))));
287297
}
288298

299+
@Test
300+
public void testSingleBoundedTrieUpdates() throws IOException {
301+
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
302+
when(myStep.getFullName()).thenReturn("myStepName");
303+
BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames = HashBiMap.create();
304+
transformStepNames.put(myStep, "s2");
305+
306+
JobMetrics jobMetrics = new JobMetrics();
307+
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
308+
DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
309+
when(options.isStreaming()).thenReturn(false);
310+
when(job.getDataflowOptions()).thenReturn(options);
311+
when(job.getState()).thenReturn(State.RUNNING);
312+
when(job.getJobId()).thenReturn(JOB_ID);
313+
when(job.getTransformStepNames()).thenReturn(transformStepNames);
314+
315+
// The parser relies on the fact that one tentative and one committed metric update exist in
316+
// the job metrics results.
317+
MetricUpdate mu1 =
318+
makeBoundedTrieMetricUpdate(
319+
"counterName",
320+
"counterNamespace",
321+
"s2",
322+
new BoundedTrieData(ImmutableList.of("ab", "cd")).toProto(),
323+
false);
324+
MetricUpdate mu1Tentative =
325+
makeBoundedTrieMetricUpdate(
326+
"counterName",
327+
"counterNamespace",
328+
"s2",
329+
new BoundedTrieData(ImmutableList.of("ab", "cd")).toProto(),
330+
true);
331+
jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative));
332+
DataflowClient dataflowClient = mock(DataflowClient.class);
333+
when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
334+
335+
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
336+
MetricQueryResults result = dataflowMetrics.allMetrics();
337+
assertThat(
338+
result.getBoundedTries(),
339+
containsInAnyOrder(
340+
attemptedMetricsResult(
341+
"counterNamespace",
342+
"counterName",
343+
"myStepName",
344+
BoundedTrieResult.create(
345+
ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false)))))));
346+
assertThat(
347+
result.getBoundedTries(),
348+
containsInAnyOrder(
349+
committedMetricsResult(
350+
"counterNamespace",
351+
"counterName",
352+
"myStepName",
353+
BoundedTrieResult.create(
354+
ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false)))))));
355+
}
356+
289357
@Test
290358
public void testIgnoreDistributionButGetCounterUpdates() throws IOException {
291359
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,11 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
519519
.transform(
520520
update ->
521521
MetricsToCounterUpdateConverter.fromStringSet(
522+
update.getKey(), true, update.getUpdate())),
523+
FluentIterable.from(updates.boundedTrieUpdates())
524+
.transform(
525+
update ->
526+
MetricsToCounterUpdateConverter.fromBoundedTrie(
522527
update.getKey(), true, update.getUpdate())));
523528
});
524529
}

0 commit comments

Comments
 (0)