Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/trigger_files/beam_PostCommit_Go.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
}
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/core/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ func WithOutputTimestamp(outputTimestamp time.Time) timerOptions {
}
}

// WithNoOutputTimestamp sets the timer without an output timestamp.
// The output watermark will not be held up, and it is illegal to output
// messages from this timer triggering using the default output timestamp.
func WithNoOutputTimestamp() timerOptions {
return func(tm *timerConfig) {
tm.HoldSet = true
tm.HoldTimestamp = mtime.MaxTimestamp
}
}

// Context is a parameter for OnTimer methods to receive the fired Timer.
type Context struct {
Family string
Expand Down
114 changes: 113 additions & 1 deletion sdks/go/test/integration/primitives/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,29 @@ import (
func init() {
register.DoFn2x0[[]byte, func(string, int)](&inputFn[string, int]{})
register.DoFn6x0[beam.Window, state.Provider, timers.Provider, string, int, func(kv[string, int])](&eventTimeFn{})
register.DoFn5x0[beam.Window, timers.Provider, string, int, func(int)](&eventTimeFnWithOutputTimestamp{})
register.DoFn3x0[beam.EventTime, int, func(int)](&checkTimestampFn{})
register.Emitter2[string, int]()
register.Emitter1[kv[string, int]]()
register.Emitter1[int]()
}

// checkTimestampFn validates that elements arrived at the expected timestamp.
type checkTimestampFn struct {
Timestamp int64 // millisecond epoch
ExpectMaxTimestamp bool
}

func (fn *checkTimestampFn) ProcessElement(ts beam.EventTime, val int, emit func(int)) {
if fn.ExpectMaxTimestamp {
if mtime.Time(ts) != mtime.MaxTimestamp {
panic(fmt.Errorf("timestamp mismatch: got %v, want %v (MaxTimestamp)", ts, mtime.MaxTimestamp))
}
} else {
if got := int64(ts); got != int64(mtime.FromMilliseconds(fn.Timestamp)) {
panic(fmt.Errorf("timestamp mismatch: got %v, want %v (as mtime)", got, fn.Timestamp))
}
}
emit(val)
}

type kv[K, V any] struct {
Expand Down Expand Up @@ -154,6 +175,97 @@ func TimersEventTimeUnbounded(s beam.Scope) {
})(s)
}

type eventTimeFnWithOutputTimestamp struct {
Callback timers.EventTime

Offset int
TimerOutput int
OutputTimestamp int64 // millisecond epoch
NoOutputTimestamp bool
}

func (fn *eventTimeFnWithOutputTimestamp) ProcessElement(w beam.Window, tp timers.Provider, key string, value int, emit func(int)) {
if fn.NoOutputTimestamp {
fn.Callback.Set(tp, w.MaxTimestamp().ToTime(), timers.WithNoOutputTimestamp())
} else {
fn.Callback.Set(tp, w.MaxTimestamp().ToTime(), timers.WithOutputTimestamp(time.UnixMilli(fn.OutputTimestamp)))
}
}

func (fn *eventTimeFnWithOutputTimestamp) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key string, timer timers.Context, emit func(int)) {
if fn.Callback.Family != timer.Family || timer.Tag != "" {
panic("unexpected timer, family: " + timer.Family + " tag:" + timer.Tag + " want: " + fn.Callback.Family + ", for key:" + key)
}
emit(fn.TimerOutput)
}

// timersEventTimePipelineBuilderWithOutputTimestamp validates EventTime timers with explicit output timestamp.
func timersEventTimePipelineBuilderWithOutputTimestamp(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs []kv[string, int]

offset := 5000
timerOutput := 4093
outputTimestamp := int64(1234567890000)

inputs = append(inputs, kvfn("key", 0))
imp := makeImp(s)

keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &eventTimeFnWithOutputTimestamp{
Offset: offset,
TimerOutput: timerOutput,
OutputTimestamp: outputTimestamp,
Callback: timers.InEventTime("Callback"),
}, keyed)

// Check that the output element has the expected timestamp.
validatedTimestamps := beam.ParDo(s, &checkTimestampFn{Timestamp: outputTimestamp}, times)
wantOutputs := []int{timerOutput}
passert.EqualsList(s, validatedTimestamps, wantOutputs)
}
}

// timersEventTimePipelineBuilderWithNoOutputTimestamp validates EventTime timers with no output timestamp.
func timersEventTimePipelineBuilderWithNoOutputTimestamp(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs []kv[string, int]

offset := 5000
timerOutput := 4093
inputs = append(inputs, kvfn("key", 0))

imp := makeImp(s)

keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &eventTimeFnWithOutputTimestamp{
Offset: offset,
TimerOutput: timerOutput,
NoOutputTimestamp: true,
Callback: timers.InEventTime("Callback"),
}, keyed)

// Check that the output element has MaxTimestamp.
validatedTimestamps := beam.ParDo(s, &checkTimestampFn{ExpectMaxTimestamp: true}, times)
wantOutputs := []int{timerOutput}
passert.EqualsList(s, validatedTimestamps, wantOutputs)
}
}

// TimersEventTime_WithOutputTimestamp validates event time timers with explicit output timestamp.
func TimersEventTime_WithOutputTimestamp(s beam.Scope) {
timersEventTimePipelineBuilderWithOutputTimestamp(beam.Impulse)(s)
}

// TimersEventTime_WithNoOutputTimestamp validates event time timers with no output timestamp.
func TimersEventTime_WithNoOutputTimestamp(s beam.Scope) {
timersEventTimePipelineBuilderWithNoOutputTimestamp(beam.Impulse)(s)
}

// Below here are tests for ProcessingTime timers.

func init() {
Expand Down
10 changes: 10 additions & 0 deletions sdks/go/test/integration/primitives/timers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ func TestTimers_EventTime_Unbounded(t *testing.T) {
ptest.BuildAndRun(t, TimersEventTimeUnbounded)
}

func TestTimers_EventTime_WithOutputTimestamp(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TimersEventTime_WithOutputTimestamp)
}

func TestTimers_EventTime_WithNoOutputTimestamp(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TimersEventTime_WithNoOutputTimestamp)
}

func TestTimers_ProcessingTime_Infinity(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TimersProcessingTimeTestStream_Infinity)
Expand Down
Loading