From d414125c26303110f03d7a6519e7d686fa02ab0e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 29 Oct 2025 22:28:27 -0400 Subject: [PATCH 1/4] Add comments about holding refreshCond lock when accessing processing time queue. --- .../pkg/beam/runners/prism/internal/engine/elementmanager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 5136cd85e3ed..6547f562a720 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -234,7 +234,7 @@ type ElementManager struct { livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully. - processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time. + processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time. Callers must hold refreshCond.L lock. testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline. } @@ -2006,6 +2006,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. } // handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages. +// Callers must hold em.refreshCond.L lock. func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow mtime.Time, processTimerFn func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane, int)) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool, int) { // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime From 6cef4cb914bce7905841babbbba1e96c4cf50d6b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 30 Oct 2025 10:55:37 -0400 Subject: [PATCH 2/4] Acquire lock before add pending. Unexport ProcessTimeNow in em. --- .../prism/internal/engine/elementmanager.go | 33 +++++++++++++------ .../prism/internal/engine/teststream.go | 2 +- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 6547f562a720..eca5fad380f1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -398,7 +398,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. for { em.refreshCond.L.Lock() // Check if processing time has advanced before the wait loop. - emNow := em.ProcessingTimeNow() + emNow := em.processingTimeNow() changedByProcessingTime := em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) @@ -415,7 +415,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. em.refreshCond.Wait() // until watermarks may have changed. // Update if the processing time has advanced while we waited, and add refreshes here. (TODO waking on real time here for prod mode) - emNow = em.ProcessingTimeNow() + emNow = em.processingTimeNow() changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(changedByProcessingTime) } @@ -521,7 +521,7 @@ func (em *ElementManager) DumpStages() string { stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n", em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents)) } else { - stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles)) + stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles)) } sort.Strings(ids) for _, id := range ids { @@ -880,8 +880,23 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol slog.Int("newPending", len(newPending)), "consumers", consumers, "sideConsumers", sideConsumers, "pendingDelta", len(newPending)*len(consumers)) for _, sID := range consumers { + consumer := em.stages[sID] - count := consumer.AddPending(em, newPending) + var count int + _, isAggregateStage := consumer.kind.(*aggregateStageKind) + if isAggregateStage { + // While adding pending elements in aggregate stage, we may need to + // access em.processTimeEvents to determine triggered bundles. + // To avoid deadlocks, we acquire the em.refreshCond.L lock here before + // AddPending is called. + func() { + em.refreshCond.L.Lock() + defer em.refreshCond.L.Unlock() + count = consumer.AddPending(em, newPending) + }() + } else { + count = consumer.AddPending(em, newPending) + } em.addPending(count) } for _, link := range sideConsumers { @@ -993,7 +1008,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag win typex.Window } em.refreshCond.L.Lock() - emNow := em.ProcessingTimeNow() + emNow := em.processingTimeNow() em.refreshCond.L.Unlock() var pendingEventTimers []element @@ -1337,7 +1352,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t ready := ss.strat.IsTriggerReady(triggerInput{ newElementCount: 1, endOfWindowReached: endOfWindowReached, - emNow: em.ProcessingTimeNow(), + emNow: em.processingTimeNow(), }, &state) if ready { @@ -1374,9 +1389,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t // TODO: how to deal with watermark holds for this implicit processing time timer // ss.watermarkHolds.Add(timer.holdTimestamp, 1) ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds) - em.refreshCond.L.Lock() em.processTimeEvents.Schedule(firingTime, ss.ID) - em.refreshCond.L.Unlock() em.wakeUpAt(firingTime) } } @@ -2441,8 +2454,8 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T return upstreamW, ready, ptimeEventsReady, injectedReady } -// ProcessingTimeNow gives the current processing time for the runner. -func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { +// processingTimeNow gives the current processing time for the runner. +func (em *ElementManager) processingTimeNow() (ret mtime.Time) { if em.testStreamHandler != nil && !em.testStreamHandler.completed { return em.testStreamHandler.Now() } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index 593a708a6347..90f81d3104b7 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -238,7 +238,7 @@ func (ev tsProcessingTimeEvent) Execute(em *ElementManager) { } // Add the refreshes now so our block prevention logic works. - emNow := em.ProcessingTimeNow() + emNow := em.processingTimeNow() toRefresh := em.processTimeEvents.AdvanceTo(emNow) em.changedStages.merge(toRefresh) } From cf729d6cba2337ea766e9e618f779d4e9ffdc15b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 30 Oct 2025 11:48:43 -0400 Subject: [PATCH 3/4] Remove unnecessary lock aquiring in startTriggeredBundle. --- .../beam/runners/prism/internal/engine/elementmanager.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index eca5fad380f1..de7b89e751ec 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1332,11 +1332,12 @@ func (ss *stageState) AddPending(em *ElementManager, newPending []element) int { func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window typex.Window, key string) int { // Check on triggers for this key. - // We use an empty linkID as the key into state for aggregations. + // Callers must hold em.refreshCond.L count := 0 if ss.state == nil { ss.state = make(map[LinkID]map[typex.Window]map[string]StateData) } + // We use an empty linkID as the key into state for aggregations. lv, ok := ss.state[LinkID{}] if !ok { lv = make(map[typex.Window]map[string]StateData) @@ -1631,7 +1632,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t return toProcess, accumulationDiff } -// startTriggeredBundle must be called with the stage.mu lock held. +// startTriggeredBundle must be called with the stage.mu lock and em.refreshCond.L lock held. // Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count. // When in discarding mode, returns 0, as the pending work already includes these elements. // When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired. @@ -1666,10 +1667,8 @@ func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win t // TODO: Use ss.bundlesToInject rather than em.injectedBundles // ss.bundlesToInject = append(ss.bundlesToInject, rb) // Bundle is marked in progress here to prevent a race condition. - em.refreshCond.L.Lock() em.injectedBundles = append(em.injectedBundles, rb) em.inprogressBundles.insert(rb.BundleID) - em.refreshCond.L.Unlock() return accumulationDiff } From 49005b2d5f0c3ea6235653a89514de1eecefb1cd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 30 Oct 2025 13:13:08 -0400 Subject: [PATCH 4/4] Update the test to skip after #36655 --- runners/prism/java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 7ce4e4d90610..0754e714dd8c 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -146,7 +146,7 @@ def sickbayTests = [ // java.util.NoSuchElementException: Empty PCollection accessed as a singleton view. 'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput', // ava.lang.IllegalArgumentException: Duplicate values for a - 'org.apache.beam.sdk.transforms.ViewTest.testMapSideInputWithNullValuesCatchesDuplicates', + 'org.apache.beam.sdk.transforms.MapViewTest.testMapSideInputWithNullValuesCatchesDuplicates', // java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.... 'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput', // java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.