Skip to content

Commit 354a387

Browse files
authored
Fix panic in teststream. (#36227)
1 parent 3659832 commit 354a387

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2056,7 +2056,7 @@ func (ss *stageState) String() string {
20562056
return fmt.Sprintf("[%v] IN: %v OUT: %v UP: %q %v, kind: %v", ss.ID, ss.input, ss.output, pcol, up, ss.kind)
20572057
}
20582058

2059-
// updateWatermarks performs the following operations:
2059+
// updateWatermarks performs the following operations and returns a possible set of stages to refresh next or nil.
20602060
//
20612061
// Watermark_In' = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
20622062
// Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(minWatermarkHold)))

sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,9 @@ func (ev tsFinalEvent) Execute(em *ElementManager) {
253253
em.testStreamHandler.UpdateHold(em, mtime.MaxTimestamp)
254254
ss := em.stages[ev.stageID]
255255
kickSet := ss.updateWatermarks(em)
256+
if kickSet == nil {
257+
kickSet = make(set[string])
258+
}
256259
kickSet.insert(ev.stageID)
257260
em.changedStages.merge(kickSet)
258261
}

0 commit comments

Comments
 (0)