Skip to content

Commit d5059c3

Browse files
authored
[Prism] Support AfterProcessingTime triggers - part 1 (#36126)
* Construct after-processing-time trigger from proto and define trigger callbacks. * Add some comments to tests. * Handle the case when after-processing-time trigger is called repeated. * Fix a bug when computing next trigger time and add a composite trigger test.
1 parent 5fe4b73 commit d5059c3

File tree

3 files changed

+255
-4
lines changed

3 files changed

+255
-4
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go

Lines changed: 105 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ func (ws WinStrat) String() string {
7979

8080
// triggerInput represents a Key + window + stage's trigger conditions.
8181
type triggerInput struct {
82-
newElementCount int // The number of new elements since the last check.
83-
endOfWindowReached bool // Whether or not the end of the window has been reached.
82+
newElementCount int // The number of new elements since the last check.
83+
endOfWindowReached bool // Whether or not the end of the window has been reached.
84+
emNow mtime.Time // The current processing time in the runner.
8485
}
8586

8687
// Trigger represents a trigger for a windowing strategy. A trigger determines when
@@ -581,4 +582,105 @@ func (t *TriggerDefault) String() string {
581582
return "Default"
582583
}
583584

584-
// TODO https://github.com/apache/beam/issues/31438 Handle TriggerAfterProcessingTime
585+
// TimestampTransform is the engine's representation of a processing time transform.
586+
type TimestampTransform struct {
587+
Delay time.Duration
588+
AlignToPeriod time.Duration
589+
AlignToOffset time.Duration
590+
}
591+
592+
// TriggerAfterProcessingTime fires once after a specified amount of processing time
593+
// has passed since an element was first seen.
594+
// Uses the extra state field to track the processing time of the first element.
595+
type TriggerAfterProcessingTime struct {
596+
Transforms []TimestampTransform
597+
}
598+
599+
type afterProcessingTimeState struct {
600+
emNow mtime.Time
601+
firingTime mtime.Time
602+
endOfWindowReached bool
603+
}
604+
605+
func (t *TriggerAfterProcessingTime) onElement(input triggerInput, state *StateData) {
606+
ts := state.getTriggerState(t)
607+
if ts.finished {
608+
return
609+
}
610+
611+
if ts.extra == nil {
612+
ts.extra = afterProcessingTimeState{
613+
emNow: input.emNow,
614+
firingTime: t.applyTimestampTransforms(input.emNow),
615+
endOfWindowReached: input.endOfWindowReached,
616+
}
617+
} else {
618+
s, _ := ts.extra.(afterProcessingTimeState)
619+
s.emNow = input.emNow
620+
s.endOfWindowReached = input.endOfWindowReached
621+
ts.extra = s
622+
}
623+
624+
state.setTriggerState(t, ts)
625+
}
626+
627+
func (t *TriggerAfterProcessingTime) applyTimestampTransforms(start mtime.Time) mtime.Time {
628+
ret := start
629+
for _, transform := range t.Transforms {
630+
ret = ret + mtime.Time(transform.Delay/time.Millisecond)
631+
if transform.AlignToPeriod > 0 {
632+
// timestamp - (timestamp % period) + period
633+
// And with an offset, we adjust before and after.
634+
tsMs := ret
635+
periodMs := mtime.Time(transform.AlignToPeriod / time.Millisecond)
636+
offsetMs := mtime.Time(transform.AlignToOffset / time.Millisecond)
637+
638+
adjustedMs := tsMs - offsetMs
639+
alignedMs := adjustedMs - (adjustedMs % periodMs) + periodMs + offsetMs
640+
ret = alignedMs
641+
}
642+
}
643+
return ret
644+
}
645+
646+
func (t *TriggerAfterProcessingTime) shouldFire(state *StateData) bool {
647+
ts := state.getTriggerState(t)
648+
if ts.extra == nil || ts.finished {
649+
return false
650+
}
651+
s := ts.extra.(afterProcessingTimeState)
652+
return s.emNow >= s.firingTime
653+
}
654+
655+
func (t *TriggerAfterProcessingTime) onFire(state *StateData) {
656+
ts := state.getTriggerState(t)
657+
if ts.finished {
658+
return
659+
}
660+
661+
// We don't reset the state here, only mark it as finished
662+
ts.finished = true
663+
state.setTriggerState(t, ts)
664+
}
665+
666+
func (t *TriggerAfterProcessingTime) reset(state *StateData) {
667+
ts := state.getTriggerState(t)
668+
if ts.extra != nil {
669+
if ts.extra.(afterProcessingTimeState).endOfWindowReached {
670+
delete(state.Trigger, t)
671+
return
672+
}
673+
}
674+
675+
// Not reaching the end of window yet.
676+
// We keep the state (especially the next possible firing time) in case the trigger is called again
677+
ts.finished = false
678+
s := ts.extra.(afterProcessingTimeState)
679+
s.firingTime = t.applyTimestampTransforms(s.emNow) // compute next possible firing time
680+
ts.extra = s
681+
state.setTriggerState(t, ts)
682+
}
683+
684+
func (t *TriggerAfterProcessingTime) String() string {
685+
return fmt.Sprintf("AfterProcessingTime[%v]", t.Transforms)
686+
}

sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,135 @@ func TestTriggers_isReady(t *testing.T) {
420420
{triggerInput{newElementCount: 1, endOfWindowReached: true}, false},
421421
{triggerInput{newElementCount: 1, endOfWindowReached: true}, true}, // Late
422422
},
423+
}, {
424+
name: "afterProcessingTime_Delay_Exact",
425+
trig: &TriggerAfterProcessingTime{
426+
Transforms: []TimestampTransform{
427+
{Delay: 3 * time.Second},
428+
},
429+
},
430+
inputs: []io{
431+
{triggerInput{emNow: 0}, false}, // the trigger is set to fire at 3s after 0
432+
{triggerInput{emNow: 1000}, false},
433+
{triggerInput{emNow: 2000}, false},
434+
{triggerInput{emNow: 3000}, true}, // fire
435+
{triggerInput{emNow: 4000}, false},
436+
{triggerInput{emNow: 5000}, false},
437+
{triggerInput{emNow: 6000}, false},
438+
{triggerInput{emNow: 7000}, false},
439+
},
440+
}, {
441+
name: "afterProcessingTime_Delay_Late",
442+
trig: &TriggerAfterProcessingTime{
443+
Transforms: []TimestampTransform{
444+
{Delay: 3 * time.Second},
445+
},
446+
},
447+
inputs: []io{
448+
{triggerInput{emNow: 0}, false}, // the trigger is set to fire at 3s after 0
449+
{triggerInput{emNow: 1000}, false},
450+
{triggerInput{emNow: 2000}, false},
451+
{triggerInput{emNow: 3001}, true}, // fire a little after the preset time
452+
{triggerInput{emNow: 4000}, false},
453+
},
454+
}, {
455+
name: "afterProcessingTime_AlignToPeriodOnly",
456+
trig: &TriggerAfterProcessingTime{
457+
Transforms: []TimestampTransform{
458+
{AlignToPeriod: 5 * time.Second},
459+
},
460+
},
461+
inputs: []io{
462+
{triggerInput{emNow: 1500}, false}, // align 1.5s to 5s
463+
{triggerInput{emNow: 2000}, false},
464+
{triggerInput{emNow: 4999}, false},
465+
{triggerInput{emNow: 5000}, true}, // fire at 5
466+
{triggerInput{emNow: 5001}, false},
467+
},
468+
}, {
469+
name: "afterProcessingTime_AlignToPeriodAndOffset",
470+
trig: &TriggerAfterProcessingTime{
471+
Transforms: []TimestampTransform{
472+
{AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond},
473+
},
474+
},
475+
inputs: []io{
476+
{triggerInput{emNow: 1500}, false}, // align 1.5s to 5s plus an 0.2 offset
477+
{triggerInput{emNow: 2000}, false},
478+
{triggerInput{emNow: 5119}, false},
479+
{triggerInput{emNow: 5200}, true}, // fire at 5.2s
480+
{triggerInput{emNow: 5201}, false},
481+
},
482+
}, {
483+
name: "afterProcessingTime_TwoTransforms",
484+
trig: &TriggerAfterProcessingTime{
485+
Transforms: []TimestampTransform{
486+
{AlignToPeriod: 5 * time.Second, AlignToOffset: 200 * time.Millisecond},
487+
{Delay: 1 * time.Second},
488+
},
489+
},
490+
inputs: []io{
491+
{triggerInput{emNow: 1500}, false}, // align 1.5s to 5s plus an 0.2 offset and a 1s delay
492+
{triggerInput{emNow: 2000}, false},
493+
{triggerInput{emNow: 5119}, false},
494+
{triggerInput{emNow: 5200}, false},
495+
{triggerInput{emNow: 5201}, false},
496+
{triggerInput{emNow: 6119}, false},
497+
{triggerInput{emNow: 6200}, true}, // fire
498+
{triggerInput{emNow: 6201}, false},
499+
},
500+
}, {
501+
name: "afterProcessingTime_Repeated", trig: &TriggerRepeatedly{
502+
&TriggerAfterProcessingTime{
503+
Transforms: []TimestampTransform{
504+
{Delay: 3 * time.Second},
505+
}}},
506+
inputs: []io{
507+
{triggerInput{emNow: 0}, false},
508+
{triggerInput{emNow: 1000}, false},
509+
{triggerInput{emNow: 2000}, false},
510+
{triggerInput{emNow: 3000}, true}, // firing the first time, trigger set again
511+
{triggerInput{emNow: 4000}, false},
512+
{triggerInput{emNow: 5000}, false},
513+
{triggerInput{emNow: 6000}, true}, // firing the second time
514+
},
515+
}, {
516+
name: "afterProcessingTime_Repeated_AcrossWindows", trig: &TriggerRepeatedly{
517+
&TriggerAfterProcessingTime{
518+
Transforms: []TimestampTransform{
519+
{Delay: 3 * time.Second},
520+
}}},
521+
inputs: []io{
522+
{triggerInput{emNow: 0}, false},
523+
{triggerInput{emNow: 1000}, false},
524+
{triggerInput{emNow: 2000}, false},
525+
{triggerInput{emNow: 3000}, true}, // fire the first time, trigger is set again
526+
{triggerInput{emNow: 4000}, false},
527+
{triggerInput{emNow: 5000}, false},
528+
{triggerInput{emNow: 6000,
529+
endOfWindowReached: true}, true}, // fire the second time, reach end of window and start over
530+
{triggerInput{emNow: 7000}, false}, // trigger firing time is set to 7s + 3s = 10s
531+
{triggerInput{emNow: 8000}, false},
532+
{triggerInput{emNow: 9000}, false},
533+
{triggerInput{emNow: 10000}, true}, // fire in the new window
534+
},
535+
}, {
536+
name: "afterProcessingTime_Repeated_Composite", trig: &TriggerRepeatedly{
537+
&TriggerAfterAny{SubTriggers: []Trigger{
538+
&TriggerAfterProcessingTime{
539+
Transforms: []TimestampTransform{
540+
{Delay: 3 * time.Second},
541+
},
542+
},
543+
&TriggerElementCount{ElementCount: 2},
544+
}}},
545+
inputs: []io{
546+
{triggerInput{emNow: 0, newElementCount: 1}, false}, // ElmCount = 1, set AfterProcessingTime trigger firing time to 3s
547+
{triggerInput{emNow: 1000, newElementCount: 1}, true}, // ElmCount = 2, fire ElmCount trigger and reset ElmCount and AfterProcessingTime firing time (4s)
548+
{triggerInput{emNow: 4000, newElementCount: 1}, true}, // ElmCount = 1, fire AfterProcessingTime trigger and reset ElmCount and AfterProcessingTime firing time (7s)
549+
{triggerInput{emNow: 5000, newElementCount: 1}, false}, // ElmCount = 1
550+
{triggerInput{emNow: 5500, newElementCount: 1}, true}, // ElmCount = 2, fire ElmCount trigger
551+
},
423552
}, {
424553
name: "default",
425554
trig: &TriggerDefault{},

sdks/go/pkg/beam/runners/prism/internal/execute.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,27 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger {
477477
}
478478
case *pipepb.Trigger_Repeat_:
479479
return &engine.TriggerRepeatedly{Repeated: buildTrigger(at.Repeat.GetSubtrigger())}
480-
case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_:
480+
case *pipepb.Trigger_AfterProcessingTime_:
481+
var transforms []engine.TimestampTransform
482+
for _, ts := range at.AfterProcessingTime.GetTimestampTransforms() {
483+
var delay, period, offset time.Duration
484+
if d := ts.GetDelay(); d != nil {
485+
delay = time.Duration(d.GetDelayMillis()) * time.Millisecond
486+
}
487+
if align := ts.GetAlignTo(); align != nil {
488+
period = time.Duration(align.GetPeriod()) * time.Millisecond
489+
offset = time.Duration(align.GetOffset()) * time.Millisecond
490+
}
491+
transforms = append(transforms, engine.TimestampTransform{
492+
Delay: delay,
493+
AlignToPeriod: period,
494+
AlignToOffset: offset,
495+
})
496+
}
497+
return &engine.TriggerAfterProcessingTime{
498+
Transforms: transforms,
499+
}
500+
case *pipepb.Trigger_AfterSynchronizedProcessingTime_:
481501
panic(fmt.Sprintf("unsupported trigger: %v", prototext.Format(tpb)))
482502
default:
483503
return &engine.TriggerDefault{}

0 commit comments

Comments
 (0)