Skip to content

Commit 3def519

Browse files
authored
Add test for user counters in timer processing to FnApiDoFnRunnerTest. fix metrics test (#35494)
1 parent 27085de commit 3def519

File tree

1 file changed

+52
-37
lines changed

1 file changed

+52
-37
lines changed

sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,10 @@
6161
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
6262
import org.apache.beam.model.pipeline.v1.RunnerApi;
6363
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
64-
import org.apache.beam.runners.core.metrics.DistributionData;
6564
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
6665
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
6766
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
6867
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
69-
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
7068
import org.apache.beam.runners.core.metrics.ShortIdMap;
7169
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
7270
import org.apache.beam.sdk.Pipeline;
@@ -144,7 +142,6 @@
144142
import org.joda.time.format.PeriodFormat;
145143
import org.junit.Assert;
146144
import org.junit.Before;
147-
import org.junit.Ignore;
148145
import org.junit.Rule;
149146
import org.junit.Test;
150147
import org.junit.experimental.runners.Enclosed;
@@ -725,10 +722,11 @@ public MetricUpdate create(String stepName, MetricName name, long value) {
725722
}
726723

727724
@Test
728-
@Ignore("https://github.com/apache/beam/issues/20872")
729725
public void testUsingMetrics() throws Exception {
726+
MetricsEnvironment.setMetricsSupported(true);
730727
MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
731-
MetricsContainerImpl metricsContainer = metricsContainerRegistry.getUnboundContainer();
728+
MetricsContainerImpl metricsContainer =
729+
metricsContainerRegistry.getContainer(TEST_TRANSFORM_ID);
732730
Closeable closeable = MetricsEnvironment.scopedMetricsContainer(metricsContainer);
733731
FixedWindows windowFn = FixedWindows.of(Duration.millis(1L));
734732
IntervalWindow windowA = windowFn.assignWindow(new Instant(1L));
@@ -812,23 +810,8 @@ public void testUsingMetrics() throws Exception {
812810
Iterables.getOnlyElement(context.getTearDownFunctions()).run();
813811
assertThat(mainOutputValues, empty());
814812

815-
List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
813+
List<MonitoringInfo> expected = new ArrayList<>();
816814
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
817-
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
818-
builder.setLabel(
819-
MonitoringInfoConstants.Labels.PCOLLECTION, "Window.Into()/Window.Assign.out");
820-
builder.setInt64SumValue(2);
821-
expected.add(builder.build());
822-
823-
builder = new SimpleMonitoringInfoBuilder();
824-
builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
825-
builder.setLabel(
826-
MonitoringInfoConstants.Labels.PCOLLECTION,
827-
"pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
828-
builder.setInt64SumValue(2);
829-
expected.add(builder.build());
830-
831-
builder = new SimpleMonitoringInfoBuilder();
832815
builder
833816
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
834817
.setLabel(
@@ -841,23 +824,8 @@ public void testUsingMetrics() throws Exception {
841824
builder.setInt64SumValue(2);
842825
expected.add(builder.build());
843826

844-
builder = new SimpleMonitoringInfoBuilder();
845-
builder.setUrn(MonitoringInfoConstants.Urns.SAMPLED_BYTE_SIZE);
846-
builder.setLabel(
847-
MonitoringInfoConstants.Labels.PCOLLECTION, "Window.Into()/Window.Assign.out");
848-
builder.setInt64DistributionValue(DistributionData.create(4, 2, 2, 2));
849-
expected.add(builder.build());
850-
851-
builder = new SimpleMonitoringInfoBuilder();
852-
builder.setUrn(Urns.SAMPLED_BYTE_SIZE);
853-
builder.setLabel(
854-
MonitoringInfoConstants.Labels.PCOLLECTION,
855-
"pTransformId/ParMultiDo(TestSideInputIsAccessibleForDownstreamCallers).output");
856-
builder.setInt64DistributionValue(DistributionData.create(10, 2, 5, 5));
857-
expected.add(builder.build());
858-
859827
closeable.close();
860-
List<MonitoringInfo> result = new ArrayList<MonitoringInfo>();
828+
List<MonitoringInfo> result = new ArrayList<>();
861829
for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
862830
result.add(mi);
863831
}
@@ -910,6 +878,11 @@ public <T> FnDataReceiver<T> registerOutputTimersLocation(
910878

911879
@Test
912880
public void testTimers() throws Exception {
881+
MetricsEnvironment.setMetricsSupported(true);
882+
MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
883+
MetricsContainerImpl metricsContainer =
884+
metricsContainerRegistry.getContainer(TEST_TRANSFORM_ID);
885+
Closeable closeable = MetricsEnvironment.scopedMetricsContainer(metricsContainer);
913886
dateTimeProvider.setDateTimeFixed(10000L);
914887

915888
Pipeline p = Pipeline.create();
@@ -1136,6 +1109,32 @@ public void testTimers() throws Exception {
11361109
.build())
11371110
.getData(),
11381111
fakeStateClient.getData());
1112+
1113+
List<MonitoringInfo> expected = new ArrayList<MonitoringInfo>();
1114+
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
1115+
builder
1116+
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
1117+
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, TestTimerfulDoFn.class.getName())
1118+
.setLabel(MonitoringInfoConstants.Labels.NAME, TestTimerfulDoFn.USER_COUNTER_ELEMS_NAME);
1119+
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
1120+
builder.setInt64SumValue(4);
1121+
expected.add(builder.build());
1122+
1123+
builder = new SimpleMonitoringInfoBuilder();
1124+
builder
1125+
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
1126+
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, TestTimerfulDoFn.class.getName())
1127+
.setLabel(MonitoringInfoConstants.Labels.NAME, TestTimerfulDoFn.USER_COUNTER_TIMERS_NAME);
1128+
builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, TEST_TRANSFORM_ID);
1129+
builder.setInt64SumValue(15);
1130+
expected.add(builder.build());
1131+
1132+
closeable.close();
1133+
List<MonitoringInfo> result = new ArrayList<MonitoringInfo>();
1134+
for (MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
1135+
result.add(mi);
1136+
}
1137+
assertThat(result, containsInAnyOrder(expected.toArray()));
11391138
}
11401139

11411140
private <K> org.apache.beam.sdk.util.construction.Timer<K> timerInGlobalWindow(
@@ -1175,6 +1174,13 @@ private <T> WindowedValue<T> valueInWindows(
11751174
}
11761175

11771176
private static class TestTimerfulDoFn extends DoFn<KV<String, String>, String> {
1177+
public static final String USER_COUNTER_TIMERS_NAME = "userCountedTimers";
1178+
public static final String USER_COUNTER_ELEMS_NAME = "userCountedElements";
1179+
1180+
private final Counter counterTimers =
1181+
Metrics.counter(TestTimerfulDoFn.class, USER_COUNTER_TIMERS_NAME);
1182+
private final Counter counterElems =
1183+
Metrics.counter(TestTimerfulDoFn.class, USER_COUNTER_ELEMS_NAME);
11781184

11791185
@StateId("bag")
11801186
private final StateSpec<BagState<String>> bagStateSpec = StateSpecs.bag(StringUtf8Coder.of());
@@ -1200,6 +1206,7 @@ public void processElement(
12001206
@TimerId("processing") Timer processingTimeTimer,
12011207
@TimerFamily("event-family") TimerMap eventTimerFamily,
12021208
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
1209+
counterElems.inc();
12031210
context.output(
12041211
"key:"
12051212
+ context.element().getKey()
@@ -1235,6 +1242,8 @@ public void eventTimer(
12351242
@TimerId("processing") Timer processingTimeTimer,
12361243
@TimerFamily("event-family") TimerMap eventTimerFamily,
12371244
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
1245+
counterTimers.inc();
1246+
12381247
context.output("key:" + key + " event" + Iterables.toString(bagState.read()));
12391248
bagState.add("event");
12401249
eventTimeTimer
@@ -1259,6 +1268,8 @@ public void processingTimer(
12591268
@TimerId("processing") Timer processingTimeTimer,
12601269
@TimerFamily("event-family") TimerMap eventTimerFamily,
12611270
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
1271+
counterTimers.inc();
1272+
12621273
context.output("key:" + key + " processing" + Iterables.toString(bagState.read()));
12631274
bagState.add("processing");
12641275

@@ -1285,6 +1296,8 @@ public void eventFamilyOnTimer(
12851296
@TimerId("processing") Timer processingTimeTimer,
12861297
@TimerFamily("event-family") TimerMap eventTimerFamily,
12871298
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
1299+
counterTimers.inc();
1300+
12881301
context.output("key:" + key + " event-family" + Iterables.toString(bagState.read()));
12891302
bagState.add("event-family");
12901303

@@ -1310,6 +1323,8 @@ public void processingFamilyOnTimer(
13101323
@TimerId("processing") Timer processingTimeTimer,
13111324
@TimerFamily("event-family") TimerMap eventTimerFamily,
13121325
@TimerFamily("processing-family") TimerMap processingTimerFamily) {
1326+
counterTimers.inc();
1327+
13131328
context.output("key:" + key + " processing-family" + Iterables.toString(bagState.read()));
13141329
bagState.add("processing-family");
13151330

0 commit comments

Comments
 (0)