diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 12b0cada7506..ccc4cfcc69d2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1097,6 +1097,7 @@ func (em *ElementManager) markChangedAndClearBundle(stageID, bundID string, ptRe em.changedStages.insert(stageID) for t := range ptRefreshes { em.processTimeEvents.Schedule(t, stageID) + em.wakeUpAt(t) } em.refreshCond.Broadcast() } @@ -2464,8 +2465,8 @@ func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time { // This is used for processing time timers to ensure the loop re-evaluates // stages when a processing time timer is expected to fire. func (em *ElementManager) wakeUpAt(t mtime.Time) { - if em.testStreamHandler == nil && em.config.EnableRTC { - // only create this goroutine if we have real-time clock enabled and the pipeline does not have TestStream. + if em.config.EnableRTC { + // only create this goroutine if we have real-time clock enabled (also implying the pipeline does not have TestStream). go func(fireAt time.Time) { time.AfterFunc(time.Until(fireAt), func() { em.refreshCond.Broadcast() diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index bab9ff048889..593a708a6347 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -16,6 +16,7 @@ package engine import ( + "log/slog" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" @@ -310,4 +311,10 @@ func (tsi *testStreamImpl) AddWatermarkEvent(tag string, newWatermark mtime.Time func (tsi *testStreamImpl) AddProcessingTimeEvent(d time.Duration) { tsi.em.testStreamHandler.AddProcessingTimeEvent(d) tsi.em.addPending(1) + + // Disable real-time clock for this em if TestStream has processing time events. + if tsi.em.config.EnableRTC { + slog.Debug("Processing time event found in TestStream: real-time clock will be disabled for this job") + tsi.em.config.EnableRTC = false + } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 7c7526b3d4db..85e8ae4350a2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -154,13 +154,13 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic ts := comps.GetTransforms() pcols := comps.GetPcollections() - config := engine.Config{} + config := engine.Config{EnableRTC: true} m := j.PipelineOptions().AsMap() if experimentsSlice, ok := m["beam:option:experiments:v1"].([]interface{}); ok { for _, exp := range experimentsSlice { if expStr, ok := exp.(string); ok { - if expStr == "prism_enable_rtc" { - config.EnableRTC = true + if expStr == "prism_disable_rtc" { + config.EnableRTC = false break // Found it, no need to check the rest of the slice } } @@ -342,6 +342,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic } else { tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond) } + default: return fmt.Errorf("prism error building stage %v - unknown TestStream event type: %T", stage.ID, ev) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 89cbd2b17f6c..b03d96b04bc1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -143,6 +143,9 @@ func TestTimers(t *testing.T) { }{ {pipeline: primitives.TimersEventTimeBounded}, {pipeline: primitives.TimersEventTimeUnbounded}, + {pipeline: primitives.TimersProcessingTime_Bounded}, + {pipeline: primitives.TimersProcessingTime_Unbounded}, + {pipeline: primitives.TimersProcessingTimeTestStream_Infinity}, } for _, test := range tests { diff --git a/sdks/go/test/integration/primitives/timers.go b/sdks/go/test/integration/primitives/timers.go index 40afe98234a7..63e62ef0e865 100644 --- a/sdks/go/test/integration/primitives/timers.go +++ b/sdks/go/test/integration/primitives/timers.go @@ -169,11 +169,14 @@ type processingTimeFn struct { Offset int TimerOutput int Cap int + + InitialDelaySec int + RecurringDelaySec int } func (fn *processingTimeFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) { // Sets a processing time callback to occur. - fn.Callback.Set(tp, time.Now().Add(9*time.Second)) + fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.InitialDelaySec)*time.Second)) // Only write to the state if we haven't done so already. // Writing blind would reset the state, and cause duplicated outputs. @@ -205,7 +208,7 @@ func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp s if err := fn.MyValue.Write(sp, read+1); err != nil { panic(err) } - fn.Callback.Set(tp, time.Now().Add(9*time.Second)) + fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.RecurringDelaySec)*time.Second)) } if num, _, err := fn.Emissions.Read(sp); err != nil { panic(err) @@ -237,6 +240,15 @@ func init() { register.Function3x0(regroup) } +// timersProcessingTimePipelineBuilder constructs a pipeline to validate the behavior of processing time timers. +// It generates a set of keyed elements and uses a DoFn (`processingTimeFn`) to set an initial processing time +// timer for each key. When a timer fires, the DoFn emits an element, increments a counter in state, and +// sets a new timer to fire after a recurring delay, continuing until a specified number of emissions for that +// key is reached. +// +// The total approximate runtime of the timer-based logic for each key is calculated as: +// InitialDelay + (numDuplicateTimers - 1) * RecurringDelay. +// Note that the number of keys is irrelevant to the runtime, because keys are processed in parallel. func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) { return func(s beam.Scope) { var inputs, wantOutputs []kv[string, int] @@ -244,8 +256,12 @@ func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollec offset := 5000 timerOutput := 4093 + // Control the total runtime of the test to under 30 secs. + // The runtime for the current setting is 3 + (5 - 1) * 1 = 7 secs numKeys := 40 - numDuplicateTimers := 15 + numDuplicateTimers := 5 + initialDelaySec := 3 + recurringDelaySec := 1 for key := 0; key < numKeys; key++ { k := strconv.Itoa(key) @@ -261,11 +277,13 @@ func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollec Inputs: inputs, }, imp) times := beam.ParDo(s, &processingTimeFn{ - Offset: offset, - TimerOutput: timerOutput, - Callback: timers.InProcessingTime("Callback"), - MyValue: state.MakeValueState[int]("MyValue"), - Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs. + Offset: offset, + TimerOutput: timerOutput, + Callback: timers.InProcessingTime("Callback"), + MyValue: state.MakeValueState[int]("MyValue"), + Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs. + InitialDelaySec: initialDelaySec, + RecurringDelaySec: recurringDelaySec, }, keyed) // We GroupByKey here so input to passert is blocked until teststream advances time to Infinity. gbk := beam.GroupByKey(s, times) @@ -298,6 +316,6 @@ func TimersProcessingTime_Bounded(s beam.Scope) { func TimersProcessingTime_Unbounded(s beam.Scope) { timersProcessingTimePipelineBuilder(func(s beam.Scope) beam.PCollection { now := time.Now() - return periodic.Impulse(s, now, now.Add(10*time.Second), 0, false) + return periodic.Impulse(s, now, now.Add(10*time.Second), 5*time.Second, false) })(s) } diff --git a/sdks/go/test/integration/primitives/timers_test.go b/sdks/go/test/integration/primitives/timers_test.go index 7e62e9da6920..efa84a49fc93 100644 --- a/sdks/go/test/integration/primitives/timers_test.go +++ b/sdks/go/test/integration/primitives/timers_test.go @@ -41,3 +41,8 @@ func TestTimers_ProcessingTime_Bounded(t *testing.T) { integration.CheckFilters(t) ptest.BuildAndRun(t, TimersProcessingTime_Bounded) } + +func TestTimers_ProcessingTime_Unbounded(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TimersProcessingTime_Unbounded) +}