Skip to content

Commit 1ce3c64

Browse files
committed
Fix error with no state found
1 parent c5cb784 commit 1ce3c64

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

sdks/python/apache_beam/runners/worker/operations.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ cdef class Operation(object):
8383
cdef readonly object scoped_start_state
8484
cdef readonly object scoped_process_state
8585
cdef readonly object scoped_finish_state
86+
cdef readonly object scoped_timer_processing_state
8687

8788
cdef readonly object data_sampler
8889

sdks/python/apache_beam/runners/worker/operations.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -444,12 +444,16 @@ def __init__(
444444
self.metrics_container = MetricsContainer(self.name_context.metrics_name())
445445

446446
self.state_sampler = state_sampler
447-
self.scoped_start_state = self.state_sampler.scoped_state(
448-
self.name_context, 'start', metrics_container=self.metrics_container)
449-
self.scoped_process_state = self.state_sampler.scoped_state(
450-
self.name_context, 'process', metrics_container=self.metrics_container)
451-
self.scoped_finish_state = self.state_sampler.scoped_state(
452-
self.name_context, 'finish', metrics_container=self.metrics_container)
447+
self.scoped_start_state = None
448+
self.scoped_process_state = None
449+
self.scoped_finish_state = None
450+
if self.state_sampler:
451+
self.scoped_start_state = self.state_sampler.scoped_state(
452+
self.name_context, 'start', metrics_container=self.metrics_container)
453+
self.scoped_process_state = self.state_sampler.scoped_state(
454+
self.name_context, 'process', metrics_container=self.metrics_container)
455+
self.scoped_finish_state = self.state_sampler.scoped_state(
456+
self.name_context, 'finish', metrics_container=self.metrics_container)
453457
# TODO(ccy): the '-abort' state can be added when the abort is supported in
454458
# Operations.
455459
self.receivers = [] # type: List[ConsumerSet]

0 commit comments

Comments
 (0)