Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,8 @@ private <K> void processTimerDirect(
currentTimer = timer;
currentTimeDomain = timeDomain;
doFnInvoker.invokeOnTimer(timerId, timerFamilyId, onTimerContext);
// Finalize state to ensure metrics and other state changes are committed.
this.stateAccessor.finalizeState();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is going to result in finalizeState being called multiple times if there are multiple timers.

But it also seems that this shouldn't be needed because finalizeState is already called in finishBundle. Is that not being called for some reason? The reported bug is old, have you verified that it is still present? If so adding a test that fails without a fix would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't verified this and just want to create the PR to get some feedback from you. I noticed this.stateAccessor.finalizeState has been called in other places (e.g., sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java) outside finishBundle. Not fully understand the code but want to put this PR out for some feedback first.

}

private <K> void processOnWindowExpiration(Timer<K> timer) {
Expand All @@ -1545,6 +1547,8 @@ private <K> void processOnWindowExpiration(Timer<K> timer) {
currentWindow = windowIterator.next();
doFnInvoker.invokeOnWindowExpiration(onWindowExpirationContext);
}
// Finalize state to ensure metrics and other state changes are committed.
this.stateAccessor.finalizeState();
} finally {
currentKey = null;
currentTimer = null;
Expand Down
Loading