|
35 | 35 | from apache_beam.typehints.typehints import TupleConstraint |
36 | 36 | from apache_beam.utils.timestamp import MAX_TIMESTAMP |
37 | 37 | from apache_beam.utils.timestamp import MIN_TIMESTAMP |
| 38 | +from apache_beam.utils.timestamp import Duration |
38 | 39 | from apache_beam.utils.timestamp import DurationTypes # pylint: disable=unused-import |
39 | 40 | from apache_beam.utils.timestamp import Timestamp |
40 | 41 | from apache_beam.utils.timestamp import TimestampTypes # pylint: disable=unused-import |
@@ -89,7 +90,7 @@ def __init__( |
89 | 90 | self, |
90 | 91 | duration: DurationTypes, |
91 | 92 | slide_interval: DurationTypes, |
92 | | - offset: TimestampTypes, |
| 93 | + offset: DurationTypes, |
93 | 94 | allowed_lateness: DurationTypes, |
94 | 95 | default_start_value, |
95 | 96 | fill_start_if_missing: bool, |
@@ -200,20 +201,23 @@ def process( |
200 | 201 |
|
201 | 202 | timer_started = timer_state.read() |
202 | 203 | if not timer_started: |
203 | | - timestamp_secs = timestamp.micros / 1e6 |
| 204 | + offset_duration = Duration.of(self.offset) |
| 205 | + slide_duration = Duration.of(self.slide_interval) |
| 206 | + duration_duration = Duration.of(self.duration) |
204 | 207 |
|
205 | 208 | # Align the timestamp with the windowing scheme. |
206 | | - aligned_timestamp = timestamp_secs - self.offset |
| 209 | + aligned_micros = (timestamp - offset_duration).micros |
207 | 210 |
|
208 | | - # Calculate the start of the last window that could contain this timestamp. |
209 | | - last_window_start_aligned = ((aligned_timestamp // self.slide_interval) * |
210 | | - self.slide_interval) |
211 | | - last_window_start = last_window_start_aligned + self.offset |
| 211 | + # Calculate the start of the last window that could contain this timestamp |
| 212 | + last_window_start_aligned_micros = ( |
| 213 | + (aligned_micros // slide_duration.micros) * slide_duration.micros) |
212 | 214 |
|
213 | | - n = (self.duration - 1) // self.slide_interval |
| 215 | + last_window_start = Timestamp( |
| 216 | + micros=last_window_start_aligned_micros) + offset_duration |
| 217 | + n = (duration_duration.micros - 1) // slide_duration.micros |
214 | 218 | # Calculate the start of the first sliding window. |
215 | | - first_slide_start = last_window_start - n * self.slide_interval |
216 | | - first_slide_start_ts = Timestamp.of(first_slide_start) |
| 219 | + first_slide_start_ts = last_window_start - Duration( |
| 220 | + micros=n * slide_duration.micros) |
217 | 221 |
|
218 | 222 | # Set the initial timer to fire at the end of the first window plus |
219 | 223 | # allowed lateness. |
@@ -541,7 +545,7 @@ def __init__( |
541 | 545 | self, |
542 | 546 | duration: DurationTypes, |
543 | 547 | slide_interval: Optional[DurationTypes] = None, |
544 | | - offset: TimestampTypes = 0, |
| 548 | + offset: DurationTypes = 0, |
545 | 549 | allowed_lateness: DurationTypes = 0, |
546 | 550 | default_start_value=None, |
547 | 551 | fill_start_if_missing: bool = False, |
|
0 commit comments