Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,10 @@
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -144,7 +142,6 @@
import org.joda.time.format.PeriodFormat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
Expand Down Expand Up @@ -725,10 +722,11 @@ public MetricUpdate create(String stepName, MetricName name, long value) {
}

@Test
@Ignore("https://github.com/apache/beam/issues/20872")
public void testUsingMetrics() throws Exception {
MetricsEnvironment.setMetricsSupported(true);
MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
MetricsContainerImpl metricsContainer = metricsContainerRegistry.getUnboundContainer();
MetricsContainerImpl metricsContainer =
metricsContainerRegistry.getContainer(TEST_TRANSFORM_ID);
Closeable closeable = MetricsEnvironment.scopedMetricsContainer(metricsContainer);
FixedWindows windowFn = FixedWindows.of(Duration.millis(1L));
IntervalWindow windowA = windowFn.assignWindow(new Instant(1L));
Expand Down Expand Up @@ -812,23 +810,8 @@ public void testUsingMetrics() throws Exception {
Iterables.getOnlyElement(context.getTearDownFunctions()).run();
assertThat(mainOutputValues, empty());

List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
List<MonitoringInfo> expected = new ArrayList<>();
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
builder.setLabel(
MonitoringInfoConstants.Labels.PCOLLECTION, "Window.Into()/Window.Assign.out");
builder.setInt64SumValue(2);
expected.add(builder.build());

builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
builder.setLabel(
MonitoringInfoConstants.Labels.PCOLLECTION,
"pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
builder.setInt64SumValue(2);
expected.add(builder.build());

builder = new SimpleMonitoringInfoBuilder();
builder
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(
Expand All @@ -841,23 +824,8 @@ public void testUsingMetrics() throws Exception {
builder.setInt64SumValue(2);
expected.add(builder.build());

builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
builder.setLabel(
MonitoringInfoConstants.Labels.PCOLLECTION, "Window.Into()/Window.Assign.out");
builder.setInt64DistributionValue(DistributionData.create(4, 2, 2, 2));
expected.add(builder.build());

builder = new SimpleMonitoringInfoBuilder();
builder.setUrn(Urns.SAMPLED_BYTE_SIZE);
builder.setLabel(
MonitoringInfoConstants.Labels.PCOLLECTION,
"pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
builder.setInt64DistributionValue(DistributionData.create(10, 2, 5, 5));
expected.add(builder.build());

closeable.close();
List<MonitoringInfo> result = new ArrayList<MonitoringInfo>();
List<MonitoringInfo> result = new ArrayList<>();
for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
result.add(mi);
}
Expand Down Expand Up @@ -910,6 +878,11 @@ public <T> FnDataReceiver<T> registerOutputTimersLocation(

@Test
public void testTimers() throws Exception {
MetricsEnvironment.setMetricsSupported(true);
MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
MetricsContainerImpl metricsContainer =
metricsContainerRegistry.getContainer(TEST_TRANSFORM_ID);
Closeable closeable = MetricsEnvironment.scopedMetricsContainer(metricsContainer);
dateTimeProvider.setDateTimeFixed(10000L);

Pipeline p = Pipeline.create();
Expand Down Expand Up @@ -1136,6 +1109,32 @@ public void testTimers() throws Exception {
.build())
.getData(),
fakeStateClient.getData());

List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
builder
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, TestTimerfulDoFn.class.getName())
.setLabel(MonitoringInfoConstants.Labels.NAME, TestTimerfulDoFn.USER_COUNTER_ELEMS_NAME);
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
builder.setInt64SumValue(4);
expected.add(builder.build());

builder = new SimpleMonitoringInfoBuilder();
builder
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, TestTimerfulDoFn.class.getName())
.setLabel(MonitoringInfoConstants.Labels.NAME, TestTimerfulDoFn.USER_COUNTER_TIMERS_NAME);
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
builder.setInt64SumValue(15);
expected.add(builder.build());

closeable.close();
List<MonitoringInfo> result = new ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
result.add(mi);
}
assertThat(result, containsInAnyOrder(expected.toArray()));
}

private <K> org.apache.beam.sdk.util.construction.Timer<K> timerInGlobalWindow(
Expand Down Expand Up @@ -1175,6 +1174,13 @@ private <T> WindowedValue<T> valueInWindows(
}

private static class TestTimerfulDoFn extends DoFn<KV<String, String>, String> {
public static final String USER_COUNTER_TIMERS_NAME = "userCountedTimers";
public static final String USER_COUNTER_ELEMS_NAME = "userCountedElements";

private final Counter counterTimers =
Metrics.counter(TestTimerfulDoFn.class, USER_COUNTER_TIMERS_NAME);
private final Counter counterElems =
Metrics.counter(TestTimerfulDoFn.class, USER_COUNTER_ELEMS_NAME);

@StateId("bag")
private final StateSpec<BagState<String>> bagStateSpec = StateSpecs.bag(StringUtf8Coder.of());
Expand All @@ -1200,6 +1206,7 @@ public void processElement(
@TimerId("processing") Timer processingTimeTimer,
@TimerFamily("event-family") TimerMap eventTimerFamily,
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
counterElems.inc();
context.output(
"key:"
+ context.element().getKey()
Expand Down Expand Up @@ -1235,6 +1242,8 @@ public void eventTimer(
@TimerId("processing") Timer processingTimeTimer,
@TimerFamily("event-family") TimerMap eventTimerFamily,
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
counterTimers.inc();

context.output("key:" + key + " event" + Iterables.toString(bagState.read()));
bagState.add("event");
eventTimeTimer
Expand All @@ -1259,6 +1268,8 @@ public void processingTimer(
@TimerId("processing") Timer processingTimeTimer,
@TimerFamily("event-family") TimerMap eventTimerFamily,
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
counterTimers.inc();

context.output("key:" + key + " processing" + Iterables.toString(bagState.read()));
bagState.add("processing");

Expand All @@ -1285,6 +1296,8 @@ public void eventFamilyOnTimer(
@TimerId("processing") Timer processingTimeTimer,
@TimerFamily("event-family") TimerMap eventTimerFamily,
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
counterTimers.inc();

context.output("key:" + key + " event-family" + Iterables.toString(bagState.read()));
bagState.add("event-family");

Expand All @@ -1310,6 +1323,8 @@ public void processingFamilyOnTimer(
@TimerId("processing") Timer processingTimeTimer,
@TimerFamily("event-family") TimerMap eventTimerFamily,
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
counterTimers.inc();

context.output("key:" + key + " processing-family" + Iterables.toString(bagState.read()));
bagState.add("processing-family");

Expand Down
Loading