Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
private boolean isReadAheadV2Enabled;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING,
DefaultValue = DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING)
private boolean isReadAheadV2DynamicScalingEnabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
Expand All @@ -411,6 +416,26 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
private int maxReadAheadV2BufferPoolSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS,
DefaultValue = DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS)
private int readAheadV2CpuMonitoringIntervalMillis;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE,
DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE)
private int readAheadV2ThreadPoolUpscalePercentage;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE,
DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE)
private int readAheadV2ThreadPoolDownscalePercentage;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS,
DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS)
private int readAheadV2MemoryMonitoringIntervalMillis;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
Expand All @@ -421,6 +446,16 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
private int readAheadV2CachedBufferTTLMillis;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT)
private int readAheadV2CpuUsageThresholdPercent;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT)
private int readAheadV2MemoryUsageThresholdPercent;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
Expand Down Expand Up @@ -1416,6 +1451,18 @@ public boolean isReadAheadEnabled() {
return this.enabledReadAhead;
}

/**
* Checks if the read-ahead v2 feature is enabled by user.
* @return true if read-ahead v2 is enabled, false otherwise.
*/
public boolean isReadAheadV2Enabled() {
return this.isReadAheadV2Enabled;
}

public boolean isReadAheadV2DynamicScalingEnabled() {
return isReadAheadV2DynamicScalingEnabled;
}

public int getMinReadAheadV2ThreadPoolSize() {
if (minReadAheadV2ThreadPoolSize <= 0) {
// If the minReadAheadV2ThreadPoolSize is not set, use the default value
Expand Down Expand Up @@ -1448,6 +1495,22 @@ public int getMaxReadAheadV2BufferPoolSize() {
return maxReadAheadV2BufferPoolSize;
}

public int getReadAheadV2CpuMonitoringIntervalMillis() {
return readAheadV2CpuMonitoringIntervalMillis;
}

public int getReadAheadV2ThreadPoolUpscalePercentage() {
return readAheadV2ThreadPoolUpscalePercentage;
}

public int getReadAheadV2ThreadPoolDownscalePercentage() {
return readAheadV2ThreadPoolDownscalePercentage;
}

public int getReadAheadV2MemoryMonitoringIntervalMillis() {
return readAheadV2MemoryMonitoringIntervalMillis;
}

public int getReadAheadExecutorServiceTTLInMillis() {
return readAheadExecutorServiceTTLMillis;
}
Expand All @@ -1456,12 +1519,12 @@ public int getReadAheadV2CachedBufferTTLMillis() {
return readAheadV2CachedBufferTTLMillis;
}

/**
* Checks if the read-ahead v2 feature is enabled by user.
* @return true if read-ahead v2 is enabled, false otherwise.
*/
public boolean isReadAheadV2Enabled() {
return this.isReadAheadV2Enabled;
public int getReadAheadV2CpuUsageThresholdPercent() {
return readAheadV2CpuUsageThresholdPercent;
}

public int getReadAheadV2MemoryUsageThresholdPercent() {
return readAheadV2MemoryUsageThresholdPercent;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManager;
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV1;
import org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV2;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
Expand Down Expand Up @@ -951,24 +954,24 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()))
.orElse(getAbfsConfiguration().getFooterReadBufferSize());

return new AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
getAbfsConfiguration().shouldReadBufferSizeAlways())
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withEncryptionAdapter(contextEncryptionAdapter)
.withAbfsBackRef(fsBackRef)
.build();
.withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(getAbfsConfiguration().shouldReadBufferSizeAlways())
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withEncryptionAdapter(contextEncryptionAdapter)
.withAbfsBackRef(fsBackRef)
.build();
}

public OutputStream openFileForWrite(final Path path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2";

/**
* Enable or disable dynamic scaling of thread pool and buffer pool of readahead V2.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = "fs.azure.enable.readahead.v2.dynamic.scaling";

/**
* Minimum number of prefetch threads in the thread pool for readahead V2.
* {@value }
Expand All @@ -290,6 +296,28 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.max.buffer.pool.size";

/**
* Interval in milliseconds for periodic monitoring of CPU usage and up/down scaling thread pool size accordingly.
* {@value }
*/
public static final String FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS = "fs.azure.readahead.v2.cpu.monitoring.interval.millis";

/**
* Percentage by which the thread pool size should be upscaled when CPU usage is low.
*/
public static final String FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE = "fs.azure.readahead.v2.thread.pool.upscale.percentage";

/**
* Percentage by which the thread pool size should be downscaled when CPU usage is high.
*/
public static final String FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = "fs.azure.readahead.v2.thread.pool.downscale.percentage";

/**
* Interval in milliseconds for periodic monitoring of memory usage and up/down scaling buffer pool size accordingly.
* {@value }
*/
public static final String FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = "fs.azure.readahead.v2.memory.monitoring.interval.millis";

/**
* TTL in milliseconds for the idle threads in executor service used by read ahead v2.
*/
Expand All @@ -300,6 +328,16 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = "fs.azure.readahead.v2.cached.buffer.ttl.millis";

/**
* Threshold percentage for CPU usage to scale up/down the thread pool size in read ahead v2.
*/
public static final String FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT = "fs.azure.readahead.v2.cpu.usage.threshold.percent";

/**
* Threshold percentage for memory usage to scale up/down the buffer pool size in read ahead v2.
*/
public static final String FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT = "fs.azure.readahead.v2.memory.usage.threshold.percent";

/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
/** Server side encryption key encoded in Base6format {@value}.*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,20 @@ public final class FileSystemConfigurations {
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = true;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = true;
public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 3_000;
public static final int DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS = 6_000;
public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE = 20;
public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = 30;
public static final int DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = 6_000;
public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 6_000;
public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 6_000;
public static final int DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT = 50;
public static final int DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT = 50;

public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
Expand Down Expand Up @@ -210,6 +217,7 @@ public final class FileSystemConfigurations {

public static final int ZERO = 0;
public static final int HUNDRED = 100;
public static final double ONE_HUNDRED = 100.0;
public static final long THOUSAND = 1000L;

public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
public static final int FOOTER_SIZE = 16 * ONE_KB;
public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;

private int readAheadBlockSize;
private final int readAheadBlockSize;
private final AbfsClient client;
private final Statistics statistics;
private final String path;
Expand Down Expand Up @@ -132,7 +132,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,

/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
private ReadBufferManager readBufferManager;
private final ReadBufferManager readBufferManager;

public AbfsInputStream(
final AbfsClient client,
Expand Down Expand Up @@ -532,7 +532,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
getReadBufferManager().queueReadAhead(this, nextOffset, (int) nextSize,
new TracingContext(readAheadTracingContext));
nextOffset = nextOffset + nextSize;
numReadAheads--;
Expand All @@ -541,7 +541,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
}

// try reading from buffers first
receivedBytes = readBufferManager.getBlock(this, position, length, b);
receivedBytes = getReadBufferManager().getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
Expand Down Expand Up @@ -745,8 +745,8 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
if (readBufferManager != null) {
readBufferManager.purgeBuffersForStream(this);
if (getReadBufferManager() != null) {
getReadBufferManager().purgeBuffersForStream(this);
}
buffer = null; // de-reference the buffer so it can be GC'ed sooner
if (contextEncryptionAdapter != null) {
Expand Down Expand Up @@ -807,7 +807,7 @@ byte[] getBuffer() {
*/
@VisibleForTesting
public boolean isReadAheadEnabled() {
return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != null;
return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager() != null;
}

@VisibleForTesting
Expand All @@ -825,6 +825,10 @@ public String getStreamID() {
return inputStreamId;
}

public String getETag() {
return eTag;
}

/**
* Getter for AbfsInputStreamStatistics.
*
Expand Down Expand Up @@ -922,11 +926,20 @@ long getLimit() {
return this.limit;
}

boolean isFirstRead() {
return this.firstRead;
}

@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}

@VisibleForTesting
ReadBufferManager getReadBufferManager() {
return readBufferManager;
}

@Override
public int minSeekForVectorReads() {
return S_128K;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ public AbfsInputStreamContext withAbfsBackRef(
return this;
}

public AbfsInputStreamContext withEncryptionAdapter(
ContextEncryptionAdapter contextEncryptionAdapter){
this.contextEncryptionAdapter = contextEncryptionAdapter;
return this;
}
public AbfsInputStreamContext withEncryptionAdapter(
ContextEncryptionAdapter contextEncryptionAdapter){
this.contextEncryptionAdapter = contextEncryptionAdapter;
return this;
}

public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
Expand Down Expand Up @@ -229,7 +229,7 @@ public BackReference getFsBackRef() {
return fsBackRef;
}

public ContextEncryptionAdapter getEncryptionAdapter() {
return contextEncryptionAdapter;
}
public ContextEncryptionAdapter getEncryptionAdapter() {
return contextEncryptionAdapter;
}
}
Loading