diff --git a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py index 450c90685acc..aed1400bc4d8 100644 --- a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py +++ b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming.py @@ -35,6 +35,7 @@ from apache_beam.typehints.typehints import TupleConstraint from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP +from apache_beam.utils.timestamp import Duration from apache_beam.utils.timestamp import DurationTypes # pylint: disable=unused-import from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.timestamp import TimestampTypes # pylint: disable=unused-import @@ -89,7 +90,7 @@ def __init__( self, duration: DurationTypes, slide_interval: DurationTypes, - offset: TimestampTypes, + offset: DurationTypes, allowed_lateness: DurationTypes, default_start_value, fill_start_if_missing: bool, @@ -200,11 +201,23 @@ def process( timer_started = timer_state.read() if not timer_started: + offset_duration = Duration.of(self.offset) + slide_duration = Duration.of(self.slide_interval) + duration_duration = Duration.of(self.duration) + + # Align the timestamp with the windowing scheme. + aligned_micros = (timestamp - offset_duration).micros + + # Calculate the start of the last window that could contain this timestamp + last_window_start_aligned_micros = ( + (aligned_micros // slide_duration.micros) * slide_duration.micros) + + last_window_start = Timestamp( + micros=last_window_start_aligned_micros) + offset_duration + n = (duration_duration.micros - 1) // slide_duration.micros # Calculate the start of the first sliding window. - first_slide_start = int( - (timestamp.micros / 1e6 - self.offset) // - self.slide_interval) * self.slide_interval + self.offset - first_slide_start_ts = Timestamp.of(first_slide_start) + first_slide_start_ts = last_window_start - Duration( + micros=n * slide_duration.micros) # Set the initial timer to fire at the end of the first window plus # allowed lateness. @@ -256,14 +269,16 @@ def _get_windowed_values_from_state( if not windowed_values: # If the window is empty, use the last value. last_value = last_value_state.read() - windowed_values.append(last_value) + value_to_insert = (window_start_ts, last_value[1]) + windowed_values.append(value_to_insert) else: first_timestamp = windowed_values[0][0] last_value = last_value_state.read() if first_timestamp > window_start_ts and last_value: # Prepend the last value if there's a gap between the first element # in the window and the start of the window. - windowed_values = [last_value] + windowed_values + value_to_insert = (window_start_ts, last_value[1]) + windowed_values = [value_to_insert] + windowed_values # Find the last element before the beginning of the next window to update # last_value_state. @@ -334,8 +349,7 @@ def on_timer( windowed_values = self._get_windowed_values_from_state( buffer_state, late_start_ts, late_end_ts, last_value_state) yield TimestampedValue( - ((key, late_start_ts, late_end_ts), [v[1] - for v in windowed_values]), + (key, ((late_start_ts, late_end_ts), windowed_values)), late_end_ts - 1) late_start_ts += self.slide_interval @@ -347,8 +361,7 @@ def on_timer( windowed_values = self._get_windowed_values_from_state( buffer_state, window_start_ts, window_end_ts, last_value_state) yield TimestampedValue( - ((key, window_start_ts, window_end_ts), [v[1] - for v in windowed_values]), + (key, ((window_start_ts, window_end_ts), windowed_values)), window_end_ts - 1) # Post-emit actions for the current window: @@ -532,7 +545,7 @@ def __init__( self, duration: DurationTypes, slide_interval: Optional[DurationTypes] = None, - offset: TimestampTypes = 0, + offset: DurationTypes = 0, allowed_lateness: DurationTypes = 0, default_start_value=None, fill_start_if_missing: bool = False, @@ -617,7 +630,7 @@ def expand(self, input): self.stop_timestamp))) if isinstance(input.element_type, TupleConstraint): - ret = keyed_output | beam.MapTuple(lambda x, y: (x[0], y)) + ret = keyed_output else: # Remove the default key if the input PCollection was originally unkeyed. ret = keyed_output | beam.Values() diff --git a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming_test.py b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming_test.py index 83bdc289b95c..ca19d9776fae 100644 --- a/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming_test.py +++ b/sdks/python/apache_beam/examples/cookbook/ordered_window_elements/streaming_test.py @@ -93,6 +93,20 @@ def _create_test_stream(elements: list[int]): return test_stream +def _convert_timestamp_to_int(has_key=False): + if has_key: + return beam.MapTuple( + lambda key, value: ( + key, + ((int(value[0][0].micros // 1e6), int(value[0][1].micros // 1e6)), + [(int(t.micros // 1e6), v) for t, v in value[1]]))) + + return beam.MapTuple( + lambda window, elements: + ((int(window[0].micros // 1e6), int(window[1].micros // 1e6)), + [(int(t.micros // 1e6), v) for t, v in elements])) + + _go_installed = shutil.which('go') is not None _in_windows = sys.platform == "win32" @@ -140,13 +154,29 @@ def test_default(self): WINDOW_SIZE, stop_timestamp=13, buffer_state_type=self.buffer_state_type)) - result = _maybe_log_elements(result) - assert_that(result, equal_to([ - [0, 1, 2], - [3, 4, 5], - [6, 7, 8], - [9], - ])) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() + assert_that( + result, + equal_to([ + ((0, 3), [(0, 0), (1, 1), (2, 2)]), + ((3, 6), [(3, 3), (4, 4), (5, 5)]), + ((6, 9), [(6, 6), (7, 7), (8, 8)]), + ((9, 12), [(9, 9)]), + ])) + + def test_offset(self): + with TestPipeline(options=self.options) as p: + result = ( + p | _create_test_stream([2, 3, 4, 5, 6, 7, 8, 9]) + | OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13, offset=2)) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() + assert_that( + result, + equal_to([ + ((2, 5), [(2, 2), (3, 3), (4, 4)]), # window start at 2 + ((5, 8), [(5, 5), (6, 6), (7, 7)]), + ((8, 11), [(8, 8), (9, 9)]) + ])) def test_slide_interval(self): with TestPipeline(options=self.options) as p: @@ -157,16 +187,18 @@ def test_slide_interval(self): assert_that( result, equal_to([ - [0, 1, 2], - [1, 2, 3], - [2, 3, 4], - [3, 4, 5], - [4, 5, 6], - [5, 6, 7], - [6, 7, 8], - [7, 8, 9], - [8, 9], - [9], + ((-2, 1), [(0, 0)]), + ((-1, 2), [(0, 0), (1, 1)]), + ((0, 3), [(0, 0), (1, 1), (2, 2)]), + ((1, 4), [(1, 1), (2, 2), (3, 3)]), + ((2, 5), [(2, 2), (3, 3), (4, 4)]), + ((3, 6), [(3, 3), (4, 4), (5, 5)]), + ((4, 7), [(4, 4), (5, 5), (6, 6)]), + ((5, 8), [(5, 5), (6, 6), (7, 7)]), + ((6, 9), [(6, 6), (7, 7), (8, 8)]), + ((7, 10), [(7, 7), (8, 8), (9, 9)]), + ((8, 11), [(8, 8), (9, 9)]), + ((9, 12), [(9, 9)]), ])) def test_keyed_input(self): @@ -175,14 +207,15 @@ def test_keyed_input(self): p | _create_test_stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) | beam.WithKeys("my_key") # key is present in the output | OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13)) - result = _maybe_log_elements(result) + result = _maybe_log_elements(result) | _convert_timestamp_to_int( + has_key=True) assert_that( result, equal_to([ - ("my_key", [1, 2]), - ("my_key", [3, 4, 5]), - ("my_key", [6, 7, 8]), - ("my_key", [9, 10]), + ("my_key", ((0, 3), [(1, 1), (2, 2)])), + ("my_key", ((3, 6), [(3, 3), (4, 4), (5, 5)])), + ("my_key", ((6, 9), [(6, 6), (7, 7), (8, 8)])), + ("my_key", ((9, 12), [(9, 9), (10, 10)])), ])) @parameterized.expand([ @@ -192,18 +225,18 @@ def test_keyed_input(self): def test_non_zero_offset_and_default_value(self, fill_window_start): if fill_window_start: expected = [ - [-100, - 0], # window [-2, 1), and the start is filled with default value - [1, 2, 3], # window [1, 4) - [4, 5, 6], - [7, 8, 9], + # window [-2, 1), and the start is filled with default value + ((-2, 1), [(-2, -100), (0, 0)]), + ((1, 4), [(1, 1), (2, 2), (3, 3)]), # window [1, 4) + ((4, 7), [(4, 4), (5, 5), (6, 6)]), + ((7, 10), [(7, 7), (8, 8), (9, 9)]), ] else: expected = [ - [0], # window [-2, 1) - [1, 2, 3], # window [1, 4) - [4, 5, 6], - [7, 8, 9], + ((-2, 1), [(0, 0)]), # window [-2, 1) + ((1, 4), [(1, 1), (2, 2), (3, 3)]), # window [1, 4) + ((4, 7), [(4, 4), (5, 5), (6, 6)]), + ((7, 10), [(7, 7), (8, 8), (9, 9)]), ] with TestPipeline(options=self.options) as p: @@ -215,7 +248,7 @@ def test_non_zero_offset_and_default_value(self, fill_window_start): default_start_value=-100, fill_start_if_missing=fill_window_start, stop_timestamp=13)) - result = _maybe_log_elements(result) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() assert_that(result, equal_to(expected)) @parameterized.expand([ @@ -225,23 +258,26 @@ def test_non_zero_offset_and_default_value(self, fill_window_start): def test_ordered_data_with_gap(self, fill_window_start): if fill_window_start: expected = [ - [0, 1, 2], - [3, 4], - [4], # window [6, 9) is empty, so the start is filled. Same as below. - [4], # window [9, 12) is empty - [4], # window [12, 15) is empty - [4, 16, 17], # window [15, 18) misses the start as well. - [18, 19, 20], + ((0, 3), [(0, 0), (1, 1), (2, 2)]), + ((3, 6), [(3, 3), (4, 4)]), + # window [6, 9) is empty, so the start is filled with last value. + ((6, 9), [(6, 4)]), + # window [9, 12) is empty, so the start is filled with last value. + ((9, 12), [(9, 4)]), + # window [12, 15) is empty, so the start is filled with last value. + ((12, 15), [(12, 4)]), + ((15, 18), [(15, 4), (16, 16), (17, 17)]), + ((18, 21), [(18, 18), (19, 19), (20, 20)]) ] else: expected = [ - [0, 1, 2], - [3, 4], - [], # window [6, 9) is empty - [], # window [9, 12) is empty - [], # window [12, 15) is empty - [16, 17], - [18, 19, 20], + ((0, 3), [(0, 0), (1, 1), (2, 2)]), + ((3, 6), [(3, 3), (4, 4)]), + ((6, 9), []), # window [6, 9) is empty + ((9, 12), []), # window [9, 12) is empty + ((12, 15), []), # window [12, 15) is empty + ((15, 18), [(16, 16), (17, 17)]), + ((18, 21), [(18, 18), (19, 19), (20, 20)]) ] with TestPipeline(options=self.options) as p: result = ( @@ -250,7 +286,7 @@ def test_ordered_data_with_gap(self, fill_window_start): WINDOW_SIZE, fill_start_if_missing=fill_window_start, stop_timestamp=23)) - result = _maybe_log_elements(result) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() assert_that(result, equal_to(expected)) def test_single_late_data_with_no_allowed_lateness(self): @@ -258,14 +294,14 @@ def test_single_late_data_with_no_allowed_lateness(self): result = ( p | _create_test_stream([0, 1, 2, 3, 4, 6, 7, 8, 9, 5]) | OrderedWindowElements(WINDOW_SIZE, stop_timestamp=13)) - result = _maybe_log_elements(result) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() assert_that( result, equal_to([ - [0, 1, 2], - [3, 4], # 5 is late and discarded - [6, 7, 8], - [9], + ((0, 3), [(0, 0), (1, 1), (2, 2)]), + ((3, 6), [(3, 3), (4, 4)]), # 5 is late and discarded + ((6, 9), [(6, 6), (7, 7), (8, 8)]), + ((9, 12), [(9, 9)]), ])) def test_single_late_data_with_allowed_lateness(self): @@ -274,16 +310,16 @@ def test_single_late_data_with_allowed_lateness(self): p | _create_test_stream([0, 1, 2, 3, 4, 6, 7, 8, 9, 5]) | OrderedWindowElements( WINDOW_SIZE, allowed_lateness=4, stop_timestamp=17)) - result = _maybe_log_elements(result) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() assert_that( result, equal_to([ - [0, 1, 2], + ((0, 3), [(0, 0), (1, 1), (2, 2)]), # allow late data up to: # 9 (watermark before late data) - 4 (allowed lateness) = 5 - [3, 4, 5], - [6, 7, 8], - [9], + ((3, 6), [(3, 3), (4, 4), (5, 5)]), + ((6, 9), [(6, 6), (7, 7), (8, 8)]), + ((9, 12), [(9, 9)]), ])) @parameterized.expand([ @@ -295,19 +331,19 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start): expected = [ # allow late data up to: # 9 (watermark before late data) - 5 (allowed lateness) = 4 - [None, 4, 5], - [6, 7, 8], - [9], - [9], - [9], + ((3, 6), [(3, None), (4, 4), (5, 5)]), + ((6, 9), [(6, 6), (7, 7), (8, 8)]), + ((9, 12), [(9, 9)]), + ((12, 15), [(12, 9)]), + ((15, 18), [(15, 9)]), ] else: expected = [ - [4, 5], - [6, 7, 8], - [9], - [], - [], + ((3, 6), [(4, 4), (5, 5)]), + ((6, 9), [(6, 6), (7, 7), (8, 8)]), + ((9, 12), [(9, 9)]), + ((12, 15), []), + ((15, 18), []), ] with TestPipeline(options=self.options) as p: result = ( @@ -317,7 +353,7 @@ def test_reversed_ordered_data_with_allowed_lateness(self, fill_start): fill_start_if_missing=fill_start, allowed_lateness=5, stop_timestamp=25)) - result = _maybe_log_elements(result) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() assert_that(result, equal_to(expected)) def test_multiple_late_data_with_allowed_lateness(self): @@ -330,29 +366,31 @@ def test_multiple_late_data_with_allowed_lateness(self): allowed_lateness=6, fill_start_if_missing=True, stop_timestamp=28)) - result = _maybe_log_elements(result) + result = _maybe_log_elements(result) | _convert_timestamp_to_int() + # yapf: disable assert_that( result, equal_to([ - [1, 2, 3], - [2, 3], - [3], - [3], - [3], - [3], - [3, 9], - [3, 9], - [9], - [9, 12], - [9, 12], - [12, 14], - [12, 14], - [14, 16], - [14, 16, 17], - [16, 17], - [17], - [17], + ((-1, 2), [(-1, None), (1, 1)]), + ((0, 3), [(0, None), (1, 1), (2, 2)]), + ((1, 4), [(1, 1), (2, 2), (3, 3)]), + ((2, 5), [(2, 2), (3, 3)]), ((3, 6), [(3, 3)]), + ((4, 7), [(4, 3)]), + ((5, 8), [(5, 3)]), + ((6, 9), [(6, 3)]), + ((7, 10), [(7, 3), (9, 9)]), + ((8, 11), [(8, 3), (9, 9)]), + ((9, 12), [(9, 9)]), + ((10, 13), [(10, 9), (12, 12)]), + ((11, 14), [(11, 9), (12, 12)]), + ((12, 15), [(12, 12), (14, 14)]), + ((13, 16), [(13, 12), (14, 14)]), + ((14, 17), [(14, 14), (16, 16)]), + ((15, 18), [(15, 14), (16, 16),(17, 17)]), + ((16, 19), [(16, 16), (17, 17)]), + ((17, 20), [(17, 17)]), ((18, 21), [(18, 17)]) ])) + # yapf: enable if __name__ == '__main__':