From decbb4bf72add944c80a47374c09b754a050d3e2 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 6 Oct 2025 17:12:07 +0900 Subject: [PATCH 1/2] Fix race condition issues --- .../spark/storage/BlockInfoManager.scala | 28 +++++++++++++------ .../spark/storage/BlockInfoManagerSuite.scala | 20 +++++++++++++ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 80e6ab7c0a663..41eaf6f1274fb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -481,23 +481,33 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false val writeLocks = Option(writeLocksByTask.remove(taskAttemptId)).getOrElse(Collections.emptySet) writeLocks.forEach { blockId => blockInfo(blockId) { (info, condition) => - assert(info.writerTask == taskAttemptId) - info.writerTask = BlockInfo.NO_WRITER - condition.signalAll() + // Check the existence of `blockId` because `unlock` may have already removed it + // concurrently. + if (writeLocks.contains(blockId)) { + blocksWithReleasedLocks += blockId + assert(info.writerTask == taskAttemptId) + info.writerTask = BlockInfo.NO_WRITER + condition.signalAll() + } } - blocksWithReleasedLocks += blockId } val readLocks = Option(readLocksByTask.remove(taskAttemptId)) .getOrElse(ImmutableMultiset.of[BlockId]) readLocks.entrySet().forEach { entry => val blockId = entry.getElement - val lockCount = entry.getCount - blocksWithReleasedLocks += blockId blockInfo(blockId) { (info, condition) => - info.readerCount -= lockCount - assert(info.readerCount >= 0) - condition.signalAll() + // Calculating lockCount by readLocks.count instead of entry.getCount is intentional. See + // discussion in SPARK-50771 and the corresponding PR. + val lockCount = readLocks.count(blockId) + + // lockCount can be 0 if read locks for `blockId` are released in `unlock` concurrently. + if (lockCount > 0) { + blocksWithReleasedLocks += blockId + info.readerCount -= lockCount + assert(info.readerCount >= 0) + condition.signalAll() + } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index f7c7ca2bd9365..e5a71680a720d 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -403,4 +403,24 @@ class BlockInfoManagerSuite extends SparkFunSuite { } } } + + test("SPARK-53807") { + val blockId = TestBlockId("block") + assert(blockInfoManager.lockNewBlockForWriting(blockId, newBlockInfo())) + blockInfoManager.unlock(blockId) + + // Without the fix the block below almost always fails. + (0 to 10).foreach { task => + withTaskId(task) { + blockInfoManager.registerTask(task) + + assert(blockInfoManager.lockForWriting(blockId).isDefined) + + val future = Future(blockInfoManager.unlock(blockId, Option(task))) + blockInfoManager.releaseAllLocksForTask(task) + + ThreadUtils.awaitReady(future, 100.millis) + } + } + } } From 543041036e6c4c340bf35cb402d41c9095c6b9f0 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 7 Oct 2025 03:05:29 +0900 Subject: [PATCH 2/2] Modify test name --- .../scala/org/apache/spark/storage/BlockInfoManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index e5a71680a720d..4b34c13706ad8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -404,7 +404,7 @@ class BlockInfoManagerSuite extends SparkFunSuite { } } - test("SPARK-53807") { + test("SPARK-53807 - concurrent unlock and releaseAllLocksForTask for write should not fail") { val blockId = TestBlockId("block") assert(blockInfoManager.lockNewBlockForWriting(blockId, newBlockInfo())) blockInfoManager.unlock(blockId)