Skip to content

Fix: Prevent race conditions in Python SDK monitoring info collection.#35049

Closed
liferoad wants to merge 6 commits intoapache:masterfrom
liferoad:fix-monitoring-race-condition-24776
Closed

Fix: Prevent race conditions in Python SDK monitoring info collection.#35049
liferoad wants to merge 6 commits intoapache:masterfrom
liferoad:fix-monitoring-race-condition-24776

Conversation

@liferoad
Copy link
Contributor

Testing Jules

Fixes #24776

This change addresses Apache Beam GitHub Issue #24776, where race conditions could occur during the collection of monitoring information in the Python SDK Harness, leading to errors such as:

  • SystemError: returned NULL without setting an error
  • RuntimeError: dictionary changed size during iteration
  • AttributeError: 'bytes' object has no attribute 'payload'
  • ValueError: non-UTF-8 strings

The primary cause was concurrent access to metric data structures (specifically MetricsContainer and its underlying MetricCells) by the DoFn execution thread (updating metrics) and the thread responsible for reporting bundle progress.

The fix introduces the following:

  1. A threading.Lock is added to the MetricsContainer class. This lock is acquired before any access or modification of the internal dictionaries that store metric cells (self.counters, self.distributions, self.gauges). This protection is applied during metric cell retrieval/creation (get_metric_cell) and when all monitoring information is collected for reporting (to_runner_api_monitoring_infos).
  2. The MetricsContainer's lock is passed to individual MetricCell instances (CounterCell, DistributionCell, GaugeCell) upon their creation.
  3. Metric update methods within CounterCell, DistributionCell, and GaugeCell (e.g., update(), set(), add_data()) now acquire this container-level lock before modifying their internal state. This ensures that updates are atomic with respect to the collection process in MetricsContainer.to_runner_api_monitoring_infos.

These changes ensure that metric data is read and updated in a thread-safe manner, preventing the previously observed errors caused by concurrent access and modification of shared metric state.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

This change addresses Apache Beam GitHub Issue apache#24776, where race
conditions could occur during the collection of monitoring information
in the Python SDK Harness, leading to errors such as:
- SystemError: returned NULL without setting an error
- RuntimeError: dictionary changed size during iteration
- AttributeError: 'bytes' object has no attribute 'payload'
- ValueError: non-UTF-8 strings

The primary cause was concurrent access to metric data structures
(specifically `MetricsContainer` and its underlying `MetricCell`s)
by the DoFn execution thread (updating metrics) and the thread
responsible for reporting bundle progress.

The fix introduces the following:
1.  A `threading.Lock` is added to the `MetricsContainer` class.
    This lock is acquired before any access or modification of the
    internal dictionaries that store metric cells (`self.counters`,
    `self.distributions`, `self.gauges`). This protection is applied
    during metric cell retrieval/creation (`get_metric_cell`) and
    when all monitoring information is collected for reporting
    (`to_runner_api_monitoring_infos`).
2.  The `MetricsContainer`'s lock is passed to individual `MetricCell`
    instances (`CounterCell`, `DistributionCell`, `GaugeCell`) upon
    their creation.
3.  Metric update methods within `CounterCell`, `DistributionCell`,
    and `GaugeCell` (e.g., `update()`, `set()`, `add_data()`) now acquire
    this container-level lock before modifying their internal state. This
    ensures that updates are atomic with respect to the collection
    process in `MetricsContainer.to_runner_api_monitoring_infos`.

These changes ensure that metric data is read and updated in a
thread-safe manner, preventing the previously observed errors caused
by concurrent access and modification of shared metric state.
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@liferoad
Copy link
Contributor Author

stop reviewer notifications

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: requested by reviewer. If you'd like to restart, comment assign set of reviewers

self.value += ivalue
else:
with self._lock:
# If a container lock is provided, use it. Otherwise, use cell's own lock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are cells ever not managed by a metrics container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/beam/pull/35049/files#diff-1293713097c11f7136aa69a2affccdd624a958dad8a10c68a4a31f39fa6c303dR281 make sure the cells use the lock from the metrics container from what I can tell. I think https://github.com/apache/beam/pull/35049/files#diff-1293713097c11f7136aa69a2affccdd624a958dad8a10c68a4a31f39fa6c303dR342 is probably more interesting to be reviewed.

I am not super confident about this PR in terms of testing this and the potential downside on the performance.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern about the move to a (significantly more) global lock causing performance regressions? (Are metrics containers at least per worker thread?)

@liferoad
Copy link
Contributor Author

Is there any concern about the move to a (significantly more) global lock causing performance regressions? (Are metrics containers at least per worker thread?)

I do have the concern about global lock and even the lock used by this PR.

@liferoad
Copy link
Contributor Author

liferoad commented Jul 4, 2025

Close this for now until we have a good way to test it.

@liferoad liferoad closed this Jul 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Race condition in Python SDK Harness ProcessBundleProgress

2 participants