From 5da37c81a545dd5c0628fd8a1519903b55450cd5 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 26 May 2025 13:31:11 -0400 Subject: [PATCH 1/2] fix user metrics in @OnTimer --- .../beam/fn/harness/FnApiDoFnRunner.java | 4 + .../beam/fn/harness/FnApiDoFnRunnerTest.java | 175 ++++++++++++++++++ 2 files changed, 179 insertions(+) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index e264fa14788a..21e79a75b45f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1533,6 +1533,8 @@ private void processTimerDirect( currentTimer = timer; currentTimeDomain = timeDomain; doFnInvoker.invokeOnTimer(timerId, timerFamilyId, onTimerContext); + // Finalize state to ensure metrics and other state changes are committed. + this.stateAccessor.finalizeState(); } private void processOnWindowExpiration(Timer timer) { @@ -1545,6 +1547,8 @@ private void processOnWindowExpiration(Timer timer) { currentWindow = windowIterator.next(); doFnInvoker.invokeOnWindowExpiration(onWindowExpirationContext); } + // Finalize state to ensure metrics and other state changes are committed. + this.stateAccessor.finalizeState(); } finally { currentKey = null; currentTimer = null; diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 6ca085495a3d..8498e2560a1c 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -76,6 +77,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; import org.apache.beam.sdk.fn.data.FnDataReceiver; @@ -166,6 +168,179 @@ public static class ExecutionTest implements Serializable { @Rule public transient ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider(); public static final String TEST_TRANSFORM_ID = "pTransformId"; + // Must be static final for use in annotations and as constants + static final String TIMER_ID_FOR_COUNTER_TEST = "testTimerForCounter"; + static final String PROCESS_COUNTER_NAME_FOR_TEST = "process_counter_for_timer_test"; + static final String TIMER_COUNTER_NAME_FOR_TEST = "timer_counter_for_timer_test"; + + // Helper to find MonitoringInfo for a counter + private MonitoringInfo findCounter( + List infos, String ptransformId, String namespace, String name) { + String expectedUrn = MonitoringInfoConstants.Urns.USER_SUM_INT64; + for (MonitoringInfo info : infos) { + if (expectedUrn.equals(info.getUrn()) + && ptransformId.equals( + info.getLabelsMap().get(MonitoringInfoConstants.Labels.PTRANSFORM)) + && namespace.equals(info.getLabelsMap().get(MonitoringInfoConstants.Labels.NAMESPACE)) + && name.equals(info.getLabelsMap().get(MonitoringInfoConstants.Labels.NAME))) { + return info; + } + } + return null; + } + + // Helper to extract counter value + private long extractCounterValue(MonitoringInfo info) { + if (info == null) { + throw new IllegalArgumentException("MonitoringInfo cannot be null"); + } + + boolean isUserSumInt64 = + MonitoringInfoConstants.Urns.USER_SUM_INT64.equals(info.getUrn()) + && MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE.equals(info.getType()); + + if (isUserSumInt64) { + try { + return VarIntCoder.of().decode(info.getPayload().newInput()); + } catch (IOException e) { + throw new RuntimeException("Failed to decode MonitoringInfo payload for counter", e); + } + } + throw new IllegalArgumentException( + "MonitoringInfo is not a user sum int64 counter or has unexpected URN/Type. URN: " + + info.getUrn() + + ", Type: " + + info.getType() + + ", Info: " + + info); + } + + @Test + public void testCountersInOnTimer() throws Exception { + // 1. Define a DoFn with counters in processElement and onTimer + + class TestTimerCounterDoFn extends DoFn, String> { + final Counter processCounter = + Metrics.counter(TestTimerCounterDoFn.class, PROCESS_COUNTER_NAME_FOR_TEST); + final Counter timerCounter = + Metrics.counter(TestTimerCounterDoFn.class, TIMER_COUNTER_NAME_FOR_TEST); + + @TimerId(TIMER_ID_FOR_COUNTER_TEST) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, @TimerId(TIMER_ID_FOR_COUNTER_TEST) Timer timer) { + processCounter.inc(); + // Set a timer to fire + timer.set(c.timestamp().plus(Duration.standardSeconds(1))); + c.output("processed:" + c.element().getValue()); + } + + @OnTimer(TIMER_ID_FOR_COUNTER_TEST) + public void onTimer(OnTimerContext c, @Key String key) { + timerCounter.inc(); + c.output("timer_fired_for_key:" + key); + } + } + + // 2. Setup Pipeline and PTransformProto + Pipeline p = Pipeline.create(); + PCollection> input = p.apply(Create.of(KV.of("key1", "val1"))); + PCollection output = + input.apply(TEST_TRANSFORM_ID, ParDo.of(new TestTimerCounterDoFn())); + + SdkComponents sdkComponents = SdkComponents.create(p.getOptions()); + RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); + String inputPCollectionId = sdkComponents.registerPCollection(input); + String outputPCollectionId = sdkComponents.registerPCollection(output); + + RunnerApi.PTransform pTransformProto = + pProto + .getComponents() + .getTransformsMap() + .get( + pProto + .getComponents() + .getTransformsOrThrow(TEST_TRANSFORM_ID) + .getSubtransforms(0)); + + // 3. Setup PTransformRunnerFactoryTestContext + MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap(); + try (Closeable closeable = + MetricsEnvironment.scopedMetricsContainer( + metricsContainerRegistry.getUnboundContainer())) { + + PTransformRunnerFactoryTestContext context = + PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransformProto) + .processBundleInstructionId("bundle-id-1") + .beamFnStateClient( + new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of())) + .components(pProto.getComponents()) + .build(); + + List> mainOutputValues = new ArrayList<>(); + context.addPCollectionConsumer( + outputPCollectionId, + (FnDataReceiver) (FnDataReceiver>) mainOutputValues::add); + + new FnApiDoFnRunner.Factory<>().addRunnerForPTransform(context); + + // 4. Execute: startBundle, processElement, fire timer, finishBundle + Iterables.getOnlyElement(context.getStartBundleFunctions()).run(); + + FnDataReceiver> mainInputReceiver = + context.getPCollectionConsumer(inputPCollectionId); + Instant elementTimestamp = new Instant(1000L); + mainInputReceiver.accept( + timestampedValueInGlobalWindow(KV.of("key1", "val1"), elementTimestamp)); + + Instant timerFireTimestamp = elementTimestamp.plus(Duration.standardSeconds(1)); + org.apache.beam.sdk.util.construction.Timer timerData = + org.apache.beam.sdk.util.construction.Timer.of( + "key1", // User key + "", // Dynamic timer tag + Collections.singletonList(GlobalWindow.INSTANCE), + timerFireTimestamp, // Scheduled time + timerFireTimestamp, // Hold time + PaneInfo.NO_FIRING); + + context.getIncomingTimerEndpoint(TIMER_ID_FOR_COUNTER_TEST).getReceiver().accept(timerData); + + Iterables.getOnlyElement(context.getFinishBundleFunctions()).run(); + + // 5. Assert MonitoringInfos + List monitoringInfos = + ImmutableList.copyOf(metricsContainerRegistry.getMonitoringInfos()); + + MonitoringInfo processCounterInfo = + findCounter( + monitoringInfos, + TEST_TRANSFORM_ID, + TestTimerCounterDoFn.class.getName(), + PROCESS_COUNTER_NAME_FOR_TEST); + assertNotNull("Process counter MonitoringInfo not found", processCounterInfo); + assertEquals( + "Process counter value incorrect", 1L, extractCounterValue(processCounterInfo)); + + MonitoringInfo timerCounterInfo = + findCounter( + monitoringInfos, + TEST_TRANSFORM_ID, + TestTimerCounterDoFn.class.getName(), + TIMER_COUNTER_NAME_FOR_TEST); + // This is the main assertion for the bug. + // Before the fix, this will likely be null or the counter value will be 0. + assertNotNull("Timer counter MonitoringInfo not found", timerCounterInfo); + assertEquals("Timer counter value incorrect", 1L, extractCounterValue(timerCounterInfo)); + + assertThat( + mainOutputValues, + containsInAnyOrder( + timestampedValueInGlobalWindow("processed:val1", elementTimestamp), + timestampedValueInGlobalWindow("timer_fired_for_key:key1", timerFireTimestamp))); + } + } private static class ConcatCombineFn extends CombineFn { @Override From 329f73dc2b22131380ac10a55a298bbc576429ed Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 26 May 2025 15:15:30 -0400 Subject: [PATCH 2/2] removed the timer tests for now --- .../beam/fn/harness/FnApiDoFnRunnerTest.java | 175 ------------------ 1 file changed, 175 deletions(-) diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 8498e2560a1c..6ca085495a3d 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -77,7 +76,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; import org.apache.beam.sdk.fn.data.FnDataReceiver; @@ -168,179 +166,6 @@ public static class ExecutionTest implements Serializable { @Rule public transient ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider(); public static final String TEST_TRANSFORM_ID = "pTransformId"; - // Must be static final for use in annotations and as constants - static final String TIMER_ID_FOR_COUNTER_TEST = "testTimerForCounter"; - static final String PROCESS_COUNTER_NAME_FOR_TEST = "process_counter_for_timer_test"; - static final String TIMER_COUNTER_NAME_FOR_TEST = "timer_counter_for_timer_test"; - - // Helper to find MonitoringInfo for a counter - private MonitoringInfo findCounter( - List infos, String ptransformId, String namespace, String name) { - String expectedUrn = MonitoringInfoConstants.Urns.USER_SUM_INT64; - for (MonitoringInfo info : infos) { - if (expectedUrn.equals(info.getUrn()) - && ptransformId.equals( - info.getLabelsMap().get(MonitoringInfoConstants.Labels.PTRANSFORM)) - && namespace.equals(info.getLabelsMap().get(MonitoringInfoConstants.Labels.NAMESPACE)) - && name.equals(info.getLabelsMap().get(MonitoringInfoConstants.Labels.NAME))) { - return info; - } - } - return null; - } - - // Helper to extract counter value - private long extractCounterValue(MonitoringInfo info) { - if (info == null) { - throw new IllegalArgumentException("MonitoringInfo cannot be null"); - } - - boolean isUserSumInt64 = - MonitoringInfoConstants.Urns.USER_SUM_INT64.equals(info.getUrn()) - && MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE.equals(info.getType()); - - if (isUserSumInt64) { - try { - return VarIntCoder.of().decode(info.getPayload().newInput()); - } catch (IOException e) { - throw new RuntimeException("Failed to decode MonitoringInfo payload for counter", e); - } - } - throw new IllegalArgumentException( - "MonitoringInfo is not a user sum int64 counter or has unexpected URN/Type. URN: " - + info.getUrn() - + ", Type: " - + info.getType() - + ", Info: " - + info); - } - - @Test - public void testCountersInOnTimer() throws Exception { - // 1. Define a DoFn with counters in processElement and onTimer - - class TestTimerCounterDoFn extends DoFn, String> { - final Counter processCounter = - Metrics.counter(TestTimerCounterDoFn.class, PROCESS_COUNTER_NAME_FOR_TEST); - final Counter timerCounter = - Metrics.counter(TestTimerCounterDoFn.class, TIMER_COUNTER_NAME_FOR_TEST); - - @TimerId(TIMER_ID_FOR_COUNTER_TEST) - private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, @TimerId(TIMER_ID_FOR_COUNTER_TEST) Timer timer) { - processCounter.inc(); - // Set a timer to fire - timer.set(c.timestamp().plus(Duration.standardSeconds(1))); - c.output("processed:" + c.element().getValue()); - } - - @OnTimer(TIMER_ID_FOR_COUNTER_TEST) - public void onTimer(OnTimerContext c, @Key String key) { - timerCounter.inc(); - c.output("timer_fired_for_key:" + key); - } - } - - // 2. Setup Pipeline and PTransformProto - Pipeline p = Pipeline.create(); - PCollection> input = p.apply(Create.of(KV.of("key1", "val1"))); - PCollection output = - input.apply(TEST_TRANSFORM_ID, ParDo.of(new TestTimerCounterDoFn())); - - SdkComponents sdkComponents = SdkComponents.create(p.getOptions()); - RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); - String inputPCollectionId = sdkComponents.registerPCollection(input); - String outputPCollectionId = sdkComponents.registerPCollection(output); - - RunnerApi.PTransform pTransformProto = - pProto - .getComponents() - .getTransformsMap() - .get( - pProto - .getComponents() - .getTransformsOrThrow(TEST_TRANSFORM_ID) - .getSubtransforms(0)); - - // 3. Setup PTransformRunnerFactoryTestContext - MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap(); - try (Closeable closeable = - MetricsEnvironment.scopedMetricsContainer( - metricsContainerRegistry.getUnboundContainer())) { - - PTransformRunnerFactoryTestContext context = - PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransformProto) - .processBundleInstructionId("bundle-id-1") - .beamFnStateClient( - new FakeBeamFnStateClient(StringUtf8Coder.of(), ImmutableMap.of())) - .components(pProto.getComponents()) - .build(); - - List> mainOutputValues = new ArrayList<>(); - context.addPCollectionConsumer( - outputPCollectionId, - (FnDataReceiver) (FnDataReceiver>) mainOutputValues::add); - - new FnApiDoFnRunner.Factory<>().addRunnerForPTransform(context); - - // 4. Execute: startBundle, processElement, fire timer, finishBundle - Iterables.getOnlyElement(context.getStartBundleFunctions()).run(); - - FnDataReceiver> mainInputReceiver = - context.getPCollectionConsumer(inputPCollectionId); - Instant elementTimestamp = new Instant(1000L); - mainInputReceiver.accept( - timestampedValueInGlobalWindow(KV.of("key1", "val1"), elementTimestamp)); - - Instant timerFireTimestamp = elementTimestamp.plus(Duration.standardSeconds(1)); - org.apache.beam.sdk.util.construction.Timer timerData = - org.apache.beam.sdk.util.construction.Timer.of( - "key1", // User key - "", // Dynamic timer tag - Collections.singletonList(GlobalWindow.INSTANCE), - timerFireTimestamp, // Scheduled time - timerFireTimestamp, // Hold time - PaneInfo.NO_FIRING); - - context.getIncomingTimerEndpoint(TIMER_ID_FOR_COUNTER_TEST).getReceiver().accept(timerData); - - Iterables.getOnlyElement(context.getFinishBundleFunctions()).run(); - - // 5. Assert MonitoringInfos - List monitoringInfos = - ImmutableList.copyOf(metricsContainerRegistry.getMonitoringInfos()); - - MonitoringInfo processCounterInfo = - findCounter( - monitoringInfos, - TEST_TRANSFORM_ID, - TestTimerCounterDoFn.class.getName(), - PROCESS_COUNTER_NAME_FOR_TEST); - assertNotNull("Process counter MonitoringInfo not found", processCounterInfo); - assertEquals( - "Process counter value incorrect", 1L, extractCounterValue(processCounterInfo)); - - MonitoringInfo timerCounterInfo = - findCounter( - monitoringInfos, - TEST_TRANSFORM_ID, - TestTimerCounterDoFn.class.getName(), - TIMER_COUNTER_NAME_FOR_TEST); - // This is the main assertion for the bug. - // Before the fix, this will likely be null or the counter value will be 0. - assertNotNull("Timer counter MonitoringInfo not found", timerCounterInfo); - assertEquals("Timer counter value incorrect", 1L, extractCounterValue(timerCounterInfo)); - - assertThat( - mainOutputValues, - containsInAnyOrder( - timestampedValueInGlobalWindow("processed:val1", elementTimestamp), - timestampedValueInGlobalWindow("timer_fired_for_key:key1", timerFireTimestamp))); - } - } private static class ConcatCombineFn extends CombineFn { @Override