Skip to content

Commit cf729d6

Browse files
committed
Remove unnecessary lock aquiring in startTriggeredBundle.
1 parent 6cef4cb commit cf729d6

File tree

1 file changed

+3
-4
lines changed

1 file changed

+3
-4
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,11 +1332,12 @@ func (ss *stageState) AddPending(em *ElementManager, newPending []element) int {
13321332

13331333
func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window typex.Window, key string) int {
13341334
// Check on triggers for this key.
1335-
// We use an empty linkID as the key into state for aggregations.
1335+
// Callers must hold em.refreshCond.L
13361336
count := 0
13371337
if ss.state == nil {
13381338
ss.state = make(map[LinkID]map[typex.Window]map[string]StateData)
13391339
}
1340+
// We use an empty linkID as the key into state for aggregations.
13401341
lv, ok := ss.state[LinkID{}]
13411342
if !ok {
13421343
lv = make(map[typex.Window]map[string]StateData)
@@ -1631,7 +1632,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
16311632
return toProcess, accumulationDiff
16321633
}
16331634

1634-
// startTriggeredBundle must be called with the stage.mu lock held.
1635+
// startTriggeredBundle must be called with the stage.mu lock and em.refreshCond.L lock held.
16351636
// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
16361637
// When in discarding mode, returns 0, as the pending work already includes these elements.
16371638
// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.
@@ -1666,10 +1667,8 @@ func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win t
16661667
// TODO: Use ss.bundlesToInject rather than em.injectedBundles
16671668
// ss.bundlesToInject = append(ss.bundlesToInject, rb)
16681669
// Bundle is marked in progress here to prevent a race condition.
1669-
em.refreshCond.L.Lock()
16701670
em.injectedBundles = append(em.injectedBundles, rb)
16711671
em.inprogressBundles.insert(rb.BundleID)
1672-
em.refreshCond.L.Unlock()
16731672
return accumulationDiff
16741673
}
16751674

0 commit comments

Comments
 (0)