Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from apache_beam.typehints.typehints import TupleConstraint
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import DurationTypes # pylint: disable=unused-import
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import TimestampTypes # pylint: disable=unused-import
Expand Down Expand Up @@ -89,7 +90,7 @@ def __init__(
self,
duration: DurationTypes,
slide_interval: DurationTypes,
offset: TimestampTypes,
offset: DurationTypes,
Copy link
Contributor

@liferoad liferoad Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a break change to me. Could we keep both like offset: Union[TimestampTypes, DurationTypes]?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transform is part of an example, so it won't matter if it is a breaking change.

This is to fix the offset type, as it should be a Duration.

allowed_lateness: DurationTypes,
default_start_value,
fill_start_if_missing: bool,
Expand Down Expand Up @@ -200,11 +201,23 @@ def process(

timer_started = timer_state.read()
if not timer_started:
offset_duration = Duration.of(self.offset)
slide_duration = Duration.of(self.slide_interval)
duration_duration = Duration.of(self.duration)

# Align the timestamp with the windowing scheme.
aligned_micros = (timestamp - offset_duration).micros

# Calculate the start of the last window that could contain this timestamp
last_window_start_aligned_micros = (
(aligned_micros // slide_duration.micros) * slide_duration.micros)

last_window_start = Timestamp(
micros=last_window_start_aligned_micros) + offset_duration
n = (duration_duration.micros - 1) // slide_duration.micros
# Calculate the start of the first sliding window.
first_slide_start = int(
(timestamp.micros / 1e6 - self.offset) //
self.slide_interval) * self.slide_interval + self.offset
first_slide_start_ts = Timestamp.of(first_slide_start)
first_slide_start_ts = last_window_start - Duration(
micros=n * slide_duration.micros)

# Set the initial timer to fire at the end of the first window plus
# allowed lateness.
Expand Down Expand Up @@ -256,14 +269,16 @@ def _get_windowed_values_from_state(
if not windowed_values:
# If the window is empty, use the last value.
last_value = last_value_state.read()
windowed_values.append(last_value)
value_to_insert = (window_start_ts, last_value[1])
windowed_values.append(value_to_insert)
else:
first_timestamp = windowed_values[0][0]
last_value = last_value_state.read()
if first_timestamp > window_start_ts and last_value:
# Prepend the last value if there's a gap between the first element
# in the window and the start of the window.
windowed_values = [last_value] + windowed_values
value_to_insert = (window_start_ts, last_value[1])
windowed_values = [value_to_insert] + windowed_values

# Find the last element before the beginning of the next window to update
# last_value_state.
Expand Down Expand Up @@ -334,8 +349,7 @@ def on_timer(
windowed_values = self._get_windowed_values_from_state(
buffer_state, late_start_ts, late_end_ts, last_value_state)
yield TimestampedValue(
((key, late_start_ts, late_end_ts), [v[1]
for v in windowed_values]),
(key, ((late_start_ts, late_end_ts), windowed_values)),
late_end_ts - 1)
late_start_ts += self.slide_interval

Expand All @@ -347,8 +361,7 @@ def on_timer(
windowed_values = self._get_windowed_values_from_state(
buffer_state, window_start_ts, window_end_ts, last_value_state)
yield TimestampedValue(
((key, window_start_ts, window_end_ts), [v[1]
for v in windowed_values]),
(key, ((window_start_ts, window_end_ts), windowed_values)),
window_end_ts - 1)

# Post-emit actions for the current window:
Expand Down Expand Up @@ -532,7 +545,7 @@ def __init__(
self,
duration: DurationTypes,
slide_interval: Optional[DurationTypes] = None,
offset: TimestampTypes = 0,
offset: DurationTypes = 0,
allowed_lateness: DurationTypes = 0,
default_start_value=None,
fill_start_if_missing: bool = False,
Expand Down Expand Up @@ -617,7 +630,7 @@ def expand(self, input):
self.stop_timestamp)))

if isinstance(input.element_type, TupleConstraint):
ret = keyed_output | beam.MapTuple(lambda x, y: (x[0], y))
ret = keyed_output
else:
# Remove the default key if the input PCollection was originally unkeyed.
ret = keyed_output | beam.Values()
Expand Down
Loading
Loading