diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java index 3c7be7907ec8..9d57413a2742 100644 --- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java +++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -101,20 +102,30 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) { private static PortableMetrics convertMonitoringInfosToMetricResults( JobApi.MetricResults jobMetrics) { - List monitoringInfoList = new ArrayList<>(); - // TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and Committed metrics - monitoringInfoList.addAll(jobMetrics.getAttemptedList()); - monitoringInfoList.addAll(jobMetrics.getCommittedList()); - Iterable> countersFromJobMetrics = - extractCountersFromJobMetrics(monitoringInfoList); + // Deduplicate attempted + committed. Committed wins. + LinkedHashMap infoMap = new LinkedHashMap<>(); + + for (MetricsApi.MonitoringInfo attempted : jobMetrics.getAttemptedList()) { + String key = monitoringInfoKey(attempted); + infoMap.putIfAbsent(key, new MiAndCommitted(attempted, false)); + } + + for (MetricsApi.MonitoringInfo committed : jobMetrics.getCommittedList()) { + String key = monitoringInfoKey(committed); + infoMap.put(key, new MiAndCommitted(committed, true)); + } + + List merged = new ArrayList<>(infoMap.values()); + + Iterable> countersFromJobMetrics = extractCountersFromJobMetrics(merged); Iterable> distributionsFromMetrics = - extractDistributionMetricsFromJobMetrics(monitoringInfoList); + extractDistributionMetricsFromJobMetrics(merged); Iterable> gaugesFromMetrics = - extractGaugeMetricsFromJobMetrics(monitoringInfoList); + extractGaugeMetricsFromJobMetrics(merged); Iterable> stringSetFromMetrics = - extractStringSetMetricsFromJobMetrics(monitoringInfoList); + extractStringSetMetricsFromJobMetrics(merged); Iterable> boundedTrieFromMetrics = - extractBoundedTrieMetricsFromJobMetrics(monitoringInfoList); + extractBoundedTrieMetricsFromJobMetrics(merged); return new PortableMetrics( countersFromJobMetrics, distributionsFromMetrics, @@ -123,26 +134,52 @@ private static PortableMetrics convertMonitoringInfosToMetricResults( boundedTrieFromMetrics); } + /** + * Build a stable deduplication key for a MonitoringInfo based on type and the metric identity + * labels. + */ + private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) { + StringBuilder sb = new StringBuilder(); + sb.append(mi.getType()).append('|'); + Map labels = mi.getLabelsMap(); + // Use canonical labels that form the metric identity + sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|'); + sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|'); + sb.append(labels.getOrDefault(METRIC_NAME_LABEL, "")); + return sb.toString(); + } + + private static class MiAndCommitted { + final MetricsApi.MonitoringInfo mi; + final boolean committed; + + MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) { + this.mi = mi; + this.committed = committed; + } + } + private static Iterable> - extractDistributionMetricsFromJobMetrics(List monitoringInfoList) { + extractDistributionMetricsFromJobMetrics(List monitoringInfoList) { return monitoringInfoList.stream() - .filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType())) - .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null) - .map(PortableMetrics::convertDistributionMonitoringInfoToDistribution) + .filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType())) + .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null) + .map(m -> convertDistributionMonitoringInfoToDistribution(m)) .collect(Collectors.toList()); } private static Iterable> extractGaugeMetricsFromJobMetrics( - List monitoringInfoList) { + List monitoringInfoList) { return monitoringInfoList.stream() - .filter(item -> LATEST_INT64_TYPE.equals(item.getType())) - .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null) - .map(PortableMetrics::convertGaugeMonitoringInfoToGauge) + .filter(m -> LATEST_INT64_TYPE.equals(m.mi.getType())) + .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null) + .map(m -> convertGaugeMonitoringInfoToGauge(m)) .collect(Collectors.toList()); } - private static MetricResult convertGaugeMonitoringInfoToGauge( - MetricsApi.MonitoringInfo monitoringInfo) { + private static MetricResult convertGaugeMonitoringInfoToGauge(MiAndCommitted m) { + MetricsApi.MonitoringInfo monitoringInfo = m.mi; + boolean isCommitted = m.committed; Map labelsMap = monitoringInfo.getLabelsMap(); MetricKey key = MetricKey.create( @@ -151,29 +188,31 @@ private static MetricResult convertGaugeMonitoringInfoToGauge( GaugeData data = decodeInt64Gauge(monitoringInfo.getPayload()); GaugeResult result = GaugeResult.create(data.value(), data.timestamp()); - return MetricResult.create(key, false, result); + return MetricResult.create(key, isCommitted, result); } private static Iterable> extractStringSetMetricsFromJobMetrics( - List monitoringInfoList) { + List monitoringInfoList) { return monitoringInfoList.stream() - .filter(item -> SET_STRING_TYPE.equals(item.getType())) - .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null) - .map(PortableMetrics::convertStringSetMonitoringInfoToStringSet) + .filter(m -> SET_STRING_TYPE.equals(m.mi.getType())) + .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null) + .map(m -> convertStringSetMonitoringInfoToStringSet(m)) .collect(Collectors.toList()); } private static Iterable> extractBoundedTrieMetricsFromJobMetrics( - List monitoringInfoList) { + List monitoringInfoList) { return monitoringInfoList.stream() - .filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType())) - .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null) - .map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie) + .filter(m -> BOUNDED_TRIE_TYPE.equals(m.mi.getType())) + .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null) + .map(m -> convertBoundedTrieMonitoringInfoToBoundedTrie(m)) .collect(Collectors.toList()); } private static MetricResult convertStringSetMonitoringInfoToStringSet( - MetricsApi.MonitoringInfo monitoringInfo) { + MiAndCommitted m) { + MetricsApi.MonitoringInfo monitoringInfo = m.mi; + boolean isCommitted = m.committed; Map labelsMap = monitoringInfo.getLabelsMap(); MetricKey key = MetricKey.create( @@ -182,11 +221,13 @@ private static MetricResult convertStringSetMonitoringInfoToStr StringSetData data = decodeStringSet(monitoringInfo.getPayload()); StringSetResult result = StringSetResult.create(data.stringSet()); - return MetricResult.create(key, false, result); + return MetricResult.create(key, isCommitted, result); } private static MetricResult convertBoundedTrieMonitoringInfoToBoundedTrie( - MetricsApi.MonitoringInfo monitoringInfo) { + MiAndCommitted m) { + MetricsApi.MonitoringInfo monitoringInfo = m.mi; + boolean isCommitted = m.committed; Map labelsMap = monitoringInfo.getLabelsMap(); MetricKey key = MetricKey.create( @@ -195,11 +236,13 @@ private static MetricResult convertBoundedTrieMonitoringInfoT BoundedTrieData data = decodeBoundedTrie(monitoringInfo.getPayload()); BoundedTrieResult result = BoundedTrieResult.create(data.extractResult().getResult()); - return MetricResult.create(key, false, result); + return MetricResult.create(key, isCommitted, result); } private static MetricResult convertDistributionMonitoringInfoToDistribution( - MetricsApi.MonitoringInfo monitoringInfo) { + MiAndCommitted m) { + MetricsApi.MonitoringInfo monitoringInfo = m.mi; + boolean isCommitted = m.committed; Map labelsMap = monitoringInfo.getLabelsMap(); MetricKey key = MetricKey.create( @@ -208,27 +251,26 @@ private static MetricResult convertDistributionMonitoringInf DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload()); DistributionResult result = DistributionResult.create(data.sum(), data.count(), data.min(), data.max()); - return MetricResult.create(key, false, result); + return MetricResult.create(key, isCommitted, result); } private static Iterable> extractCountersFromJobMetrics( - List monitoringInfoList) { + List monitoringInfoList) { return monitoringInfoList.stream() - .filter(item -> SUM_INT64_TYPE.equals(item.getType())) - .filter( - item -> - item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter out pcollection metrics - .map(PortableMetrics::convertCounterMonitoringInfoToCounter) + .filter(m -> SUM_INT64_TYPE.equals(m.mi.getType())) + .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null) + .map(m -> convertCounterMonitoringInfoToCounter(m)) .collect(Collectors.toList()); } - private static MetricResult convertCounterMonitoringInfoToCounter( - MetricsApi.MonitoringInfo counterMonInfo) { + private static MetricResult convertCounterMonitoringInfoToCounter(MiAndCommitted m) { + MetricsApi.MonitoringInfo counterMonInfo = m.mi; + boolean isCommitted = m.committed; Map labelsMap = counterMonInfo.getLabelsMap(); MetricKey key = MetricKey.create( labelsMap.get(STEP_NAME_LABEL), MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL))); - return MetricResult.create(key, false, decodeInt64Counter(counterMonInfo.getPayload())); + return MetricResult.create(key, isCommitted, decodeInt64Counter(counterMonInfo.getPayload())); } } diff --git a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java index 8c87a46ff17c..68f3f6eae396 100644 --- a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java +++ b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java @@ -222,6 +222,52 @@ public void removeStagedArtifacts(String stagingToken) {} server.start(); } + @Test + public void deduplicatesAttemptedAndCommittedMetrics() throws Exception { + Map labelMap = new HashMap<>(); + labelMap.put(NAMESPACE_LABEL, NAMESPACE); + labelMap.put(METRIC_NAME_LABEL, METRIC_NAME); + labelMap.put(STEP_NAME_LABEL, STEP_NAME); + + // attempted counter (value 7) and committed counter (value 10) with same identity + MetricsApi.MonitoringInfo attemptedCounter = + MetricsApi.MonitoringInfo.newBuilder() + .setType(COUNTER_TYPE) + .putAllLabels(labelMap) + .setPayload(encodeInt64Counter(7L)) + .build(); + + MetricsApi.MonitoringInfo committedCounter = + MetricsApi.MonitoringInfo.newBuilder() + .setType(COUNTER_TYPE) + .putAllLabels(labelMap) + .setPayload(encodeInt64Counter(10L)) + .build(); + + JobApi.MetricResults metricResults = + JobApi.MetricResults.newBuilder() + .addAttempted(attemptedCounter) + .addCommitted(committedCounter) + .build(); + + createJobServer(JobState.Enum.DONE, metricResults); + PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess()); + PipelineResult result = runner.run(p); + result.waitUntilFinish(); + + Iterable> counters = + result.metrics().allMetrics().getCounters(); + ImmutableList> list = + ImmutableList.copyOf(counters); + + // Only one MetricResult should be present for the same identity. + assertThat(list.size(), is(1)); + org.apache.beam.sdk.metrics.MetricResult r = list.get(0); + + // Committed value should be present and equal to the committed payload (10). + assertThat(r.getCommitted(), is(10L)); + } + private static PipelineOptions createPipelineOptions() { PortablePipelineOptions options = PipelineOptionsFactory.create().as(PortablePipelineOptions.class);