Skip to content

Conversation

sarutak
Copy link
Member

@sarutak sarutak commented Oct 6, 2025

What changes were proposed in this pull request?

This PR fixes race condition issues between unlock and releaseAllLocksForTask in BlockInfoManager.

In case read locks for a block acquired by a task are released by unclck and releaseAllLocksForTask concurrently, assertion error can happen.
The reason is calling entry.getCount in releaseAllLocksForTask can return an old value even after the count in an entry is decreased by countsForTask.remove on another thread. So info.readerCount -= lockCount can result in a negative number, causing assertion error.

This issue can be reproduced by inserting sleep into unlock and releaseAllLocksForTask like as follows.

  • unlock
   // reader counts. We need to check if the readLocksByTask per tasks are present, if they
   // are not then we know releaseAllLocksForTask has already cleaned up the read lock.
   val countsForTask = readLocksByTask.get(taskAttemptId)
+  Thread.sleep(5)
   if (countsForTask != null) {
     assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
     info.readerCount -= 1
  • releaseAllLocksForTask
+  Thread.sleep(5)
   val readLocks = Option(readLocksByTask.remove(taskAttemptId))
     .getOrElse(ImmutableMultiset.of[BlockId])
   readLocks.entrySet().forEach { entry =>
     val blockId = entry.getElement
     val lockCount = entry.getCount
+    Thread.sleep(5)
     blocksWithReleasedLocks += blockId

The Javadoc for ConcurrentHashMultiset#entrySet says as follows.

However, multiset changes may or may not be reflected in any Entry instances already retrieved from the entry set (this is implementation-dependent)

So, this PR calculates lockCount by calling readLocks.count to get the latest count, and place it within blockInfo block for exclusive execution.

Similar to read locks, a race condition isssue can happen even for write locks.
During writeLocks.forEach in releaseAllLocksForTask, a blockId can be removed from writeLocks by writeLocksByTask.get(taskAttemptId).remove(blockId) in unlock on another thread.
You can reproduce this issue by the new test added in this PR.
This PR fixes this issue by checking the existence of a blockId by writeLocks.contains(info) within blockInfo block.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Confirmed SPARK-38675 - concurrent unlock and releaseAllLocksForTask calls should not fail passes even if sleeps are inserted into unlock and releaseAllLocksForTask like as follows.

  • unlock
 val countsForTask = readLocksByTask.get(taskAttemptId)
 if (countsForTask != null) {
+  Thread.sleep(5)
   assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
   info.readerCount -= 1
   val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
  • releaseAllLocksForTask
+  Thread.sleep(5)
   val readLocks = Option(readLocksByTask.remove(taskAttemptId))
     .getOrElse(ImmutableMultiset.of[BlockId])
   readLocks.entrySet().forEach { entry =>
     val blockId = entry.getElement
  // Using readLocks.count instead of entry.getCount is intentional. See discussion in
  // SPARK-50771.
  val lockCount = readLocks.count(blockId)
+ Thread.sleep(5)

  // lockCount can be 0 if read locks for `blockId` are released in `unlock` concurrently.
  if (lockCount > 0) {
    blocksWithReleasedLocks += blockId
    info.readerCount -= lockCount

Also new test for write locks is added.

Was this patch authored or co-authored using generative AI tooling?

No.

@sarutak sarutak changed the title [SPARK-50771][SPARK-53807][CORE] Fix race condition issues between unlock and releaseAllLocksForTask in BlockManager [SPARK-50771][SPARK-53807][CORE] Fix race condition issues between unlock and releaseAllLocksForTask in BlockInfoManager Oct 6, 2025
@github-actions github-actions bot added the CORE label Oct 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant