Skip to content

Conversation

@shunping
Copy link
Collaborator

@shunping shunping commented Oct 30, 2025

Data race has been seen when we run some processing-time timer tests.

go test -v -race -count=200 -timeout 100s -run ^TestTimers_ProcessingTime_Infinity$ github.com/apache/beam/sdks/v2/go/test/integration/primitives

WARNING: DATA RACE
Write at 0x00c000a1e600 by goroutine 776:
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.mtimeHeap.Swap()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go:33 +0xa8
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*mtimeHeap).Swap()
      <autogenerated>:1 +0x20
  container/heap.down()
      /Users/shunping/go/pkg/mod/golang.org/[email protected]/src/container/heap/heap.go:114 +0x5c
  container/heap.Pop()
      /Users/shunping/go/pkg/mod/golang.org/[email protected]/src/container/heap/heap.go:62 +0x68
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*stageRefreshQueue).AdvanceTo()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime.go:92 +0xb8
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*ElementManager).Bundles.func2()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:402 +0x128

Previous read at 0x00c000a1e600 by goroutine 938:
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*stageRefreshQueue).Peek()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/processingtime.go:80 +0x21c
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*ElementManager).ProcessingTimeNow()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:2451 +0x1b4
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*stageState).injectTriggeredBundlesIfReady()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:1340 +0x2c8
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*aggregateStageKind).addPending()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:1435 +0x4e8
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*stageState).AddPending()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:1315 +0xb4
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*ElementManager).PersistBundle()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:884 +0x9d8
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.(*stage).Execute()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/stage.go:351 +0x2834
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.executePipeline.func2()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/execute.go:375 +0xf8
  golang.org/x/sync/errgroup.(*Group).Go.func1()
      /Users/shunping/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:93 +0x70

Goroutine 776 (running) created at:
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine.(*ElementManager).Bundles()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:388 +0x278
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.executePipeline()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/execute.go:351 +0x1638
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.RunPipeline()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/execute.go:90 +0x530
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices.(*Server).Run.gowrap1()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:400 +0x40

Goroutine 938 (running) created at:
  golang.org/x/sync/errgroup.(*Group).Go()
      /Users/shunping/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:78 +0x104
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.executePipeline()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/execute.go:372 +0x1974
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal.RunPipeline()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/execute.go:90 +0x530
  github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices.(*Server).Run.gowrap1()
      /Users/shunping/Projects/beam-dev-go-prism/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:400 +0x40

Most of the access to em.processTimeEvents are guarded by em.refreshCond.L lock, except for one location in injectTriggerBundlesIfReady. This PR fixes the missing spot.

Related to #36662

Internal Bug ID: 449405414

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @shunping, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical data race within the Prism runner's processing-time timer mechanism. The issue arose from concurrent read and write operations on the processTimeEvents queue without proper synchronization. The fix involves adding a missing lock acquisition and release around a specific access point, thereby ensuring the integrity and thread safety of the processing time queue operations.

Highlights

  • Data Race Fix: Resolved a data race identified in the Prism runner's processing-time timer tests, specifically when accessing the processTimeEvents queue.
  • Concurrency Control: Implemented a mutex lock around the em.ProcessingTimeNow() call within the injectTriggeredBundlesIfReady function to ensure thread-safe access to shared resources.
  • Code Clarity: Updated comments for the processTimeEvents field and the handleProcessingTimeTimer function to explicitly document the requirement for callers to hold the refreshCond.L lock.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@shunping
Copy link
Collaborator Author

cc'ed @stankiewicz

@shunping shunping requested review from lostluck and removed request for lostluck October 30, 2025 03:11
@shunping shunping self-assigned this Oct 30, 2025
@shunping shunping marked this pull request as draft October 30, 2025 03:14
state := wv[key]
endOfWindowReached := window.MaxTimestamp() < ss.input
em.refreshCond.L.Lock()
emNow := em.ProcessingTimeNow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessingTimeNow() is exported... Are we calling it outside of the engine package? If not, please unexport it.

Otherwise we'll need to move the lock holding into the method itself to handle the external caller, and have an internalU unexported version that requires manual locking.

Essentially, we can't have an exported method that requires holding an internal lock.

Copy link
Collaborator Author

@shunping shunping Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we calling it outside of the engine package?

No, it is all called within engine.

image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem as I mentioned below (#36672 (comment)) is about the order of obtaining two locks (refreshCond.L and ss.mu) is different in the watermark evaluation go routine and the go routine for running PersistBundle.

@shunping
Copy link
Collaborator Author

shunping commented Oct 30, 2025

Looks like my fix also caused deadlock:

  • In the watermark evaluation loop, we always get refreshCond.L first and then ss.mu.
  • However, in PersistBundle() (a separate Go Routine), when we need to addPending to aggregate kind, we first obtain ss.mu. Then when evaluating trigger bundle, we need to obtain refreshCond.L for checking processing time in the scenario of not using real-time clock.

@shunping
Copy link
Collaborator Author

shunping commented Oct 30, 2025

Looks like my fix also caused deadlock:

  • In the watermark evaluation loop, we always get refreshCond.L first and then ss.mu.
  • However, in PersistBundle() (a separate Go Routine), when we need to addPending to aggregate kind, we first obtain ss.mu. Then when evaluating trigger bundle, we need to obtain refreshCond.L for checking processing time in the scenario of not using real-time clock.

To resolve the deadlock mentioned here, we will have to make sure refreshCond.L is acquired before ss.mu when we need to access em.processTimeEvents in injectTriggeredBundleReady.

@shunping shunping marked this pull request as ready for review October 30, 2025 17:52
@github-actions
Copy link
Contributor

Assigning reviewers:

R: @lostluck for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha! The assumption was that the processing time handling would only happen in Stateful stages (with timers), but now it's dealing with triggers. Unfortunate, but at least this was a minor issue.

@lostluck lostluck merged commit 01e1cf6 into apache:master Oct 30, 2025
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants