Skip to content

Commit b69b703

Browse files
authored
[#31438] Trigger Precusor work for Prism. (#33763)
* [prism] Never mix Data and timers in the same bundle. No execution order guarantees. * Trigger State machine - no processing time - no merging. * comments * fix aftereach Handling to proceed only on finished * Add default + fix OrFinally * Split the isReady into onElement, shouldFire, and onFire methods. * Add string prints, adjust OnWindowExp behavior. * Add tests for unset AfterEndOfWindow subtriggers & fixes. --------- Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
1 parent 2712794 commit b69b703

File tree

5 files changed

+968
-5
lines changed

5 files changed

+968
-5
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/data.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,27 @@ import (
3030
"google.golang.org/protobuf/encoding/protowire"
3131
)
3232

33-
// StateData is a "union" between Bag state and MultiMap state to increase common code.
33+
// StateData is a "union" between Bag, MultiMap, and Trigger state to increase
34+
// common code.
35+
//
36+
// Trigger state is never explicitly set by users, but occurs on demand when
37+
// a trigger requires state.
3438
type StateData struct {
3539
Bag [][]byte
3640
Multimap map[string][][]byte
41+
42+
Trigger map[Trigger]triggerState
43+
}
44+
45+
func (s *StateData) getTriggerState(key Trigger) triggerState {
46+
if s.Trigger == nil {
47+
s.Trigger = map[Trigger]triggerState{}
48+
}
49+
return s.Trigger[key]
50+
}
51+
52+
func (s *StateData) setTriggerState(key Trigger, val triggerState) {
53+
s.Trigger[key] = val
3754
}
3855

3956
// TimerKey is for use as a key for timers.

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,6 +1195,7 @@ func (ss *stageState) AddPending(newPending []element) int {
11951195
}
11961196
newPending = origPending
11971197
}
1198+
//slog.Warn("AddPending", "stage", ss.ID, "pending", newPending)
11981199
if ss.stateful {
11991200
if ss.pendingByKeys == nil {
12001201
ss.pendingByKeys = map[string]*dataAndTimers{}
@@ -1377,12 +1378,26 @@ keysPerBundle:
13771378
minTs = dnt.elements[0].timestamp
13781379
}
13791380

1381+
dataInBundle := false
1382+
13801383
// Can we pre-compute this bit when adding to pendingByKeys?
13811384
// startBundle is in run in a single scheduling goroutine, so moving per-element code
13821385
// to be computed by the bundle parallel goroutines will speed things up a touch.
13831386
for dnt.elements.Len() > 0 {
1387+
// We can't mix data and timers in the same bundle, as there's no
1388+
// guarantee which is processed first SDK side.
1389+
// If the bundle already contains data, then it's before the timer
1390+
// by the heap invariant, and must be processed before we can fire a timer.
1391+
// AKA, keep them seperate.
1392+
if len(toProcess) > 0 && // If we have already picked some elements AND
1393+
((dataInBundle && dnt.elements[0].IsTimer()) || // we're about to add a timer to a Bundle that already has data OR
1394+
(!dataInBundle && !dnt.elements[0].IsTimer())) { // we're about to add data to a bundle that already has a time
1395+
break
1396+
}
13841397
e := heap.Pop(&dnt.elements).(element)
1385-
if e.IsTimer() {
1398+
if e.IsData() {
1399+
dataInBundle = true
1400+
} else {
13861401
lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]
13871402
if !ok {
13881403
timerCleared = true
@@ -1392,7 +1407,7 @@ keysPerBundle:
13921407
timerCleared = true
13931408
continue
13941409
}
1395-
holdsInBundle[e.holdTimestamp] += 1
1410+
holdsInBundle[e.holdTimestamp]++
13961411
// Clear the "fired" timer so subsequent matches can be ignored.
13971412
delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window})
13981413
}
@@ -1526,6 +1541,7 @@ func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess []
15261541
ss.inprogressKeysByBundle[bundID] = newKeys
15271542
ss.inprogressKeys.merge(newKeys)
15281543
ss.inprogressHoldsByBundle[bundID] = holdsInBundle
1544+
//slog.Warn("makeInProgressBundle", "stage", ss.ID, "toProcess", toProcess)
15291545
return bundID
15301546
}
15311547

0 commit comments

Comments
 (0)