Skip to content

Commit a6ffe29

Browse files
committed
Merge branch 'master' into vertex-ai-custom-route
2 parents 39c9bd5 + 0676463 commit a6ffe29

File tree

65 files changed

+2418
-507
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2418
-507
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 4
3+
"modification": 1
44
}

infra/iam/users.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,3 +1235,8 @@
12351235
member_type: user
12361236
permissions:
12371237
- role: roles/editor
1238+
- username: yalah5084
1239+
1240+
member_type: user
1241+
permissions:
1242+
- role: beam_writer

runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.beam.sdk.metrics.MetricResults;
3838
import org.apache.beam.sdk.metrics.MetricsEnvironment;
3939
import org.apache.beam.sdk.options.PipelineOptions;
40+
import org.apache.beam.sdk.options.SdkHarnessOptions;
4041
import org.apache.beam.sdk.runners.PTransformOverride;
4142
import org.apache.beam.sdk.transforms.PTransform;
4243
import org.apache.beam.sdk.util.UserCodeException;
@@ -184,7 +185,7 @@ public DirectPipelineResult run(Pipeline pipeline) {
184185

185186
DisplayDataValidator.validatePipeline(pipeline);
186187
DisplayDataValidator.validateOptions(options);
187-
188+
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
188189
ExecutorService metricsPool =
189190
Executors.newCachedThreadPool(
190191
new ThreadFactoryBuilder()

runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import java.util.concurrent.TimeoutException;
5050
import java.util.concurrent.atomic.AtomicInteger;
5151
import java.util.concurrent.atomic.AtomicLong;
52+
import java.util.logging.Level;
53+
import java.util.logging.LogManager;
5254
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
5355
import org.apache.beam.sdk.Pipeline;
5456
import org.apache.beam.sdk.PipelineResult;
@@ -75,6 +77,7 @@
7577
import org.apache.beam.sdk.transforms.Create;
7678
import org.apache.beam.sdk.transforms.DoFn;
7779
import org.apache.beam.sdk.transforms.Flatten;
80+
import org.apache.beam.sdk.transforms.Impulse;
7881
import org.apache.beam.sdk.transforms.MapElements;
7982
import org.apache.beam.sdk.transforms.PTransform;
8083
import org.apache.beam.sdk.transforms.ParDo;
@@ -744,6 +747,50 @@ public interface TestSerializationOfOptions extends PipelineOptions {
744747
void setIgnoredField(String value);
745748
}
746749

750+
@Test
751+
public void testLogLevel() {
752+
PipelineOptions options =
753+
PipelineOptionsFactory.fromArgs(
754+
new String[] {
755+
"--runner=DirectRunner",
756+
"--defaultSdkHarnessLogLevel=ERROR",
757+
"--sdkHarnessLogLevelOverrides={\"org.apache.beam.runners.direct.DirectRunnerTest\":\"INFO\"}"
758+
})
759+
.create();
760+
Pipeline pipeline = Pipeline.create(options);
761+
762+
LogManager logManager = LogManager.getLogManager();
763+
// use full name to avoid conflicts with org.slf4j.Logger
764+
java.util.logging.Logger rootLogger = logManager.getLogger("");
765+
Level originalLevel = rootLogger.getLevel();
766+
767+
try {
768+
pipeline
769+
.apply(Impulse.create())
770+
.apply(
771+
ParDo.of(
772+
new DoFn<byte[], byte[]>() {
773+
@ProcessElement
774+
public void process(@Element byte[] element, OutputReceiver<byte[]> o) {
775+
LogManager logManager = LogManager.getLogManager();
776+
java.util.logging.Logger rootLogger = logManager.getLogger("");
777+
// check loglevel here. Whether actual logs are rendered depends on slf4j impl
778+
// and upstream configs.
779+
assertEquals(Level.SEVERE, rootLogger.getLevel());
780+
assertEquals(
781+
Level.INFO,
782+
java.util.logging.Logger.getLogger(
783+
"org.apache.beam.runners.direct.DirectRunnerTest")
784+
.getLevel());
785+
}
786+
}));
787+
pipeline.run();
788+
} finally {
789+
// resume original log level
790+
rootLogger.setLevel(originalLevel);
791+
}
792+
}
793+
747794
private static class LongNoDecodeCoder extends AtomicCoder<Long> {
748795
@Override
749796
public void encode(Long value, OutputStream outStream) throws IOException {}

runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java

Lines changed: 86 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.ArrayList;
3232
import java.util.Collections;
33+
import java.util.LinkedHashMap;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.stream.Collectors;
@@ -101,20 +102,30 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
101102

102103
private static PortableMetrics convertMonitoringInfosToMetricResults(
103104
JobApi.MetricResults jobMetrics) {
104-
List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
105-
// TODO(https://github.com/apache/beam/issues/32001) dedup Attempted and Committed metrics
106-
monitoringInfoList.addAll(jobMetrics.getAttemptedList());
107-
monitoringInfoList.addAll(jobMetrics.getCommittedList());
108-
Iterable<MetricResult<Long>> countersFromJobMetrics =
109-
extractCountersFromJobMetrics(monitoringInfoList);
105+
// Deduplicate attempted + committed. Committed wins.
106+
LinkedHashMap<String, MiAndCommitted> infoMap = new LinkedHashMap<>();
107+
108+
for (MetricsApi.MonitoringInfo attempted : jobMetrics.getAttemptedList()) {
109+
String key = monitoringInfoKey(attempted);
110+
infoMap.putIfAbsent(key, new MiAndCommitted(attempted, false));
111+
}
112+
113+
for (MetricsApi.MonitoringInfo committed : jobMetrics.getCommittedList()) {
114+
String key = monitoringInfoKey(committed);
115+
infoMap.put(key, new MiAndCommitted(committed, true));
116+
}
117+
118+
List<MiAndCommitted> merged = new ArrayList<>(infoMap.values());
119+
120+
Iterable<MetricResult<Long>> countersFromJobMetrics = extractCountersFromJobMetrics(merged);
110121
Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
111-
extractDistributionMetricsFromJobMetrics(monitoringInfoList);
122+
extractDistributionMetricsFromJobMetrics(merged);
112123
Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
113-
extractGaugeMetricsFromJobMetrics(monitoringInfoList);
124+
extractGaugeMetricsFromJobMetrics(merged);
114125
Iterable<MetricResult<StringSetResult>> stringSetFromMetrics =
115-
extractStringSetMetricsFromJobMetrics(monitoringInfoList);
126+
extractStringSetMetricsFromJobMetrics(merged);
116127
Iterable<MetricResult<BoundedTrieResult>> boundedTrieFromMetrics =
117-
extractBoundedTrieMetricsFromJobMetrics(monitoringInfoList);
128+
extractBoundedTrieMetricsFromJobMetrics(merged);
118129
return new PortableMetrics(
119130
countersFromJobMetrics,
120131
distributionsFromMetrics,
@@ -123,26 +134,52 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
123134
boundedTrieFromMetrics);
124135
}
125136

137+
/**
138+
* Build a stable deduplication key for a MonitoringInfo based on type and the metric identity
139+
* labels.
140+
*/
141+
private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
142+
StringBuilder sb = new StringBuilder();
143+
sb.append(mi.getType()).append('|');
144+
Map<String, String> labels = mi.getLabelsMap();
145+
// Use canonical labels that form the metric identity
146+
sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
147+
sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
148+
sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
149+
return sb.toString();
150+
}
151+
152+
private static class MiAndCommitted {
153+
final MetricsApi.MonitoringInfo mi;
154+
final boolean committed;
155+
156+
MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
157+
this.mi = mi;
158+
this.committed = committed;
159+
}
160+
}
161+
126162
private static Iterable<MetricResult<DistributionResult>>
127-
extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> monitoringInfoList) {
163+
extractDistributionMetricsFromJobMetrics(List<MiAndCommitted> monitoringInfoList) {
128164
return monitoringInfoList.stream()
129-
.filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
130-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
131-
.map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
165+
.filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType()))
166+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
167+
.map(m -> convertDistributionMonitoringInfoToDistribution(m))
132168
.collect(Collectors.toList());
133169
}
134170

135171
private static Iterable<MetricResult<GaugeResult>> extractGaugeMetricsFromJobMetrics(
136-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
172+
List<MiAndCommitted> monitoringInfoList) {
137173
return monitoringInfoList.stream()
138-
.filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
139-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
140-
.map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
174+
.filter(m -> LATEST_INT64_TYPE.equals(m.mi.getType()))
175+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
176+
.map(m -> convertGaugeMonitoringInfoToGauge(m))
141177
.collect(Collectors.toList());
142178
}
143179

144-
private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
145-
MetricsApi.MonitoringInfo monitoringInfo) {
180+
private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(MiAndCommitted m) {
181+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
182+
boolean isCommitted = m.committed;
146183
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
147184
MetricKey key =
148185
MetricKey.create(
@@ -151,29 +188,31 @@ private static MetricResult<GaugeResult> convertGaugeMonitoringInfoToGauge(
151188

152189
GaugeData data = decodeInt64Gauge(monitoringInfo.getPayload());
153190
GaugeResult result = GaugeResult.create(data.value(), data.timestamp());
154-
return MetricResult.create(key, false, result);
191+
return MetricResult.create(key, isCommitted, result);
155192
}
156193

157194
private static Iterable<MetricResult<StringSetResult>> extractStringSetMetricsFromJobMetrics(
158-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
195+
List<MiAndCommitted> monitoringInfoList) {
159196
return monitoringInfoList.stream()
160-
.filter(item -> SET_STRING_TYPE.equals(item.getType()))
161-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
162-
.map(PortableMetrics::convertStringSetMonitoringInfoToStringSet)
197+
.filter(m -> SET_STRING_TYPE.equals(m.mi.getType()))
198+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
199+
.map(m -> convertStringSetMonitoringInfoToStringSet(m))
163200
.collect(Collectors.toList());
164201
}
165202

166203
private static Iterable<MetricResult<BoundedTrieResult>> extractBoundedTrieMetricsFromJobMetrics(
167-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
204+
List<MiAndCommitted> monitoringInfoList) {
168205
return monitoringInfoList.stream()
169-
.filter(item -> BOUNDED_TRIE_TYPE.equals(item.getType()))
170-
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
171-
.map(PortableMetrics::convertBoundedTrieMonitoringInfoToBoundedTrie)
206+
.filter(m -> BOUNDED_TRIE_TYPE.equals(m.mi.getType()))
207+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
208+
.map(m -> convertBoundedTrieMonitoringInfoToBoundedTrie(m))
172209
.collect(Collectors.toList());
173210
}
174211

175212
private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStringSet(
176-
MetricsApi.MonitoringInfo monitoringInfo) {
213+
MiAndCommitted m) {
214+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
215+
boolean isCommitted = m.committed;
177216
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
178217
MetricKey key =
179218
MetricKey.create(
@@ -182,11 +221,13 @@ private static MetricResult<StringSetResult> convertStringSetMonitoringInfoToStr
182221

183222
StringSetData data = decodeStringSet(monitoringInfo.getPayload());
184223
StringSetResult result = StringSetResult.create(data.stringSet());
185-
return MetricResult.create(key, false, result);
224+
return MetricResult.create(key, isCommitted, result);
186225
}
187226

188227
private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoToBoundedTrie(
189-
MetricsApi.MonitoringInfo monitoringInfo) {
228+
MiAndCommitted m) {
229+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
230+
boolean isCommitted = m.committed;
190231
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
191232
MetricKey key =
192233
MetricKey.create(
@@ -195,11 +236,13 @@ private static MetricResult<BoundedTrieResult> convertBoundedTrieMonitoringInfoT
195236

196237
BoundedTrieData data = decodeBoundedTrie(monitoringInfo.getPayload());
197238
BoundedTrieResult result = BoundedTrieResult.create(data.extractResult().getResult());
198-
return MetricResult.create(key, false, result);
239+
return MetricResult.create(key, isCommitted, result);
199240
}
200241

201242
private static MetricResult<DistributionResult> convertDistributionMonitoringInfoToDistribution(
202-
MetricsApi.MonitoringInfo monitoringInfo) {
243+
MiAndCommitted m) {
244+
MetricsApi.MonitoringInfo monitoringInfo = m.mi;
245+
boolean isCommitted = m.committed;
203246
Map<String, String> labelsMap = monitoringInfo.getLabelsMap();
204247
MetricKey key =
205248
MetricKey.create(
@@ -208,27 +251,26 @@ private static MetricResult<DistributionResult> convertDistributionMonitoringInf
208251
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
209252
DistributionResult result =
210253
DistributionResult.create(data.sum(), data.count(), data.min(), data.max());
211-
return MetricResult.create(key, false, result);
254+
return MetricResult.create(key, isCommitted, result);
212255
}
213256

214257
private static Iterable<MetricResult<Long>> extractCountersFromJobMetrics(
215-
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
258+
List<MiAndCommitted> monitoringInfoList) {
216259
return monitoringInfoList.stream()
217-
.filter(item -> SUM_INT64_TYPE.equals(item.getType()))
218-
.filter(
219-
item ->
220-
item.getLabelsMap().get(NAMESPACE_LABEL) != null) // filter out pcollection metrics
221-
.map(PortableMetrics::convertCounterMonitoringInfoToCounter)
260+
.filter(m -> SUM_INT64_TYPE.equals(m.mi.getType()))
261+
.filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
262+
.map(m -> convertCounterMonitoringInfoToCounter(m))
222263
.collect(Collectors.toList());
223264
}
224265

225-
private static MetricResult<Long> convertCounterMonitoringInfoToCounter(
226-
MetricsApi.MonitoringInfo counterMonInfo) {
266+
private static MetricResult<Long> convertCounterMonitoringInfoToCounter(MiAndCommitted m) {
267+
MetricsApi.MonitoringInfo counterMonInfo = m.mi;
268+
boolean isCommitted = m.committed;
227269
Map<String, String> labelsMap = counterMonInfo.getLabelsMap();
228270
MetricKey key =
229271
MetricKey.create(
230272
labelsMap.get(STEP_NAME_LABEL),
231273
MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL)));
232-
return MetricResult.create(key, false, decodeInt64Counter(counterMonInfo.getPayload()));
274+
return MetricResult.create(key, isCommitted, decodeInt64Counter(counterMonInfo.getPayload()));
233275
}
234276
}

runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,52 @@ public void removeStagedArtifacts(String stagingToken) {}
222222
server.start();
223223
}
224224

225+
@Test
226+
public void deduplicatesAttemptedAndCommittedMetrics() throws Exception {
227+
Map<String, String> labelMap = new HashMap<>();
228+
labelMap.put(NAMESPACE_LABEL, NAMESPACE);
229+
labelMap.put(METRIC_NAME_LABEL, METRIC_NAME);
230+
labelMap.put(STEP_NAME_LABEL, STEP_NAME);
231+
232+
// attempted counter (value 7) and committed counter (value 10) with same identity
233+
MetricsApi.MonitoringInfo attemptedCounter =
234+
MetricsApi.MonitoringInfo.newBuilder()
235+
.setType(COUNTER_TYPE)
236+
.putAllLabels(labelMap)
237+
.setPayload(encodeInt64Counter(7L))
238+
.build();
239+
240+
MetricsApi.MonitoringInfo committedCounter =
241+
MetricsApi.MonitoringInfo.newBuilder()
242+
.setType(COUNTER_TYPE)
243+
.putAllLabels(labelMap)
244+
.setPayload(encodeInt64Counter(10L))
245+
.build();
246+
247+
JobApi.MetricResults metricResults =
248+
JobApi.MetricResults.newBuilder()
249+
.addAttempted(attemptedCounter)
250+
.addCommitted(committedCounter)
251+
.build();
252+
253+
createJobServer(JobState.Enum.DONE, metricResults);
254+
PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess());
255+
PipelineResult result = runner.run(p);
256+
result.waitUntilFinish();
257+
258+
Iterable<org.apache.beam.sdk.metrics.MetricResult<Long>> counters =
259+
result.metrics().allMetrics().getCounters();
260+
ImmutableList<org.apache.beam.sdk.metrics.MetricResult<Long>> list =
261+
ImmutableList.copyOf(counters);
262+
263+
// Only one MetricResult should be present for the same identity.
264+
assertThat(list.size(), is(1));
265+
org.apache.beam.sdk.metrics.MetricResult<Long> r = list.get(0);
266+
267+
// Committed value should be present and equal to the committed payload (10).
268+
assertThat(r.getCommitted(), is(10L));
269+
}
270+
225271
private static PipelineOptions createPipelineOptions() {
226272
PortablePipelineOptions options =
227273
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);

sdks/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ require (
3535
github.com/aws/aws-sdk-go-v2 v1.41.0
3636
github.com/aws/aws-sdk-go-v2/config v1.32.6
3737
github.com/aws/aws-sdk-go-v2/credentials v1.19.6
38-
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.17
39-
github.com/aws/aws-sdk-go-v2/service/s3 v1.94.0
38+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.18
39+
github.com/aws/aws-sdk-go-v2/service/s3 v1.95.0
4040
github.com/aws/smithy-go v1.24.0
4141
github.com/docker/go-connections v0.6.0
4242
github.com/dustin/go-humanize v1.0.1
@@ -62,7 +62,7 @@ require (
6262
golang.org/x/text v0.32.0
6363
google.golang.org/api v0.257.0
6464
google.golang.org/genproto v0.0.0-20250922171735-9219d122eba9
65-
google.golang.org/grpc v1.77.0
65+
google.golang.org/grpc v1.78.0
6666
google.golang.org/protobuf v1.36.11
6767
gopkg.in/yaml.v2 v2.4.0
6868
gopkg.in/yaml.v3 v3.0.1

0 commit comments

Comments
 (0)