@@ -167,50 +167,6 @@ def test_sampler_transition_overhead(self):
167167 self .assertLess (overhead_us , 20.0 )
168168
169169 @retry (reraise = True , stop = stop_after_attempt (3 ))
170- def test_timer_sampler_operation (self ):
171- state_duration_ms = 200
172- margin_of_error = 75
173-
174- counter_factory = CounterFactory ()
175- sampler = statesampler .StateSampler (
176- 'test_stage' , counter_factory , sampling_period_ms = 1 )
177-
178- name_context = common .NameContext ('test_op' )
179- scoped_timer_state = sampler .scoped_state (
180- name_context , 'process-timers' , metrics_container = None )
181-
182- sampler .start ()
183- with scoped_timer_state :
184- time .sleep (state_duration_ms / 1000.0 )
185- sampler .stop ()
186- sampler .commit_counters ()
187-
188- if not statesampler .FAST_SAMPLER :
189- return
190-
191- expected_name = CounterName (
192- 'process-timers-msecs' , step_name = 'test_op' , stage_name = 'test_stage' )
193-
194- found_counter = None
195- for counter in counter_factory .get_counters ():
196- if counter .name == expected_name :
197- found_counter = counter
198- break
199-
200- self .assertIsNotNone (
201- found_counter , f"Expected counter '{ expected_name } ' to be created." )
202-
203- value = found_counter .value ()
204- self .assertGreater (
205- value ,
206- state_duration_ms * (1.0 - margin_of_error ),
207- f"Timer metric was too low: { value } ms." )
208- self .assertLess (
209- value ,
210- state_duration_ms * (1.0 + margin_of_error ),
211- f"Timer metric was too high: { value } ms." )
212-
213- @retry (reraise = True , stop = stop_after_attempt (30 ))
214170 # Patch the problematic function to return the correct timer spec
215171 @patch ('apache_beam.transforms.userstate.get_dofn_specs' )
216172 def test_do_operation_process_timer (self , mock_get_dofn_specs ):
@@ -280,9 +236,8 @@ def test_do_operation_process_timer(self, mock_get_dofn_specs):
280236 logging .info ("Actual value %d" , actual_value )
281237 self .assertGreater (
282238 actual_value , state_duration_ms * (1.0 - margin_of_error ))
283- self .assertLess (actual_value , state_duration_ms * (1.0 + margin_of_error ))
284239
285- @retry (reraise = True , stop = stop_after_attempt (30 ))
240+ @retry (reraise = True , stop = stop_after_attempt (3 ))
286241 @patch ('apache_beam.runners.worker.operations.userstate.get_dofn_specs' )
287242 def test_do_operation_process_timer_with_exception (self , mock_get_dofn_specs ):
288243 fn = ExceptionTimerDoFn ()
@@ -355,7 +310,6 @@ def test_do_operation_process_timer_with_exception(self, mock_get_dofn_specs):
355310 actual_value = found_counter .value ()
356311 self .assertGreater (
357312 actual_value , state_duration_ms * (1.0 - margin_of_error ))
358- self .assertLess (actual_value , state_duration_ms * (1.0 + margin_of_error ))
359313 _LOGGER .info ("Exception test finished successfully." )
360314
361315
0 commit comments