Skip to content

Commit d414125

Browse files
committed
Add comments about holding refreshCond lock when accessing processing time queue.
1 parent fe07fe7 commit d414125

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ type ElementManager struct {
234234
livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
235235
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
236236

237-
processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time.
237+
processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time. Callers must hold refreshCond.L lock.
238238
testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
239239
}
240240

@@ -2006,6 +2006,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
20062006
}
20072007

20082008
// handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages.
2009+
// Callers must hold em.refreshCond.L lock.
20092010
func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow mtime.Time,
20102011
processTimerFn func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane, int)) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool, int) {
20112012
// TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime

0 commit comments

Comments
 (0)