Skip to content

Commit 554a73b

Browse files
authored
Enable real-time clock in prism by default. (#36473)
1 parent b5b9181 commit 554a73b

File tree

6 files changed

+49
-14
lines changed

6 files changed

+49
-14
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,7 @@ func (em *ElementManager) markChangedAndClearBundle(stageID, bundID string, ptRe
10971097
em.changedStages.insert(stageID)
10981098
for t := range ptRefreshes {
10991099
em.processTimeEvents.Schedule(t, stageID)
1100+
em.wakeUpAt(t)
11001101
}
11011102
em.refreshCond.Broadcast()
11021103
}
@@ -2464,8 +2465,8 @@ func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time {
24642465
// This is used for processing time timers to ensure the loop re-evaluates
24652466
// stages when a processing time timer is expected to fire.
24662467
func (em *ElementManager) wakeUpAt(t mtime.Time) {
2467-
if em.testStreamHandler == nil && em.config.EnableRTC {
2468-
// only create this goroutine if we have real-time clock enabled and the pipeline does not have TestStream.
2468+
if em.config.EnableRTC {
2469+
// only create this goroutine if we have real-time clock enabled (also implying the pipeline does not have TestStream).
24692470
go func(fireAt time.Time) {
24702471
time.AfterFunc(time.Until(fireAt), func() {
24712472
em.refreshCond.Broadcast()

sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package engine
1717

1818
import (
19+
"log/slog"
1920
"time"
2021

2122
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
@@ -310,4 +311,10 @@ func (tsi *testStreamImpl) AddWatermarkEvent(tag string, newWatermark mtime.Time
310311
func (tsi *testStreamImpl) AddProcessingTimeEvent(d time.Duration) {
311312
tsi.em.testStreamHandler.AddProcessingTimeEvent(d)
312313
tsi.em.addPending(1)
314+
315+
// Disable real-time clock for this em if TestStream has processing time events.
316+
if tsi.em.config.EnableRTC {
317+
slog.Debug("Processing time event found in TestStream: real-time clock will be disabled for this job")
318+
tsi.em.config.EnableRTC = false
319+
}
313320
}

sdks/go/pkg/beam/runners/prism/internal/execute.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,13 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
152152
ts := comps.GetTransforms()
153153
pcols := comps.GetPcollections()
154154

155-
config := engine.Config{}
155+
config := engine.Config{EnableRTC: true}
156156
m := j.PipelineOptions().AsMap()
157157
if experimentsSlice, ok := m["beam:option:experiments:v1"].([]interface{}); ok {
158158
for _, exp := range experimentsSlice {
159159
if expStr, ok := exp.(string); ok {
160-
if expStr == "prism_enable_rtc" {
161-
config.EnableRTC = true
160+
if expStr == "prism_disable_rtc" {
161+
config.EnableRTC = false
162162
break // Found it, no need to check the rest of the slice
163163
}
164164
}
@@ -294,6 +294,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
294294
} else {
295295
tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond)
296296
}
297+
297298
default:
298299
return fmt.Errorf("prism error building stage %v - unknown TestStream event type: %T", stage.ID, ev)
299300
}

sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ func TestTimers(t *testing.T) {
143143
}{
144144
{pipeline: primitives.TimersEventTimeBounded},
145145
{pipeline: primitives.TimersEventTimeUnbounded},
146+
{pipeline: primitives.TimersProcessingTime_Bounded},
147+
{pipeline: primitives.TimersProcessingTime_Unbounded},
148+
{pipeline: primitives.TimersProcessingTimeTestStream_Infinity},
146149
}
147150

148151
for _, test := range tests {

sdks/go/test/integration/primitives/timers.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,14 @@ type processingTimeFn struct {
169169
Offset int
170170
TimerOutput int
171171
Cap int
172+
173+
InitialDelaySec int
174+
RecurringDelaySec int
172175
}
173176

174177
func (fn *processingTimeFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
175178
// Sets a processing time callback to occur.
176-
fn.Callback.Set(tp, time.Now().Add(9*time.Second))
179+
fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.InitialDelaySec)*time.Second))
177180

178181
// Only write to the state if we haven't done so already.
179182
// Writing blind would reset the state, and cause duplicated outputs.
@@ -205,7 +208,7 @@ func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp s
205208
if err := fn.MyValue.Write(sp, read+1); err != nil {
206209
panic(err)
207210
}
208-
fn.Callback.Set(tp, time.Now().Add(9*time.Second))
211+
fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.RecurringDelaySec)*time.Second))
209212
}
210213
if num, _, err := fn.Emissions.Read(sp); err != nil {
211214
panic(err)
@@ -237,15 +240,28 @@ func init() {
237240
register.Function3x0(regroup)
238241
}
239242

243+
// timersProcessingTimePipelineBuilder constructs a pipeline to validate the behavior of processing time timers.
244+
// It generates a set of keyed elements and uses a DoFn (`processingTimeFn`) to set an initial processing time
245+
// timer for each key. When a timer fires, the DoFn emits an element, increments a counter in state, and
246+
// sets a new timer to fire after a recurring delay, continuing until a specified number of emissions for that
247+
// key is reached.
248+
//
249+
// The total approximate runtime of the timer-based logic for each key is calculated as:
250+
// InitialDelay + (numDuplicateTimers - 1) * RecurringDelay.
251+
// Note that the number of keys is irrelevant to the runtime, because keys are processed in parallel.
240252
func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
241253
return func(s beam.Scope) {
242254
var inputs, wantOutputs []kv[string, int]
243255

244256
offset := 5000
245257
timerOutput := 4093
246258

259+
// Control the total runtime of the test to under 30 secs.
260+
// The runtime for the current setting is 3 + (5 - 1) * 1 = 7 secs
247261
numKeys := 40
248-
numDuplicateTimers := 15
262+
numDuplicateTimers := 5
263+
initialDelaySec := 3
264+
recurringDelaySec := 1
249265

250266
for key := 0; key < numKeys; key++ {
251267
k := strconv.Itoa(key)
@@ -261,11 +277,13 @@ func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollec
261277
Inputs: inputs,
262278
}, imp)
263279
times := beam.ParDo(s, &processingTimeFn{
264-
Offset: offset,
265-
TimerOutput: timerOutput,
266-
Callback: timers.InProcessingTime("Callback"),
267-
MyValue: state.MakeValueState[int]("MyValue"),
268-
Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs.
280+
Offset: offset,
281+
TimerOutput: timerOutput,
282+
Callback: timers.InProcessingTime("Callback"),
283+
MyValue: state.MakeValueState[int]("MyValue"),
284+
Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs.
285+
InitialDelaySec: initialDelaySec,
286+
RecurringDelaySec: recurringDelaySec,
269287
}, keyed)
270288
// We GroupByKey here so input to passert is blocked until teststream advances time to Infinity.
271289
gbk := beam.GroupByKey(s, times)
@@ -298,6 +316,6 @@ func TimersProcessingTime_Bounded(s beam.Scope) {
298316
func TimersProcessingTime_Unbounded(s beam.Scope) {
299317
timersProcessingTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
300318
now := time.Now()
301-
return periodic.Impulse(s, now, now.Add(10*time.Second), 0, false)
319+
return periodic.Impulse(s, now, now.Add(10*time.Second), 5*time.Second, false)
302320
})(s)
303321
}

sdks/go/test/integration/primitives/timers_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,8 @@ func TestTimers_ProcessingTime_Bounded(t *testing.T) {
4141
integration.CheckFilters(t)
4242
ptest.BuildAndRun(t, TimersProcessingTime_Bounded)
4343
}
44+
45+
func TestTimers_ProcessingTime_Unbounded(t *testing.T) {
46+
integration.CheckFilters(t)
47+
ptest.BuildAndRun(t, TimersProcessingTime_Unbounded)
48+
}

0 commit comments

Comments
 (0)