@@ -184,7 +184,8 @@ type Config struct {
184184//
185185// Watermarks are advanced based on consumed input, except if the stage produces residuals.
186186type ElementManager struct {
187- config Config
187+ config Config
188+ nextBundID func () string // Generates unique bundleIDs. Set in the Bundles method.
188189
189190 impulses set [string ] // List of impulse stages.
190191 stages map [string ]* stageState // The state for each stage.
@@ -197,6 +198,7 @@ type ElementManager struct {
197198 refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
198199 inprogressBundles set [string ] // Active bundleIDs
199200 changedStages set [string ] // Stages that have changed and need their watermark refreshed.
201+ injectedBundles []RunBundle // Represents ready to execute bundles prepared outside of the main loop, such as for onWindowExpiration, or for Triggers.
200202
201203 livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
202204 pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
@@ -271,6 +273,16 @@ func (em *ElementManager) StageStateful(ID string) {
271273 em .stages [ID ].stateful = true
272274}
273275
276+ // StageOnWindowExpiration marks the given stage as stateful, which means elements are
277+ // processed by key.
278+ func (em * ElementManager ) StageOnWindowExpiration (stageID string , timer StaticTimerID ) {
279+ ss := em .stages [stageID ]
280+ ss .onWindowExpiration = timer
281+ ss .keysToExpireByWindow = map [typex.Window ]set [string ]{}
282+ ss .inProgressExpiredWindows = map [typex.Window ]int {}
283+ ss .expiryWindowsByBundles = map [string ]typex.Window {}
284+ }
285+
274286// StageProcessingTimeTimers indicates which timers are processingTime domain timers.
275287func (em * ElementManager ) StageProcessingTimeTimers (ID string , ptTimers map [string ]bool ) {
276288 em .stages [ID ].processingTimeTimersFamilies = ptTimers
@@ -338,6 +350,8 @@ func (rb RunBundle) LogValue() slog.Value {
338350// The returned channel is closed when the context is canceled, or there are no pending elements
339351// remaining.
340352func (em * ElementManager ) Bundles (ctx context.Context , upstreamCancelFn context.CancelCauseFunc , nextBundID func () string ) <- chan RunBundle {
353+ // Make it easier for injected bundles to get unique IDs.
354+ em .nextBundID = nextBundID
341355 runStageCh := make (chan RunBundle )
342356 ctx , cancelFn := context .WithCancelCause (ctx )
343357 go func () {
@@ -370,8 +384,9 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
370384 changedByProcessingTime := em .processTimeEvents .AdvanceTo (emNow )
371385 em .changedStages .merge (changedByProcessingTime )
372386
373- // If there are no changed stages or ready processing time events available, we wait until there are.
374- for len (em .changedStages )+ len (changedByProcessingTime ) == 0 {
387+ // If there are no changed stages, ready processing time events,
388+ // or injected bundles available, we wait until there are.
389+ for len (em .changedStages )+ len (changedByProcessingTime )+ len (em .injectedBundles ) == 0 {
375390 // Check to see if we must exit
376391 select {
377392 case <- ctx .Done ():
@@ -386,6 +401,19 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
386401 changedByProcessingTime = em .processTimeEvents .AdvanceTo (emNow )
387402 em .changedStages .merge (changedByProcessingTime )
388403 }
404+ // Run any injected bundles first.
405+ for len (em .injectedBundles ) > 0 {
406+ rb := em .injectedBundles [0 ]
407+ em .injectedBundles = em .injectedBundles [1 :]
408+ em .refreshCond .L .Unlock ()
409+
410+ select {
411+ case <- ctx .Done ():
412+ return
413+ case runStageCh <- rb :
414+ }
415+ em .refreshCond .L .Lock ()
416+ }
389417
390418 // We know there is some work we can do that may advance the watermarks,
391419 // refresh them, and see which stages have advanced.
@@ -628,6 +656,12 @@ type Block struct {
628656 Transform , Family string
629657}
630658
659+ // StaticTimerID represents the static user identifiers for a timer,
660+ // in particular, the ID of the Transform, and the family for the timer.
661+ type StaticTimerID struct {
662+ TransformID , TimerFamily string
663+ }
664+
631665// StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.
632666//
633667// TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.
@@ -847,6 +881,19 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
847881 }
848882 delete (stage .inprogressHoldsByBundle , rb .BundleID )
849883
884+ // Clean up OnWindowExpiration bundle accounting, so window state
885+ // may be garbage collected.
886+ if stage .expiryWindowsByBundles != nil {
887+ win , ok := stage .expiryWindowsByBundles [rb .BundleID ]
888+ if ok {
889+ stage .inProgressExpiredWindows [win ] -= 1
890+ if stage .inProgressExpiredWindows [win ] == 0 {
891+ delete (stage .inProgressExpiredWindows , win )
892+ }
893+ delete (stage .expiryWindowsByBundles , rb .BundleID )
894+ }
895+ }
896+
850897 // If there are estimated output watermarks, set the estimated
851898 // output watermark for the stage.
852899 if len (residuals .MinOutputWatermarks ) > 0 {
@@ -1068,6 +1115,12 @@ type stageState struct {
10681115 strat winStrat // Windowing Strategy for aggregation fireings.
10691116 processingTimeTimersFamilies map [string ]bool // Indicates which timer families use the processing time domain.
10701117
1118+ // onWindowExpiration management
1119+ onWindowExpiration StaticTimerID // The static ID of the OnWindowExpiration callback.
1120+ keysToExpireByWindow map [typex.Window ]set [string ] // Tracks all keys ever used with a window, so they may be expired.
1121+ inProgressExpiredWindows map [typex.Window ]int // Tracks the number of bundles currently expiring these windows, so we don't prematurely garbage collect them.
1122+ expiryWindowsByBundles map [string ]typex.Window // Tracks which bundle is handling which window, so the above map can be cleared.
1123+
10711124 mu sync.Mutex
10721125 upstreamWatermarks sync.Map // watermark set from inputPCollection's parent.
10731126 input mtime.Time // input watermark for the parallel input.
@@ -1158,6 +1211,14 @@ func (ss *stageState) AddPending(newPending []element) int {
11581211 timers : map [timerKey ]timerTimes {},
11591212 }
11601213 ss .pendingByKeys [string (e .keyBytes )] = dnt
1214+ if ss .keysToExpireByWindow != nil {
1215+ w , ok := ss .keysToExpireByWindow [e .window ]
1216+ if ! ok {
1217+ w = make (set [string ])
1218+ ss .keysToExpireByWindow [e .window ] = w
1219+ }
1220+ w .insert (string (e .keyBytes ))
1221+ }
11611222 }
11621223 heap .Push (& dnt .elements , e )
11631224
@@ -1555,48 +1616,143 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
15551616 if minWatermarkHold < newOut {
15561617 newOut = minWatermarkHold
15571618 }
1558- refreshes := set [string ]{}
1619+ // If the newOut is smaller, then don't change downstream watermarks.
1620+ if newOut <= ss .output {
1621+ return nil
1622+ }
1623+
15591624 // If bigger, advance the output watermark
1560- if newOut > ss .output {
1561- ss .output = newOut
1562- for _ , outputCol := range ss .outputIDs {
1563- consumers := em .consumers [outputCol ]
1564-
1565- for _ , sID := range consumers {
1566- em .stages [sID ].updateUpstreamWatermark (outputCol , ss .output )
1567- refreshes .insert (sID )
1568- }
1569- // Inform side input consumers, but don't update the upstream watermark.
1570- for _ , sID := range em .sideConsumers [outputCol ] {
1571- refreshes .insert (sID .Global )
1572- }
1573- }
1574- // Garbage collect state, timers and side inputs, for all windows
1575- // that are before the new output watermark.
1576- // They'll never be read in again.
1577- for _ , wins := range ss .sideInputs {
1578- for win := range wins {
1579- // TODO(#https://github.com/apache/beam/issues/31438):
1580- // Adjust with AllowedLateness
1581- // Clear out anything we've already used.
1582- if win .MaxTimestamp () < newOut {
1583- delete (wins , win )
1625+ preventDownstreamUpdate := ss .createOnWindowExpirationBundles (newOut , em )
1626+
1627+ // Garbage collect state, timers and side inputs, for all windows
1628+ // that are before the new output watermark, if they aren't in progress
1629+ // of being expired.
1630+ // They'll never be read in again.
1631+ for _ , wins := range ss .sideInputs {
1632+ for win := range wins {
1633+ // TODO(#https://github.com/apache/beam/issues/31438):
1634+ // Adjust with AllowedLateness
1635+ // Clear out anything we've already used.
1636+ if win .MaxTimestamp () < newOut {
1637+ // If the expiry is in progress, skip this window.
1638+ if ss .inProgressExpiredWindows [win ] > 0 {
1639+ continue
15841640 }
1641+ delete (wins , win )
15851642 }
15861643 }
1587- for _ , wins := range ss .state {
1588- for win := range wins {
1589- // TODO(#https://github.com/apache/beam/issues/31438):
1590- // Adjust with AllowedLateness
1591- if win .MaxTimestamp () < newOut {
1592- delete (wins , win )
1644+ }
1645+ for _ , wins := range ss .state {
1646+ for win := range wins {
1647+ // TODO(#https://github.com/apache/beam/issues/31438):
1648+ // Adjust with AllowedLateness
1649+ if win .MaxTimestamp () < newOut {
1650+ // If the expiry is in progress, skip collecting this window.
1651+ if ss .inProgressExpiredWindows [win ] > 0 {
1652+ continue
15931653 }
1654+ delete (wins , win )
15941655 }
15951656 }
15961657 }
1658+ // If there are windows to expire, we don't update the output watermark yet.
1659+ if preventDownstreamUpdate {
1660+ return nil
1661+ }
1662+
1663+ // Update this stage's output watermark, and then propagate that to downstream stages
1664+ refreshes := set [string ]{}
1665+ ss .output = newOut
1666+ for _ , outputCol := range ss .outputIDs {
1667+ consumers := em .consumers [outputCol ]
1668+
1669+ for _ , sID := range consumers {
1670+ em .stages [sID ].updateUpstreamWatermark (outputCol , ss .output )
1671+ refreshes .insert (sID )
1672+ }
1673+ // Inform side input consumers, but don't update the upstream watermark.
1674+ for _ , sID := range em .sideConsumers [outputCol ] {
1675+ refreshes .insert (sID .Global )
1676+ }
1677+ }
15971678 return refreshes
15981679}
15991680
1681+ // createOnWindowExpirationBundles injects bundles when windows
1682+ // expire for all keys that were used in that window. Returns true if any
1683+ // bundles are created, which means that the window must not yet be garbage
1684+ // collected.
1685+ //
1686+ // Must be called within the stageState.mu's and the ElementManager.refreshCond
1687+ // critical sections.
1688+ func (ss * stageState ) createOnWindowExpirationBundles (newOut mtime.Time , em * ElementManager ) bool {
1689+ var preventDownstreamUpdate bool
1690+ for win , keys := range ss .keysToExpireByWindow {
1691+ // Check if the window has expired.
1692+ // TODO(#https://github.com/apache/beam/issues/31438):
1693+ // Adjust with AllowedLateness
1694+ if win .MaxTimestamp () >= newOut {
1695+ continue
1696+ }
1697+ // We can't advance the output watermark if there's garbage to collect.
1698+ preventDownstreamUpdate = true
1699+ // Hold off on garbage collecting data for these windows while these
1700+ // are in progress.
1701+ ss .inProgressExpiredWindows [win ] += 1
1702+
1703+ // Produce bundle(s) for these keys and window, and inject them.
1704+ wm := win .MaxTimestamp ()
1705+ rb := RunBundle {StageID : ss .ID , BundleID : "owe-" + em .nextBundID (), Watermark : wm }
1706+
1707+ // Now we need to actually build the bundle.
1708+ var toProcess []element
1709+ busyKeys := set [string ]{}
1710+ usedKeys := set [string ]{}
1711+ for k := range keys {
1712+ if ss .inprogressKeys .present (k ) {
1713+ busyKeys .insert (k )
1714+ continue
1715+ }
1716+ usedKeys .insert (k )
1717+ toProcess = append (toProcess , element {
1718+ window : win ,
1719+ timestamp : wm ,
1720+ pane : typex .NoFiringPane (),
1721+ holdTimestamp : wm ,
1722+ transform : ss .onWindowExpiration .TransformID ,
1723+ family : ss .onWindowExpiration .TimerFamily ,
1724+ sequence : 1 ,
1725+ keyBytes : []byte (k ),
1726+ elmBytes : nil ,
1727+ })
1728+ }
1729+ em .addPending (len (toProcess ))
1730+ ss .watermarkHolds .Add (wm , 1 )
1731+ ss .makeInProgressBundle (
1732+ func () string { return rb .BundleID },
1733+ toProcess ,
1734+ wm ,
1735+ usedKeys ,
1736+ map [mtime.Time ]int {wm : 1 },
1737+ )
1738+ ss .expiryWindowsByBundles [rb .BundleID ] = win
1739+
1740+ slog .Debug ("OnWindowExpiration-Bundle Created" , slog .Any ("bundle" , rb ), slog .Any ("usedKeys" , usedKeys ), slog .Any ("window" , win ), slog .Any ("toProcess" , toProcess ), slog .Any ("busyKeys" , busyKeys ))
1741+ // We're already in the refreshCond critical section.
1742+ // Insert that this is in progress here to avoid a race condition.
1743+ em .inprogressBundles .insert (rb .BundleID )
1744+ em .injectedBundles = append (em .injectedBundles , rb )
1745+
1746+ // Remove the key accounting, or continue tracking which keys still need clearing.
1747+ if len (busyKeys ) == 0 {
1748+ delete (ss .keysToExpireByWindow , win )
1749+ } else {
1750+ ss .keysToExpireByWindow [win ] = busyKeys
1751+ }
1752+ }
1753+ return preventDownstreamUpdate
1754+ }
1755+
16001756// bundleReady returns the maximum allowed watermark for this stage, and whether
16011757// it's permitted to execute by side inputs.
16021758func (ss * stageState ) bundleReady (em * ElementManager , emNow mtime.Time ) (mtime.Time , bool , bool ) {
0 commit comments