Skip to content

Commit 51ae038

Browse files
committed
Make the behavior of streaming order window elements similar to the batch version.
1 parent b2960c9 commit 51ae038

File tree

2 files changed

+134
-97
lines changed

2 files changed

+134
-97
lines changed

sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,19 @@ def process(
200200

201201
timer_started = timer_state.read()
202202
if not timer_started:
203+
timestamp_secs = timestamp.micros / 1e6
204+
205+
# Align the timestamp with the windowing scheme.
206+
aligned_timestamp = timestamp_secs - self.offset
207+
208+
# Calculate the start of the last window that could contain this timestamp.
209+
last_window_start_aligned = ((aligned_timestamp // self.slide_interval) *
210+
self.slide_interval)
211+
last_window_start = last_window_start_aligned + self.offset
212+
213+
n = (self.duration - 1) // self.slide_interval
203214
# Calculate the start of the first sliding window.
204-
first_slide_start = int(
205-
(timestamp.micros / 1e6 - self.offset) //
206-
self.slide_interval) * self.slide_interval + self.offset
215+
first_slide_start = last_window_start - n * self.slide_interval
207216
first_slide_start_ts = Timestamp.of(first_slide_start)
208217

209218
# Set the initial timer to fire at the end of the first window plus
@@ -256,14 +265,16 @@ def _get_windowed_values_from_state(
256265
if not windowed_values:
257266
# If the window is empty, use the last value.
258267
last_value = last_value_state.read()
259-
windowed_values.append(last_value)
268+
value_to_insert = (window_start_ts, last_value[1])
269+
windowed_values.append(value_to_insert)
260270
else:
261271
first_timestamp = windowed_values[0][0]
262272
last_value = last_value_state.read()
263273
if first_timestamp > window_start_ts and last_value:
264274
# Prepend the last value if there's a gap between the first element
265275
# in the window and the start of the window.
266-
windowed_values = [last_value] + windowed_values
276+
value_to_insert = (window_start_ts, last_value[1])
277+
windowed_values = [value_to_insert] + windowed_values
267278

268279
# Find the last element before the beginning of the next window to update
269280
# last_value_state.
@@ -334,8 +345,7 @@ def on_timer(
334345
windowed_values = self._get_windowed_values_from_state(
335346
buffer_state, late_start_ts, late_end_ts, last_value_state)
336347
yield TimestampedValue(
337-
((key, late_start_ts, late_end_ts), [v[1]
338-
for v in windowed_values]),
348+
(key, ((late_start_ts, late_end_ts), windowed_values)),
339349
late_end_ts - 1)
340350
late_start_ts += self.slide_interval
341351

@@ -347,8 +357,7 @@ def on_timer(
347357
windowed_values = self._get_windowed_values_from_state(
348358
buffer_state, window_start_ts, window_end_ts, last_value_state)
349359
yield TimestampedValue(
350-
((key, window_start_ts, window_end_ts), [v[1]
351-
for v in windowed_values]),
360+
(key, ((window_start_ts, window_end_ts), windowed_values)),
352361
window_end_ts - 1)
353362

354363
# Post-emit actions for the current window:
@@ -617,7 +626,7 @@ def expand(self, input):
617626
self.stop_timestamp)))
618627

619628
if isinstance(input.element_type, TupleConstraint):
620-
ret = keyed_output | beam.MapTuple(lambda x, y: (x[0], y))
629+
ret = keyed_output
621630
else:
622631
# Remove the default key if the input PCollection was originally unkeyed.
623632
ret = keyed_output | beam.Values()

sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming_test.py

Lines changed: 115 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,20 @@ def _create_test_stream(elements: list[int]):
9393
return test_stream
9494

9595

96+
def _convert_timestamp_to_int(has_key=False):
97+
if has_key:
98+
return beam.MapTuple(
99+
lambda key, value: (
100+
key,
101+
((int(value[0][0].micros // 1e6), int(value[0][1].micros // 1e6)),
102+
[(int(t.micros // 1e6), v) for t, v in value[1]])))
103+
104+
return beam.MapTuple(
105+
lambda window, elements:
106+
((int(window[0].micros // 1e6), int(window[1].micros // 1e6)),
107+
[(int(t.micros // 1e6), v) for t, v in elements]))
108+
109+
96110
_go_installed = shutil.which('go') is not None
97111
_in_windows = sys.platform == "win32"
98112

@@ -140,14 +154,28 @@ def test_default(self):
140154
WINDOW_SIZE,
141155
stop_timestamp=13,
142156
buffer_state_type=self.buffer_state_type))
143-
result = _maybe_log_elements(result)
157+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
144158
assert_that(result, equal_to([
145-
[0, 1, 2],
146-
[3, 4, 5],
147-
[6, 7, 8],
148-
[9],
159+
((0, 3), [(0, 0), (1, 1), (2, 2)]),
160+
((3, 6), [(3, 3), (4, 4), (5, 5)]),
161+
((6, 9), [(6, 6), (7, 7), (8, 8)]),
162+
((9, 12), [(9, 9)]),
149163
]))
150164

165+
def test_offset(self):
166+
with TestPipeline(options=self.options) as p:
167+
result = (
168+
p | _create_test_stream([2, 3, 4, 5, 6, 7, 8, 9])
169+
| OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13, offset=2))
170+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
171+
assert_that(
172+
result,
173+
equal_to([
174+
((2, 5), [(2, 2), (3, 3), (4, 4)]), # window start at 2
175+
((5, 8), [(5, 5), (6, 6), (7, 7)]),
176+
((8, 11), [(8, 8), (9, 9)])
177+
]))
178+
151179
def test_slide_interval(self):
152180
with TestPipeline(options=self.options) as p:
153181
result = (
@@ -157,16 +185,18 @@ def test_slide_interval(self):
157185
assert_that(
158186
result,
159187
equal_to([
160-
[0, 1, 2],
161-
[1, 2, 3],
162-
[2, 3, 4],
163-
[3, 4, 5],
164-
[4, 5, 6],
165-
[5, 6, 7],
166-
[6, 7, 8],
167-
[7, 8, 9],
168-
[8, 9],
169-
[9],
188+
((-2, 1), [(0, 0)]),
189+
((-1, 2), [(0, 0), (1, 1)]),
190+
((0, 3), [(0, 0), (1, 1), (2, 2)]),
191+
((1, 4), [(1, 1), (2, 2), (3, 3)]),
192+
((2, 5), [(2, 2), (3, 3), (4, 4)]),
193+
((3, 6), [(3, 3), (4, 4), (5, 5)]),
194+
((4, 7), [(4, 4), (5, 5), (6, 6)]),
195+
((5, 8), [(5, 5), (6, 6), (7, 7)]),
196+
((6, 9), [(6, 6), (7, 7), (8, 8)]),
197+
((7, 10), [(7, 7), (8, 8), (9, 9)]),
198+
((8, 11), [(8, 8), (9, 9)]),
199+
((9, 12), [(9, 9)]),
170200
]))
171201

172202
def test_keyed_input(self):
@@ -175,14 +205,15 @@ def test_keyed_input(self):
175205
p | _create_test_stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
176206
| beam.WithKeys("my_key") # key is present in the output
177207
| OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13))
178-
result = _maybe_log_elements(result)
208+
result = _maybe_log_elements(result) | _convert_timestamp_to_int(
209+
has_key=True)
179210
assert_that(
180211
result,
181212
equal_to([
182-
("my_key", [1, 2]),
183-
("my_key", [3, 4, 5]),
184-
("my_key", [6, 7, 8]),
185-
("my_key", [9, 10]),
213+
("my_key", ((0,3), [(1, 1), (2, 2)])),
214+
("my_key", ((3, 6), [(3, 3), (4, 4), (5, 5)])),
215+
("my_key", ((6, 9), [(6, 6), (7, 7), (8, 8)])),
216+
("my_key", ((9, 12), [(9, 9), (10, 10)])),
186217
]))
187218

188219
@parameterized.expand([
@@ -192,18 +223,18 @@ def test_keyed_input(self):
192223
def test_non_zero_offset_and_default_value(self, fill_window_start):
193224
if fill_window_start:
194225
expected = [
195-
[-100,
196-
0], # window [-2, 1), and the start is filled with default value
197-
[1, 2, 3], # window [1, 4)
198-
[4, 5, 6],
199-
[7, 8, 9],
226+
# window [-2, 1), and the start is filled with default value
227+
((-2, 1), [(-2, -100), (0, 0)]),
228+
((1, 4), [(1, 1), (2, 2), (3, 3)]), # window [1, 4)
229+
((4, 7), [(4, 4), (5, 5), (6, 6)]),
230+
((7, 10), [(7, 7), (8, 8), (9, 9)]),
200231
]
201232
else:
202233
expected = [
203-
[0], # window [-2, 1)
204-
[1, 2, 3], # window [1, 4)
205-
[4, 5, 6],
206-
[7, 8, 9],
234+
((-2, 1), [(0, 0)]), # window [-2, 1)
235+
((1, 4), [(1, 1), (2, 2), (3, 3)]), # window [1, 4)
236+
((4, 7), [(4, 4), (5, 5), (6, 6)]),
237+
((7, 10), [(7, 7), (8, 8), (9, 9)]),
207238
]
208239

209240
with TestPipeline(options=self.options) as p:
@@ -215,7 +246,7 @@ def test_non_zero_offset_and_default_value(self, fill_window_start):
215246
default_start_value=-100,
216247
fill_start_if_missing=fill_window_start,
217248
stop_timestamp=13))
218-
result = _maybe_log_elements(result)
249+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
219250
assert_that(result, equal_to(expected))
220251

221252
@parameterized.expand([
@@ -225,23 +256,23 @@ def test_non_zero_offset_and_default_value(self, fill_window_start):
225256
def test_ordered_data_with_gap(self, fill_window_start):
226257
if fill_window_start:
227258
expected = [
228-
[0, 1, 2],
229-
[3, 4],
230-
[4], # window [6, 9) is empty, so the start is filled. Same as below.
231-
[4], # window [9, 12) is empty
232-
[4], # window [12, 15) is empty
233-
[4, 16, 17], # window [15, 18) misses the start as well.
234-
[18, 19, 20],
259+
((0, 3), [(0, 0), (1, 1), (2, 2)]),
260+
((3, 6), [(3, 3), (4, 4)]),
261+
((6, 9), [(6, 4)]), # window [6, 9) is empty, so the start is filled.
262+
((9, 12), [(9, 4)]), # window [9, 12) is empty, so the start is filled.
263+
((12, 15), [(12, 4)]), # window [12, 15) is empty, so the start is filled.
264+
((15, 18), [(15, 4), (16, 16), (17, 17)]),
265+
((18, 21), [(18, 18), (19, 19), (20, 20)])
235266
]
236267
else:
237268
expected = [
238-
[0, 1, 2],
239-
[3, 4],
240-
[], # window [6, 9) is empty
241-
[], # window [9, 12) is empty
242-
[], # window [12, 15) is empty
243-
[16, 17],
244-
[18, 19, 20],
269+
((0, 3), [(0, 0), (1, 1), (2, 2)]),
270+
((3, 6), [(3, 3), (4, 4)]),
271+
((6, 9), []), # window [6, 9) is empty
272+
((9, 12), []), # window [9, 12) is empty
273+
((12, 15), []), # window [12, 15) is empty
274+
((15, 18), [(16, 16), (17, 17)]),
275+
((18, 21), [(18, 18), (19, 19), (20, 20)])
245276
]
246277
with TestPipeline(options=self.options) as p:
247278
result = (
@@ -250,22 +281,22 @@ def test_ordered_data_with_gap(self, fill_window_start):
250281
WINDOW_SIZE,
251282
fill_start_if_missing=fill_window_start,
252283
stop_timestamp=23))
253-
result = _maybe_log_elements(result)
284+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
254285
assert_that(result, equal_to(expected))
255286

256287
def test_single_late_data_with_no_allowed_lateness(self):
257288
with TestPipeline(options=self.options) as p:
258289
result = (
259290
p | _create_test_stream([0, 1, 2, 3, 4, 6, 7, 8, 9, 5])
260291
| OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13))
261-
result = _maybe_log_elements(result)
292+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
262293
assert_that(
263294
result,
264295
equal_to([
265-
[0, 1, 2],
266-
[3, 4], # 5 is late and discarded
267-
[6, 7, 8],
268-
[9],
296+
((0, 3), [(0, 0), (1, 1), (2, 2)]),
297+
((3, 6), [(3, 3), (4, 4)]), # 5 is late and discarded
298+
((6, 9), [(6, 6), (7, 7), (8, 8)]),
299+
((9, 12), [(9, 9)]),
269300
]))
270301

271302
def test_single_late_data_with_allowed_lateness(self):
@@ -274,16 +305,16 @@ def test_single_late_data_with_allowed_lateness(self):
274305
p | _create_test_stream([0, 1, 2, 3, 4, 6, 7, 8, 9, 5])
275306
| OrderedWindowElements(
276307
WINDOW_SIZE, allowed_lateness=4, stop_timestamp=17))
277-
result = _maybe_log_elements(result)
308+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
278309
assert_that(
279310
result,
280311
equal_to([
281-
[0, 1, 2],
312+
((0, 3), [(0, 0), (1, 1), (2, 2)]),
282313
# allow late data up to:
283314
# 9 (watermark before late data) - 4 (allowed lateness) = 5
284-
[3, 4, 5],
285-
[6, 7, 8],
286-
[9],
315+
((3, 6), [(3, 3), (4, 4), (5, 5)]),
316+
((6, 9), [(6, 6), (7, 7), (8, 8)]),
317+
((9, 12), [(9, 9)]),
287318
]))
288319

289320
@parameterized.expand([
@@ -295,19 +326,19 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start):
295326
expected = [
296327
# allow late data up to:
297328
# 9 (watermark before late data) - 5 (allowed lateness) = 4
298-
[None, 4, 5],
299-
[6, 7, 8],
300-
[9],
301-
[9],
302-
[9],
329+
((3, 6), [(3, None), (4, 4), (5, 5)]),
330+
((6, 9), [(6, 6), (7, 7), (8, 8)]),
331+
((9, 12), [(9, 9)]),
332+
((12, 15), [(12, 9)]),
333+
((15, 18), [(15, 9)]),
303334
]
304335
else:
305336
expected = [
306-
[4, 5],
307-
[6, 7, 8],
308-
[9],
309-
[],
310-
[],
337+
((3, 6), [(4, 4), (5, 5)]),
338+
((6, 9), [(6, 6), (7, 7), (8, 8)]),
339+
((9, 12), [(9, 9)]),
340+
((12, 15), []),
341+
((15, 18), []),
311342
]
312343
with TestPipeline(options=self.options) as p:
313344
result = (
@@ -317,7 +348,7 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start):
317348
fill_start_if_missing=fill_start,
318349
allowed_lateness=5,
319350
stop_timestamp=25))
320-
result = _maybe_log_elements(result)
351+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
321352
assert_that(result, equal_to(expected))
322353

323354
def test_multiple_late_data_with_allowed_lateness(self):
@@ -330,29 +361,26 @@ def test_multiple_late_data_with_allowed_lateness(self):
330361
allowed_lateness=6,
331362
fill_start_if_missing=True,
332363
stop_timestamp=28))
333-
result = _maybe_log_elements(result)
364+
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
334365
assert_that(
335366
result,
336-
equal_to([
337-
[1, 2, 3],
338-
[2, 3],
339-
[3],
340-
[3],
341-
[3],
342-
[3],
343-
[3, 9],
344-
[3, 9],
345-
[9],
346-
[9, 12],
347-
[9, 12],
348-
[12, 14],
349-
[12, 14],
350-
[14, 16],
351-
[14, 16, 17],
352-
[16, 17],
353-
[17],
354-
[17],
355-
]))
367+
equal_to([((-1, 2), [(-1, None), (1, 1)]), ((0, 3), [(0, None),
368+
(1, 1), (2, 2)]),
369+
((1, 4), [(1, 1), (2, 2),
370+
(3, 3)]), ((2, 5), [(2, 2),
371+
(3, 3)]), ((3, 6), [(3, 3)]),
372+
((4, 7), [(4, 3)]), ((5, 8), [(5, 3)]), ((6, 9), [(6, 3)]),
373+
((7, 10), [(7, 3), (9, 9)]), ((8, 11), [
374+
(8, 3), (9, 9)
375+
]), ((9, 12), [(9, 9)]), ((10, 13), [(10, 9), (12, 12)]),
376+
((11, 14), [(11, 9), (12, 12)]), ((12, 15), [
377+
(12, 12), (14, 14)
378+
]), ((13, 16), [(13, 12), (14, 14)]), ((14, 17), [
379+
(14, 14), (16, 16)
380+
]), ((15, 18), [(15, 14), (16, 16),
381+
(17, 17)]), ((16, 19), [(16, 16),
382+
(17, 17)]),
383+
((17, 20), [(17, 17)]), ((18, 21), [(18, 17)])]))
356384

357385

358386
if __name__ == '__main__':

0 commit comments

Comments
 (0)