Skip to content

Commit 3910192

Browse files
committed
Drop support DataflowMetrics and MetricsToCounterUpdateConverter until Dataflow java client support is ready
1 parent 66801f7 commit 3910192

File tree

8 files changed

+7
-241
lines changed

8 files changed

+7
-241
lines changed

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ dependencies {
9494
// io-kafka is only used in PTransform override so it is optional
9595
provided project(":sdks:java:io:kafka")
9696
implementation project(":sdks:java:io:google-cloud-platform")
97-
implementation project(":runners:core-java")
9897
implementation library.java.avro
9998
implementation library.java.bigdataoss_util
10099
implementation library.java.commons_codec

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import java.util.HashMap;
3030
import java.util.HashSet;
3131
import java.util.List;
32-
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
3332
import org.apache.beam.model.pipeline.v1.RunnerApi;
34-
import org.apache.beam.runners.core.metrics.BoundedTrieData;
3533
import org.apache.beam.sdk.metrics.BoundedTrieResult;
3634
import org.apache.beam.sdk.metrics.DistributionResult;
3735
import org.apache.beam.sdk.metrics.GaugeResult;
@@ -57,11 +55,7 @@
5755
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
5856
})
5957
class DataflowMetrics extends MetricResults {
60-
6158
private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class);
62-
// TODO (rosinha): Remove this once bounded_trie is available in metrics proto Dataflow
63-
// java client.
64-
public static final String BOUNDED_TRIE = "bounded_trie";
6559
/**
6660
* Client for the Dataflow service. This can be used to query the service for information about
6761
* the job.
@@ -110,13 +104,13 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
110104
ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
111105
ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of();
112106
ImmutableList<MetricResult<StringSetResult>> stringSets = ImmutableList.of();
113-
ImmutableList<MetricResult<BoundedTrieResult>> boundedTries = ImmutableList.of();
107+
ImmutableList<MetricResult<BoundedTrieResult>> boudedTries = ImmutableList.of();
114108
JobMetrics jobMetrics;
115109
try {
116110
jobMetrics = getJobMetrics();
117111
} catch (IOException e) {
118112
LOG.warn("Unable to query job metrics.\n");
119-
return MetricQueryResults.create(counters, distributions, gauges, stringSets, boundedTries);
113+
return MetricQueryResults.create(counters, distributions, gauges, stringSets, boudedTries);
120114
}
121115
metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList());
122116
return populateMetricQueryResults(metricUpdates, filter);
@@ -140,15 +134,13 @@ private static class DataflowMetricResultExtractor {
140134
private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
141135
private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;
142136
private final ImmutableList.Builder<MetricResult<StringSetResult>> stringSetResults;
143-
private final ImmutableList.Builder<MetricResult<BoundedTrieResult>> boundedTrieResults;
144137
private final boolean isStreamingJob;
145138

146139
DataflowMetricResultExtractor(boolean isStreamingJob) {
147140
counterResults = ImmutableList.builder();
148141
distributionResults = ImmutableList.builder();
149142
gaugeResults = ImmutableList.builder();
150143
stringSetResults = ImmutableList.builder();
151-
boundedTrieResults = ImmutableList.builder();
152144
/* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
153145
* In Dataflow batch jobs, only COMMITTED metrics are available, but
154146
* we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
@@ -177,11 +169,6 @@ public void addMetricResult(
177169
// stringset metric
178170
StringSetResult value = getStringSetValue(committed);
179171
stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
180-
} else if (committed.get(BOUNDED_TRIE) != null && attempted.get(BOUNDED_TRIE) != null) {
181-
// TODO (rosinha): This is dummy code. Once Dataflow MetricUpdate
182-
// google client api is updated. Update this.
183-
BoundedTrieResult value = getBoundedTrieValue(committed);
184-
boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
185172
} else {
186173
// This is exceptionally unexpected. We expect matching user metrics to only have the
187174
// value types provided by the Metrics API.
@@ -209,15 +196,6 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
209196
return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet())));
210197
}
211198

212-
private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) {
213-
if (metricUpdate.get(BOUNDED_TRIE) == null) {
214-
return BoundedTrieResult.empty();
215-
}
216-
BoundedTrie bTrie = (BoundedTrie) metricUpdate.get(BOUNDED_TRIE);
217-
BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie);
218-
return BoundedTrieResult.create(trieData.extractResult().getResult());
219-
}
220-
221199
private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
222200
if (metricUpdate.getDistribution() == null) {
223201
return DistributionResult.IDENTITY_ELEMENT;
@@ -242,13 +220,9 @@ public Iterable<MetricResult<GaugeResult>> getGaugeResults() {
242220
return gaugeResults.build();
243221
}
244222

245-
public Iterable<MetricResult<StringSetResult>> getStringSetResults() {
223+
public Iterable<MetricResult<StringSetResult>> geStringSetResults() {
246224
return stringSetResults.build();
247225
}
248-
249-
public Iterable<MetricResult<BoundedTrieResult>> getBoundedTrieResults() {
250-
return boundedTrieResults.build();
251-
}
252226
}
253227

254228
private static class DataflowMetricQueryResultsFactory {
@@ -414,8 +388,8 @@ public MetricQueryResults build() {
414388
extractor.getCounterResults(),
415389
extractor.getDistributionResults(),
416390
extractor.getGaugeResults(),
417-
extractor.getStringSetResults(),
418-
extractor.getBoundedTrieResults());
391+
extractor.geStringSetResults(),
392+
ImmutableList.of());
419393
}
420394
}
421395
}

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,11 @@
4141
import java.io.IOException;
4242
import java.math.BigDecimal;
4343
import java.util.Set;
44-
import org.apache.beam.runners.core.metrics.BoundedTrieData;
4544
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
4645
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
4746
import org.apache.beam.sdk.PipelineResult.State;
4847
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
4948
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
50-
import org.apache.beam.sdk.metrics.BoundedTrieResult;
5149
import org.apache.beam.sdk.metrics.DistributionResult;
5250
import org.apache.beam.sdk.metrics.MetricQueryResults;
5351
import org.apache.beam.sdk.metrics.MetricsFilter;
@@ -198,13 +196,6 @@ private MetricUpdate makeStringSetMetricUpdate(
198196
return setStructuredName(update, name, namespace, step, tentative);
199197
}
200198

201-
private MetricUpdate makeBoundedTrieMetricUpdate(
202-
String name, String namespace, String step, BoundedTrieData data, boolean tentative) {
203-
MetricUpdate update = new MetricUpdate();
204-
update.set(DataflowMetrics.BOUNDED_TRIE, data.toProto());
205-
return setStructuredName(update, name, namespace, step, tentative);
206-
}
207-
208199
@Test
209200
public void testSingleCounterUpdates() throws IOException {
210201
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
@@ -295,64 +286,6 @@ public void testSingleStringSetUpdates() throws IOException {
295286
StringSetResult.create(ImmutableSet.of("ab", "cd")))));
296287
}
297288

298-
@Test
299-
public void testSingleBoundedTrieUpdates() throws IOException {
300-
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
301-
when(myStep.getFullName()).thenReturn("myStepName");
302-
BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames = HashBiMap.create();
303-
transformStepNames.put(myStep, "s2");
304-
305-
JobMetrics jobMetrics = new JobMetrics();
306-
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
307-
DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
308-
when(options.isStreaming()).thenReturn(false);
309-
when(job.getDataflowOptions()).thenReturn(options);
310-
when(job.getState()).thenReturn(State.RUNNING);
311-
when(job.getJobId()).thenReturn(JOB_ID);
312-
when(job.getTransformStepNames()).thenReturn(transformStepNames);
313-
314-
// The parser relies on the fact that one tentative and one committed metric update exist in
315-
// the job metrics results.
316-
MetricUpdate mu1 =
317-
makeBoundedTrieMetricUpdate(
318-
"counterName",
319-
"counterNamespace",
320-
"s2",
321-
new BoundedTrieData(ImmutableList.of("ab", "cd")),
322-
false);
323-
MetricUpdate mu1Tentative =
324-
makeBoundedTrieMetricUpdate(
325-
"counterName",
326-
"counterNamespace",
327-
"s2",
328-
new BoundedTrieData(ImmutableList.of("ab", "cd")),
329-
true);
330-
jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative));
331-
DataflowClient dataflowClient = mock(DataflowClient.class);
332-
when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
333-
334-
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
335-
MetricQueryResults result = dataflowMetrics.allMetrics();
336-
assertThat(
337-
result.getBoundedTries(),
338-
containsInAnyOrder(
339-
attemptedMetricsResult(
340-
"counterNamespace",
341-
"counterName",
342-
"myStepName",
343-
BoundedTrieResult.create(
344-
ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false)))))));
345-
assertThat(
346-
result.getBoundedTries(),
347-
containsInAnyOrder(
348-
committedMetricsResult(
349-
"counterNamespace",
350-
"counterName",
351-
"myStepName",
352-
BoundedTrieResult.create(
353-
ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false)))))));
354-
}
355-
356289
@Test
357290
public void testIgnoreDistributionButGetCounterUpdates() throws IOException {
358291
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -519,12 +519,7 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
519519
.transform(
520520
update ->
521521
MetricsToCounterUpdateConverter.fromStringSet(
522-
update.getKey(), true, update.getUpdate())),
523-
FluentIterable.from(updates.boundedTrieUpdates())
524-
.transform(
525-
update ->
526-
MetricsToCounterUpdateConverter.fromBoundedTrie(
527-
update.getKey(), update.getUpdate())));
522+
update.getKey(), true, update.getUpdate())));
528523
});
529524
}
530525

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.api.services.dataflow.model.IntegerGauge;
2828
import com.google.api.services.dataflow.model.StringList;
2929
import java.util.ArrayList;
30-
import org.apache.beam.runners.core.metrics.BoundedTrieData;
3130
import org.apache.beam.runners.core.metrics.DistributionData;
3231
import org.apache.beam.runners.core.metrics.StringSetData;
3332
import org.apache.beam.sdk.metrics.MetricKey;
@@ -112,16 +111,6 @@ public static CounterUpdate fromStringSet(
112111
.setStringList(stringList);
113112
}
114113

115-
public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) {
116-
// BoundedTrie uses SET kind metric aggregation which tracks unique strings.
117-
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET);
118-
// TODO (rosinha): Once the CounterUpdate API is updated in dataflow client update this.
119-
return new CounterUpdate()
120-
.setStructuredNameAndMetadata(name)
121-
.setCumulative(false)
122-
.set("bounded_trie", boundedTrieData.toProto());
123-
}
124-
125114
public static CounterUpdate fromDistribution(
126115
MetricKey key, boolean isCumulative, DistributionData update) {
127116
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,7 @@ public Iterable<CounterUpdate> extractUpdates() {
217217
return counterUpdates()
218218
.append(distributionUpdates())
219219
.append(gaugeUpdates())
220-
.append(stringSetUpdates())
221-
.append(boundedTrieUpdates());
220+
.append(stringSetUpdates());
222221
}
223222

224223
private FluentIterable<CounterUpdate> counterUpdates() {
@@ -278,20 +277,6 @@ private FluentIterable<CounterUpdate> stringSetUpdates() {
278277
.filter(Predicates.notNull());
279278
}
280279

281-
private FluentIterable<CounterUpdate> boundedTrieUpdates() {
282-
return FluentIterable.from(boundedTries.entries())
283-
.transform(
284-
new Function<Entry<MetricName, BoundedTrieCell>, CounterUpdate>() {
285-
@Override
286-
public @Nullable CounterUpdate apply(
287-
@Nonnull Map.Entry<MetricName, BoundedTrieCell> entry) {
288-
return MetricsToCounterUpdateConverter.fromBoundedTrie(
289-
MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative());
290-
}
291-
})
292-
.filter(Predicates.notNull());
293-
}
294-
295280
private FluentIterable<CounterUpdate> distributionUpdates() {
296281
return FluentIterable.from(distributions.entries())
297282
.transform(

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import com.google.api.services.dataflow.model.DistributionUpdate;
3333
import com.google.api.services.dataflow.model.StringList;
3434
import java.util.Arrays;
35-
import org.apache.beam.model.pipeline.v1.MetricsApi;
36-
import org.apache.beam.runners.core.metrics.BoundedTrieData;
3735
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
3836
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
3937
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
@@ -42,15 +40,13 @@
4240
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
4341
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
4442
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
45-
import org.apache.beam.sdk.metrics.BoundedTrie;
4643
import org.apache.beam.sdk.metrics.Counter;
4744
import org.apache.beam.sdk.metrics.Distribution;
4845
import org.apache.beam.sdk.metrics.MetricName;
4946
import org.apache.beam.sdk.metrics.Metrics;
5047
import org.apache.beam.sdk.metrics.MetricsContainer;
5148
import org.apache.beam.sdk.metrics.StringSet;
5249
import org.apache.beam.sdk.options.PipelineOptionsFactory;
53-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
5450
import org.hamcrest.Matchers;
5551
import org.junit.Test;
5652
import org.junit.runner.RunWith;
@@ -197,42 +193,6 @@ public void extractMetricUpdatesStringSet() {
197193
assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
198194
}
199195

200-
@Test
201-
public void extractMetricUpdatesBoundedTrie() {
202-
BatchModeExecutionContext executionContext =
203-
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage");
204-
DataflowOperationContext operationContext =
205-
executionContext.createOperationContext(NameContextsForTests.nameContextForTest());
206-
207-
BoundedTrie boundedTrie =
208-
operationContext
209-
.metricsContainer()
210-
.getBoundedTrie(MetricName.named("namespace", "some-bounded-trie"));
211-
boundedTrie.add("ab");
212-
boundedTrie.add("cd");
213-
214-
BoundedTrieData trieData = new BoundedTrieData();
215-
trieData.add(ImmutableList.of("ab"));
216-
trieData.add(ImmutableList.of("cd"));
217-
MetricsApi.BoundedTrie expectedTrie = trieData.toProto();
218-
219-
final CounterUpdate expected =
220-
new CounterUpdate()
221-
.setStructuredNameAndMetadata(
222-
new CounterStructuredNameAndMetadata()
223-
.setName(
224-
new CounterStructuredName()
225-
.setOrigin("USER")
226-
.setOriginNamespace("namespace")
227-
.setName("some-bounded-trie")
228-
.setOriginalStepName("originalName"))
229-
.setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
230-
.setCumulative(false)
231-
.set("bounded_trie", expectedTrie);
232-
233-
assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
234-
}
235-
236196
@Test
237197
public void extractMsecCounters() {
238198
BatchModeExecutionContext executionContext =

0 commit comments

Comments
 (0)