diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index e264fa14788a..21e79a75b45f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1533,6 +1533,8 @@ private void processTimerDirect( currentTimer = timer; currentTimeDomain = timeDomain; doFnInvoker.invokeOnTimer(timerId, timerFamilyId, onTimerContext); + // Finalize state to ensure metrics and other state changes are committed. + this.stateAccessor.finalizeState(); } private void processOnWindowExpiration(Timer timer) { @@ -1545,6 +1547,8 @@ private void processOnWindowExpiration(Timer timer) { currentWindow = windowIterator.next(); doFnInvoker.invokeOnWindowExpiration(onWindowExpirationContext); } + // Finalize state to ensure metrics and other state changes are committed. + this.stateAccessor.finalizeState(); } finally { currentKey = null; currentTimer = null;