Skip to content

Commit 8d860da

Browse files
committed
Add comments about holding refreshCond lock when accessing processing time queue.
1 parent 1256c54 commit 8d860da

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ type ElementManager struct {
234234
livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
235235
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
236236

237-
processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time.
237+
processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time. Callers must hold refreshCond.L lock.
238238
testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
239239
}
240240

@@ -2009,6 +2009,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
20092009
}
20102010

20112011
// handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages.
2012+
// Callers must hold em.refreshCond.L lock.
20122013
func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow mtime.Time,
20132014
processTimerFn func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane, int)) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool, int) {
20142015
// TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime

0 commit comments

Comments
 (0)