@@ -1215,7 +1215,9 @@ type stageKind interface {
12151215 // buildEventTimeBundle handles building bundles for the stage per it's kind.
12161216 buildEventTimeBundle (ss * stageState , watermark mtime.Time ) (toProcess elementHeap , minTs mtime.Time , newKeys set [string ],
12171217 holdsInBundle map [mtime.Time ]int , panesInBundle []bundlePane , schedulable bool , pendingAdjustment int )
1218-
1218+ // buildProcessingTimeBundle handles building processing-time bundles for the stage per it's kind.
1219+ buildProcessingTimeBundle (ss * stageState , em * ElementManager , emNow mtime.Time ) (toProcess elementHeap , minTs mtime.Time , newKeys set [string ],
1220+ holdsInBundle map [mtime.Time ]int , panesInBundle []bundlePane , schedulable bool )
12191221 // getPaneOrDefault based on the stage state, element metadata, and bundle id.
12201222 getPaneOrDefault (ss * stageState , defaultPane typex.PaneInfo , w typex.Window , keyBytes []byte , bundID string ) typex.PaneInfo
12211223}
@@ -1327,17 +1329,54 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
13271329 ready := ss .strat .IsTriggerReady (triggerInput {
13281330 newElementCount : 1 ,
13291331 endOfWindowReached : endOfWindowReached ,
1332+ emNow : em .ProcessingTimeNow (),
13301333 }, & state )
13311334
13321335 if ready {
13331336 state .Pane = computeNextTriggeredPane (state .Pane , endOfWindowReached )
1337+ } else {
1338+ if pts := ss .strat .GetAfterProcessingTimeTriggers (); pts != nil {
1339+ for _ , t := range pts {
1340+ ts := (& state ).getTriggerState (t )
1341+ if ts .extra == nil || t .shouldFire ((& state )) {
1342+ // Skipping inserting a processing time timer if the firing time
1343+ // is not set or it already should fire.
1344+ // When the after processing time triggers should fire, there are
1345+ // two scenarios:
1346+ // (1) the entire trigger of this window is ready to fire. In this
1347+ // case, `ready` should be true and we won't reach here.
1348+ // (2) we are still waiting for other triggers (subtriggers) to
1349+ // fire (e.g. AfterAll).
1350+ continue
1351+ }
1352+ firingTime := ts .extra .(afterProcessingTimeState ).firingTime
1353+ notYetHolds := map [mtime.Time ]int {}
1354+ timer := element {
1355+ window : window ,
1356+ timestamp : firingTime ,
1357+ holdTimestamp : window .MaxTimestamp (),
1358+ pane : typex .NoFiringPane (),
1359+ transform : ss .ID , // Use stage id to fake transform id
1360+ family : "AfterProcessingTime" ,
1361+ tag : "" ,
1362+ sequence : 1 ,
1363+ elmBytes : nil ,
1364+ keyBytes : []byte (key ),
1365+ }
1366+ // TODO: how to deal with watermark holds for this implicit processing time timer
1367+ // ss.watermarkHolds.Add(timer.holdTimestamp, 1)
1368+ ss .processingTimeTimers .Persist (firingTime , timer , notYetHolds )
1369+ em .processTimeEvents .Schedule (firingTime , ss .ID )
1370+ em .wakeUpAt (firingTime )
1371+ }
1372+ }
13341373 }
13351374 // Store the state as triggers may have changed it.
13361375 ss .state [LinkID {}][window ][key ] = state
13371376
13381377 // If we're ready, it's time to fire!
13391378 if ready {
1340- count += ss .buildTriggeredBundle (em , key , window )
1379+ count += ss .startTriggeredBundle (em , key , window )
13411380 }
13421381 return count
13431382}
@@ -1524,16 +1563,11 @@ func (ss *stageState) savePanes(bundID string, panesInBundle []bundlePane) {
15241563 }
15251564}
15261565
1527- // buildTriggeredBundle must be called with the stage.mu lock held.
1528- // When in discarding mode, returns 0.
1529- // When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
1530- func (ss * stageState ) buildTriggeredBundle (em * ElementManager , key string , win typex.Window ) int {
1566+ func (ss * stageState ) buildTriggeredBundle (em * ElementManager , key string , win typex.Window ) ([]element , int ) {
15311567 var toProcess []element
15321568 dnt := ss .pendingByKeys [key ]
15331569 var notYet []element
15341570
1535- rb := RunBundle {StageID : ss .ID , BundleID : "agg-" + em .nextBundID (), Watermark : ss .input }
1536-
15371571 // Look at all elements for this key, and only for this window.
15381572 for dnt .elements .Len () > 0 {
15391573 e := heap .Pop (& dnt .elements ).(element )
@@ -1564,6 +1598,19 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
15641598 heap .Init (& dnt .elements )
15651599 }
15661600
1601+ return toProcess , accumulationDiff
1602+ }
1603+
1604+ // startTriggeredBundle must be called with the stage.mu lock held.
1605+ // Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
1606+ // When in discarding mode, returns 0, as the pending work already includes these elements.
1607+ // When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.
1608+ func (ss * stageState ) startTriggeredBundle (em * ElementManager , key string , win typex.Window ) int {
1609+ toProcess , accumulationDiff := ss .buildTriggeredBundle (em , key , win )
1610+ if len (toProcess ) == 0 {
1611+ return accumulationDiff
1612+ }
1613+
15671614 if ss .inprogressKeys == nil {
15681615 ss .inprogressKeys = set [string ]{}
15691616 }
@@ -1575,6 +1622,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
15751622 },
15761623 }
15771624
1625+ rb := RunBundle {StageID : ss .ID , BundleID : "agg-" + em .nextBundID (), Watermark : ss .input }
15781626 ss .makeInProgressBundle (
15791627 func () string { return rb .BundleID },
15801628 toProcess ,
@@ -1585,9 +1633,11 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
15851633 )
15861634 slog .Debug ("started a triggered bundle" , "stageID" , ss .ID , "bundleID" , rb .BundleID , "size" , len (toProcess ))
15871635
1588- ss .bundlesToInject = append (ss .bundlesToInject , rb )
1636+ // TODO: Use ss.bundlesToInject rather than em.injectedBundles
1637+ // ss.bundlesToInject = append(ss.bundlesToInject, rb)
15891638 // Bundle is marked in progress here to prevent a race condition.
15901639 em .refreshCond .L .Lock ()
1640+ em .injectedBundles = append (em .injectedBundles , rb )
15911641 em .inprogressBundles .insert (rb .BundleID )
15921642 em .refreshCond .L .Unlock ()
15931643 return accumulationDiff
@@ -1927,6 +1977,20 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
19271977 ss .mu .Lock ()
19281978 defer ss .mu .Unlock ()
19291979
1980+ toProcess , minTs , newKeys , holdsInBundle , panesInBundle , stillSchedulable := ss .kind .buildProcessingTimeBundle (ss , em , emNow )
1981+
1982+ if len (toProcess ) == 0 {
1983+ // If we have nothing
1984+ return "" , false , stillSchedulable
1985+ }
1986+ bundID := ss .makeInProgressBundle (genBundID , toProcess , minTs , newKeys , holdsInBundle , panesInBundle )
1987+ slog .Debug ("started a processing time bundle" , "stageID" , ss .ID , "bundleID" , bundID , "size" , len (toProcess ), "emNow" , emNow )
1988+ return bundID , true , stillSchedulable
1989+ }
1990+
1991+ // handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages.
1992+ func handleProcessingTimeTimer (ss * stageState , em * ElementManager , emNow mtime.Time ,
1993+ processTimerFn func (e element , toProcess []element , holdsInBundle map [mtime.Time ]int , panesInBundle []bundlePane ) ([]element , []bundlePane )) (elementHeap , mtime.Time , set [string ], map [mtime.Time ]int , []bundlePane , bool ) {
19301994 // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime
19311995 // Special Case for ProcessingTime handling.
19321996 // Eg. Always queue EventTime elements at minTime.
@@ -1935,6 +1999,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
19351999 // Potentially puts too much work on the scheduling thread though.
19362000
19372001 var toProcess []element
2002+ var panesInBundle []bundlePane
19382003 minTs := mtime .MaxTimestamp
19392004 holdsInBundle := map [mtime.Time ]int {}
19402005
@@ -1968,10 +2033,8 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
19682033 if e .timestamp < minTs {
19692034 minTs = e .timestamp
19702035 }
1971- holdsInBundle [e .holdTimestamp ]++
19722036
1973- // We're going to process this timer!
1974- toProcess = append (toProcess , e )
2037+ toProcess , panesInBundle = processTimerFn (e , toProcess , holdsInBundle , panesInBundle )
19752038 }
19762039
19772040 nextTime = ss .processingTimeTimers .Peek ()
@@ -1986,19 +2049,58 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
19862049 for _ , v := range notYet {
19872050 ss .processingTimeTimers .Persist (v .firing , v .timer , notYetHolds )
19882051 em .processTimeEvents .Schedule (v .firing , ss .ID )
2052+ em .wakeUpAt (v .firing )
19892053 }
19902054
19912055 // Add a refresh if there are still processing time events to process.
19922056 stillSchedulable := (nextTime < emNow && nextTime != mtime .MaxTimestamp || len (notYet ) > 0 )
19932057
1994- if len (toProcess ) == 0 {
1995- // If we have nothing
1996- return "" , false , stillSchedulable
1997- }
1998- bundID := ss .makeInProgressBundle (genBundID , toProcess , minTs , newKeys , holdsInBundle , nil )
2058+ return toProcess , minTs , newKeys , holdsInBundle , panesInBundle , stillSchedulable
2059+ }
19992060
2000- slog .Debug ("started a processing time bundle" , "stageID" , ss .ID , "bundleID" , bundID , "size" , len (toProcess ), "emNow" , emNow )
2001- return bundID , true , stillSchedulable
2061+ // buildProcessingTimeBundle for stateful stages prepares bundles for processing-time timers
2062+ func (* statefulStageKind ) buildProcessingTimeBundle (ss * stageState , em * ElementManager , emNow mtime.Time ) (elementHeap , mtime.Time , set [string ], map [mtime.Time ]int , []bundlePane , bool ) {
2063+ return handleProcessingTimeTimer (ss , em , emNow , func (e element , toProcess []element , holdsInBundle map [mtime.Time ]int , panesInBundle []bundlePane ) ([]element , []bundlePane ) {
2064+ holdsInBundle [e .holdTimestamp ]++
2065+ // We're going to process this timer!
2066+ toProcess = append (toProcess , e )
2067+ return toProcess , nil
2068+ })
2069+ }
2070+
2071+ // buildProcessingTimeBundle for aggregation stages prepares bundles for after-processing-time triggers
2072+ func (* aggregateStageKind ) buildProcessingTimeBundle (ss * stageState , em * ElementManager , emNow mtime.Time ) (elementHeap , mtime.Time , set [string ], map [mtime.Time ]int , []bundlePane , bool ) {
2073+ return handleProcessingTimeTimer (ss , em , emNow , func (e element , toProcess []element , holdsInBundle map [mtime.Time ]int , panesInBundle []bundlePane ) ([]element , []bundlePane ) {
2074+ // Different from `buildProcessingTimeBundle` for stateful stage,
2075+ // triggers don't hold back the watermark, so no holds are in the triggered bundle.
2076+ state := ss .state [LinkID {}][e .window ][string (e .keyBytes )]
2077+ endOfWindowReached := e .window .MaxTimestamp () < ss .input
2078+ ready := ss .strat .IsTriggerReady (triggerInput {
2079+ newElementCount : 0 ,
2080+ endOfWindowReached : endOfWindowReached ,
2081+ emNow : emNow ,
2082+ }, & state )
2083+
2084+ if ready {
2085+ state .Pane = computeNextTriggeredPane (state .Pane , endOfWindowReached )
2086+
2087+ // We're going to process this trigger!
2088+ elems , _ := ss .buildTriggeredBundle (em , string (e .keyBytes ), e .window )
2089+ toProcess = append (toProcess , elems ... )
2090+
2091+ ss .state [LinkID {}][e .window ][string (e .keyBytes )] = state
2092+
2093+ panesInBundle = append (panesInBundle , bundlePane {})
2094+ }
2095+
2096+ return toProcess , panesInBundle
2097+ })
2098+ }
2099+
2100+ // buildProcessingTimeBundle for stateless stages is not supposed to be called currently
2101+ func (* ordinaryStageKind ) buildProcessingTimeBundle (ss * stageState , em * ElementManager , emNow mtime.Time ) (elementHeap , mtime.Time , set [string ], map [mtime.Time ]int , []bundlePane , bool ) {
2102+ slog .Error ("ordinary stages can't have processing time elements" )
2103+ return nil , mtime .MinTimestamp , nil , nil , nil , false
20022104}
20032105
20042106// makeInProgressBundle is common code to store a set of elements as a bundle in progress.
@@ -2281,13 +2383,23 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
22812383 inputW := ss .input
22822384 _ , upstreamW := ss .UpstreamWatermark ()
22832385 previousInputW := ss .previousInput
2284- if inputW == upstreamW && previousInputW == inputW {
2386+
2387+ _ , isOrdinaryStage := ss .kind .(* ordinaryStageKind )
2388+ if isOrdinaryStage && len (ss .sides ) == 0 {
2389+ // For ordinary stage with no side inputs, we use whether there are pending elements to determine
2390+ // whether a bundle is ready or not.
2391+ if len (ss .pending ) == 0 {
2392+ return mtime .MinTimestamp , false , ptimeEventsReady , injectedReady
2393+ }
2394+ } else if inputW == upstreamW && previousInputW == inputW {
2395+ // Otherwise, use the progression of watermark to determine the bundle readiness.
22852396 slog .Debug ("bundleReady: unchanged upstream watermark" ,
22862397 slog .String ("stage" , ss .ID ),
22872398 slog .Group ("watermark" ,
22882399 slog .Any ("upstream == input == previousInput" , inputW )))
22892400 return mtime .MinTimestamp , false , ptimeEventsReady , injectedReady
22902401 }
2402+
22912403 ready := true
22922404 for _ , side := range ss .sides {
22932405 pID , ok := em .pcolParents [side .Global ]
@@ -2329,3 +2441,17 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) {
23292441func rebaseProcessingTime (localNow , scheduled mtime.Time ) mtime.Time {
23302442 return localNow + (scheduled - mtime .Now ())
23312443}
2444+
2445+ // wakeUpAt schedules a wakeup signal for the bundle processing loop.
2446+ // This is used for processing time timers to ensure the loop re-evaluates
2447+ // stages when a processing time timer is expected to fire.
2448+ func (em * ElementManager ) wakeUpAt (t mtime.Time ) {
2449+ if em .testStreamHandler == nil && em .config .EnableRTC {
2450+ // only create this goroutine if we have real-time clock enabled and the pipeline does not have TestStream.
2451+ go func (fireAt time.Time ) {
2452+ time .AfterFunc (time .Until (fireAt ), func () {
2453+ em .refreshCond .Broadcast ()
2454+ })
2455+ }(t .ToTime ())
2456+ }
2457+ }
0 commit comments