Skip to content

Commit c80fa85

Browse files
committed
Fix error for Regex test
1 parent 1ce3c64 commit c80fa85

File tree

4 files changed

+29
-11
lines changed

4 files changed

+29
-11
lines changed

sdks/python/apache_beam/runners/worker/operations.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ cdef class Operation(object):
8383
cdef readonly object scoped_start_state
8484
cdef readonly object scoped_process_state
8585
cdef readonly object scoped_finish_state
86-
cdef readonly object scoped_timer_processing_state
8786

8887
cdef readonly object data_sampler
8988

@@ -118,6 +117,7 @@ cdef class DoOperation(Operation):
118117
cdef dict timer_specs
119118
cdef public object input_info
120119
cdef object fn
120+
cdef readonly object scoped_timer_processing_state
121121

122122

123123
cdef class SdfProcessSizedElements(DoOperation):

sdks/python/apache_beam/runners/worker/operations.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from apache_beam.runners.worker import opcounters
5050
from apache_beam.runners.worker import operation_specs
5151
from apache_beam.runners.worker import sideinputs
52+
from apache_beam.runners.worker import statesampler
5253
from apache_beam.runners.worker.data_sampler import DataSampler
5354
from apache_beam.transforms import sideinputs as apache_sideinputs
5455
from apache_beam.transforms import combiners
@@ -451,9 +452,15 @@ def __init__(
451452
self.scoped_start_state = self.state_sampler.scoped_state(
452453
self.name_context, 'start', metrics_container=self.metrics_container)
453454
self.scoped_process_state = self.state_sampler.scoped_state(
454-
self.name_context, 'process', metrics_container=self.metrics_container)
455+
self.name_context,
456+
'process',
457+
metrics_container=self.metrics_container)
455458
self.scoped_finish_state = self.state_sampler.scoped_state(
456459
self.name_context, 'finish', metrics_container=self.metrics_container)
460+
else:
461+
self.scoped_start_state = statesampler.NOOP_SCOPED_STATE
462+
self.scoped_process_state = statesampler.NOOP_SCOPED_STATE
463+
self.scoped_finish_state = statesampler.NOOP_SCOPED_STATE
457464
# TODO(ccy): the '-abort' state can be added when the abort is supported in
458465
# Operations.
459466
self.receivers = [] # type: List[ConsumerSet]
@@ -812,9 +819,9 @@ def __init__(
812819
self.user_state_context = user_state_context
813820
self.tagged_receivers = None # type: Optional[_TaggedReceivers]
814821
# A mapping of timer tags to the input "PCollections" they come in on.
815-
# Force clean rebuild
822+
# Force clean rebuild
816823
self.input_info = None # type: Optional[OpInputInfo]
817-
self.scoped_timer_processing_state = None
824+
self.scoped_timer_processing_state = statesampler.NOOP_SCOPED_STATE
818825
if self.state_sampler:
819826
self.scoped_timer_processing_state = self.state_sampler.scoped_state(
820827
self.name_context,

sdks/python/apache_beam/runners/worker/statesampler.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ def scoped_state(
135135
state_name: str,
136136
io_target=None,
137137
metrics_container: Optional['MetricsContainer'] = None,
138-
suffix: str = '-msecs'
139-
) -> statesampler_impl.ScopedState:
138+
suffix: str = '-msecs') -> statesampler_impl.ScopedState:
140139
"""Returns a ScopedState object associated to a Step and a State.
141140
142141
Args:
@@ -171,3 +170,17 @@ def commit_counters(self) -> None:
171170
for state in self._states_by_name.values():
172171
state_msecs = int(1e-6 * state.nsecs)
173172
state.counter.update(state_msecs - state.counter.value())
173+
174+
175+
class NoOpScopedState:
176+
def __enter__(self):
177+
pass
178+
179+
def __exit__(self, exc_type, exc_val, exc_tb):
180+
pass
181+
182+
def sampled_msecs_int(self):
183+
return 0
184+
185+
186+
NOOP_SCOPED_STATE = NoOpScopedState()

sdks/python/apache_beam/runners/worker/statesampler_test.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ def test_timer_sampler(self):
139139
margin_of_error = 0.25
140140

141141
sampler.start()
142-
with sampler.scoped_state(
143-
'step1', 'process-timers', suffix='-millis'):
142+
with sampler.scoped_state('step1', 'process-timers', suffix='-millis'):
144143
time.sleep(state_duration_ms / 1000)
145144
sampler.stop()
146145
sampler.commit_counters()
@@ -151,8 +150,8 @@ def test_timer_sampler(self):
151150

152151
# Test that sampled state timings are close to their expected values.
153152
expected_counter_values = {
154-
CounterName('process-timers-millis', step_name='step1', stage_name='timer'):
155-
state_duration_ms,
153+
CounterName(
154+
'process-timers-millis', step_name='step1', stage_name='timer'): state_duration_ms,
156155
}
157156
for counter in counter_factory.get_counters():
158157
self.assertIn(counter.name, expected_counter_values)
@@ -164,7 +163,6 @@ def test_timer_sampler(self):
164163
self.assertLess(actual_value, expected_value * (1.0 + margin_of_error))
165164

166165

167-
168166
if __name__ == '__main__':
169167
logging.getLogger().setLevel(logging.INFO)
170168
unittest.main()

0 commit comments

Comments
 (0)