@@ -1153,6 +1153,7 @@ type stageState struct {
11531153 input mtime.Time // input watermark for the parallel input.
11541154 output mtime.Time // Output watermark for the whole stage
11551155 estimatedOutput mtime.Time // Estimated watermark output from DoFns
1156+ previousInput mtime.Time // input watermark before the latest watermark refresh
11561157
11571158 pending elementHeap // pending input elements for this stage that are to be processesd
11581159 inprogress map [string ]elements // inprogress elements by active bundles, keyed by bundle
@@ -2014,6 +2015,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
20142015 newIn = minPending
20152016 }
20162017
2018+ ss .previousInput = ss .input
2019+
20172020 // If bigger, advance the input watermark.
20182021 if newIn > ss .input {
20192022 ss .input = newIn
@@ -2171,11 +2174,13 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
21712174 ptimeEventsReady := ss .processingTimeTimers .Peek () <= emNow || emNow == mtime .MaxTimestamp
21722175 injectedReady := len (ss .bundlesToInject ) > 0
21732176
2174- // If the upstream watermark and the input watermark are the same,
2175- // then we can't yet process this stage.
2177+ // If the upstream watermark does not change, we can't yet process this stage.
2178+ // To check whether upstream water is unchanged, we evaluate if the input watermark, and
2179+ // the input watermark before the latest refresh are the same.
21762180 inputW := ss .input
21772181 _ , upstreamW := ss .UpstreamWatermark ()
2178- if inputW == upstreamW {
2182+ previousInputW := ss .previousInput
2183+ if inputW == upstreamW && previousInputW == inputW {
21792184 slog .Debug ("bundleReady: unchanged upstream watermark" ,
21802185 slog .String ("stage" , ss .ID ),
21812186 slog .Group ("watermark" ,
0 commit comments