From 0b78cf72bd93739c39d94518284d2a942f013082 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Tue, 9 Dec 2025 13:22:38 +0100 Subject: [PATCH 1/2] Add WithNoOutputTimestamp to go sdk matching Java SDK. --- sdks/go/pkg/beam/core/timers/timers.go | 10 ++ sdks/go/test/integration/primitives/timers.go | 114 +++++++++++++++++- .../integration/primitives/timers_test.go | 10 ++ 3 files changed, 133 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/timers/timers.go b/sdks/go/pkg/beam/core/timers/timers.go index f55b03c80278..ae267dab0bff 100644 --- a/sdks/go/pkg/beam/core/timers/timers.go +++ b/sdks/go/pkg/beam/core/timers/timers.go @@ -75,6 +75,16 @@ func WithOutputTimestamp(outputTimestamp time.Time) timerOptions { } } +// WithNoOutputTimestamp sets the timer without an output timestamp. +// The output watermark will not be held up, and it is illegal to output +// messages from this timer triggering using the default output timestamp. +func WithNoOutputTimestamp() timerOptions { + return func(tm *timerConfig) { + tm.HoldSet = true + tm.HoldTimestamp = mtime.MaxTimestamp + } +} + // Context is a parameter for OnTimer methods to receive the fired Timer. type Context struct { Family string diff --git a/sdks/go/test/integration/primitives/timers.go b/sdks/go/test/integration/primitives/timers.go index 63e62ef0e865..31c0586b2edb 100644 --- a/sdks/go/test/integration/primitives/timers.go +++ b/sdks/go/test/integration/primitives/timers.go @@ -36,8 +36,29 @@ import ( func init() { register.DoFn2x0[[]byte, func(string, int)](&inputFn[string, int]{}) register.DoFn6x0[beam.Window, state.Provider, timers.Provider, string, int, func(kv[string, int])](&eventTimeFn{}) + register.DoFn5x0[beam.Window, timers.Provider, string, int, func(int)](&eventTimeFnWithOutputTimestamp{}) + register.DoFn3x0[beam.EventTime, int, func(int)](&checkTimestampFn{}) register.Emitter2[string, int]() - register.Emitter1[kv[string, int]]() + register.Emitter1[int]() +} + +// checkTimestampFn validates that elements arrived at the expected timestamp. +type checkTimestampFn struct { + Timestamp int64 // millisecond epoch + ExpectMaxTimestamp bool +} + +func (fn *checkTimestampFn) ProcessElement(ts beam.EventTime, val int, emit func(int)) { + if fn.ExpectMaxTimestamp { + if mtime.Time(ts) != mtime.MaxTimestamp { + panic(fmt.Errorf("timestamp mismatch: got %v, want %v (MaxTimestamp)", ts, mtime.MaxTimestamp)) + } + } else { + if got := int64(ts); got != int64(mtime.FromMilliseconds(fn.Timestamp)) { + panic(fmt.Errorf("timestamp mismatch: got %v, want %v (as mtime)", got, fn.Timestamp)) + } + } + emit(val) } type kv[K, V any] struct { @@ -154,6 +175,97 @@ func TimersEventTimeUnbounded(s beam.Scope) { })(s) } +type eventTimeFnWithOutputTimestamp struct { + Callback timers.EventTime + + Offset int + TimerOutput int + OutputTimestamp int64 // millisecond epoch + NoOutputTimestamp bool +} + +func (fn *eventTimeFnWithOutputTimestamp) ProcessElement(w beam.Window, tp timers.Provider, key string, value int, emit func(int)) { + if fn.NoOutputTimestamp { + fn.Callback.Set(tp, w.MaxTimestamp().ToTime(), timers.WithNoOutputTimestamp()) + } else { + fn.Callback.Set(tp, w.MaxTimestamp().ToTime(), timers.WithOutputTimestamp(time.UnixMilli(fn.OutputTimestamp))) + } +} + +func (fn *eventTimeFnWithOutputTimestamp) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key string, timer timers.Context, emit func(int)) { + if fn.Callback.Family != timer.Family || timer.Tag != "" { + panic("unexpected timer, family: " + timer.Family + " tag:" + timer.Tag + " want: " + fn.Callback.Family + ", for key:" + key) + } + emit(fn.TimerOutput) +} + +// timersEventTimePipelineBuilderWithOutputTimestamp validates EventTime timers with explicit output timestamp. +func timersEventTimePipelineBuilderWithOutputTimestamp(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) { + return func(s beam.Scope) { + var inputs []kv[string, int] + + offset := 5000 + timerOutput := 4093 + outputTimestamp := int64(1234567890000) + + inputs = append(inputs, kvfn("key", 0)) + imp := makeImp(s) + + keyed := beam.ParDo(s, &inputFn[string, int]{ + Inputs: inputs, + }, imp) + times := beam.ParDo(s, &eventTimeFnWithOutputTimestamp{ + Offset: offset, + TimerOutput: timerOutput, + OutputTimestamp: outputTimestamp, + Callback: timers.InEventTime("Callback"), + }, keyed) + + // Check that the output element has the expected timestamp. + validatedTimestamps := beam.ParDo(s, &checkTimestampFn{Timestamp: outputTimestamp}, times) + wantOutputs := []int{timerOutput} + passert.EqualsList(s, validatedTimestamps, wantOutputs) + } +} + +// timersEventTimePipelineBuilderWithNoOutputTimestamp validates EventTime timers with no output timestamp. +func timersEventTimePipelineBuilderWithNoOutputTimestamp(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) { + return func(s beam.Scope) { + var inputs []kv[string, int] + + offset := 5000 + timerOutput := 4093 + inputs = append(inputs, kvfn("key", 0)) + + imp := makeImp(s) + + keyed := beam.ParDo(s, &inputFn[string, int]{ + Inputs: inputs, + }, imp) + times := beam.ParDo(s, &eventTimeFnWithOutputTimestamp{ + Offset: offset, + TimerOutput: timerOutput, + NoOutputTimestamp: true, + Callback: timers.InEventTime("Callback"), + }, keyed) + + // Check that the output element has MaxTimestamp. + validatedTimestamps := beam.ParDo(s, &checkTimestampFn{ExpectMaxTimestamp: true}, times) + wantOutputs := []int{timerOutput} + passert.EqualsList(s, validatedTimestamps, wantOutputs) + } +} + +// TimersEventTime_WithOutputTimestamp validates event time timers with explicit output timestamp. +func TimersEventTime_WithOutputTimestamp(s beam.Scope) { + timersEventTimePipelineBuilderWithOutputTimestamp(beam.Impulse)(s) +} + +// TimersEventTime_WithNoOutputTimestamp validates event time timers with no output timestamp. +func TimersEventTime_WithNoOutputTimestamp(s beam.Scope) { + timersEventTimePipelineBuilderWithNoOutputTimestamp(beam.Impulse)(s) +} + // Below here are tests for ProcessingTime timers. func init() { diff --git a/sdks/go/test/integration/primitives/timers_test.go b/sdks/go/test/integration/primitives/timers_test.go index efa84a49fc93..69c451c88e97 100644 --- a/sdks/go/test/integration/primitives/timers_test.go +++ b/sdks/go/test/integration/primitives/timers_test.go @@ -32,6 +32,16 @@ func TestTimers_EventTime_Unbounded(t *testing.T) { ptest.BuildAndRun(t, TimersEventTimeUnbounded) } +func TestTimers_EventTime_WithOutputTimestamp(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TimersEventTime_WithOutputTimestamp) +} + +func TestTimers_EventTime_WithNoOutputTimestamp(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TimersEventTime_WithNoOutputTimestamp) +} + func TestTimers_ProcessingTime_Infinity(t *testing.T) { integration.CheckFilters(t) ptest.BuildAndRun(t, TimersProcessingTimeTestStream_Infinity) From aeb7e8672859606fe4afa42f3a1dadf1ce4ddafc Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Wed, 10 Dec 2025 09:43:48 +0100 Subject: [PATCH 2/2] trigger postcommit tests --- .github/trigger_files/beam_PostCommit_Go.json | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .github/trigger_files/beam_PostCommit_Go.json diff --git a/.github/trigger_files/beam_PostCommit_Go.json b/.github/trigger_files/beam_PostCommit_Go.json new file mode 100644 index 000000000000..b73af5e61a43 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Go.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 +}