From 697278b5a41d1bbaa6c0b1996b127892e34965df Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 26 May 2025 01:38:57 +0000 Subject: [PATCH 1/6] Fix: Prevent race conditions in Python SDK monitoring info collection. This change addresses Apache Beam GitHub Issue #24776, where race conditions could occur during the collection of monitoring information in the Python SDK Harness, leading to errors such as: - SystemError: returned NULL without setting an error - RuntimeError: dictionary changed size during iteration - AttributeError: 'bytes' object has no attribute 'payload' - ValueError: non-UTF-8 strings The primary cause was concurrent access to metric data structures (specifically `MetricsContainer` and its underlying `MetricCell`s) by the DoFn execution thread (updating metrics) and the thread responsible for reporting bundle progress. The fix introduces the following: 1. A `threading.Lock` is added to the `MetricsContainer` class. This lock is acquired before any access or modification of the internal dictionaries that store metric cells (`self.counters`, `self.distributions`, `self.gauges`). This protection is applied during metric cell retrieval/creation (`get_metric_cell`) and when all monitoring information is collected for reporting (`to_runner_api_monitoring_infos`). 2. The `MetricsContainer`'s lock is passed to individual `MetricCell` instances (`CounterCell`, `DistributionCell`, `GaugeCell`) upon their creation. 3. Metric update methods within `CounterCell`, `DistributionCell`, and `GaugeCell` (e.g., `update()`, `set()`, `add_data()`) now acquire this container-level lock before modifying their internal state. This ensures that updates are atomic with respect to the collection process in `MetricsContainer.to_runner_api_monitoring_infos`. These changes ensure that metric data is read and updated in a thread-safe manner, preventing the previously observed errors caused by concurrent access and modification of shared metric state. --- sdks/python/apache_beam/metrics/cells.py | 45 ++++++++++++-------- sdks/python/apache_beam/metrics/execution.py | 30 +++++++------ 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index b4703c5b5b96..6f477e5bead8 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -61,8 +61,9 @@ class MetricCell(object): and may be subject to parallel/concurrent updates. Cells should only be used directly within a runner. """ - def __init__(self): - self._lock = threading.Lock() + def __init__(self, container_lock=None): + self._lock = threading.Lock() # Lock for this specific cell's internal data + self._container_lock = container_lock # Lock from the MetricsContainer self._start_time = None def update(self, value): @@ -106,8 +107,8 @@ class CounterCell(MetricCell): This class is thread safe. """ - def __init__(self, *args): - super().__init__(*args) + def __init__(self, container_lock=None): + super().__init__(container_lock=container_lock) self.value = 0 def reset(self): @@ -137,7 +138,11 @@ def update(self, value): # directly by circumventing the GIL. self.value += ivalue else: - with self._lock: + # If a container lock is provided, use it. Otherwise, use cell's own lock. + # This ensures that if the cell is managed by a MetricsContainer, + # the container's lock is used for thread safety across cells. + lock_to_use = self._container_lock if self._container_lock else self._lock + with lock_to_use: self.value += value def get_cumulative(self): @@ -171,8 +176,8 @@ class DistributionCell(MetricCell): This class is thread safe. """ - def __init__(self, *args): - super().__init__(*args) + def __init__(self, container_lock=None): + super().__init__(container_lock=container_lock) self.data = DistributionData.identity_element() def reset(self): @@ -190,7 +195,9 @@ def update(self, value): # We will hold the GIL throughout the entire _update. self._update(value) else: - with self._lock: + # If a container lock is provided, use it. Otherwise, use cell's own lock. + lock_to_use = self._container_lock if self._container_lock else self._lock + with lock_to_use: self._update(value) def _update(self, value): @@ -226,8 +233,8 @@ class AbstractMetricCell(MetricCell): This class is thread safe. """ - def __init__(self, data_class): - super().__init__() + def __init__(self, data_class, container_lock=None): + super().__init__(container_lock=container_lock) self.data_class = data_class self.data = self.data_class.identity_element() @@ -240,11 +247,13 @@ def combine(self, other: 'AbstractMetricCell') -> 'AbstractMetricCell': return result def set(self, value): - with self._lock: + lock_to_use = self._container_lock if self._container_lock else self._lock + with lock_to_use: self._update_locked(value) def update(self, value): - with self._lock: + lock_to_use = self._container_lock if self._container_lock else self._lock + with lock_to_use: self._update_locked(value) def _update_locked(self, value): @@ -269,8 +278,8 @@ class GaugeCell(AbstractMetricCell): This class is thread safe. """ - def __init__(self): - super().__init__(GaugeData) + def __init__(self, container_lock=None): + super().__init__(GaugeData, container_lock=container_lock) def _update_locked(self, value): # Set the value directly without checking timestamp, because @@ -298,8 +307,8 @@ class StringSetCell(AbstractMetricCell): This class is thread safe. """ - def __init__(self): - super().__init__(StringSetData) + def __init__(self, container_lock=None): + super().__init__(StringSetData, container_lock=container_lock) def add(self, value): self.update(value) @@ -327,8 +336,8 @@ class BoundedTrieCell(AbstractMetricCell): This class is thread safe. """ - def __init__(self): - super().__init__(BoundedTrieData) + def __init__(self, container_lock=None): + super().__init__(BoundedTrieData, container_lock=container_lock) def add(self, value): self.update(value) diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index a3414447c48f..0d0bb4c9ade7 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -240,7 +240,7 @@ class MetricsContainer(object): """ def __init__(self, step_name): self.step_name = step_name - self.lock = threading.Lock() + self._lock = threading.Lock() self.metrics = {} # type: Dict[_TypedMetricName, MetricCell] def get_counter(self, metric_name): @@ -272,10 +272,16 @@ def get_bounded_trie(self, metric_name): def get_metric_cell(self, typed_metric_name): # type: (_TypedMetricName) -> MetricCell + # First check without a lock. cell = self.metrics.get(typed_metric_name, None) if cell is None: - with self.lock: - cell = self.metrics[typed_metric_name] = typed_metric_name.cell_type() + # If not found, acquire lock and check again. + # This is to prevent duplicate cell creation in concurrent scenarios. + with self._lock: + cell = self.metrics.get(typed_metric_name, None) + if cell is None: + cell = self.metrics[typed_metric_name] = typed_metric_name.cell_type( + container_lock=self._lock) return cell def get_cumulative(self): @@ -323,16 +329,16 @@ def to_runner_api_monitoring_infos(self, transform_id): # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] """Returns a list of MonitoringInfos for the metrics in this container.""" - with self.lock: + with self._lock: items = list(self.metrics.items()) - all_metrics = [ - cell.to_runner_api_monitoring_info(key.metric_name, transform_id) - for key, cell in items - ] - return { - monitoring_infos.to_key(mi): mi - for mi in all_metrics if mi is not None - } + all_metrics = [ + cell.to_runner_api_monitoring_info(key.metric_name, transform_id) + for key, cell in items + ] + return { + monitoring_infos.to_key(mi): mi + for mi in all_metrics if mi is not None + } def reset(self): # type: () -> None From 42e2b3570d59097fc60657f5047e3ea7e51d1b5e Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 26 May 2025 08:59:54 -0400 Subject: [PATCH 2/6] fix lock --- sdks/python/apache_beam/metrics/execution.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 0d0bb4c9ade7..3bdcb6e62941 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -240,7 +240,7 @@ class MetricsContainer(object): """ def __init__(self, step_name): self.step_name = step_name - self._lock = threading.Lock() + self.lock = threading.Lock() self.metrics = {} # type: Dict[_TypedMetricName, MetricCell] def get_counter(self, metric_name): @@ -277,11 +277,11 @@ def get_metric_cell(self, typed_metric_name): if cell is None: # If not found, acquire lock and check again. # This is to prevent duplicate cell creation in concurrent scenarios. - with self._lock: + with self.lock: cell = self.metrics.get(typed_metric_name, None) if cell is None: cell = self.metrics[typed_metric_name] = typed_metric_name.cell_type( - container_lock=self._lock) + container_lock=self.lock) return cell def get_cumulative(self): @@ -329,7 +329,7 @@ def to_runner_api_monitoring_infos(self, transform_id): # type: (str) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] """Returns a list of MonitoringInfos for the metrics in this container.""" - with self._lock: + with self.lock: items = list(self.metrics.items()) all_metrics = [ cell.to_runner_api_monitoring_info(key.metric_name, transform_id) From 4fdc8d08104e854425f8237f574332cee0d24d6b Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 26 May 2025 09:05:32 -0400 Subject: [PATCH 3/6] fix container_lock --- sdks/python/apache_beam/metrics/cells.pxd | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/metrics/cells.pxd b/sdks/python/apache_beam/metrics/cells.pxd index ebadeec97984..2f173d7394b1 100644 --- a/sdks/python/apache_beam/metrics/cells.pxd +++ b/sdks/python/apache_beam/metrics/cells.pxd @@ -22,6 +22,7 @@ from cpython.datetime cimport datetime cdef class MetricCell(object): cdef object _lock + cdef object _container_lock cpdef bint update(self, value) except -1 cdef datetime _start_time From e283c9dcba52c0616e2aa7fbcc9d39816df18632 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 26 May 2025 10:48:26 -0400 Subject: [PATCH 4/6] fixed tests --- sdks/python/apache_beam/metrics/execution.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 3bdcb6e62941..97b36fa525dd 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -47,6 +47,7 @@ from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import DistributionCell from apache_beam.metrics.cells import GaugeCell +from apache_beam.metrics.cells import MetricCellFactory from apache_beam.metrics.cells import StringSetCell from apache_beam.metrics.cells import StringSetData from apache_beam.runners.worker import statesampler @@ -280,8 +281,16 @@ def get_metric_cell(self, typed_metric_name): with self.lock: cell = self.metrics.get(typed_metric_name, None) if cell is None: - cell = self.metrics[typed_metric_name] = typed_metric_name.cell_type( - container_lock=self.lock) + if isinstance(typed_metric_name.cell_type, MetricCellFactory): + # If it's a factory, call it without container_lock, + # as the factory's __call__ should handle cell creation. + cell = self.metrics[ + typed_metric_name] = typed_metric_name.cell_type() + else: + # Otherwise, assume it's a MetricCell class and pass container_lock. + cell = self.metrics[ + typed_metric_name] = typed_metric_name.cell_type( + container_lock=self.lock) return cell def get_cumulative(self): From f3224720bf1f487d5b3779dba733cbad09f48aec Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 26 May 2025 11:16:51 -0400 Subject: [PATCH 5/6] fixed MetricCellFactory --- sdks/python/apache_beam/metrics/execution.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 97b36fa525dd..ab69bc84d261 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -47,7 +47,6 @@ from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import DistributionCell from apache_beam.metrics.cells import GaugeCell -from apache_beam.metrics.cells import MetricCellFactory from apache_beam.metrics.cells import StringSetCell from apache_beam.metrics.cells import StringSetData from apache_beam.runners.worker import statesampler From 4c54c1e1819b2a87de7c37287ae37fd0939e3a8b Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 26 May 2025 11:26:39 -0400 Subject: [PATCH 6/6] always import MetricCellFactory --- sdks/python/apache_beam/metrics/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index ab69bc84d261..7590a50c493c 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -47,6 +47,7 @@ from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import DistributionCell from apache_beam.metrics.cells import GaugeCell +from apache_beam.metrics.cells import MetricCellFactory from apache_beam.metrics.cells import StringSetCell from apache_beam.metrics.cells import StringSetData from apache_beam.runners.worker import statesampler @@ -57,7 +58,6 @@ from apache_beam.metrics.cells import GaugeData from apache_beam.metrics.cells import DistributionData from apache_beam.metrics.cells import MetricCell - from apache_beam.metrics.cells import MetricCellFactory from apache_beam.metrics.metricbase import MetricName from apache_beam.portability.api import metrics_pb2