Skip to content

Commit 42fdd0c

Browse files
committed
Remove unnecessary lock aquiring in startTriggeredBundle.
1 parent 8b2cac4 commit 42fdd0c

File tree

1 file changed

+3
-4
lines changed

1 file changed

+3
-4
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,11 +1333,12 @@ func (ss *stageState) AddPending(em *ElementManager, newPending []element) int {
13331333

13341334
func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window typex.Window, key string) int {
13351335
// Check on triggers for this key.
1336-
// We use an empty linkID as the key into state for aggregations.
1336+
// Callers must hold em.refreshCond.L
13371337
count := 0
13381338
if ss.state == nil {
13391339
ss.state = make(map[LinkID]map[typex.Window]map[string]StateData)
13401340
}
1341+
// We use an empty linkID as the key into state for aggregations.
13411342
lv, ok := ss.state[LinkID{}]
13421343
if !ok {
13431344
lv = make(map[typex.Window]map[string]StateData)
@@ -1632,7 +1633,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
16321633
return toProcess, accumulationDiff
16331634
}
16341635

1635-
// startTriggeredBundle must be called with the stage.mu lock held.
1636+
// startTriggeredBundle must be called with the stage.mu lock and em.refreshCond.L lock held.
16361637
// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
16371638
// When in discarding mode, returns 0, as the pending work already includes these elements.
16381639
// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.
@@ -1667,10 +1668,8 @@ func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win t
16671668
// TODO: Use ss.bundlesToInject rather than em.injectedBundles
16681669
// ss.bundlesToInject = append(ss.bundlesToInject, rb)
16691670
// Bundle is marked in progress here to prevent a race condition.
1670-
em.refreshCond.L.Lock()
16711671
em.injectedBundles = append(em.injectedBundles, rb)
16721672
em.inprogressBundles.insert(rb.BundleID)
1673-
em.refreshCond.L.Unlock()
16741673
return accumulationDiff
16751674
}
16761675

0 commit comments

Comments
 (0)