Skip to content

Commit 7bfe0e1

Browse files
authored
Do not panic if an element has no windows during PersistBundle. (#36610)
* Do not panic if an element has no windows during PersistBundle. * Add a test for this case.
1 parent 518b118 commit 7bfe0e1

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -791,8 +791,7 @@ func reElementResiduals(residuals []Residual, inputInfo PColInfo, rb RunBundle)
791791
panic("error decoding residual header:" + err.Error())
792792
}
793793
if len(ws) == 0 {
794-
slog.Error("reElementResiduals: sdk provided a windowed value header 0 windows", "bundle", rb)
795-
panic("error decoding residual header: sdk provided a windowed value header 0 windows")
794+
slog.Warn("reElementResiduals: sdk provided a windowed value header 0 windows", "bundle", rb)
796795
}
797796
// POSSIBLY BAD PATTERN: The buffer is invalidated on the next call, which doesn't always happen.
798797
// But the decoder won't be mutating the buffer bytes, just reading the data. So the elmBytes
@@ -852,8 +851,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
852851
panic("error decoding watermarks")
853852
}
854853
if len(ws) == 0 {
855-
slog.Error("PersistBundle: sdk provided a windowed value header 0 windows", "bundle", rb)
856-
panic("error decoding residual header: sdk provided a windowed value header 0 windows")
854+
slog.Warn("PersistBundle: sdk provided a windowed value header 0 windows", "bundle", rb)
857855
}
858856
// TODO: Optimize unnecessary copies. This is doubleteeing.
859857
elmBytes := info.EDec(tee)

sdks/python/apache_beam/transforms/window_test.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,19 @@ def test_sliding_windows(self):
192192
('key @ [2.0, 6.0)', [2, 3])]
193193
assert_that(result, equal_to(expected))
194194

195+
def test_sliding_windows_period_longer_than_size(self):
196+
with TestPipeline() as p:
197+
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 4, 5, 6, 7, 8)
198+
result = (
199+
pcoll
200+
| 'w' >> WindowInto(SlidingWindows(period=4, size=2))
201+
| GroupByKey()
202+
| beam.MapTuple(lambda k, vs: (k, sorted(vs)))
203+
| beam.ParDo(ReifyWindowsFn()))
204+
expected = [('key @ [0.0, 2.0)', [1]), ('key @ [4.0, 6.0)', [4, 5]),
205+
('key @ [8.0, 10.0)', [8])]
206+
assert_that(result, equal_to(expected))
207+
195208
def test_sessions(self):
196209
with TestPipeline() as p:
197210
pcoll = self.timestamped_key_values(p, 'key', 1, 2, 3, 20, 35, 27)

0 commit comments

Comments
 (0)