Skip to content

Commit 9b5a91c

Browse files
committed
Add test case to test full functionality
1 parent 188a767 commit 9b5a91c

File tree

2 files changed

+55
-3
lines changed

2 files changed

+55
-3
lines changed

sdks/python/apache_beam/runners/worker/operations.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -823,8 +823,7 @@ def __init__(
823823
self.scoped_timer_processing_state = self.state_sampler.scoped_state(
824824
self.name_context,
825825
'process-timers',
826-
metrics_container=self.metrics_container,
827-
suffix="-millis")
826+
metrics_container=self.metrics_container)
828827
# See fn_data in dataflow_runner.py
829828
# TODO: Store all the items from spec?
830829
self.fn, _, _, _, _ = (pickler.loads(self.spec.serialized_fn))

sdks/python/apache_beam/runners/worker/statesampler_test.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ def test_timer_sampler(self):
151151
# Test that sampled state timings are close to their expected values.
152152
expected_counter_values = {
153153
CounterName(
154-
'process-timers-millis', step_name='step1', stage_name='timer'): state_duration_ms,
154+
'process-timers', step_name='step1', stage_name='timer'): state_duration_ms,
155155
}
156156
for counter in counter_factory.get_counters():
157157
self.assertIn(counter.name, expected_counter_values)
@@ -162,6 +162,59 @@ def test_timer_sampler(self):
162162
self.assertGreater(actual_value, expected_value * (1.0 - margin_of_error))
163163
self.assertLess(actual_value, expected_value * (1.0 + margin_of_error))
164164

165+
@retry(reraise=True, stop=stop_after_attempt(3))
166+
def test_process_timers_metric_is_recorded(self):
167+
"""
168+
Tests that the 'process-timers-msecs' metric is correctly recorded
169+
when a state sampler is active.
170+
"""
171+
# Set up a real state sampler and counter factory.
172+
counter_factory = CounterFactory()
173+
sampler = statesampler.StateSampler(
174+
'test_stage', counter_factory, sampling_period_ms=1)
175+
176+
state_duration_ms = 100
177+
margin_of_error = 0.25
178+
179+
# Run a workload inside the 'process-timers' scoped state.
180+
sampler.start()
181+
with sampler.scoped_state('test_step', 'process-timers'):
182+
time.sleep(state_duration_ms / 1000.0)
183+
sampler.stop()
184+
sampler.commit_counters()
185+
186+
if not statesampler.FAST_SAMPLER:
187+
return
188+
189+
# Verify that the counter was created with the correct name and value.
190+
expected_counter_name = CounterName(
191+
'process-timers-msecs',
192+
step_name='test_step',
193+
stage_name='test_stage')
194+
195+
# Find the specific counter we are looking for.
196+
found_counter = None
197+
for counter in counter_factory.get_counters():
198+
if counter.name == expected_counter_name:
199+
found_counter = counter
200+
break
201+
202+
self.assertIsNotNone(
203+
found_counter,
204+
f"The expected counter '{expected_counter_name}' was not created.")
205+
206+
# Check that its value is approximately correct.
207+
actual_value = found_counter.value()
208+
expected_value = state_duration_ms
209+
self.assertGreater(
210+
actual_value,
211+
expected_value * (1.0 - margin_of_error),
212+
"The timer metric was lower than expected.")
213+
self.assertLess(
214+
actual_value,
215+
expected_value * (1.0 + margin_of_error),
216+
"The timer metric was higher than expected.")
217+
165218

166219
if __name__ == '__main__':
167220
logging.getLogger().setLevel(logging.INFO)

0 commit comments

Comments
 (0)