diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 80e6ab7c0a663..41eaf6f1274fb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -481,23 +481,33 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false val writeLocks = Option(writeLocksByTask.remove(taskAttemptId)).getOrElse(Collections.emptySet) writeLocks.forEach { blockId => blockInfo(blockId) { (info, condition) => - assert(info.writerTask == taskAttemptId) - info.writerTask = BlockInfo.NO_WRITER - condition.signalAll() + // Check the existence of `blockId` because `unlock` may have already removed it + // concurrently. + if (writeLocks.contains(blockId)) { + blocksWithReleasedLocks += blockId + assert(info.writerTask == taskAttemptId) + info.writerTask = BlockInfo.NO_WRITER + condition.signalAll() + } } - blocksWithReleasedLocks += blockId } val readLocks = Option(readLocksByTask.remove(taskAttemptId)) .getOrElse(ImmutableMultiset.of[BlockId]) readLocks.entrySet().forEach { entry => val blockId = entry.getElement - val lockCount = entry.getCount - blocksWithReleasedLocks += blockId blockInfo(blockId) { (info, condition) => - info.readerCount -= lockCount - assert(info.readerCount >= 0) - condition.signalAll() + // Calculating lockCount by readLocks.count instead of entry.getCount is intentional. See + // discussion in SPARK-50771 and the corresponding PR. + val lockCount = readLocks.count(blockId) + + // lockCount can be 0 if read locks for `blockId` are released in `unlock` concurrently. + if (lockCount > 0) { + blocksWithReleasedLocks += blockId + info.readerCount -= lockCount + assert(info.readerCount >= 0) + condition.signalAll() + } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index f7c7ca2bd9365..4b34c13706ad8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -403,4 +403,24 @@ class BlockInfoManagerSuite extends SparkFunSuite { } } } + + test("SPARK-53807 - concurrent unlock and releaseAllLocksForTask for write should not fail") { + val blockId = TestBlockId("block") + assert(blockInfoManager.lockNewBlockForWriting(blockId, newBlockInfo())) + blockInfoManager.unlock(blockId) + + // Without the fix the block below almost always fails. + (0 to 10).foreach { task => + withTaskId(task) { + blockInfoManager.registerTask(task) + + assert(blockInfoManager.lockForWriting(blockId).isDefined) + + val future = Future(blockInfoManager.unlock(blockId, Option(task))) + blockInfoManager.releaseAllLocksForTask(task) + + ThreadUtils.awaitReady(future, 100.millis) + } + } + } }