Skip to content

Commit f7ff2d4

Browse files
committed
Assume state sampler is always defined.
1 parent a146463 commit f7ff2d4

File tree

1 file changed

+6
-13
lines changed

1 file changed

+6
-13
lines changed

sdks/python/apache_beam/runners/worker/operations.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -445,19 +445,12 @@ def __init__(
445445
self.metrics_container = MetricsContainer(self.name_context.metrics_name())
446446

447447
self.state_sampler = state_sampler
448-
if self.state_sampler:
449-
self.scoped_start_state = self.state_sampler.scoped_state(
450-
self.name_context, 'start', metrics_container=self.metrics_container)
451-
self.scoped_process_state = self.state_sampler.scoped_state(
452-
self.name_context,
453-
'process',
454-
metrics_container=self.metrics_container)
455-
self.scoped_finish_state = self.state_sampler.scoped_state(
456-
self.name_context, 'finish', metrics_container=self.metrics_container)
457-
else:
458-
self.scoped_start_state = statesampler.NOOP_SCOPED_STATE
459-
self.scoped_process_state = statesampler.NOOP_SCOPED_STATE
460-
self.scoped_finish_state = statesampler.NOOP_SCOPED_STATE
448+
self.scoped_start_state = self.state_sampler.scoped_state(
449+
self.name_context, 'start', metrics_container=self.metrics_container)
450+
self.scoped_process_state = self.state_sampler.scoped_state(
451+
self.name_context, 'process', metrics_container=self.metrics_container)
452+
self.scoped_finish_state = self.state_sampler.scoped_state(
453+
self.name_context, 'finish', metrics_container=self.metrics_container)
461454
# TODO(ccy): the '-abort' state can be added when the abort is supported in
462455
# Operations.
463456
self.receivers = [] # type: List[ConsumerSet]

0 commit comments

Comments
 (0)