Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ func (em *ElementManager) markChangedAndClearBundle(stageID, bundID string, ptRe
em.changedStages.insert(stageID)
for t := range ptRefreshes {
em.processTimeEvents.Schedule(t, stageID)
em.wakeUpAt(t)
}
em.refreshCond.Broadcast()
}
Expand Down Expand Up @@ -2464,8 +2465,8 @@ func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time {
// This is used for processing time timers to ensure the loop re-evaluates
// stages when a processing time timer is expected to fire.
func (em *ElementManager) wakeUpAt(t mtime.Time) {
if em.testStreamHandler == nil && em.config.EnableRTC {
// only create this goroutine if we have real-time clock enabled and the pipeline does not have TestStream.
if em.config.EnableRTC {
// only create this goroutine if we have real-time clock enabled (also implying the pipeline does not have TestStream).
go func(fireAt time.Time) {
time.AfterFunc(time.Until(fireAt), func() {
em.refreshCond.Broadcast()
Expand Down
7 changes: 7 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package engine

import (
"log/slog"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand Down Expand Up @@ -310,4 +311,10 @@ func (tsi *testStreamImpl) AddWatermarkEvent(tag string, newWatermark mtime.Time
func (tsi *testStreamImpl) AddProcessingTimeEvent(d time.Duration) {
tsi.em.testStreamHandler.AddProcessingTimeEvent(d)
tsi.em.addPending(1)

// Disable real-time clock for this em if TestStream has processing time events.
if tsi.em.config.EnableRTC {
slog.Debug("Processing time event found in TestStream: real-time clock will be disabled for this job")
tsi.em.config.EnableRTC = false
}
}
7 changes: 4 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
ts := comps.GetTransforms()
pcols := comps.GetPcollections()

config := engine.Config{}
config := engine.Config{EnableRTC: true}
m := j.PipelineOptions().AsMap()
if experimentsSlice, ok := m["beam:option:experiments:v1"].([]interface{}); ok {
for _, exp := range experimentsSlice {
if expStr, ok := exp.(string); ok {
if expStr == "prism_enable_rtc" {
config.EnableRTC = true
if expStr == "prism_disable_rtc" {
config.EnableRTC = false
break // Found it, no need to check the rest of the slice
}
}
Expand Down Expand Up @@ -342,6 +342,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
} else {
tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond)
}

default:
return fmt.Errorf("prism error building stage %v - unknown TestStream event type: %T", stage.ID, ev)
}
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func TestTimers(t *testing.T) {
}{
{pipeline: primitives.TimersEventTimeBounded},
{pipeline: primitives.TimersEventTimeUnbounded},
{pipeline: primitives.TimersProcessingTime_Bounded},
{pipeline: primitives.TimersProcessingTime_Unbounded},
{pipeline: primitives.TimersProcessingTimeTestStream_Infinity},
}

for _, test := range tests {
Expand Down
36 changes: 27 additions & 9 deletions sdks/go/test/integration/primitives/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,14 @@ type processingTimeFn struct {
Offset int
TimerOutput int
Cap int

InitialDelaySec int
RecurringDelaySec int
}

func (fn *processingTimeFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
// Sets a processing time callback to occur.
fn.Callback.Set(tp, time.Now().Add(9*time.Second))
fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.InitialDelaySec)*time.Second))

// Only write to the state if we haven't done so already.
// Writing blind would reset the state, and cause duplicated outputs.
Expand Down Expand Up @@ -205,7 +208,7 @@ func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp s
if err := fn.MyValue.Write(sp, read+1); err != nil {
panic(err)
}
fn.Callback.Set(tp, time.Now().Add(9*time.Second))
fn.Callback.Set(tp, time.Now().Add(time.Duration(fn.RecurringDelaySec)*time.Second))
}
if num, _, err := fn.Emissions.Read(sp); err != nil {
panic(err)
Expand Down Expand Up @@ -237,15 +240,28 @@ func init() {
register.Function3x0(regroup)
}

// timersProcessingTimePipelineBuilder constructs a pipeline to validate the behavior of processing time timers.
// It generates a set of keyed elements and uses a DoFn (`processingTimeFn`) to set an initial processing time
// timer for each key. When a timer fires, the DoFn emits an element, increments a counter in state, and
// sets a new timer to fire after a recurring delay, continuing until a specified number of emissions for that
// key is reached.
//
// The total approximate runtime of the timer-based logic for each key is calculated as:
// InitialDelay + (numDuplicateTimers - 1) * RecurringDelay.
// Note that the number of keys is irrelevant to the runtime, because keys are processed in parallel.
func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs, wantOutputs []kv[string, int]

offset := 5000
timerOutput := 4093

// Control the total runtime of the test to under 30 secs.
// The runtime for the current setting is 3 + (5 - 1) * 1 = 7 secs
numKeys := 40
numDuplicateTimers := 15
numDuplicateTimers := 5
initialDelaySec := 3
recurringDelaySec := 1

for key := 0; key < numKeys; key++ {
k := strconv.Itoa(key)
Expand All @@ -261,11 +277,13 @@ func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollec
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &processingTimeFn{
Offset: offset,
TimerOutput: timerOutput,
Callback: timers.InProcessingTime("Callback"),
MyValue: state.MakeValueState[int]("MyValue"),
Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs.
Offset: offset,
TimerOutput: timerOutput,
Callback: timers.InProcessingTime("Callback"),
MyValue: state.MakeValueState[int]("MyValue"),
Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs.
InitialDelaySec: initialDelaySec,
RecurringDelaySec: recurringDelaySec,
}, keyed)
// We GroupByKey here so input to passert is blocked until teststream advances time to Infinity.
gbk := beam.GroupByKey(s, times)
Expand Down Expand Up @@ -298,6 +316,6 @@ func TimersProcessingTime_Bounded(s beam.Scope) {
func TimersProcessingTime_Unbounded(s beam.Scope) {
timersProcessingTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
now := time.Now()
return periodic.Impulse(s, now, now.Add(10*time.Second), 0, false)
return periodic.Impulse(s, now, now.Add(10*time.Second), 5*time.Second, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal here is to have the transform fire twice, not once?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly!

})(s)
}
5 changes: 5 additions & 0 deletions sdks/go/test/integration/primitives/timers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ func TestTimers_ProcessingTime_Bounded(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TimersProcessingTime_Bounded)
}

func TestTimers_ProcessingTime_Unbounded(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TimersProcessingTime_Unbounded)
}
Loading