Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,15 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
if (alwaysReadBufferSize) {
bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
// Enable readAhead when reading sequentially
// Switch between enabling and disabling read ahead based on the workload read pattern.
if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
LOG.debug("Sequential read with read ahead size of {}", bufferSize);
// Sequential read pattern detected. Enable read ahead.
LOG.debug("Sequential read with read size of {} and read ahead enabled", bufferSize);
Copy link
Preview

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message mentions 'read size of {}' but logs bufferSize instead of the actual read size (b.length). This could be misleading when debugging read operations.

Suggested change
LOG.debug("Sequential read with read size of {} and read ahead enabled", bufferSize);
LOG.debug("Sequential read with read size of {} and read ahead enabled", b.length);

Copilot uses AI. Check for mistakes.

bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
} else {
// Enabling read ahead for random reads as well to reduce number of remote calls.
// Random read pattern detected. Disable read ahead.
int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize);
LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead);
LOG.debug("Random read with read size of {} and read ahead disabled", lengthWithReadAhead);
bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true);
}
}
Expand Down Expand Up @@ -511,8 +512,12 @@ private int readInternal(final long position, final byte[] b, final int offset,
}
int receivedBytes;

// queue read-aheads
int numReadAheads = this.readAheadQueueDepth;
/*
* Number of prefetches to queue for each request is configurable.
* For the first read of this input stream, we don't read ahead but keep
* the current read data in cache as pattern might not be sequential.
*/
int numReadAheads = firstRead? 1 : this.readAheadQueueDepth;
Copy link
Preview

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The ternary operator lacks spaces around the ? operator, which deviates from Java coding conventions. It should be firstRead ? 1 : this.readAheadQueueDepth.

Suggested change
int numReadAheads = firstRead? 1 : this.readAheadQueueDepth;
int numReadAheads = firstRead ? 1 : this.readAheadQueueDepth;

Copilot uses AI. Check for mistakes.

long nextOffset = position;
// First read to queue needs to be of readBufferSize and later
// of readAhead Block size
Expand Down Expand Up @@ -910,6 +915,10 @@ long getLimit() {
return this.limit;
}

boolean isFirstRead() {
return this.firstRead;
}

@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,11 @@ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, fin
*/
private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) {
ReadBuffer buffer = getFromList(getReadAheadQueue(), stream, requestedOffset);
if (buffer != null) {
/*
* If this prefetch was triggered by first read of this input stream,
* we should not remove it from queue and cache it for future purpose.
*/
if (buffer != null && !stream.isFirstRead()) {
getReadAheadQueue().remove(buffer);
notifyAll(); // lock is held in calling method
getFreeList().push(buffer.getBufferindex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
// Create AbfsInputStream with the client instance
AbfsInputStream inputStream = new AbfsInputStream(
AbfsInputStream inputStream = Mockito.spy(new AbfsInputStream(
mockAbfsClient,
null,
FORWARD_SLASH + fileName,
THREE_KB,
THREE_KB + ONE_KB, // First read will always bypass readahead
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment suggests this change is related to first read bypassing readahead, but increasing the file size by ONE_KB doesn't clearly relate to the first read behavior. This change appears to be adjusting test data size rather than directly testing the first read bypass feature.

Suggested change
THREE_KB + ONE_KB, // First read will always bypass readahead
ONE_KB, // Set file size to match the read buffer size to ensure the first read bypasses readahead

Copilot uses AI. Check for mistakes.

inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB),
"eTag",
getTestTracingContext(null, false));
getTestTracingContext(null, false)));

inputStream.setCachedSasToken(
TestCachedSASToken.getTestCachedSASTokenInstance());
Expand Down Expand Up @@ -329,9 +329,11 @@ public void testFailedReadAhead() throws Exception {
AbfsRestOperation successOp = getMockRestOp();

// Stub :
// Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// First read will be synchronous and should succeed
// Second Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// Actual read request fails with the failure in readahead thread
doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
doReturn(successOp)
.doThrow(new TimeoutException("Internal Server error for RAH-Thread-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y"))
.doThrow(new TimeoutException("Internal Server error RAH-Thread-Z"))
.doReturn(successOp) // Any extra calls to read, pass it.
Expand All @@ -342,6 +344,9 @@ public void testFailedReadAhead() throws Exception {

AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt");

// First read will bypass readahead and succeed.
inputStream.read(new byte[ONE_KB]);

// Scenario: ReadAhead triggered from current active read call failed
// Before the change to return exception from readahead buffer,
// AbfsInputStream would have triggered an extra readremote on noticing
Expand All @@ -353,9 +358,9 @@ public void testFailedReadAhead() throws Exception {
() -> inputStream.read(new byte[ONE_KB]));

// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
verifyReadCallCount(client, 4);

// Stub returns success for the 4th read request, if ReadBuffers still
// Stub returns success for the 5th read request, if ReadBuffers still
// persisted, ReadAheadManager getBlock would have returned exception.
checkEvictedStatus(inputStream, 0, false);
}
Expand All @@ -366,16 +371,21 @@ public void testFailedReadAheadEviction() throws Exception {
AbfsRestOperation successOp = getMockRestOp();
getBufferManager().setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
// Stub :
// Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// First read will be synchronous and should succeed
// Second Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// Actual read request fails with the failure in readahead thread
doThrow(new TimeoutException("Internal Server error"))
doReturn(successOp)
.doThrow(new TimeoutException("Internal Server error"))
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
any(String.class), any(), any(TracingContext.class));

AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAheadEviction.txt");

// First read will bypass readahead and succeed.
inputStream.read(new byte[ONE_KB]);

// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
Expand Down Expand Up @@ -406,11 +416,13 @@ public void testOlderReadAheadFailure() throws Exception {
AbfsRestOperation successOp = getMockRestOp();

// Stub :
// First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// First read will be synchronous and should succeed
// Second Read request leads to 3 readahead calls: Fail all 3 readahead-client.read()
// A second read request will see that readahead had failed for data in
// the requested offset range and also that its is an older readahead request.
// So attempt a new read only for the requested range.
doThrow(new TimeoutException("Internal Server error for RAH-X"))
doReturn(successOp)
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
.doReturn(successOp) // pass the read for second read request
Expand All @@ -422,12 +434,15 @@ public void testOlderReadAheadFailure() throws Exception {

AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt");

// First read will bypass readahead and succeed.
inputStream.read(new byte[ONE_KB]);

// First read request that fails as the readahead triggered from this request failed.
intercept(IOException.class,
() -> inputStream.read(new byte[ONE_KB]));

// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// 1 Main thread and 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 4);

// Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old.
Thread.sleep(getBufferManager().getThresholdAgeMilliseconds());
Expand All @@ -439,9 +454,9 @@ public void testOlderReadAheadFailure() throws Exception {
// calls will be one more from earlier (there is a reset mock which will reset the
// count, but the mock stub is erased as well which needs AbsInputStream to be recreated,
// which beats the purpose)
verifyReadCallCount(client, 4);
verifyReadCallCount(client, 5);

// Stub returns success for the 5th read request, if ReadBuffers still
// Stub returns success for the 6th read request, if ReadBuffers still
// persisted request would have failed for position 0.
checkEvictedStatus(inputStream, 0, false);
}
Expand All @@ -460,15 +475,17 @@ public void testSuccessfulReadAhead() throws Exception {
AbfsRestOperation op = getMockRestOp();

// Stub :
// Pass all readAheads and fail the post eviction request to
// Pass all readAhead (4 blocks) and fail the post eviction request to
// prove ReadAhead buffer is used
// for post eviction check, fail all read aheads
doReturn(op)
.doReturn(op)
.doReturn(op)
.doThrow(new TimeoutException("Internal Server error for RAH-X"))
.doThrow(new TimeoutException("Internal Server error for RAH-Y"))
.doThrow(new TimeoutException("Internal Server error for RAH-Z"))
.doReturn(op)
.doThrow(new TimeoutException("Internal Server error for RAH-A"))
.doThrow(new TimeoutException("Internal Server error for RAH-B"))
.doThrow(new TimeoutException("Internal Server error for RAH-C"))
.doThrow(new TimeoutException("Internal Server error for RAH-D"))
.when(client)
.read(any(String.class), any(Long.class), any(byte[].class),
any(Integer.class), any(Integer.class), any(String.class),
Expand All @@ -477,11 +494,14 @@ public void testSuccessfulReadAhead() throws Exception {
AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt");
int beforeReadCompletedListSize = getBufferManager().getCompletedReadListSize();

// First read request that triggers readAheads.
// First read request always bypasses readahead. This will succeed
inputStream.read(new byte[ONE_KB]);

// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// This will start triggering readahead requests.
inputStream.read((new byte[1024]));
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent array size usage. The code uses new byte[1024] here while using new byte[ONE_KB] elsewhere in the same file. Using the constant ONE_KB would be more consistent and maintainable.

Suggested change
inputStream.read((new byte[1024]));
inputStream.read((new byte[ONE_KB]));

Copilot uses AI. Check for mistakes.


// 1 Main thread and 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 4);
int newAdditionsToCompletedRead =
getBufferManager().getCompletedReadListSize()
- beforeReadCompletedListSize;
Expand All @@ -501,9 +521,9 @@ public void testSuccessfulReadAhead() throws Exception {
// Once created, mock will remember all interactions.
// As the above read should not have triggered any server calls, total
// number of read calls made at this point will be same as last.
verifyReadCallCount(client, 3);
verifyReadCallCount(client, 4);

// Stub will throw exception for client.read() for 4th and later calls
// Stub will throw exception for client.read() for 5th and later calls
// if not using the read-ahead buffer exception will be thrown on read
checkEvictedStatus(inputStream, 0, true);
}
Expand Down