Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,19 @@ def process(

timer_started = timer_state.read()
if not timer_started:
timestamp_secs = timestamp.micros / 1e6

# Align the timestamp with the windowing scheme.
aligned_timestamp = timestamp_secs - self.offset

# Calculate the start of the last window that could contain this timestamp.
last_window_start_aligned = ((aligned_timestamp // self.slide_interval) *
self.slide_interval)
last_window_start = last_window_start_aligned + self.offset

n = (self.duration - 1) // self.slide_interval
# Calculate the start of the first sliding window.
first_slide_start = int(
(timestamp.micros / 1e6 - self.offset) //
self.slide_interval) * self.slide_interval + self.offset
first_slide_start = last_window_start - n * self.slide_interval
first_slide_start_ts = Timestamp.of(first_slide_start)

# Set the initial timer to fire at the end of the first window plus
Expand Down Expand Up @@ -256,14 +265,16 @@ def _get_windowed_values_from_state(
if not windowed_values:
# If the window is empty, use the last value.
last_value = last_value_state.read()
windowed_values.append(last_value)
value_to_insert = (window_start_ts, last_value[1])
windowed_values.append(value_to_insert)
else:
first_timestamp = windowed_values[0][0]
last_value = last_value_state.read()
if first_timestamp > window_start_ts and last_value:
# Prepend the last value if there's a gap between the first element
# in the window and the start of the window.
windowed_values = [last_value] + windowed_values
value_to_insert = (window_start_ts, last_value[1])
windowed_values = [value_to_insert] + windowed_values

# Find the last element before the beginning of the next window to update
# last_value_state.
Expand Down Expand Up @@ -334,8 +345,7 @@ def on_timer(
windowed_values = self._get_windowed_values_from_state(
buffer_state, late_start_ts, late_end_ts, last_value_state)
yield TimestampedValue(
((key, late_start_ts, late_end_ts), [v[1]
for v in windowed_values]),
(key, ((late_start_ts, late_end_ts), windowed_values)),
late_end_ts - 1)
late_start_ts += self.slide_interval

Expand All @@ -347,8 +357,7 @@ def on_timer(
windowed_values = self._get_windowed_values_from_state(
buffer_state, window_start_ts, window_end_ts, last_value_state)
yield TimestampedValue(
((key, window_start_ts, window_end_ts), [v[1]
for v in windowed_values]),
(key, ((window_start_ts, window_end_ts), windowed_values)),
window_end_ts - 1)

# Post-emit actions for the current window:
Expand Down Expand Up @@ -617,7 +626,7 @@ def expand(self, input):
self.stop_timestamp)))

if isinstance(input.element_type, TupleConstraint):
ret = keyed_output | beam.MapTuple(lambda x, y: (x[0], y))
ret = keyed_output
else:
# Remove the default key if the input PCollection was originally unkeyed.
ret = keyed_output | beam.Values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ def _create_test_stream(elements: list[int]):
return test_stream


def _convert_timestamp_to_int(has_key=False):
if has_key:
return beam.MapTuple(
lambda key, value: (
key,
((int(value[0][0].micros // 1e6), int(value[0][1].micros // 1e6)),
[(int(t.micros // 1e6), v) for t, v in value[1]])))

return beam.MapTuple(
lambda window, elements:
((int(window[0].micros // 1e6), int(window[1].micros // 1e6)),
[(int(t.micros // 1e6), v) for t, v in elements]))


_go_installed = shutil.which('go') is not None
_in_windows = sys.platform == "win32"

Expand Down Expand Up @@ -140,13 +154,29 @@ def test_default(self):
WINDOW_SIZE,
stop_timestamp=13,
buffer_state_type=self.buffer_state_type))
result = _maybe_log_elements(result)
assert_that(result, equal_to([
[0, 1, 2],
[3, 4, 5],
[6, 7, 8],
[9],
]))
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
assert_that(
result,
equal_to([
((0, 3), [(0, 0), (1, 1), (2, 2)]),
((3, 6), [(3, 3), (4, 4), (5, 5)]),
((6, 9), [(6, 6), (7, 7), (8, 8)]),
((9, 12), [(9, 9)]),
]))

def test_offset(self):
with TestPipeline(options=self.options) as p:
result = (
p | _create_test_stream([2, 3, 4, 5, 6, 7, 8, 9])
| OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13, offset=2))
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
assert_that(
result,
equal_to([
((2, 5), [(2, 2), (3, 3), (4, 4)]), # window start at 2
((5, 8), [(5, 5), (6, 6), (7, 7)]),
((8, 11), [(8, 8), (9, 9)])
]))

def test_slide_interval(self):
with TestPipeline(options=self.options) as p:
Expand All @@ -157,16 +187,18 @@ def test_slide_interval(self):
assert_that(
result,
equal_to([
[0, 1, 2],
[1, 2, 3],
[2, 3, 4],
[3, 4, 5],
[4, 5, 6],
[5, 6, 7],
[6, 7, 8],
[7, 8, 9],
[8, 9],
[9],
((-2, 1), [(0, 0)]),
((-1, 2), [(0, 0), (1, 1)]),
((0, 3), [(0, 0), (1, 1), (2, 2)]),
((1, 4), [(1, 1), (2, 2), (3, 3)]),
((2, 5), [(2, 2), (3, 3), (4, 4)]),
((3, 6), [(3, 3), (4, 4), (5, 5)]),
((4, 7), [(4, 4), (5, 5), (6, 6)]),
((5, 8), [(5, 5), (6, 6), (7, 7)]),
((6, 9), [(6, 6), (7, 7), (8, 8)]),
((7, 10), [(7, 7), (8, 8), (9, 9)]),
((8, 11), [(8, 8), (9, 9)]),
((9, 12), [(9, 9)]),
]))

def test_keyed_input(self):
Expand All @@ -175,14 +207,15 @@ def test_keyed_input(self):
p | _create_test_stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
| beam.WithKeys("my_key") # key is present in the output
| OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13))
result = _maybe_log_elements(result)
result = _maybe_log_elements(result) | _convert_timestamp_to_int(
has_key=True)
assert_that(
result,
equal_to([
("my_key", [1, 2]),
("my_key", [3, 4, 5]),
("my_key", [6, 7, 8]),
("my_key", [9, 10]),
("my_key", ((0, 3), [(1, 1), (2, 2)])),
("my_key", ((3, 6), [(3, 3), (4, 4), (5, 5)])),
("my_key", ((6, 9), [(6, 6), (7, 7), (8, 8)])),
("my_key", ((9, 12), [(9, 9), (10, 10)])),
]))

@parameterized.expand([
Expand All @@ -192,18 +225,18 @@ def test_keyed_input(self):
def test_non_zero_offset_and_default_value(self, fill_window_start):
if fill_window_start:
expected = [
[-100,
0], # window [-2, 1), and the start is filled with default value
[1, 2, 3], # window [1, 4)
[4, 5, 6],
[7, 8, 9],
# window [-2, 1), and the start is filled with default value
((-2, 1), [(-2, -100), (0, 0)]),
((1, 4), [(1, 1), (2, 2), (3, 3)]), # window [1, 4)
((4, 7), [(4, 4), (5, 5), (6, 6)]),
((7, 10), [(7, 7), (8, 8), (9, 9)]),
]
else:
expected = [
[0], # window [-2, 1)
[1, 2, 3], # window [1, 4)
[4, 5, 6],
[7, 8, 9],
((-2, 1), [(0, 0)]), # window [-2, 1)
((1, 4), [(1, 1), (2, 2), (3, 3)]), # window [1, 4)
((4, 7), [(4, 4), (5, 5), (6, 6)]),
((7, 10), [(7, 7), (8, 8), (9, 9)]),
]

with TestPipeline(options=self.options) as p:
Expand All @@ -215,7 +248,7 @@ def test_non_zero_offset_and_default_value(self, fill_window_start):
default_start_value=-100,
fill_start_if_missing=fill_window_start,
stop_timestamp=13))
result = _maybe_log_elements(result)
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
assert_that(result, equal_to(expected))

@parameterized.expand([
Expand All @@ -225,23 +258,26 @@ def test_non_zero_offset_and_default_value(self, fill_window_start):
def test_ordered_data_with_gap(self, fill_window_start):
if fill_window_start:
expected = [
[0, 1, 2],
[3, 4],
[4], # window [6, 9) is empty, so the start is filled. Same as below.
[4], # window [9, 12) is empty
[4], # window [12, 15) is empty
[4, 16, 17], # window [15, 18) misses the start as well.
[18, 19, 20],
((0, 3), [(0, 0), (1, 1), (2, 2)]),
((3, 6), [(3, 3), (4, 4)]),
# window [6, 9) is empty, so the start is filled with last value.
((6, 9), [(6, 4)]),
# window [9, 12) is empty, so the start is filled with last value.
((9, 12), [(9, 4)]),
# window [12, 15) is empty, so the start is filled with last value.
((12, 15), [(12, 4)]),
((15, 18), [(15, 4), (16, 16), (17, 17)]),
((18, 21), [(18, 18), (19, 19), (20, 20)])
]
else:
expected = [
[0, 1, 2],
[3, 4],
[], # window [6, 9) is empty
[], # window [9, 12) is empty
[], # window [12, 15) is empty
[16, 17],
[18, 19, 20],
((0, 3), [(0, 0), (1, 1), (2, 2)]),
((3, 6), [(3, 3), (4, 4)]),
((6, 9), []), # window [6, 9) is empty
((9, 12), []), # window [9, 12) is empty
((12, 15), []), # window [12, 15) is empty
((15, 18), [(16, 16), (17, 17)]),
((18, 21), [(18, 18), (19, 19), (20, 20)])
]
with TestPipeline(options=self.options) as p:
result = (
Expand All @@ -250,22 +286,22 @@ def test_ordered_data_with_gap(self, fill_window_start):
WINDOW_SIZE,
fill_start_if_missing=fill_window_start,
stop_timestamp=23))
result = _maybe_log_elements(result)
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
assert_that(result, equal_to(expected))

def test_single_late_data_with_no_allowed_lateness(self):
with TestPipeline(options=self.options) as p:
result = (
p | _create_test_stream([0, 1, 2, 3, 4, 6, 7, 8, 9, 5])
| OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13))
result = _maybe_log_elements(result)
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
assert_that(
result,
equal_to([
[0, 1, 2],
[3, 4], # 5 is late and discarded
[6, 7, 8],
[9],
((0, 3), [(0, 0), (1, 1), (2, 2)]),
((3, 6), [(3, 3), (4, 4)]), # 5 is late and discarded
((6, 9), [(6, 6), (7, 7), (8, 8)]),
((9, 12), [(9, 9)]),
]))

def test_single_late_data_with_allowed_lateness(self):
Expand All @@ -274,16 +310,16 @@ def test_single_late_data_with_allowed_lateness(self):
p | _create_test_stream([0, 1, 2, 3, 4, 6, 7, 8, 9, 5])
| OrderedWindowElements(
WINDOW_SIZE, allowed_lateness=4, stop_timestamp=17))
result = _maybe_log_elements(result)
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
assert_that(
result,
equal_to([
[0, 1, 2],
((0, 3), [(0, 0), (1, 1), (2, 2)]),
# allow late data up to:
# 9 (watermark before late data) - 4 (allowed lateness) = 5
[3, 4, 5],
[6, 7, 8],
[9],
((3, 6), [(3, 3), (4, 4), (5, 5)]),
((6, 9), [(6, 6), (7, 7), (8, 8)]),
((9, 12), [(9, 9)]),
]))

@parameterized.expand([
Expand All @@ -295,19 +331,19 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start):
expected = [
# allow late data up to:
# 9 (watermark before late data) - 5 (allowed lateness) = 4
[None, 4, 5],
[6, 7, 8],
[9],
[9],
[9],
((3, 6), [(3, None), (4, 4), (5, 5)]),
((6, 9), [(6, 6), (7, 7), (8, 8)]),
((9, 12), [(9, 9)]),
((12, 15), [(12, 9)]),
((15, 18), [(15, 9)]),
]
else:
expected = [
[4, 5],
[6, 7, 8],
[9],
[],
[],
((3, 6), [(4, 4), (5, 5)]),
((6, 9), [(6, 6), (7, 7), (8, 8)]),
((9, 12), [(9, 9)]),
((12, 15), []),
((15, 18), []),
]
with TestPipeline(options=self.options) as p:
result = (
Expand All @@ -317,7 +353,7 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start):
fill_start_if_missing=fill_start,
allowed_lateness=5,
stop_timestamp=25))
result = _maybe_log_elements(result)
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
assert_that(result, equal_to(expected))

def test_multiple_late_data_with_allowed_lateness(self):
Expand All @@ -330,29 +366,31 @@ def test_multiple_late_data_with_allowed_lateness(self):
allowed_lateness=6,
fill_start_if_missing=True,
stop_timestamp=28))
result = _maybe_log_elements(result)
result = _maybe_log_elements(result) | _convert_timestamp_to_int()
# yapf: disable
assert_that(
result,
equal_to([
[1, 2, 3],
[2, 3],
[3],
[3],
[3],
[3],
[3, 9],
[3, 9],
[9],
[9, 12],
[9, 12],
[12, 14],
[12, 14],
[14, 16],
[14, 16, 17],
[16, 17],
[17],
[17],
((-1, 2), [(-1, None), (1, 1)]),
((0, 3), [(0, None), (1, 1), (2, 2)]),
((1, 4), [(1, 1), (2, 2), (3, 3)]),
((2, 5), [(2, 2), (3, 3)]), ((3, 6), [(3, 3)]),
((4, 7), [(4, 3)]),
((5, 8), [(5, 3)]),
((6, 9), [(6, 3)]),
((7, 10), [(7, 3), (9, 9)]),
((8, 11), [(8, 3), (9, 9)]),
((9, 12), [(9, 9)]),
((10, 13), [(10, 9), (12, 12)]),
((11, 14), [(11, 9), (12, 12)]),
((12, 15), [(12, 12), (14, 14)]),
((13, 16), [(13, 12), (14, 14)]),
((14, 17), [(14, 14), (16, 16)]),
((15, 18), [(15, 14), (16, 16),(17, 17)]),
((16, 19), [(16, 16), (17, 17)]),
((17, 20), [(17, 17)]), ((18, 21), [(18, 17)])
]))
# yapf: enable


if __name__ == '__main__':
Expand Down
Loading