diff --git a/core/server/worker/src/main/java/alluxio/worker/block/BlockMetadataManager.java b/core/server/worker/src/main/java/alluxio/worker/block/BlockMetadataManager.java index 5c170199003c..8405c9ecbec9 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/BlockMetadataManager.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/BlockMetadataManager.java @@ -299,6 +299,26 @@ public Optional getTempBlockMeta(long blockId) { return Optional.empty(); } + + /** + * Gets the metadata of a temp block. + * + * @param sessionId the sessionId of the temp block + * @param blockId the id of the temp block + * @return metadata of the block + */ + public Optional getTempBlockMeta(long sessionId, long blockId) { + for (StorageTier tier : mTiers) { + for (StorageDir dir : tier.getStorageDirs()) { + Optional tempBlockMeta = dir.getTempBlockMeta(blockId); + if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { + return tempBlockMeta; + } + } + } + return Optional.empty(); + } + /** * Gets the {@link StorageTier} given its tierAlias. Throws an {@link IllegalArgumentException} if * the tierAlias is not found. diff --git a/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java index ce447d94abce..2a2254b22a5f 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java @@ -538,13 +538,10 @@ public void registerBlockStoreEventListener(BlockStoreEventListener listener) { private TempBlockMeta checkAndGetTempBlockMeta(long sessionId, long blockId) { Optional tempBlockMeta; try (LockResource r = new LockResource(mMetadataReadLock)) { - tempBlockMeta = mMetaManager.getTempBlockMeta(blockId); + tempBlockMeta = mMetaManager.getTempBlockMeta(sessionId, blockId); } checkState(tempBlockMeta.isPresent(), ExceptionMessage.TEMP_BLOCK_META_NOT_FOUND.getMessage(blockId)); - checkState(tempBlockMeta.get().getSessionId() == sessionId, - ExceptionMessage.BLOCK_ID_FOR_DIFFERENT_SESSION.getMessage(blockId, - tempBlockMeta.get().getSessionId(), sessionId)); return tempBlockMeta.get(); } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockReader.java b/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockReader.java index cc3f66356618..b05adf4a8909 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockReader.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockReader.java @@ -24,6 +24,7 @@ import alluxio.util.IdUtils; import alluxio.worker.block.io.BlockReader; import alluxio.worker.block.io.BlockWriter; +import alluxio.worker.block.meta.TempBlockMeta; import alluxio.worker.block.meta.UnderFileSystemBlockMeta; import com.codahale.metrics.Counter; @@ -346,11 +347,12 @@ private void updateBlockWriter(long offset) throws IOException { if (mBlockWriter != null && offset > mBlockWriter.getPosition()) { cancelBlockWriter(); } + TempBlockMeta tempBlockMeta = null; try { if (mBlockWriter == null && offset == 0 && !mBlockMeta.isNoCache()) { BlockStoreLocation loc = BlockStoreLocation.anyDirInTier( WORKER_STORAGE_TIER_ASSOC.getAlias(0)); - mLocalBlockStore.createBlock(mBlockMeta.getSessionId(), mBlockMeta.getBlockId(), + tempBlockMeta = mLocalBlockStore.createBlock(mBlockMeta.getSessionId(), mBlockMeta.getBlockId(), AllocateOptions.forCreate(mInitialBlockSize, loc)); mBlockWriter = mLocalBlockStore.createBlockWriter( mBlockMeta.getSessionId(), mBlockMeta.getBlockId()); @@ -360,6 +362,9 @@ private void updateBlockWriter(long offset) throws IOException { "Failed to update block writer for UFS block [blockId: {}, ufsPath: {}, offset: {}]: {}", mBlockMeta.getBlockId(), mBlockMeta.getUnderFileSystemPath(), offset, e.toString()); mBlockWriter = null; + if (tempBlockMeta != null) { + mLocalBlockStore.abortBlock(tempBlockMeta.getSessionId(), tempBlockMeta.getBlockId()); + } } catch (IllegalStateException e) { // This can happen when there are concurrent UFS readers who are all trying to cache to block. LOG.debug( @@ -367,6 +372,9 @@ private void updateBlockWriter(long offset) throws IOException { + "Concurrent UFS readers may be caching the same block.", mBlockMeta.getBlockId(), mBlockMeta.getUnderFileSystemPath(), offset, e); mBlockWriter = null; + if (tempBlockMeta != null) { + mLocalBlockStore.abortBlock(tempBlockMeta.getSessionId(), tempBlockMeta.getBlockId()); + } } } } diff --git a/libexec/version.sh b/libexec/version.sh new file mode 100644 index 000000000000..6461f066fb5f --- /dev/null +++ b/libexec/version.sh @@ -0,0 +1 @@ +VERSION=308-SNAPSHOT