Skip to content

refactor(s3stream): refactor StreamReader Block object #2685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
private boolean closed = false;

public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager,
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) {
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) {
this.streamId = streamId;
this.nextReadOffset = nextReadOffset;
this.readahead = new Readahead();
Expand Down Expand Up @@ -125,7 +125,7 @@ CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, int maxB
Throwable cause = FutureUtil.cause(ex);
if (cause != null) {
readContext.records.forEach(StreamRecordBatch::release);
for (Block block : readContext.blocks) {
for (ExternalCacheBlock block : readContext.blocks) {
block.release();
}
if (leftRetries > 0 && isRecoverable(cause)) {
Expand Down Expand Up @@ -163,10 +163,10 @@ public void close() {

void read0(ReadContext ctx, final long startOffset, final long endOffset, final int maxBytes) {
// 1. get blocks
CompletableFuture<List<Block>> getBlocksCf = getBlocks(startOffset, endOffset, maxBytes, false);
CompletableFuture<List<ExternalCacheBlock>> getBlocksCf = getBlocks(startOffset, endOffset, maxBytes, false);

// 2. wait block's data loaded
List<Block> blocks = new ArrayList<>();
List<ExternalCacheBlock> blocks = new ArrayList<>();
CompletableFuture<Void> loadBlocksCf = getBlocksCf.thenCompose(blockList -> {
blocks.addAll(blockList);
return CompletableFuture.allOf(blockList.stream().map(block -> block.loadCf).toArray(CompletableFuture[]::new));
Expand All @@ -180,7 +180,7 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final
// 3. extract records from blocks
loadBlocksCf.thenAccept(nil -> {
ctx.blocks.addAll(blocks);
Optional<Block> failedBlock = blocks.stream().filter(block -> block.exception != null).findAny();
Optional<ExternalCacheBlock> failedBlock = blocks.stream().filter(block -> block.exception != null).findAny();
if (failedBlock.isPresent()) {
ctx.cf.completeExceptionally(failedBlock.get().exception);
return;
Expand All @@ -193,7 +193,7 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final
long nextStartOffset = startOffset;
long nextEndOffset;
boolean fulfill = false;
for (Block block : blocks) {
for (ExternalCacheBlock block : blocks) {
DataBlockIndex index = block.index;
if (nextStartOffset < index.startOffset() || nextStartOffset >= index.endOffset()) {
String msg = String.format("[BUG] nextStartOffset:%d is not in the range of index:%d-%d", nextStartOffset, index.startOffset(), index.endOffset());
Expand Down Expand Up @@ -275,7 +275,7 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) {
break;
}
}
for (Block block : ctx.blocks) {
for (ExternalCacheBlock block : ctx.blocks) {
block.release();
}
// try readahead to speed up the next read
Expand All @@ -290,16 +290,16 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) {
});
}

private CompletableFuture<List<Block>> getBlocks(long startOffset, long endOffset, int maxBytes,
boolean readahead) {
private CompletableFuture<List<ExternalCacheBlock>> getBlocks(long startOffset, long endOffset, int maxBytes,
boolean readahead) {
GetBlocksContext context = new GetBlocksContext(readahead);
try {
getBlocks0(context, startOffset, endOffset, maxBytes);
} catch (Throwable ex) {
context.cf.completeExceptionally(ex);
}
context.cf.exceptionally(ex -> {
context.blocks.forEach(Block::release);
context.blocks.forEach(ExternalCacheBlock::release);
return null;
});
return context.cf;
Expand Down Expand Up @@ -340,8 +340,8 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset,
firstBlock = false;
}
// after read the data will be return to the cache, so we need to reload the data every time
block = block.newBlockWithData(ctx.readahead);
ctx.blocks.add(block);
ExternalCacheBlock externalBlock = block.newBlockWithData(ctx.readahead);
ctx.blocks.add(externalBlock);
if ((endOffset != -1L && index.endOffset() >= endOffset) || remainingSize <= 0) {
fulfill = true;
break;
Expand Down Expand Up @@ -512,8 +512,8 @@ private static boolean isRecoverable(Throwable cause) {
}

static class GetBlocksContext {
final List<Block> blocks = new ArrayList<>();
final CompletableFuture<List<Block>> cf = new CompletableFuture<>();
final List<ExternalCacheBlock> blocks = new ArrayList<>();
final CompletableFuture<List<ExternalCacheBlock>> cf = new CompletableFuture<>();
final boolean readahead;

public GetBlocksContext(boolean readahead) {
Expand All @@ -523,65 +523,77 @@ public GetBlocksContext(boolean readahead) {

static class ReadContext {
final List<StreamRecordBatch> records = new LinkedList<>();
final List<Block> blocks = new ArrayList<>();
final List<ExternalCacheBlock> blocks = new ArrayList<>();
final CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
CacheAccessType accessType = BLOCK_CACHE_HIT;
final TimerUtil start = new TimerUtil();
}

public static class ExternalCacheBlock {
final S3ObjectMetadata metadata;
final DataBlockIndex index;

CompletableFuture<Void> loadCf;

DataBlock data;
Throwable exception;

public ExternalCacheBlock(S3ObjectMetadata metadata, DataBlockIndex index) {
this.metadata = metadata;
this.index = index;
}

public void release() {
loadCf.whenComplete((v, ex) -> {
if (data != null) {
data.release();
data = null;
}
});
Copy link
Preview

Copilot AI Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loadCf field may be null when release() is called, which would cause a NullPointerException. Consider adding a null check before calling whenComplete.

Suggested change
});
if (loadCf != null) {
loadCf.whenComplete((v, ex) -> {
if (data != null) {
data.release();
data = null;
}
});
}

Copilot uses AI. Check for mistakes.

}
}

class Block {
final S3ObjectMetadata metadata;
final DataBlockIndex index;
DataBlock data;
DataBlock.FreeListenerHandle freeListenerHandle;

CompletableFuture<Void> loadCf;
Throwable exception;
boolean released = false;
boolean readCompleted = false;

public Block(S3ObjectMetadata metadata, DataBlockIndex index) {
this.metadata = metadata;
this.index = index;
}

// TODO: use different Block type, cause of the returned Block shouldn't have markReadCompleted method
public Block newBlockWithData(boolean readahead) {
public ExternalCacheBlock newBlockWithData(boolean readahead) {
// We need to create a new block with consistent data to avoid duplicated release or leak,
// cause of the loaded data maybe evicted and reloaded.
Block newBlock = new Block(metadata, index);
ExternalCacheBlock externalBlock = new ExternalCacheBlock(metadata, index);
ObjectReader objectReader = objectReaderFactory.get(metadata);
DataBlockCache.GetOptions getOptions = DataBlockCache.GetOptions.builder().readahead(readahead).build();
loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> {
newBlock.data = newData;
if (!readCompleted && data != newData) {
// the data block is first loaded or evict & reload
if (data != null) {
freeListenerHandle.close();
}
data = newData;
newData.markUnread();
freeListenerHandle = data.registerFreeListener(b -> handleBlockFree(this));
}
// the data block is first loaded or evict & reload
externalBlock.loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> {
externalBlock.data = newData;
handleDataBlockChange(newData);
Copy link
Preview

Copilot AI Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handleDataBlockChange method is called from ExternalCacheBlock but is defined in the Block class, which will cause a compilation error since ExternalCacheBlock doesn't have access to this method.

Copilot uses AI. Check for mistakes.

}).exceptionally(ex -> {
exception = ex;
newBlock.exception = ex;
externalBlock.exception = ex;
return null;
}).whenComplete((nil, ex) -> objectReader.release());
newBlock.loadCf = loadCf;
return newBlock;

return externalBlock;
}

public void release() {
if (released) {
return;
}
released = true;
loadCf.whenComplete((nil, ex) -> {
private void handleDataBlockChange(DataBlock newData) {
Copy link
Preview

Copilot AI Aug 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handleDataBlockChange method references instance fields (readCompleted, data, freeListenerHandle) from the Block class, but it's being called from ExternalCacheBlock which doesn't have these fields. This will cause compilation errors.

Copilot uses AI. Check for mistakes.

if (!readCompleted && data != newData) {
// the data block is first loaded or evict & reload
if (data != null) {
data.release();
freeListenerHandle.close();
}
});
data = newData;
newData.markUnread();
freeListenerHandle = data.registerFreeListener(b -> handleBlockFree(this));
}
}

/**
Expand All @@ -601,8 +613,7 @@ public String toString() {
"metadata=" + metadata +
", index=" + index +
", data=" + data +
", exception=" + exception +
", released=" + released +
", readCompleted=" + readCompleted +
'}';
}
}
Expand Down Expand Up @@ -645,7 +656,7 @@ public void tryReadahead(boolean cacheMiss) {
readaheadMarkOffset = nextReadaheadOffset;
inflightReadaheadCf = getBlocks(nextReadaheadOffset, -1L, nextReadaheadSize, true).thenAccept(blocks -> {
nextReadaheadOffset = blocks.isEmpty() ? nextReadaheadOffset : blocks.get(blocks.size() - 1).index.endOffset();
blocks.forEach(Block::release);
blocks.forEach(ExternalCacheBlock::release);
});
// For get block indexes and load data block are sync success,
// the whenComplete will invoke first before assign CompletableFuture to inflightReadaheadCf
Expand Down
Loading