diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 5136cd85e3ed..c14ddd87d6ae 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -2448,7 +2448,10 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { // "Test" mode -> advance to next processing time event if any, to allow execution. if !em.config.EnableRTC { - if t, ok := em.processTimeEvents.Peek(); ok { + em.refreshCond.L.Lock() + t, ok := em.processTimeEvents.Peek() + em.refreshCond.L.Unlock() + if ok { return t } }