@@ -260,7 +260,7 @@ func NewElementManager(config Config) *ElementManager {
260260// AddStage adds a stage to this element manager, connecting it's PCollections and
261261// nodes to the watermark propagation graph.
262262func (em * ElementManager ) AddStage (ID string , inputIDs , outputIDs []string , sides []LinkID ) {
263- slog .Debug ("AddStage" , slog .String ("ID" , ID ), slog .Any ("inputs" , inputIDs ), slog .Any ("sides" , sides ), slog .Any ("outputs" , outputIDs ))
263+ slog .Debug ("em. AddStage" , slog .String ("ID" , ID ), slog .Any ("inputs" , inputIDs ), slog .Any ("sides" , sides ), slog .Any ("outputs" , outputIDs ))
264264 ss := makeStageState (ID , inputIDs , outputIDs , sides )
265265
266266 em .stages [ss .ID ] = ss
@@ -504,6 +504,40 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
504504 return runStageCh
505505}
506506
507+ // DumpStages puts all the stage information into a string and returns it.
508+ func (em * ElementManager ) DumpStages () string {
509+ var stageState []string
510+ ids := maps .Keys (em .stages )
511+ if em .testStreamHandler != nil {
512+ stageState = append (stageState , fmt .Sprintf ("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n " ,
513+ em .testStreamHandler .completed , em .testStreamHandler .nextEventIndex , len (em .testStreamHandler .events ), em .testStreamHandler .events , em .testStreamHandler .processingTime , mtime .FromTime (em .testStreamHandler .processingTime ), em .processTimeEvents ))
514+ } else {
515+ stageState = append (stageState , fmt .Sprintf ("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n " , em .ProcessingTimeNow (), em .processTimeEvents .events , em .injectedBundles ))
516+ }
517+ sort .Strings (ids )
518+ for _ , id := range ids {
519+ ss := em .stages [id ]
520+ inW := ss .InputWatermark ()
521+ outW := ss .OutputWatermark ()
522+ upPCol , upW := ss .UpstreamWatermark ()
523+ upS := em .pcolParents [upPCol ]
524+ if upS == "" {
525+ upS = "IMPULSE " // (extra spaces to allow print to align better.)
526+ }
527+ stageState = append (stageState , fmt .Sprintln (id , "watermark in" , inW , "out" , outW , "upstream" , upW , "from" , upS , "pending" , ss .pending , "byKey" , ss .pendingByKeys , "inprogressKeys" , ss .inprogressKeys , "byBundle" , ss .inprogressKeysByBundle , "holds" , ss .watermarkHolds .heap , "holdCounts" , ss .watermarkHolds .counts , "holdsInBundle" , ss .inprogressHoldsByBundle , "pttEvents" , ss .processingTimeTimers .toFire , "bundlesToInject" , ss .bundlesToInject ))
528+
529+ var outputConsumers , sideConsumers []string
530+ for _ , col := range ss .outputIDs {
531+ outputConsumers = append (outputConsumers , em .consumers [col ]... )
532+ for _ , l := range em .sideConsumers [col ] {
533+ sideConsumers = append (sideConsumers , l .Global )
534+ }
535+ }
536+ stageState = append (stageState , fmt .Sprintf ("\t sideInputs: %v outputCols: %v outputConsumers: %v sideConsumers: %v\n " , ss .sides , ss .outputIDs , outputConsumers , sideConsumers ))
537+ }
538+ return strings .Join (stageState , "" )
539+ }
540+
507541// checkForQuiescence sees if this element manager is no longer able to do any pending work or make progress.
508542//
509543// Quiescense can happen if there are no inprogress bundles, and there are no further watermark refreshes, which
@@ -524,9 +558,9 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
524558 // If there are changed stages that need a watermarks refresh,
525559 // we aren't yet stuck.
526560 v := em .livePending .Load ()
527- slog .Debug ("Bundles: nothing in progress after advance" ,
528- slog .Any ("advanced " , advanced ),
529- slog .Int ( "changeCount " , len ( em .changedStages ) ),
561+ slog .Debug ("Bundles: nothing in progress after advance, but some stages need a watermark refresh " ,
562+ slog .Any ("mayProgress " , advanced ),
563+ slog .Any ( "needRefresh " , em .changedStages ),
530564 slog .Int64 ("pendingElementCount" , v ),
531565 )
532566 return nil
@@ -569,36 +603,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
569603 // Jobs must never get stuck so this indicates a bug in prism to be investigated.
570604
571605 slog .Debug ("Bundles: nothing in progress and no refreshes" , slog .Int64 ("pendingElementCount" , v ))
572- var stageState []string
573- ids := maps .Keys (em .stages )
574- if em .testStreamHandler != nil {
575- stageState = append (stageState , fmt .Sprintf ("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n " ,
576- em .testStreamHandler .completed , em .testStreamHandler .nextEventIndex , len (em .testStreamHandler .events ), em .testStreamHandler .events , em .testStreamHandler .processingTime , mtime .FromTime (em .testStreamHandler .processingTime ), em .processTimeEvents ))
577- } else {
578- stageState = append (stageState , fmt .Sprintf ("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n " , em .ProcessingTimeNow (), em .processTimeEvents .events , em .injectedBundles ))
579- }
580- sort .Strings (ids )
581- for _ , id := range ids {
582- ss := em .stages [id ]
583- inW := ss .InputWatermark ()
584- outW := ss .OutputWatermark ()
585- upPCol , upW := ss .UpstreamWatermark ()
586- upS := em .pcolParents [upPCol ]
587- if upS == "" {
588- upS = "IMPULSE " // (extra spaces to allow print to align better.)
589- }
590- stageState = append (stageState , fmt .Sprintln (id , "watermark in" , inW , "out" , outW , "upstream" , upW , "from" , upS , "pending" , ss .pending , "byKey" , ss .pendingByKeys , "inprogressKeys" , ss .inprogressKeys , "byBundle" , ss .inprogressKeysByBundle , "holds" , ss .watermarkHolds .heap , "holdCounts" , ss .watermarkHolds .counts , "holdsInBundle" , ss .inprogressHoldsByBundle , "pttEvents" , ss .processingTimeTimers .toFire , "bundlesToInject" , ss .bundlesToInject ))
591-
592- var outputConsumers , sideConsumers []string
593- for _ , col := range ss .outputIDs {
594- outputConsumers = append (outputConsumers , em .consumers [col ]... )
595- for _ , l := range em .sideConsumers [col ] {
596- sideConsumers = append (sideConsumers , l .Global )
597- }
598- }
599- stageState = append (stageState , fmt .Sprintf ("\t sideInputs: %v outputCols: %v outputConsumers: %v sideConsumers: %v\n " , ss .sides , ss .outputIDs , outputConsumers , sideConsumers ))
600- }
601- return errors .Errorf ("nothing in progress and no refreshes with non zero pending elements: %v\n %v" , v , strings .Join (stageState , "" ))
606+ return errors .Errorf ("nothing in progress and no refreshes with non zero pending elements: %v\n %v" , v , em .DumpStages ())
602607}
603608
604609// InputForBundle returns pre-allocated data for the given bundle, encoding the elements using
@@ -864,7 +869,9 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
864869 }
865870 consumers := em .consumers [output ]
866871 sideConsumers := em .sideConsumers [output ]
867- slog .Debug ("PersistBundle: bundle has downstream consumers." , "bundle" , rb , slog .Int ("newPending" , len (newPending )), "consumers" , consumers , "sideConsumers" , sideConsumers )
872+ slog .Debug ("PersistBundle: bundle has downstream consumers." , "bundle" , rb ,
873+ slog .Int ("newPending" , len (newPending )), "consumers" , consumers , "sideConsumers" , sideConsumers ,
874+ "pendingDelta" , len (newPending )* len (consumers ))
868875 for _ , sID := range consumers {
869876 consumer := em .stages [sID ]
870877 count := consumer .AddPending (em , newPending )
@@ -1576,6 +1583,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
15761583 nil ,
15771584 panesInBundle ,
15781585 )
1586+ slog .Debug ("started a triggered bundle" , "stageID" , ss .ID , "bundleID" , rb .BundleID , "size" , len (toProcess ))
15791587
15801588 ss .bundlesToInject = append (ss .bundlesToInject , rb )
15811589 // Bundle is marked in progress here to prevent a race condition.
@@ -1688,6 +1696,7 @@ func (ss *stageState) startEventTimeBundle(watermark mtime.Time, genBundID func(
16881696 }
16891697
16901698 bundID := ss .makeInProgressBundle (genBundID , toProcess , minTs , newKeys , holdsInBundle , panesInBundle )
1699+ slog .Debug ("started an event time bundle" , "stageID" , ss .ID , "bundleID" , bundID , "bundleSize" , len (toProcess ), "upstreamWatermark" , watermark )
16911700
16921701 return bundID , true , stillSchedulable , accumulatingPendingAdjustment
16931702}
@@ -1987,6 +1996,8 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
19871996 return "" , false , stillSchedulable
19881997 }
19891998 bundID := ss .makeInProgressBundle (genBundID , toProcess , minTs , newKeys , holdsInBundle , nil )
1999+
2000+ slog .Debug ("started a processing time bundle" , "stageID" , ss .ID , "bundleID" , bundID , "size" , len (toProcess ), "emNow" , emNow )
19902001 return bundID , true , stillSchedulable
19912002}
19922003
@@ -2274,8 +2285,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
22742285 slog .Debug ("bundleReady: unchanged upstream watermark" ,
22752286 slog .String ("stage" , ss .ID ),
22762287 slog .Group ("watermark" ,
2277- slog .Any ("upstream" , upstreamW ),
2278- slog .Any ("input" , inputW )))
2288+ slog .Any ("upstream == input == previousInput" , inputW )))
22792289 return mtime .MinTimestamp , false , ptimeEventsReady , injectedReady
22802290 }
22812291 ready := true
0 commit comments