Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def sickbayTests = [
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// ava.lang.IllegalArgumentException: Duplicate values for a
'org.apache.beam.sdk.transforms.ViewTest.testMapSideInputWithNullValuesCatchesDuplicates',
'org.apache.beam.sdk.transforms.MapViewTest.testMapSideInputWithNullValuesCatchesDuplicates',
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view....
'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput',
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
Expand Down
43 changes: 28 additions & 15 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ type ElementManager struct {
livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.

processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time.
processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time. Callers must hold refreshCond.L lock.
testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
}

Expand Down Expand Up @@ -398,7 +398,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
for {
em.refreshCond.L.Lock()
// Check if processing time has advanced before the wait loop.
emNow := em.ProcessingTimeNow()
emNow := em.processingTimeNow()
changedByProcessingTime := em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)

Expand All @@ -415,7 +415,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
em.refreshCond.Wait() // until watermarks may have changed.

// Update if the processing time has advanced while we waited, and add refreshes here. (TODO waking on real time here for prod mode)
emNow = em.ProcessingTimeNow()
emNow = em.processingTimeNow()
changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)
}
Expand Down Expand Up @@ -521,7 +521,7 @@ func (em *ElementManager) DumpStages() string {
stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n",
em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
} else {
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
}
sort.Strings(ids)
for _, id := range ids {
Expand Down Expand Up @@ -880,8 +880,23 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
slog.Int("newPending", len(newPending)), "consumers", consumers, "sideConsumers", sideConsumers,
"pendingDelta", len(newPending)*len(consumers))
for _, sID := range consumers {

consumer := em.stages[sID]
count := consumer.AddPending(em, newPending)
var count int
_, isAggregateStage := consumer.kind.(*aggregateStageKind)
if isAggregateStage {
// While adding pending elements in aggregate stage, we may need to
// access em.processTimeEvents to determine triggered bundles.
// To avoid deadlocks, we acquire the em.refreshCond.L lock here before
// AddPending is called.
func() {
em.refreshCond.L.Lock()
defer em.refreshCond.L.Unlock()
count = consumer.AddPending(em, newPending)
}()
} else {
count = consumer.AddPending(em, newPending)
}
em.addPending(count)
}
for _, link := range sideConsumers {
Expand Down Expand Up @@ -993,7 +1008,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
win typex.Window
}
em.refreshCond.L.Lock()
emNow := em.ProcessingTimeNow()
emNow := em.processingTimeNow()
em.refreshCond.L.Unlock()

var pendingEventTimers []element
Expand Down Expand Up @@ -1317,11 +1332,12 @@ func (ss *stageState) AddPending(em *ElementManager, newPending []element) int {

func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window typex.Window, key string) int {
// Check on triggers for this key.
// We use an empty linkID as the key into state for aggregations.
// Callers must hold em.refreshCond.L
count := 0
if ss.state == nil {
ss.state = make(map[LinkID]map[typex.Window]map[string]StateData)
}
// We use an empty linkID as the key into state for aggregations.
lv, ok := ss.state[LinkID{}]
if !ok {
lv = make(map[typex.Window]map[string]StateData)
Expand All @@ -1337,7 +1353,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 1,
endOfWindowReached: endOfWindowReached,
emNow: em.ProcessingTimeNow(),
emNow: em.processingTimeNow(),
}, &state)

if ready {
Expand Down Expand Up @@ -1374,9 +1390,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
// TODO: how to deal with watermark holds for this implicit processing time timer
// ss.watermarkHolds.Add(timer.holdTimestamp, 1)
ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds)
em.refreshCond.L.Lock()
em.processTimeEvents.Schedule(firingTime, ss.ID)
em.refreshCond.L.Unlock()
em.wakeUpAt(firingTime)
}
}
Expand Down Expand Up @@ -1618,7 +1632,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
return toProcess, accumulationDiff
}

// startTriggeredBundle must be called with the stage.mu lock held.
// startTriggeredBundle must be called with the stage.mu lock and em.refreshCond.L lock held.
// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
// When in discarding mode, returns 0, as the pending work already includes these elements.
// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.
Expand Down Expand Up @@ -1653,10 +1667,8 @@ func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win t
// TODO: Use ss.bundlesToInject rather than em.injectedBundles
// ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.injectedBundles = append(em.injectedBundles, rb)
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
return accumulationDiff
}

Expand Down Expand Up @@ -2006,6 +2018,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
}

// handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages.
// Callers must hold em.refreshCond.L lock.
func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow mtime.Time,
processTimerFn func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane, int)) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool, int) {
// TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime
Expand Down Expand Up @@ -2440,8 +2453,8 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
return upstreamW, ready, ptimeEventsReady, injectedReady
}

// ProcessingTimeNow gives the current processing time for the runner.
func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) {
// processingTimeNow gives the current processing time for the runner.
func (em *ElementManager) processingTimeNow() (ret mtime.Time) {
if em.testStreamHandler != nil && !em.testStreamHandler.completed {
return em.testStreamHandler.Now()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (ev tsProcessingTimeEvent) Execute(em *ElementManager) {
}

// Add the refreshes now so our block prevention logic works.
emNow := em.ProcessingTimeNow()
emNow := em.processingTimeNow()
toRefresh := em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(toRefresh)
}
Expand Down
Loading