diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 3dc7f88e52911..0f8d54e2aecee 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -348,14 +348,15 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO if (alwaysReadBufferSize) { bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { - // Enable readAhead when reading sequentially + // Switch between enabling and disabling read ahead based on the workload read pattern. if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - LOG.debug("Sequential read with read ahead size of {}", bufferSize); + // Sequential read pattern detected. Enable read ahead. + LOG.debug("Sequential read with read size of {} and read ahead enabled", bufferSize); bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); } else { - // Enabling read ahead for random reads as well to reduce number of remote calls. + // Random read pattern detected. Disable read ahead. int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize); - LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); + LOG.debug("Random read with read size of {} and read ahead disabled", lengthWithReadAhead); bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true); } } @@ -511,8 +512,12 @@ private int readInternal(final long position, final byte[] b, final int offset, } int receivedBytes; - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; + /* + * Number of prefetches to queue for each request is configurable. + * For the first read of this input stream, we don't read ahead but keep + * the current read data in cache as pattern might not be sequential. + */ + int numReadAheads = firstRead? 1 : this.readAheadQueueDepth; long nextOffset = position; // First read to queue needs to be of readBufferSize and later // of readAhead Block size @@ -910,6 +915,10 @@ long getLimit() { return this.limit; } + boolean isFirstRead() { + return this.firstRead; + } + @VisibleForTesting BackReference getFsBackRef() { return fsBackRef; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index fe1ac3fa1f235..d3ec7a098c280 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -451,7 +451,11 @@ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, fin */ private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { ReadBuffer buffer = getFromList(getReadAheadQueue(), stream, requestedOffset); - if (buffer != null) { + /* + * If this prefetch was triggered by first read of this input stream, + * we should not remove it from queue and cache it for future purpose. + */ + if (buffer != null && !stream.isFirstRead()) { getReadAheadQueue().remove(buffer); notifyAll(); // lock is held in calling method getFreeList().push(buffer.getBufferindex()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index de49da5dc51d2..814a940c10478 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -119,14 +119,14 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) throws IOException { AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); // Create AbfsInputStream with the client instance - AbfsInputStream inputStream = new AbfsInputStream( + AbfsInputStream inputStream = Mockito.spy(new AbfsInputStream( mockAbfsClient, null, FORWARD_SLASH + fileName, - THREE_KB, + THREE_KB + ONE_KB, // First read will always bypass readahead inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB), "eTag", - getTestTracingContext(null, false)); + getTestTracingContext(null, false))); inputStream.setCachedSasToken( TestCachedSASToken.getTestCachedSASTokenInstance()); @@ -329,9 +329,11 @@ public void testFailedReadAhead() throws Exception { AbfsRestOperation successOp = getMockRestOp(); // Stub : - // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // First read will be synchronous and should succeed + // Second Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // Actual read request fails with the failure in readahead thread - doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) + doReturn(successOp) + .doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y")) .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z")) .doReturn(successOp) // Any extra calls to read, pass it. @@ -342,6 +344,9 @@ public void testFailedReadAhead() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt"); + // First read will bypass readahead and succeed. + inputStream.read(new byte[ONE_KB]); + // Scenario: ReadAhead triggered from current active read call failed // Before the change to return exception from readahead buffer, // AbfsInputStream would have triggered an extra readremote on noticing @@ -353,9 +358,9 @@ public void testFailedReadAhead() throws Exception { () -> inputStream.read(new byte[ONE_KB])); // Only the 3 readAhead threads should have triggered client.read - verifyReadCallCount(client, 3); + verifyReadCallCount(client, 4); - // Stub returns success for the 4th read request, if ReadBuffers still + // Stub returns success for the 5th read request, if ReadBuffers still // persisted, ReadAheadManager getBlock would have returned exception. checkEvictedStatus(inputStream, 0, false); } @@ -366,9 +371,11 @@ public void testFailedReadAheadEviction() throws Exception { AbfsRestOperation successOp = getMockRestOp(); getBufferManager().setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD); // Stub : - // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // First read will be synchronous and should succeed + // Second Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // Actual read request fails with the failure in readahead thread - doThrow(new TimeoutException("Internal Server error")) + doReturn(successOp) + .doThrow(new TimeoutException("Internal Server error")) .when(client) .read(any(String.class), any(Long.class), any(byte[].class), any(Integer.class), any(Integer.class), any(String.class), @@ -376,6 +383,9 @@ public void testFailedReadAheadEviction() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAheadEviction.txt"); + // First read will bypass readahead and succeed. + inputStream.read(new byte[ONE_KB]); + // Add a failed buffer to completed queue and set to no free buffers to read ahead. ReadBuffer buff = new ReadBuffer(); buff.setStatus(ReadBufferStatus.READ_FAILED); @@ -406,11 +416,13 @@ public void testOlderReadAheadFailure() throws Exception { AbfsRestOperation successOp = getMockRestOp(); // Stub : - // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // First read will be synchronous and should succeed + // Second Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // A second read request will see that readahead had failed for data in // the requested offset range and also that its is an older readahead request. // So attempt a new read only for the requested range. - doThrow(new TimeoutException("Internal Server error for RAH-X")) + doReturn(successOp) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) .doThrow(new TimeoutException("Internal Server error for RAH-Y")) .doThrow(new TimeoutException("Internal Server error for RAH-Z")) .doReturn(successOp) // pass the read for second read request @@ -422,12 +434,15 @@ public void testOlderReadAheadFailure() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt"); + // First read will bypass readahead and succeed. + inputStream.read(new byte[ONE_KB]); + // First read request that fails as the readahead triggered from this request failed. intercept(IOException.class, () -> inputStream.read(new byte[ONE_KB])); - // Only the 3 readAhead threads should have triggered client.read - verifyReadCallCount(client, 3); + // 1 Main thread and 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 4); // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old. Thread.sleep(getBufferManager().getThresholdAgeMilliseconds()); @@ -439,9 +454,9 @@ public void testOlderReadAheadFailure() throws Exception { // calls will be one more from earlier (there is a reset mock which will reset the // count, but the mock stub is erased as well which needs AbsInputStream to be recreated, // which beats the purpose) - verifyReadCallCount(client, 4); + verifyReadCallCount(client, 5); - // Stub returns success for the 5th read request, if ReadBuffers still + // Stub returns success for the 6th read request, if ReadBuffers still // persisted request would have failed for position 0. checkEvictedStatus(inputStream, 0, false); } @@ -460,15 +475,17 @@ public void testSuccessfulReadAhead() throws Exception { AbfsRestOperation op = getMockRestOp(); // Stub : - // Pass all readAheads and fail the post eviction request to + // Pass all readAhead (4 blocks) and fail the post eviction request to // prove ReadAhead buffer is used // for post eviction check, fail all read aheads doReturn(op) .doReturn(op) .doReturn(op) - .doThrow(new TimeoutException("Internal Server error for RAH-X")) - .doThrow(new TimeoutException("Internal Server error for RAH-Y")) - .doThrow(new TimeoutException("Internal Server error for RAH-Z")) + .doReturn(op) + .doThrow(new TimeoutException("Internal Server error for RAH-A")) + .doThrow(new TimeoutException("Internal Server error for RAH-B")) + .doThrow(new TimeoutException("Internal Server error for RAH-C")) + .doThrow(new TimeoutException("Internal Server error for RAH-D")) .when(client) .read(any(String.class), any(Long.class), any(byte[].class), any(Integer.class), any(Integer.class), any(String.class), @@ -477,11 +494,14 @@ public void testSuccessfulReadAhead() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); int beforeReadCompletedListSize = getBufferManager().getCompletedReadListSize(); - // First read request that triggers readAheads. + // First read request always bypasses readahead. This will succeed inputStream.read(new byte[ONE_KB]); - // Only the 3 readAhead threads should have triggered client.read - verifyReadCallCount(client, 3); + // This will start triggering readahead requests. + inputStream.read((new byte[1024])); + + // 1 Main thread and 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 4); int newAdditionsToCompletedRead = getBufferManager().getCompletedReadListSize() - beforeReadCompletedListSize; @@ -501,9 +521,9 @@ public void testSuccessfulReadAhead() throws Exception { // Once created, mock will remember all interactions. // As the above read should not have triggered any server calls, total // number of read calls made at this point will be same as last. - verifyReadCallCount(client, 3); + verifyReadCallCount(client, 4); - // Stub will throw exception for client.read() for 4th and later calls + // Stub will throw exception for client.read() for 5th and later calls // if not using the read-ahead buffer exception will be thrown on read checkEvictedStatus(inputStream, 0, true); }