Skip to content

Commit facdc4e

Browse files
authored
Add PeriodicStream in the new time series folder. (#35300)
* Add PeriodicStream in the new time series folder. * Add some more docstrings and minor fix on test name. * Fix lints and docs.
1 parent 19ff1df commit facdc4e

File tree

3 files changed

+279
-0
lines changed

3 files changed

+279
-0
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import math
19+
import time
20+
from typing import Any
21+
from typing import Optional
22+
from typing import Sequence
23+
24+
import apache_beam as beam
25+
from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
26+
from apache_beam.runners import sdf_utils
27+
from apache_beam.transforms.periodicsequence import ImpulseSeqGenRestrictionProvider # pylint:disable=line-too-long
28+
from apache_beam.transforms.window import TimestampedValue
29+
from apache_beam.utils import timestamp
30+
31+
32+
class ImpulseStreamGenDoFn(beam.DoFn):
33+
"""
34+
Generates a periodic, unbounded stream of elements from a provided sequence.
35+
36+
(Similar to ImpulseSeqGenDoFn in apache_beam.transforms.periodicsequence)
37+
38+
This Splittable DoFn (SDF) is designed to simulate a continuous stream of
39+
data for testing or demonstration purposes. It takes a Python sequence (e.g.,
40+
a list) and emits its elements one by one in a loop, assigning a timestamp
41+
to each.
42+
43+
The DoFn operates in two modes based on the structure of the input `data`:
44+
45+
- **Non-Timestamped Data**: If `data` is a sequence of arbitrary values
46+
(e.g., `[v1, v2, ...]`), the DoFn will assign a new timestamp to each
47+
emitted element. The timestamps are calculated by starting at a given
48+
`start_time` and incrementing by a fixed `interval`.
49+
- **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each
50+
tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn
51+
will use the provided timestamp for the emitted element.
52+
53+
The rate of emission is controlled by wall-clock time. The DoFn will only
54+
emit elements whose timestamp (either calculated or provided) is in the past
55+
compared to the current system time. When it "catches up" to the present,
56+
it will pause and defer the remainder of the work.
57+
58+
Args:
59+
data: The sequence of elements to emit into the PCollection. The elements
60+
can be raw values or pre-timestamped tuples in the format
61+
`(apache_beam.utils.timestamp.Timestamp, value)`.
62+
"""
63+
def __init__(self, data: Sequence[Any]):
64+
self._data = data
65+
self._len = len(data)
66+
self._is_timestamped_value = len(data) > 0 and isinstance(
67+
data[0], tuple) and isinstance(data[0][0], timestamp.Timestamp)
68+
69+
def _get_timestamped_value(self, index, current_output_timestamp):
70+
if self._is_timestamped_value:
71+
event_time, value = self._data[index % self._len]
72+
return TimestampedValue(value, event_time)
73+
else:
74+
value = self._data[index % self._len]
75+
return TimestampedValue(value, current_output_timestamp)
76+
77+
@beam.DoFn.unbounded_per_element()
78+
def process(
79+
self,
80+
element,
81+
restriction_tracker=beam.DoFn.RestrictionParam(
82+
ImpulseSeqGenRestrictionProvider()),
83+
watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
84+
ManualWatermarkEstimator.default_provider())):
85+
start, _, interval = element
86+
87+
if isinstance(start, timestamp.Timestamp):
88+
start = start.micros / 1000000
89+
90+
assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
91+
92+
current_output_index = restriction_tracker.current_restriction().start
93+
94+
while True:
95+
current_output_timestamp = start + interval * current_output_index
96+
97+
if current_output_timestamp > time.time():
98+
# we are too ahead of time, let's wait.
99+
restriction_tracker.defer_remainder(
100+
timestamp.Timestamp(current_output_timestamp))
101+
return
102+
103+
if not restriction_tracker.try_claim(current_output_index):
104+
# nothing to claim, just stop
105+
return
106+
107+
output = self._get_timestamped_value(
108+
current_output_index, current_output_timestamp)
109+
110+
current_watermark = watermark_estimator.current_watermark()
111+
if current_watermark is None or output.timestamp > current_watermark:
112+
# ensure watermark is monotonic
113+
watermark_estimator.set_watermark(output.timestamp)
114+
115+
yield output
116+
117+
current_output_index += 1
118+
119+
120+
class PeriodicStream(beam.PTransform):
121+
"""A PTransform that generates a periodic stream of elements from a sequence.
122+
123+
This transform creates a `PCollection` by emitting elements from a provided
124+
Python sequence at a specified time interval. It is designed for use in
125+
streaming pipelines to simulate a live, continuous source of data.
126+
127+
The transform can be configured to:
128+
- Emit the sequence only once.
129+
- Repeat the sequence indefinitely or for a maximum duration.
130+
- Control the time interval between elements.
131+
132+
To ensure that the stream does not emit a burst of elements immediately at
133+
pipeline startup, a fixed warmup period is added before the first element
134+
is generated.
135+
136+
Args:
137+
data: The sequence of elements to emit into the PCollection. The elements
138+
can be raw values or pre-timestamped tuples in the format
139+
`(apache_beam.utils.timestamp.Timestamp, value)`.
140+
max_duration: The maximum total duration in seconds for the stream
141+
generation. If `None` (the default) and `repeat` is `True`, the
142+
stream is effectively infinite. If `repeat` is `False`, the stream's
143+
duration is the shorter of this value and the time required to emit
144+
the sequence once.
145+
interval: The delay in seconds between consecutive elements.
146+
Defaults to 0.1.
147+
repeat: If `True`, the input `data` sequence is emitted repeatedly.
148+
If `False` (the default), the sequence is emitted only once.
149+
"""
150+
151+
WARMUP_TIME = 2
152+
153+
def __init__(
154+
self,
155+
data: Sequence[Any],
156+
max_duration: Optional[float] = None,
157+
interval: float = 0.1,
158+
repeat: bool = False):
159+
self._data = data
160+
self._interval = interval
161+
self._repeat = repeat
162+
self._duration = len(self._data) * interval
163+
self._max_duration = max_duration if max_duration is not None else float(
164+
"inf")
165+
166+
def expand(self, pbegin):
167+
# Give the runner some time to start up so the events will not cluster
168+
# at the beginning.
169+
start = timestamp.Timestamp.now() + PeriodicStream.WARMUP_TIME
170+
171+
if not self._repeat:
172+
stop = start + min(self._duration, self._max_duration)
173+
else:
174+
stop = timestamp.MAX_TIMESTAMP if math.isinf(
175+
self._max_duration) else start + self._max_duration
176+
177+
result = (
178+
pbegin
179+
| 'ImpulseElement' >> beam.Create([(start, stop, self._interval)])
180+
| 'GenStream' >> beam.ParDo(ImpulseStreamGenDoFn(self._data)))
181+
return result
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
import unittest
20+
21+
import apache_beam as beam
22+
from apache_beam.ml.ts.util import PeriodicStream
23+
from apache_beam.options.pipeline_options import PipelineOptions
24+
from apache_beam.testing.util import assert_that
25+
from apache_beam.testing.util import equal_to
26+
from apache_beam.transforms.window import FixedWindows
27+
from apache_beam.utils.timestamp import Timestamp
28+
29+
30+
class PeriodicStreamTest(unittest.TestCase):
31+
def test_interval(self):
32+
options = PipelineOptions()
33+
start = Timestamp.now()
34+
with beam.Pipeline(options=options) as p:
35+
ret = (
36+
p | PeriodicStream([1, 2, 3, 4], interval=0.5)
37+
| beam.WindowInto(FixedWindows(0.5))
38+
| beam.WithKeys(0)
39+
| beam.GroupByKey())
40+
expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])]
41+
assert_that(ret, equal_to(expected))
42+
end = Timestamp.now()
43+
self.assertGreaterEqual(end - start, 3)
44+
self.assertLessEqual(end - start, 7)
45+
46+
def test_repeat(self):
47+
options = PipelineOptions()
48+
start = Timestamp.now()
49+
with beam.Pipeline(options=options) as p:
50+
ret = (
51+
p | PeriodicStream(
52+
[1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True)
53+
| beam.WindowInto(FixedWindows(0.5))
54+
| beam.WithKeys(0)
55+
| beam.GroupByKey())
56+
expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])]
57+
assert_that(ret, equal_to(expected))
58+
end = Timestamp.now()
59+
self.assertGreaterEqual(end - start, 3)
60+
self.assertLessEqual(end - start, 7)
61+
62+
def test_timestamped_value(self):
63+
options = PipelineOptions()
64+
start = Timestamp.now()
65+
with beam.Pipeline(options=options) as p:
66+
ret = (
67+
p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2),
68+
(Timestamp(2), 3), (Timestamp(1), 4)],
69+
interval=0.5)
70+
| beam.WindowInto(FixedWindows(0.5))
71+
| beam.WithKeys(0)
72+
| beam.GroupByKey())
73+
expected = [(0, [1, 4]), (0, [2]), (0, [3])]
74+
assert_that(ret, equal_to(expected))
75+
end = Timestamp.now()
76+
self.assertGreaterEqual(end - start, 3)
77+
self.assertLessEqual(end - start, 7)
78+
79+
80+
if __name__ == '__main__':
81+
logging.getLogger().setLevel(logging.WARNING)
82+
unittest.main()

0 commit comments

Comments
 (0)