Skip to content

Commit 9d5e111

Browse files
authored
HADOOP-19613. [ABFS][ReadAheadV2] Refactor ReadBufferManager to isolate new code with the current working code (#7801)
1 parent c21f9bd commit 9d5e111

File tree

13 files changed

+1227
-568
lines changed

13 files changed

+1227
-568
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,41 @@ public class AbfsConfiguration{
381381
DefaultValue = DEFAULT_ENABLE_READAHEAD)
382382
private boolean enabledReadAhead;
383383

384+
@BooleanConfigurationValidatorAnnotation(
385+
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2,
386+
DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
387+
private boolean isReadAheadV2Enabled;
388+
389+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
390+
FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
391+
DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
392+
private int minReadAheadV2ThreadPoolSize;
393+
394+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
395+
FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE,
396+
DefaultValue = DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE)
397+
private int maxReadAheadV2ThreadPoolSize;
398+
399+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
400+
FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE,
401+
DefaultValue = DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE)
402+
private int minReadAheadV2BufferPoolSize;
403+
404+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
405+
FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE,
406+
DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
407+
private int maxReadAheadV2BufferPoolSize;
408+
409+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
410+
FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
411+
DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
412+
private int readAheadExecutorServiceTTLMillis;
413+
414+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
415+
FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS,
416+
DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
417+
private int readAheadV2CachedBufferTTLMillis;
418+
384419
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
385420
MinValue = 0,
386421
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -1368,6 +1403,54 @@ public boolean isReadAheadEnabled() {
13681403
return this.enabledReadAhead;
13691404
}
13701405

1406+
public int getMinReadAheadV2ThreadPoolSize() {
1407+
if (minReadAheadV2ThreadPoolSize <= 0) {
1408+
// If the minReadAheadV2ThreadPoolSize is not set, use the default value
1409+
return 2 * Runtime.getRuntime().availableProcessors();
1410+
}
1411+
return minReadAheadV2ThreadPoolSize;
1412+
}
1413+
1414+
public int getMaxReadAheadV2ThreadPoolSize() {
1415+
if (maxReadAheadV2ThreadPoolSize <= 0) {
1416+
// If the maxReadAheadV2ThreadPoolSize is not set, use the default value
1417+
return 4 * Runtime.getRuntime().availableProcessors();
1418+
}
1419+
return maxReadAheadV2ThreadPoolSize;
1420+
}
1421+
1422+
public int getMinReadAheadV2BufferPoolSize() {
1423+
if (minReadAheadV2BufferPoolSize <= 0) {
1424+
// If the minReadAheadV2BufferPoolSize is not set, use the default value
1425+
return 2 * Runtime.getRuntime().availableProcessors();
1426+
}
1427+
return minReadAheadV2BufferPoolSize;
1428+
}
1429+
1430+
public int getMaxReadAheadV2BufferPoolSize() {
1431+
if (maxReadAheadV2BufferPoolSize <= 0) {
1432+
// If the maxReadAheadV2BufferPoolSize is not set, use the default value
1433+
return 4 * Runtime.getRuntime().availableProcessors();
1434+
}
1435+
return maxReadAheadV2BufferPoolSize;
1436+
}
1437+
1438+
public int getReadAheadExecutorServiceTTLInMillis() {
1439+
return readAheadExecutorServiceTTLMillis;
1440+
}
1441+
1442+
public int getReadAheadV2CachedBufferTTLMillis() {
1443+
return readAheadV2CachedBufferTTLMillis;
1444+
}
1445+
1446+
/**
1447+
* Checks if the read-ahead v2 feature is enabled by user.
1448+
* @return true if read-ahead v2 is enabled, false otherwise.
1449+
*/
1450+
public boolean isReadAheadV2Enabled() {
1451+
return this.isReadAheadV2Enabled;
1452+
}
1453+
13711454
@VisibleForTesting
13721455
void setReadAheadEnabled(final boolean enabledReadAhead) {
13731456
this.enabledReadAhead = enabledReadAhead;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())
955955
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
956956
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
957957
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
958+
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
958959
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
959960
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
960961
.withFooterReadBufferSize(footerReadBufferSize)

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,46 @@ public final class ConfigurationKeys {
259259
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
260260

261261
/**
262-
* Enable or disable readahead buffer in AbfsInputStream.
262+
* Enable or disable readahead V1 in AbfsInputStream.
263263
* Value: {@value}.
264264
*/
265265
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";
266+
/**
267+
* Enable or disable readahead V2 in AbfsInputStream. This will work independent of V1.
268+
* Value: {@value}.
269+
*/
270+
public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2";
271+
272+
/**
273+
* Minimum number of prefetch threads in the thread pool for readahead V2.
274+
* {@value }
275+
*/
276+
public static final String FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE = "fs.azure.readahead.v2.min.thread.pool.size";
277+
/**
278+
* Maximum number of prefetch threads in the thread pool for readahead V2.
279+
* {@value }
280+
*/
281+
public static final String FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE = "fs.azure.readahead.v2.max.thread.pool.size";
282+
/**
283+
* Minimum size of the buffer pool for caching prefetched data for readahead V2.
284+
* {@value }
285+
*/
286+
public static final String FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.min.buffer.pool.size";
287+
/**
288+
* Maximum size of the buffer pool for caching prefetched data for readahead V2.
289+
* {@value }
290+
*/
291+
public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.max.buffer.pool.size";
292+
293+
/**
294+
* TTL in milliseconds for the idle threads in executor service used by read ahead v2.
295+
*/
296+
public static final String FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = "fs.azure.readahead.v2.executor.service.ttl.millis";
297+
298+
/**
299+
* TTL in milliseconds for the cached buffers in buffer pool used by read ahead v2.
300+
*/
301+
public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = "fs.azure.readahead.v2.cached.buffer.ttl.millis";
266302

267303
/** Setting this true will make the driver use it's own RemoteIterator implementation */
268304
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ public final class FileSystemConfigurations {
127127
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
128128

129129
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
130+
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
131+
public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
132+
public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
133+
public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
134+
public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
135+
public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 3_000;
136+
public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 6_000;
137+
130138
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
131139
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
132140

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
7979
private final String eTag; // eTag of the path when InputStream are created
8080
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
8181
private final boolean readAheadEnabled; // whether enable readAhead;
82+
private final boolean readAheadV2Enabled; // whether enable readAhead V2;
8283
private final String inputStreamId;
8384
private final boolean alwaysReadBufferSize;
8485
/*
@@ -130,6 +131,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
130131

131132
/** ABFS instance to be held by the input stream to avoid GC close. */
132133
private final BackReference fsBackRef;
134+
private ReadBufferManager readBufferManager;
133135

134136
public AbfsInputStream(
135137
final AbfsClient client,
@@ -150,6 +152,7 @@ public AbfsInputStream(
150152
this.eTag = eTag;
151153
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
152154
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
155+
this.readAheadV2Enabled = abfsInputStreamContext.isReadAheadV2Enabled();
153156
this.alwaysReadBufferSize
154157
= abfsInputStreamContext.shouldReadBufferSizeAlways();
155158
this.bufferedPreadDisabled = abfsInputStreamContext
@@ -173,9 +176,19 @@ public AbfsInputStream(
173176
this.fsBackRef = abfsInputStreamContext.getFsBackRef();
174177
contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter();
175178

176-
// Propagate the config values to ReadBufferManager so that the first instance
177-
// to initialize can set the readAheadBlockSize
178-
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
179+
/*
180+
* Initialize the ReadBufferManager based on whether readAheadV2 is enabled or not.
181+
* Precedence is given to ReadBufferManagerV2.
182+
* If none of the V1 and V2 are enabled, then no read ahead will be done.
183+
*/
184+
if (readAheadV2Enabled) {
185+
ReadBufferManagerV2.setReadBufferManagerConfigs(
186+
readAheadBlockSize, client.getAbfsConfiguration());
187+
readBufferManager = ReadBufferManagerV2.getBufferManager();
188+
} else {
189+
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
190+
readBufferManager = ReadBufferManagerV1.getBufferManager();
191+
}
179192
if (streamStatistics != null) {
180193
ioStatistics = streamStatistics.getIOStatistics();
181194
}
@@ -491,7 +504,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){
491504

492505
private int readInternal(final long position, final byte[] b, final int offset, final int length,
493506
final boolean bypassReadAhead) throws IOException {
494-
if (readAheadEnabled && !bypassReadAhead) {
507+
if (isReadAheadEnabled() && !bypassReadAhead) {
495508
// try reading from read-ahead
496509
if (offset != 0) {
497510
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
@@ -510,7 +523,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
510523
while (numReadAheads > 0 && nextOffset < contentLength) {
511524
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
512525
nextOffset, nextSize);
513-
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize,
526+
readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
514527
new TracingContext(readAheadTracingContext));
515528
nextOffset = nextOffset + nextSize;
516529
numReadAheads--;
@@ -519,7 +532,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
519532
}
520533

521534
// try reading from buffers first
522-
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
535+
receivedBytes = readBufferManager.getBlock(this, position, length, b);
523536
bytesFromReadAhead += receivedBytes;
524537
if (receivedBytes > 0) {
525538
incrementReadOps();
@@ -720,7 +733,9 @@ public boolean seekToNewSource(long l) throws IOException {
720733
public synchronized void close() throws IOException {
721734
LOG.debug("Closing {}", this);
722735
closed = true;
723-
ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
736+
if (readBufferManager != null) {
737+
readBufferManager.purgeBuffersForStream(this);
738+
}
724739
buffer = null; // de-reference the buffer so it can be GC'ed sooner
725740
if (contextEncryptionAdapter != null) {
726741
contextEncryptionAdapter.destroy();
@@ -773,9 +788,14 @@ byte[] getBuffer() {
773788
return buffer;
774789
}
775790

791+
/**
792+
* Checks if any version of read ahead is enabled.
793+
* If both are disabled, then skip read ahead logic.
794+
* @return true if read ahead is enabled, false otherwise.
795+
*/
776796
@VisibleForTesting
777797
public boolean isReadAheadEnabled() {
778-
return readAheadEnabled;
798+
return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != null;
779799
}
780800

781801
@VisibleForTesting

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
4141

4242
private boolean isReadAheadEnabled = true;
4343

44+
private boolean isReadAheadV2Enabled;
45+
4446
private boolean alwaysReadBufferSize;
4547

4648
private int readAheadBlockSize;
@@ -91,6 +93,12 @@ public AbfsInputStreamContext isReadAheadEnabled(
9193
return this;
9294
}
9395

96+
public AbfsInputStreamContext isReadAheadV2Enabled(
97+
final boolean isReadAheadV2Enabled) {
98+
this.isReadAheadV2Enabled = isReadAheadV2Enabled;
99+
return this;
100+
}
101+
94102
public AbfsInputStreamContext withReadAheadRange(
95103
final int readAheadRange) {
96104
this.readAheadRange = readAheadRange;
@@ -181,6 +189,10 @@ public boolean isReadAheadEnabled() {
181189
return isReadAheadEnabled;
182190
}
183191

192+
public boolean isReadAheadV2Enabled() {
193+
return isReadAheadV2Enabled;
194+
}
195+
184196
public int getReadAheadRange() {
185197
return readAheadRange;
186198
}

0 commit comments

Comments
 (0)