@@ -398,7 +398,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
398398 for {
399399 em .refreshCond .L .Lock ()
400400 // Check if processing time has advanced before the wait loop.
401- emNow := em .ProcessingTimeNow ()
401+ emNow := em .processingTimeNow ()
402402 changedByProcessingTime := em .processTimeEvents .AdvanceTo (emNow )
403403 em .changedStages .merge (changedByProcessingTime )
404404
@@ -415,7 +415,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
415415 em .refreshCond .Wait () // until watermarks may have changed.
416416
417417 // Update if the processing time has advanced while we waited, and add refreshes here. (TODO waking on real time here for prod mode)
418- emNow = em .ProcessingTimeNow ()
418+ emNow = em .processingTimeNow ()
419419 changedByProcessingTime = em .processTimeEvents .AdvanceTo (emNow )
420420 em .changedStages .merge (changedByProcessingTime )
421421 }
@@ -521,7 +521,7 @@ func (em *ElementManager) DumpStages() string {
521521 stageState = append (stageState , fmt .Sprintf ("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n " ,
522522 em .testStreamHandler .completed , em .testStreamHandler .nextEventIndex , len (em .testStreamHandler .events ), em .testStreamHandler .events , em .testStreamHandler .processingTime , mtime .FromTime (em .testStreamHandler .processingTime ), em .processTimeEvents ))
523523 } else {
524- stageState = append (stageState , fmt .Sprintf ("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n " , em .ProcessingTimeNow (), em .processTimeEvents .events , em .injectedBundles ))
524+ stageState = append (stageState , fmt .Sprintf ("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n " , em .processingTimeNow (), em .processTimeEvents .events , em .injectedBundles ))
525525 }
526526 sort .Strings (ids )
527527 for _ , id := range ids {
@@ -880,8 +880,23 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
880880 slog .Int ("newPending" , len (newPending )), "consumers" , consumers , "sideConsumers" , sideConsumers ,
881881 "pendingDelta" , len (newPending )* len (consumers ))
882882 for _ , sID := range consumers {
883+
883884 consumer := em .stages [sID ]
884- count := consumer .AddPending (em , newPending )
885+ var count int
886+ _ , isAggregateStage := consumer .kind .(* aggregateStageKind )
887+ if isAggregateStage {
888+ // While adding pending elements in aggregate stage, we may need to
889+ // access em.processTimeEvents to determine triggered bundles.
890+ // To avoid deadlocks, we acquire the em.refreshCond.L lock here before
891+ // AddPending is called.
892+ func () {
893+ em .refreshCond .L .Lock ()
894+ defer em .refreshCond .L .Unlock ()
895+ count = consumer .AddPending (em , newPending )
896+ }()
897+ } else {
898+ count = consumer .AddPending (em , newPending )
899+ }
885900 em .addPending (count )
886901 }
887902 for _ , link := range sideConsumers {
@@ -993,7 +1008,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
9931008 win typex.Window
9941009 }
9951010 em .refreshCond .L .Lock ()
996- emNow := em .ProcessingTimeNow ()
1011+ emNow := em .processingTimeNow ()
9971012 em .refreshCond .L .Unlock ()
9981013
9991014 var pendingEventTimers []element
@@ -1337,7 +1352,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
13371352 ready := ss .strat .IsTriggerReady (triggerInput {
13381353 newElementCount : 1 ,
13391354 endOfWindowReached : endOfWindowReached ,
1340- emNow : em .ProcessingTimeNow (),
1355+ emNow : em .processingTimeNow (),
13411356 }, & state )
13421357
13431358 if ready {
@@ -1374,9 +1389,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
13741389 // TODO: how to deal with watermark holds for this implicit processing time timer
13751390 // ss.watermarkHolds.Add(timer.holdTimestamp, 1)
13761391 ss .processingTimeTimers .Persist (firingTime , timer , notYetHolds )
1377- em .refreshCond .L .Lock ()
13781392 em .processTimeEvents .Schedule (firingTime , ss .ID )
1379- em .refreshCond .L .Unlock ()
13801393 em .wakeUpAt (firingTime )
13811394 }
13821395 }
@@ -2441,8 +2454,8 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
24412454 return upstreamW , ready , ptimeEventsReady , injectedReady
24422455}
24432456
2444- // ProcessingTimeNow gives the current processing time for the runner.
2445- func (em * ElementManager ) ProcessingTimeNow () (ret mtime.Time ) {
2457+ // processingTimeNow gives the current processing time for the runner.
2458+ func (em * ElementManager ) processingTimeNow () (ret mtime.Time ) {
24462459 if em .testStreamHandler != nil && ! em .testStreamHandler .completed {
24472460 return em .testStreamHandler .Now ()
24482461 }
0 commit comments