diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 3db1565c7057e..81d64af71f68b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -391,6 +391,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_READAHEAD_V2) private boolean isReadAheadV2Enabled; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, + DefaultValue = DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING) + private boolean isReadAheadV2DynamicScalingEnabled; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE, DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE) @@ -411,6 +416,26 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE) private int maxReadAheadV2BufferPoolSize; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS, + DefaultValue = DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS) + private int readAheadV2CpuMonitoringIntervalMillis; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE, + DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE) + private int readAheadV2ThreadPoolUpscalePercentage; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE, + DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE) + private int readAheadV2ThreadPoolDownscalePercentage; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS, + DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS) + private int readAheadV2MemoryMonitoringIntervalMillis; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS, DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS) @@ -421,6 +446,16 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS) private int readAheadV2CachedBufferTTLMillis; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, + DefaultValue = DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT) + private int readAheadV2CpuUsageThresholdPercent; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, + DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT) + private int readAheadV2MemoryUsageThresholdPercent; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS, MinValue = 0, DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) @@ -1416,6 +1451,18 @@ public boolean isReadAheadEnabled() { return this.enabledReadAhead; } + /** + * Checks if the read-ahead v2 feature is enabled by user. + * @return true if read-ahead v2 is enabled, false otherwise. + */ + public boolean isReadAheadV2Enabled() { + return this.isReadAheadV2Enabled; + } + + public boolean isReadAheadV2DynamicScalingEnabled() { + return isReadAheadV2DynamicScalingEnabled; + } + public int getMinReadAheadV2ThreadPoolSize() { if (minReadAheadV2ThreadPoolSize <= 0) { // If the minReadAheadV2ThreadPoolSize is not set, use the default value @@ -1448,6 +1495,22 @@ public int getMaxReadAheadV2BufferPoolSize() { return maxReadAheadV2BufferPoolSize; } + public int getReadAheadV2CpuMonitoringIntervalMillis() { + return readAheadV2CpuMonitoringIntervalMillis; + } + + public int getReadAheadV2ThreadPoolUpscalePercentage() { + return readAheadV2ThreadPoolUpscalePercentage; + } + + public int getReadAheadV2ThreadPoolDownscalePercentage() { + return readAheadV2ThreadPoolDownscalePercentage; + } + + public int getReadAheadV2MemoryMonitoringIntervalMillis() { + return readAheadV2MemoryMonitoringIntervalMillis; + } + public int getReadAheadExecutorServiceTTLInMillis() { return readAheadExecutorServiceTTLMillis; } @@ -1456,12 +1519,12 @@ public int getReadAheadV2CachedBufferTTLMillis() { return readAheadV2CachedBufferTTLMillis; } - /** - * Checks if the read-ahead v2 feature is enabled by user. - * @return true if read-ahead v2 is enabled, false otherwise. - */ - public boolean isReadAheadV2Enabled() { - return this.isReadAheadV2Enabled; + public int getReadAheadV2CpuUsageThresholdPercent() { + return readAheadV2CpuUsageThresholdPercent; + } + + public int getReadAheadV2MemoryUsageThresholdPercent() { + return readAheadV2MemoryUsageThresholdPercent; } @VisibleForTesting diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 2732c0ed8fb31..51f9bbb6dbdd9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -110,6 +110,9 @@ import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.ListingSupport; +import org.apache.hadoop.fs.azurebfs.services.ReadBufferManager; +import org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV1; +import org.apache.hadoop.fs.azurebfs.services.ReadBufferManagerV2; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; @@ -951,24 +954,24 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( int footerReadBufferSize = options.map(c -> c.getInt( AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())) .orElse(getAbfsConfiguration().getFooterReadBufferSize()); + return new AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds()) - .withReadBufferSize(getAbfsConfiguration().getReadBufferSize()) - .withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth()) - .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends()) - .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled()) - .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled()) - .withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely()) - .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead()) - .withFooterReadBufferSize(footerReadBufferSize) - .withReadAheadRange(getAbfsConfiguration().getReadAheadRange()) - .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) - .withShouldReadBufferSizeAlways( - getAbfsConfiguration().shouldReadBufferSizeAlways()) - .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize()) - .withBufferedPreadDisabled(bufferedPreadDisabled) - .withEncryptionAdapter(contextEncryptionAdapter) - .withAbfsBackRef(fsBackRef) - .build(); + .withReadBufferSize(getAbfsConfiguration().getReadBufferSize()) + .withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth()) + .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends()) + .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled()) + .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled()) + .withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely()) + .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead()) + .withFooterReadBufferSize(footerReadBufferSize) + .withReadAheadRange(getAbfsConfiguration().getReadAheadRange()) + .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) + .withShouldReadBufferSizeAlways(getAbfsConfiguration().shouldReadBufferSizeAlways()) + .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize()) + .withBufferedPreadDisabled(bufferedPreadDisabled) + .withEncryptionAdapter(contextEncryptionAdapter) + .withAbfsBackRef(fsBackRef) + .build(); } public OutputStream openFileForWrite(final Path path, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 50a88ab4e4587..520e0d8831f29 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -269,6 +269,12 @@ public final class ConfigurationKeys { */ public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2"; + /** + * Enable or disable dynamic scaling of thread pool and buffer pool of readahead V2. + * Value: {@value}. + */ + public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = "fs.azure.enable.readahead.v2.dynamic.scaling"; + /** * Minimum number of prefetch threads in the thread pool for readahead V2. * {@value } @@ -290,6 +296,28 @@ public final class ConfigurationKeys { */ public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.max.buffer.pool.size"; + /** + * Interval in milliseconds for periodic monitoring of CPU usage and up/down scaling thread pool size accordingly. + * {@value } + */ + public static final String FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS = "fs.azure.readahead.v2.cpu.monitoring.interval.millis"; + + /** + * Percentage by which the thread pool size should be upscaled when CPU usage is low. + */ + public static final String FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE = "fs.azure.readahead.v2.thread.pool.upscale.percentage"; + + /** + * Percentage by which the thread pool size should be downscaled when CPU usage is high. + */ + public static final String FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = "fs.azure.readahead.v2.thread.pool.downscale.percentage"; + + /** + * Interval in milliseconds for periodic monitoring of memory usage and up/down scaling buffer pool size accordingly. + * {@value } + */ + public static final String FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = "fs.azure.readahead.v2.memory.monitoring.interval.millis"; + /** * TTL in milliseconds for the idle threads in executor service used by read ahead v2. */ @@ -300,6 +328,16 @@ public final class ConfigurationKeys { */ public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = "fs.azure.readahead.v2.cached.buffer.ttl.millis"; + /** + * Threshold percentage for CPU usage to scale up/down the thread pool size in read ahead v2. + */ + public static final String FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT = "fs.azure.readahead.v2.cpu.usage.threshold.percent"; + + /** + * Threshold percentage for memory usage to scale up/down the buffer pool size in read ahead v2. + */ + public static final String FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT = "fs.azure.readahead.v2.memory.usage.threshold.percent"; + /** Setting this true will make the driver use it's own RemoteIterator implementation */ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; /** Server side encryption key encoded in Base6format {@value}.*/ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 8bcd55aee8e35..5360d643aa935 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -128,13 +128,20 @@ public final class FileSystemConfigurations { public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; public static final boolean DEFAULT_ENABLE_READAHEAD = true; - public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false; + public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = true; + public static final boolean DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = true; public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1; public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1; public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1; public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1; - public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 3_000; + public static final int DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS = 6_000; + public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE = 20; + public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = 30; + public static final int DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = 6_000; + public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 6_000; public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 6_000; + public static final int DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT = 50; + public static final int DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT = 50; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; @@ -210,6 +217,7 @@ public final class FileSystemConfigurations { public static final int ZERO = 0; public static final int HUNDRED = 100; + public static final double ONE_HUNDRED = 100.0; public static final long THOUSAND = 1000L; public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 38b49603fbb00..05bc30f4b373f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -69,7 +69,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, public static final int FOOTER_SIZE = 16 * ONE_KB; public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2; - private int readAheadBlockSize; + private final int readAheadBlockSize; private final AbfsClient client; private final Statistics statistics; private final String path; @@ -132,7 +132,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, /** ABFS instance to be held by the input stream to avoid GC close. */ private final BackReference fsBackRef; - private ReadBufferManager readBufferManager; + private final ReadBufferManager readBufferManager; public AbfsInputStream( final AbfsClient client, @@ -532,7 +532,7 @@ private int readInternal(final long position, final byte[] b, final int offset, while (numReadAheads > 0 && nextOffset < contentLength) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); - readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize, + getReadBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, new TracingContext(readAheadTracingContext)); nextOffset = nextOffset + nextSize; numReadAheads--; @@ -541,7 +541,7 @@ private int readInternal(final long position, final byte[] b, final int offset, } // try reading from buffers first - receivedBytes = readBufferManager.getBlock(this, position, length, b); + receivedBytes = getReadBufferManager().getBlock(this, position, length, b); bytesFromReadAhead += receivedBytes; if (receivedBytes > 0) { incrementReadOps(); @@ -745,8 +745,8 @@ public boolean seekToNewSource(long l) throws IOException { public synchronized void close() throws IOException { LOG.debug("Closing {}", this); closed = true; - if (readBufferManager != null) { - readBufferManager.purgeBuffersForStream(this); + if (getReadBufferManager() != null) { + getReadBufferManager().purgeBuffersForStream(this); } buffer = null; // de-reference the buffer so it can be GC'ed sooner if (contextEncryptionAdapter != null) { @@ -807,7 +807,7 @@ byte[] getBuffer() { */ @VisibleForTesting public boolean isReadAheadEnabled() { - return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != null; + return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager() != null; } @VisibleForTesting @@ -825,6 +825,10 @@ public String getStreamID() { return inputStreamId; } + public String getETag() { + return eTag; + } + /** * Getter for AbfsInputStreamStatistics. * @@ -922,11 +926,20 @@ long getLimit() { return this.limit; } + boolean isFirstRead() { + return this.firstRead; + } + @VisibleForTesting BackReference getFsBackRef() { return fsBackRef; } + @VisibleForTesting + ReadBufferManager getReadBufferManager() { + return readBufferManager; + } + @Override public int minSeekForVectorReads() { return S_128K; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index f6272492d6081..86edee10155c7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -152,11 +152,11 @@ public AbfsInputStreamContext withAbfsBackRef( return this; } - public AbfsInputStreamContext withEncryptionAdapter( - ContextEncryptionAdapter contextEncryptionAdapter){ - this.contextEncryptionAdapter = contextEncryptionAdapter; - return this; - } + public AbfsInputStreamContext withEncryptionAdapter( + ContextEncryptionAdapter contextEncryptionAdapter){ + this.contextEncryptionAdapter = contextEncryptionAdapter; + return this; + } public AbfsInputStreamContext build() { if (readBufferSize > readAheadBlockSize) { @@ -229,7 +229,7 @@ public BackReference getFsBackRef() { return fsBackRef; } - public ContextEncryptionAdapter getEncryptionAdapter() { - return contextEncryptionAdapter; - } + public ContextEncryptionAdapter getEncryptionAdapter() { + return contextEncryptionAdapter; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 9ce926d841c84..930482b2825d8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -20,15 +20,18 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED; -class ReadBuffer { +public class ReadBuffer { private AbfsInputStream stream; + private String eTag; + private String path; // path of the file this buffer is for private long offset; // offset within the file for the buffer private int length; // actual length, set after the buffer is filles private int requestedLength; // requested length of the read @@ -44,6 +47,7 @@ class ReadBuffer { private boolean isFirstByteConsumed = false; private boolean isLastByteConsumed = false; private boolean isAnyByteConsumed = false; + private AtomicInteger refCount = new AtomicInteger(0); private IOException errException = null; @@ -51,10 +55,26 @@ public AbfsInputStream getStream() { return stream; } + public String getETag() { + return eTag; + } + + public String getPath() { + return path; + } + public void setStream(AbfsInputStream stream) { this.stream = stream; } + public void setETag(String eTag) { + this.eTag = eTag; + } + + public void setPath(String path) { + this.path = path; + } + public void setTracingContext(TracingContext tracingContext) { this.tracingContext = tracingContext; } @@ -122,6 +142,20 @@ public void setStatus(ReadBufferStatus status) { } } + public void startReading() { + refCount.getAndIncrement(); + } + + public void endReading() { + if (refCount.decrementAndGet() < 0) { + throw new IllegalStateException("ReadBuffer refCount cannot be negative"); + } + } + + public int getRefCount() { + return refCount.get(); + } + public CountDownLatch getLatch() { return latch; } @@ -162,4 +196,7 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) { this.isAnyByteConsumed = isAnyByteConsumed; } + public boolean isFullyConsumed() { + return isFirstByteConsumed() && isLastByteConsumed(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 9ee128fbc3275..d62f53cf83072 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -119,7 +119,6 @@ abstract void doneReading(ReadBuffer buffer, */ abstract void purgeBuffersForStream(AbfsInputStream stream); - // Following Methods are for testing purposes only and should not be used in production code. /** @@ -264,7 +263,7 @@ protected synchronized List getReadAheadQueueCopy() { * @return a list of in-progress {@link ReadBuffer} objects */ @VisibleForTesting - protected synchronized List getInProgressCopiedList() { + protected synchronized List getInProgressListCopy() { return new ArrayList<>(inProgressList); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index fe1ac3fa1f235..e690ec8adb0ec 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -33,7 +33,7 @@ * The Read Buffer Manager for Rest AbfsClient. * V1 implementation of ReadBufferManager. */ -final class ReadBufferManagerV1 extends ReadBufferManager { +public final class ReadBufferManagerV1 extends ReadBufferManager { private static final int NUM_BUFFERS = 16; private static final int NUM_THREADS = 8; @@ -52,7 +52,7 @@ private ReadBufferManagerV1() { * Sets the read buffer manager configurations. * @param readAheadBlockSize the size of the read-ahead block in bytes */ - static void setReadBufferManagerConfigs(int readAheadBlockSize) { + public static void setReadBufferManagerConfigs(int readAheadBlockSize) { if (bufferManager == null) { LOGGER.debug( "ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}", @@ -66,7 +66,7 @@ static void setReadBufferManagerConfigs(int readAheadBlockSize) { * Returns the singleton instance of ReadBufferManagerV1. * @return the singleton instance of ReadBufferManagerV1 */ - static ReadBufferManagerV1 getBufferManager() { + public static ReadBufferManagerV1 getBufferManager() { if (bufferManager == null) { LOCK.lock(); try { @@ -88,7 +88,7 @@ static ReadBufferManagerV1 getBufferManager() { void init() { buffers = new byte[NUM_BUFFERS][]; for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. These byte arrays are never garbage collected + buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. The byte array never goes back to GC getFreeList().add(i); } for (int i = 0; i < NUM_THREADS; i++) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index 9cce860127dae..fa99ed45dfb2d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -17,67 +17,91 @@ */ package org.apache.hadoop.fs.azurebfs.services; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; + +import com.sun.management.OperatingSystemMXBean; + import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; -import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.classification.VisibleForTesting; -final class ReadBufferManagerV2 extends ReadBufferManager { +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_HUNDRED; + +/** + * The Improved Read Buffer Manager for Rest AbfsClient. + */ +public final class ReadBufferManagerV2 extends ReadBufferManager { + // Internal constants + private static final ReentrantLock LOCK = new ReentrantLock(); // Thread Pool Configurations private static int minThreadPoolSize; private static int maxThreadPoolSize; + private static int cpuMonitoringIntervalInMilliSec; + private static double cpuThreshold; + private static int threadPoolUpscalePercentage; + private static int threadPoolDownscalePercentage; private static int executorServiceKeepAliveTimeInMilliSec; + private static final double threadPoolRequirementBuffer = 1.2; // 20% more threads than the queue size + private static boolean isDynamicScalingEnabled; + + private ScheduledExecutorService cpuMonitorThread; private ThreadPoolExecutor workerPool; + private final List workerRefs = new ArrayList<>(); // Buffer Pool Configurations private static int minBufferPoolSize; private static int maxBufferPoolSize; + private static int memoryMonitoringIntervalInMilliSec; + private static double memoryThreshold; + private int numberOfActiveBuffers = 0; private byte[][] bufferPool; + private Stack removedBufferList = new Stack<>(); + private ScheduledExecutorService memoryMonitorThread; + // Buffer Manager Structures private static ReadBufferManagerV2 bufferManager; - - // hide instance constructor - private ReadBufferManagerV2() { - LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); - } + private static boolean isConfigured = false; /** - * Sets the read buffer manager configurations. - * @param readAheadBlockSize the size of the read-ahead block in bytes - * @param abfsConfiguration the AbfsConfiguration instance for other configurations + * Private constructor to prevent instantiation as this needs to be singleton. */ - static void setReadBufferManagerConfigs(int readAheadBlockSize, AbfsConfiguration abfsConfiguration) { - if (bufferManager == null) { - minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize(); - maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize(); - executorServiceKeepAliveTimeInMilliSec = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis(); - - minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize(); - maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize(); - setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis()); - setReadAheadBlockSize(readAheadBlockSize); - } + private ReadBufferManagerV2() { + printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch"); } - /** - * Returns the singleton instance of ReadBufferManagerV2. - * @return the singleton instance of ReadBufferManagerV2 - */ - static ReadBufferManagerV2 getBufferManager() { + public static ReadBufferManagerV2 getBufferManager() { + if (!isConfigured) { + throw new IllegalStateException("ReadBufferManagerV2 is not configured. " + + "Please call setReadBufferManagerConfigs() before calling getBufferManager()."); + } if (bufferManager == null) { LOCK.lock(); try { if (bufferManager == null) { bufferManager = new ReadBufferManagerV2(); bufferManager.init(); + LOGGER.trace("ReadBufferManagerV2 singleton initialized"); } } finally { LOCK.unlock(); @@ -87,17 +111,52 @@ static ReadBufferManagerV2 getBufferManager() { } /** - * {@inheritDoc} + * Set the ReadBufferManagerV2 configurations based on the provided before singleton initialization. + * @param readAheadBlockSize the read-ahead block size to set for the ReadBufferManagerV2. + * @param abfsConfiguration the configuration to set for the ReadBufferManagerV2. + */ + public static void setReadBufferManagerConfigs(final int readAheadBlockSize, + final AbfsConfiguration abfsConfiguration) { + // Set Configs only before initializations. + if (bufferManager == null) { + minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize(); + maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize(); + cpuMonitoringIntervalInMilliSec = abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis(); + cpuThreshold = abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()/ ONE_HUNDRED; + threadPoolUpscalePercentage = abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage(); + threadPoolDownscalePercentage = abfsConfiguration.getReadAheadV2ThreadPoolDownscalePercentage(); + executorServiceKeepAliveTimeInMilliSec = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis(); + + minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize(); + maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize(); + memoryMonitoringIntervalInMilliSec = abfsConfiguration.getReadAheadV2MemoryMonitoringIntervalMillis(); + memoryThreshold = abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent()/ ONE_HUNDRED; + setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis()); + isDynamicScalingEnabled = abfsConfiguration.isReadAheadV2DynamicScalingEnabled(); + setReadAheadBlockSize(readAheadBlockSize); + setIsConfigured(true); + } + } + + /** + * Initialize the singleton ReadBufferManagerV2. */ @Override void init() { // Initialize Buffer Pool bufferPool = new byte[maxBufferPoolSize][]; for (int i = 0; i < minBufferPoolSize; i++) { - bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. These byte arrays are never garbage collected + bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. The byte array never goes back to GC getFreeList().add(i); numberOfActiveBuffers++; } + memoryMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread t = new Thread(runnable, "ReadAheadV2-Memory-Monitor"); + t.setDaemon(true); + return t; + }); + memoryMonitorThread.scheduleAtFixedRate(this::scheduledEviction, + memoryMonitoringIntervalInMilliSec, memoryMonitoringIntervalInMilliSec, TimeUnit.MILLISECONDS); // Initialize a Fixed Size Thread Pool with minThreadPoolSize threads workerPool = new ThreadPoolExecutor( @@ -106,123 +165,688 @@ void init() { executorServiceKeepAliveTimeInMilliSec, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), - namedThreadFactory); + workerThreadFactory); workerPool.allowCoreThreadTimeOut(true); for (int i = 0; i < minThreadPoolSize; i++) { - ReadBufferWorker worker = new ReadBufferWorker(i, this); + ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager()); + workerRefs.add(worker); workerPool.submit(worker); } ReadBufferWorker.UNLEASH_WORKERS.countDown(); + + if (isDynamicScalingEnabled) { + cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor"); + t.setDaemon(true); + return t; + }); + cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool, + cpuMonitoringIntervalInMilliSec, cpuMonitoringIntervalInMilliSec, + TimeUnit.MILLISECONDS); + } + + printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {} worker threads", + numberOfActiveBuffers, workerRefs.size()); } /** - * {@inheritDoc} + * {@link AbfsInputStream} calls this method to queueing read-ahead. + * @param stream which read-ahead is requested from. + * @param requestedOffset The offset in the file which should be read. + * @param requestedLength The length to read. */ @Override - public void queueReadAhead(final AbfsInputStream stream, - final long requestedOffset, - final int requestedLength, - final TracingContext tracingContext) { - // TODO: To be implemented + public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, + final int requestedLength, TracingContext tracingContext) { + printTraceLog("Start Queueing readAhead for file: {}, with eTag: {}, offset: {}, length: {}, triggered by stream: {}", + stream.getPath(), stream.getETag(), requestedOffset, requestedLength, stream.hashCode()); + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream.getETag(), requestedOffset)) { + // Already queued for this offset, so skip queuing. + printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, offset: {}, triggered by stream: {} as it is already queued", + stream.getPath(), stream.getETag(), requestedOffset, stream.hashCode()); + return; + } + if (getFreeList().isEmpty() && !tryMemoryUpscale() && !tryEvict()) { + // No buffers are available and more buffers cannot be created. Skip queuing. + printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, offset: {}, triggered by stream: {} as no buffers are available", + stream.getPath(), stream.getETag(), requestedOffset, stream.hashCode()); + return; + } + + // Create a new ReadBuffer to keep the prefetched data and queue. + buffer = new ReadBuffer(); + buffer.setStream(stream); // To map buffer with stream that requested it + buffer.setETag(stream.getETag()); // To map buffer with file it belongs to + buffer.setPath(stream.getPath()); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.setTracingContext(tracingContext); + + if (getFreeList().isEmpty()) { + /* + * By now there should be at least one buffer available. + * This is to double sure that after upscaling or eviction, + * we still have free buffer available. If not, we skip queueing. + */ + return; + } + Integer bufferIndex = getFreeList().pop(); + buffer.setBuffer(bufferPool[bufferIndex]); + buffer.setBufferindex(bufferIndex); + getReadAheadQueue().add(buffer); + notifyAll(); + printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset: {}, buffer idx: {}, triggered by stream: {}", + stream.getPath(), stream.getETag(), requestedOffset, buffer.getBufferindex(), stream.hashCode()); + } } /** - * {@inheritDoc} + * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do its own + * read to get the data faster (compared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream of the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read */ @Override - public int getBlock(final AbfsInputStream stream, - final long position, - final int length, - final byte[] buffer) throws IOException { - // TODO: To be implemented + public int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException { + // not synchronized, so have to be careful with locking + printTraceLog("getBlock request for file: {}, with eTag: {}, for position: {} for length: {} received from stream: {}", + stream.getPath(), stream.getETag(), position, length, stream.hashCode()); + + String requestedETag = stream.getETag(); + boolean isFirstRead = stream.isFirstRead(); + + // Wait for any in-progress read to complete. + waitForProcess(requestedETag, position, isFirstRead); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(requestedETag, position, length, buffer); + } + if (bytesRead > 0) { + printTraceLog("Done read from Cache for the file with eTag: {}, position: {}, length: {}, requested by stream: {}", + requestedETag, position, bytesRead, stream.hashCode()); + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do its own read return 0; } /** - * {@inheritDoc} + * {@link ReadBufferWorker} thread calls this to get the next buffer that it should work on. + * @return {@link ReadBuffer} + * @throws InterruptedException if thread is interrupted */ @Override public ReadBuffer getNextBlockToRead() throws InterruptedException { - // TODO: To be implemented - return null; + ReadBuffer buffer = null; + synchronized (this) { + // Blocking Call to wait for prefetch to be queued. + while (getReadAheadQueue().size() == 0) { + wait(); + } + + buffer = getReadAheadQueue().remove(); + notifyAll(); + if (buffer == null) { + return null; + } + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); + getInProgressList().add(buffer); + } + printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for offset: {}, queued by stream: {}", + buffer.getPath(), buffer.getETag(), buffer.getOffset(), buffer.getStream().hashCode()); + return buffer; } /** - * {@inheritDoc} + * {@link ReadBufferWorker} thread calls this method to post completion. * + * @param buffer the buffer whose read was completed + * @param result the {@link ReadBufferStatus} after the read operation in the worker thread + * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read */ @Override - public void doneReading(final ReadBuffer buffer, - final ReadBufferStatus result, + public void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - // TODO: To be implemented + printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag: {}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}", + buffer.getPath(), buffer.getETag(), buffer.getOffset(), buffer.getStream().hashCode(), result, bytesActuallyRead); + synchronized (this) { + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. + if (getInProgressList().contains(buffer)) { + getInProgressList().remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + // Successful read, so update the buffer status and length + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setLength(bytesActuallyRead); + } else { + // Failed read, reuse buffer for next read, this buffer will be + // evicted later based on eviction policy. + getFreeList().push(buffer.getBufferindex()); + } + // completed list also contains FAILED read buffers + // for sending exception message to clients. + buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); + getCompletedReadList().add(buffer); + } + } + + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + buffer.getLatch().countDown(); // wake up waiting threads (if any) } /** - * {@inheritDoc} + * Purging the buffers associated with an {@link AbfsInputStream} + * from {@link ReadBufferManagerV2} when stream is closed. + * @param stream input stream. */ - @Override - public void purgeBuffersForStream(final AbfsInputStream stream) { - // TODO: To be implemented + public synchronized void purgeBuffersForStream(AbfsInputStream stream) { + printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream); + getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() == stream); + purgeList(stream, getCompletedReadList()); + } + + private boolean isAlreadyQueued(final String eTag, final long requestedOffset) { + // returns true if any part of the buffer is already queued + return (isInList(getReadAheadQueue(), eTag, requestedOffset) + || isInList(getInProgressList(), eTag, requestedOffset) + || isInList(getCompletedReadList(), eTag, requestedOffset)); + } + + private boolean isInList(final Collection list, final String eTag, + final long requestedOffset) { + return (getFromList(list, eTag, requestedOffset) != null); + } + + private ReadBuffer getFromList(final Collection list, final String eTag, + final long requestedOffset) { + for (ReadBuffer buffer : list) { + if (eTag.equals(buffer.getETag())) { + if (buffer.getStatus() == ReadBufferStatus.AVAILABLE + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getLength()) { + return buffer; + } else if (requestedOffset >= buffer.getOffset() + && requestedOffset + < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + return null; } /** - * {@inheritDoc} + * If any buffer in the completed list can be reclaimed then reclaim it and return the buffer to free list. + * The objective is to find just one buffer - there is no advantage to evicting more than one. + * @return whether the eviction succeeded - i.e., were we able to free up one buffer */ - @VisibleForTesting - @Override - public int getNumBuffers() { - return numberOfActiveBuffers; + private synchronized boolean tryEvict() { + ReadBuffer nodeToEvict = null; + if (getCompletedReadList().size() <= 0) { + return false; // there are no evict-able buffers + } + + long currentTimeInMs = currentTimeMillis(); + + // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + for (ReadBuffer buf : getCompletedReadList()) { + if (buf.isFullyConsumed()) { + nodeToEvict = buf; + break; + } + } + if (nodeToEvict != null) { + return manualEviction(nodeToEvict); + } + + // next, try buffers where any bytes have been consumed (maybe a bad idea? have to experiment and see) + for (ReadBuffer buf : getCompletedReadList()) { + if (buf.isAnyByteConsumed()) { + nodeToEvict = buf; + break; + } + } + + if (nodeToEvict != null) { + return manualEviction(nodeToEvict); + } + + // next, try any old nodes that have not been consumed + // Failed read buffers (with buffer index=-1) that are older than + // thresholdAge should be cleaned up, but at the same time should not + // report successful eviction. + // Queue logic expects that a buffer is freed up for read ahead when + // eviction is successful, whereas a failed ReadBuffer would have released + // its buffer when its status was set to READ_FAILED. + long earliestBirthday = Long.MAX_VALUE; + ArrayList oldFailedBuffers = new ArrayList<>(); + for (ReadBuffer buf : getCompletedReadList()) { + if ((buf.getBufferindex() != -1) + && (buf.getTimeStamp() < earliestBirthday)) { + nodeToEvict = buf; + earliestBirthday = buf.getTimeStamp(); + } else if ((buf.getBufferindex() == -1) + && (currentTimeInMs - buf.getTimeStamp()) > getThresholdAgeMilliseconds()) { + oldFailedBuffers.add(buf); + } + } + + for (ReadBuffer buf : oldFailedBuffers) { + manualEviction(buf); + } + + if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds()) && (nodeToEvict != null)) { + return manualEviction(nodeToEvict); + } + + printTraceLog("No buffer eligible for eviction"); + // nothing can be evicted + return false; + } + + private boolean evict(final ReadBuffer buf) { + if (buf.getRefCount() > 0) { + // If the buffer is still being read, then we cannot evict it. + printTraceLog("Cannot evict buffer with index: {}, file: {}, with eTag: {}, offset: {} as it is still being read by some input stream", + buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset()); + return false; + } + // As failed ReadBuffers (bufferIndx = -1) are saved in getCompletedReadList(), + // avoid adding it to availableBufferList. + if (buf.getBufferindex() != -1) { + synchronized (this) { + getFreeList().push(buf.getBufferindex()); + } + } + getCompletedReadList().remove(buf); + buf.setTracingContext(null); + printTraceLog("Eviction of Buffer Completed for BufferIndex: {}, file: {}, with eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}", + buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(), buf.isFullyConsumed(), buf.isAnyByteConsumed()); + return true; + } + + private void waitForProcess(final String eTag, final long position, boolean isFirstRead) { + ReadBuffer readBuf; + synchronized (this) { + readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead); + if (readBuf == null) { + readBuf = getFromList(getInProgressList(), eTag, position); + } + } + if (readBuf != null) { // if in in-progress queue, then block for it + try { + printTraceLog("A relevant read buffer for file: {}, with eTag: {}, offset: {}, queued by stream: {}, having buffer idx: {} is being prefetched, waiting for latch", + readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(), readBuf.getStream().hashCode(), readBuf.getBufferindex()); + readBuf.getLatch().await(); // blocking wait on the caller stream's thread + // Note on correctness: readBuf gets out of getInProgressList() only in 1 place: after worker thread + // is done processing it (in doneReading). There, the latch is set after removing the buffer from + // getInProgressList(). So this latch is safe to be outside the synchronized block. + // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock + // while waiting, so no one will be able to change any state. If this becomes more complex in the future, + // then the latch cane be removed and replaced with wait/notify whenever getInProgressList() is touched. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, " + + "buffer index: {} queued by stream: {}", readBuf.getPath(), readBuf.getETag(), + readBuf.getOffset(), readBuf.getBufferindex(), readBuf.getStream().hashCode()); + } + } + + private ReadBuffer clearFromReadAheadQueue(final String eTag, final long requestedOffset, boolean isFirstRead) { + ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag, requestedOffset); + /* + * If this prefetch was triggered by first read of this input stream, + * we should not remove it from queue and let it complete by backend threads. + */ + if (buffer != null && isFirstRead) { + return buffer; + } + if (buffer != null) { + synchronized (this) { + getReadAheadQueue().remove(buffer); + notifyAll(); // lock is held in calling method + getFreeList().push(buffer.getBufferindex()); + } + } + return null; + } + + private int getBlockFromCompletedQueue(final String eTag, final long position, + final int length, final byte[] buffer) throws IOException { + ReadBuffer buf = getBufferFromCompletedQueue(eTag, position); + + if (buf == null) { + return 0; + } + + buf.startReading(); // atomic increment of refCount. + + if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { + // To prevent new read requests to fail due to old read-ahead attempts, + // return exception only from buffers that failed within last getThresholdAgeMilliseconds() + if ((currentTimeMillis() - (buf.getTimeStamp()) < getThresholdAgeMilliseconds())) { + throw buf.getErrException(); + } else { + return 0; + } + } + + if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) + || (position >= buf.getOffset() + buf.getLength())) { + return 0; + } + + int cursor = (int) (position - buf.getOffset()); + int availableLengthInBuffer = buf.getLength() - cursor; + int lengthToCopy = Math.min(length, availableLengthInBuffer); + System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); + if (cursor == 0) { + buf.setFirstByteConsumed(true); + } + if (cursor + lengthToCopy == buf.getLength()) { + buf.setLastByteConsumed(true); + } + buf.setAnyByteConsumed(true); + + buf.endReading(); // atomic decrement of refCount + return lengthToCopy; } + + private ReadBuffer getBufferFromCompletedQueue(final String eTag, final long requestedOffset) { + for (ReadBuffer buffer : getCompletedReadList()) { + // Buffer is returned if the requestedOffset is at or above buffer's + // offset but less than buffer's length or the actual requestedLength + if (eTag.equals(buffer.getETag()) + && (requestedOffset >= buffer.getOffset()) + && ((requestedOffset < buffer.getOffset() + buffer.getLength()) + || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { + return buffer; + } + } + return null; + } + + private synchronized boolean tryMemoryUpscale() { + if (!isDynamicScalingEnabled) { + printTraceLog("Dynamic scaling is disabled, skipping memory upscale"); + return false; // Dynamic scaling is disabled, so no upscaling. + } + double memoryLoad = getMemoryLoad(); + if (memoryLoad < memoryThreshold && numberOfActiveBuffers < maxBufferPoolSize) { + // Create and Add more buffers in getFreeList(). + if (removedBufferList.isEmpty()) { + bufferPool[numberOfActiveBuffers] = new byte[getReadAheadBlockSize()]; + getFreeList().add(numberOfActiveBuffers); + } else { + // Reuse a removed buffer index. + int freeIndex = removedBufferList.pop(); + if (freeIndex >= bufferPool.length) { + printTraceLog("Invalid free index: {}. Current buffer pool size: {}", + freeIndex, bufferPool.length); + return false; + } + bufferPool[freeIndex] = new byte[getReadAheadBlockSize()]; + getFreeList().add(freeIndex); + } + numberOfActiveBuffers++; + printTraceLog("Current Memory Load: {}. Incrementing buffer pool size to {}", memoryLoad, numberOfActiveBuffers); + return true; + } + printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load: {}", + numberOfActiveBuffers, memoryLoad); + return false; + } + + private void scheduledEviction() { + for (ReadBuffer buf : getCompletedReadList()) { + if (currentTimeMillis() - buf.getTimeStamp() > getThresholdAgeMilliseconds()) { + // If the buffer is older than thresholdAge, evict it. + printTraceLog("Scheduled Eviction of Buffer Triggered for BufferIndex: {}, file: {}, with eTag: {}, offset: {}, length: {}, queued by stream: {}", + buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(), buf.getLength(), buf.getStream().hashCode()); + evict(buf); + } + } + + double memoryLoad = getMemoryLoad(); + if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) { + synchronized (this) { + int freeIndex = getFreeList().pop(); + bufferPool[freeIndex] = null; + removedBufferList.add(freeIndex); + numberOfActiveBuffers--; + printTraceLog("Current Memory Load: {}. Decrementing buffer pool size to {}", memoryLoad, numberOfActiveBuffers); + } + } + } + + private boolean manualEviction(final ReadBuffer buf) { + printTraceLog("Manual Eviction of Buffer Triggered for BufferIndex: {}, file: {}, with eTag: {}, offset: {}, queued by stream: {}", + buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(), buf.getStream().hashCode()); + return evict(buf); + } + + private void adjustThreadPool() { + int currentPoolSize = workerRefs.size(); + double cpuLoad = getCpuLoad(); + int requiredPoolSize = getRequiredThreadPoolSize(); + int newThreadPoolSize; + printTraceLog("Current CPU load: {}, Current worker pool size: {}, Current queue size: {}", cpuLoad, currentPoolSize, requiredPoolSize); + if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) { + // Submit more background tasks. + newThreadPoolSize = Math.min(maxThreadPoolSize, + (int) Math.ceil((currentPoolSize * (ONE_HUNDRED + threadPoolUpscalePercentage))/ONE_HUNDRED)); + // Create new Worker Threads + for (int i = currentPoolSize; i < newThreadPoolSize; i++) { + ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager()); + workerRefs.add(worker); + workerPool.submit(worker); + } + printTraceLog("Increased worker pool size from {} to {}", currentPoolSize, newThreadPoolSize); + } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) { + newThreadPoolSize = Math.max(minThreadPoolSize, + (int) Math.ceil((currentPoolSize * (ONE_HUNDRED - threadPoolDownscalePercentage))/ONE_HUNDRED)); + // Signal the extra workers to stop + while (workerRefs.size() > newThreadPoolSize) { + ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1); + worker.stop(); + } + printTraceLog("Decreased worker pool size from {} to {}", currentPoolSize, newThreadPoolSize); + } else { + printTraceLog("No change in worker pool size. CPU load: {} Pool size: {}", cpuLoad, currentPoolSize); + } + } + /** - * {@inheritDoc} + * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). + * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), + * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. + * Note: it is not monotonic across Sockets, and even within a CPU, its only the + * more recent parts which share a clock across all cores. + * + * @return current time in milliseconds */ - @VisibleForTesting - @Override - public void callTryEvict() { - // TODO: To be implemented + private long currentTimeMillis() { + return System.nanoTime() / 1000 / 1000; + } + + private void purgeList(AbfsInputStream stream, LinkedList list) { + for (Iterator it = list.iterator(); it.hasNext();) { + ReadBuffer readBuffer = it.next(); + if (readBuffer.getStream() == stream) { + it.remove(); + // As failed ReadBuffers (bufferIndex = -1) are already pushed to free + // list in doneReading method, we will skip adding those here again. + if (readBuffer.getBufferindex() != -1) { + synchronized (this) { + getFreeList().push(readBuffer.getBufferindex()); + } + } + } + } } /** - * {@inheritDoc} + * Test method that can clean up the current state of readAhead buffers and + * the lists. Will also trigger a fresh init. */ @VisibleForTesting @Override public void testResetReadBufferManager() { - // TODO: To be implemented + synchronized (this) { + ArrayList completedBuffers = new ArrayList<>(); + for (ReadBuffer buf : getCompletedReadList()) { + if (buf != null) { + completedBuffers.add(buf); + } + } + + for (ReadBuffer buf : completedBuffers) { + manualEviction(buf); + } + + getReadAheadQueue().clear(); + getInProgressList().clear(); + getCompletedReadList().clear(); + getFreeList().clear(); + for (int i = 0; i < maxBufferPoolSize; i++) { + bufferPool[i] = null; + } + bufferPool = null; + cpuMonitorThread.shutdownNow(); + memoryMonitorThread.shutdownNow(); + workerPool.shutdownNow(); + resetBufferManager(); + } } - /** - * {@inheritDoc} - */ @VisibleForTesting @Override - public void testResetReadBufferManager(final int readAheadBlockSize, - final int thresholdAgeMilliseconds) { - // TODO: To be implemented + public void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) { + setReadAheadBlockSize(readAheadBlockSize); + setThresholdAgeMilliseconds(thresholdAgeMilliseconds); + testResetReadBufferManager(); } - /** - * {@inheritDoc} - */ - @Override - public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) { - // TODO: To be implemented + @VisibleForTesting + public void callTryEvict() { + tryEvict(); } - private final ThreadFactory namedThreadFactory = new ThreadFactory() { - private int count = 0; - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "ReadAheadV2-Thread-" + count++); + + @VisibleForTesting + public int getNumBuffers() { + synchronized (this) { + return numberOfActiveBuffers; } - }; + } @Override void resetBufferManager() { setBufferManager(null); // reset the singleton instance + setIsConfigured(false); } private static void setBufferManager(ReadBufferManagerV2 manager) { bufferManager = manager; } + + private static void setIsConfigured(boolean configured) { + isConfigured = configured; + } + + private final ThreadFactory workerThreadFactory = new ThreadFactory() { + private int count = 0; + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "ReadAheadV2-WorkerThread-" + count++); + t.setDaemon(true); + return t; + } + }; + + private void printTraceLog(String message, Object... args) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(message, args); + } + } + + private void printDebugLog(String message, Object... args) { + LOGGER.debug(message, args); + } + + @VisibleForTesting + double getMemoryLoad() { + MemoryMXBean osBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memoryUsage = osBean.getHeapMemoryUsage(); + return (double) memoryUsage.getUsed() / memoryUsage.getMax(); + } + + @VisibleForTesting + public static ReadBufferManagerV2 getInstance() { + return bufferManager; + } + + @VisibleForTesting + public int getMinThreadPoolSize() { + return minThreadPoolSize; + } + + @VisibleForTesting + public int getMaxThreadPoolSize() { + return maxThreadPoolSize; + } + + @VisibleForTesting + public int getMinBufferPoolSize() { + return minBufferPoolSize; + } + + @VisibleForTesting + public int getMaxBufferPoolSize() { + return maxBufferPoolSize; + } + + @VisibleForTesting + public int getCurrentThreadPoolSize() { + return workerRefs.size(); + } + + @VisibleForTesting + public int getCpuMonitoringIntervalInMilliSec() { + return cpuMonitoringIntervalInMilliSec; + } + + @VisibleForTesting + public ScheduledExecutorService getCpuMonitoringThread() { + return cpuMonitorThread; + } + + @VisibleForTesting + public double getCpuLoad() { + OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean( + OperatingSystemMXBean.class); + return osBean.getSystemCpuLoad(); + } + + public int getRequiredThreadPoolSize() { + return (int) Math.ceil(threadPoolRequirementBuffer + * (getReadAheadQueue().size() + getInProgressList().size())); // 20% more for buffer + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index 79d5eef955a4a..2c6efdc735aeb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; @@ -29,6 +31,7 @@ class ReadBufferWorker implements Runnable { protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); private int id; private ReadBufferManager bufferManager; + private AtomicBoolean isRunning = new AtomicBoolean(true); ReadBufferWorker(final int id, final ReadBufferManager bufferManager) { this.id = id; @@ -54,7 +57,7 @@ public void run() { Thread.currentThread().interrupt(); } ReadBuffer buffer; - while (true) { + while (isRunning()) { try { buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread } catch (InterruptedException ex) { @@ -72,7 +75,7 @@ public void run() { // read-ahead buffer size, make sure a valid length is passed // for remote read Math.min(buffer.getRequestedLength(), buffer.getBuffer().length), - buffer.getTracingContext()); + buffer.getTracingContext()); bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager } catch (IOException ex) { @@ -85,4 +88,13 @@ public void run() { } } } + + public void stop() { + isRunning.set(false); + } + + @VisibleForTesting + public boolean isRunning() { + return isRunning.get(); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index c35b76e1a7368..798b1943e0760 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; @@ -711,12 +712,38 @@ protected void assertPathDns(Path path) { .contains(expectedDns); } + /** + * Return array of random bytes of the given length. + * + * @param length length of the byte array + * @return byte array + */ protected byte[] getRandomBytesArray(int length) { final byte[] b = new byte[length]; new Random().nextBytes(b); return b; } + /** + * Create a file on the file system with the given file name and content. + * + * @param fs fileSystem that stores the file + * @param fileName name of the file + * @param fileContent content of the file + * + * @return path of the file created + * @throws IOException exception in writing file on fileSystem + */ + protected Path createFileWithContent(FileSystem fs, String fileName, + byte[] fileContent) throws IOException { + Path testFilePath = path(fileName); + try (FSDataOutputStream oStream = fs.create(testFilePath)) { + oStream.write(fileContent); + oStream.flush(); + } + return testFilePath; + } + /** * Checks a list of futures for exceptions. * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index 1e5ba3689f5da..33b7acf465872 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -233,7 +233,7 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception { .setDisableOutputStreamFlush(disableOutputStreamFlush); final Path testFilePath = path(methodName.getMethodName()); - byte[] buffer = getRandomBytesArray(); + byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH); // The test case must write "fs.azure.write.request.size" bytes // to the stream in order for the data to be uploaded to storage. assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize() @@ -265,7 +265,7 @@ private void testFlush(boolean disableOutputStreamFlush) throws Exception { @Test public void testHflushWithFlushEnabled() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); - byte[] buffer = getRandomBytesArray(); + byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH); String fileName = UUID.randomUUID().toString(); final Path testFilePath = path(fileName); @@ -278,7 +278,7 @@ public void testHflushWithFlushEnabled() throws Exception { @Test public void testHflushWithFlushDisabled() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); - byte[] buffer = getRandomBytesArray(); + byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH); final Path testFilePath = path(methodName.getMethodName()); boolean isAppendBlob = false; if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) { @@ -295,7 +295,7 @@ public void testHflushWithFlushDisabled() throws Exception { @Test public void testHsyncWithFlushEnabled() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); - byte[] buffer = getRandomBytesArray(); + byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH); final Path testFilePath = path(methodName.getMethodName()); @@ -332,7 +332,7 @@ public void testTracingHeaderForAppendBlob() throws Exception { @Test public void testStreamCapabilitiesWithFlushDisabled() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); - byte[] buffer = getRandomBytesArray(); + byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH); final Path testFilePath = path(methodName.getMethodName()); @@ -349,7 +349,7 @@ public void testStreamCapabilitiesWithFlushDisabled() throws Exception { @Test public void testStreamCapabilitiesWithFlushEnabled() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); - byte[] buffer = getRandomBytesArray(); + byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH); final Path testFilePath = path(methodName.getMethodName()); try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) { assertHasStreamCapabilities(stream, @@ -365,7 +365,7 @@ public void testStreamCapabilitiesWithFlushEnabled() throws Exception { @Test public void testHsyncWithFlushDisabled() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); - byte[] buffer = getRandomBytesArray(); + byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH); final Path testFilePath = path(methodName.getMethodName()); boolean isAppendBlob = false; if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) { @@ -378,12 +378,6 @@ public void testHsyncWithFlushDisabled() throws Exception { } } - private byte[] getRandomBytesArray() { - final byte[] b = new byte[TEST_FILE_LENGTH]; - new Random().nextBytes(b); - return b; - } - private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path path, byte[] buffer, boolean enableFlush) throws IOException { fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush); FSDataOutputStream stream = fs.create(path); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java index 388e662115ed2..2d5d0a2d0b5a8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java @@ -49,29 +49,6 @@ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest abstractAbfsIntegrat this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest; } - private Path path(String filepath) throws IOException { - return abstractAbfsIntegrationTest.getFileSystem().makeQualified( - new Path(getTestPath(), getUniquePath(filepath))); - } - - private Path getTestPath() { - Path path = new Path(UriUtils.generateUniqueTestPath()); - return path; - } - - /** - * Generate a unique path using the given filepath. - * @param filepath path string - * @return unique path created from filepath and a GUID - */ - private Path getUniquePath(String filepath) { - if (filepath.equals("/")) { - return new Path(filepath); - } - return new Path(filepath + StringUtils - .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN)); - } - /** * Returns AzureBlobFileSystem instance with the required * readFullFileOptimization configuration. @@ -90,38 +67,6 @@ public AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely) return (AzureBlobFileSystem) FileSystem.newInstance(configuration); } - /** - * Return array of random bytes of the given length. - * - * @param length length of the byte array - * @return byte array - */ - public byte[] getRandomBytesArray(int length) { - final byte[] b = new byte[length]; - new Random().nextBytes(b); - return b; - } - - /** - * Create a file on the file system with the given file name and content. - * - * @param fs fileSystem that stores the file - * @param fileName name of the file - * @param fileContent content of the file - * - * @return path of the file created - * @throws IOException exception in writing file on fileSystem - */ - public Path createFileWithContent(FileSystem fs, String fileName, - byte[] fileContent) throws IOException { - Path testFilePath = path(fileName); - try (FSDataOutputStream oStream = fs.create(testFilePath)) { - oStream.write(fileContent); - oStream.flush(); - } - return testFilePath; - } - /** * Assert that the content read from the subsection of a file is correct. * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 6bcf31f9e69dd..938f5f4300ce9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -52,8 +52,8 @@ public void testWithNoOptimization() throws Exception { int fileSize = i * ONE_MB; final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize); String fileName = methodName.getMethodName() + i; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent); } } @@ -97,8 +97,8 @@ public void testExceptionInOptimization() throws Exception { int fileSize = i * ONE_MB; final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize); String fileName = methodName.getMethodName() + i; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED, fileSize / 4, fileContent); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java index fbafc12490c78..9cd3433a88e04 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java @@ -258,8 +258,8 @@ private void validateSeekAndReadWithConf(boolean optimizeFooterRead, try (AzureBlobFileSystem spiedFs = createSpiedFs( getRawConfiguration())) { String fileName = methodName.getMethodName() + fileId; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName, + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(spiedFs, fileName, fileContent); for (int readBufferSize : READ_BUFFER_SIZE) { validateSeekAndReadWithConf(spiedFs, optimizeFooterRead, seekTo, @@ -389,8 +389,8 @@ public void testPartialReadWithNoData() throws Exception { futureList.add(executorService.submit(() -> { try (AzureBlobFileSystem spiedFs = createSpiedFs( getRawConfiguration())) { - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName, + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(spiedFs, fileName, fileContent); validatePartialReadWithNoData(spiedFs, fileSize, fileContent, testFilePath); @@ -461,8 +461,8 @@ public void testPartialReadWithSomeData() throws Exception { try (AzureBlobFileSystem spiedFs = createSpiedFs( getRawConfiguration())) { String fileName = methodName.getMethodName() + fileId; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName, + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(spiedFs, fileName, fileContent); validatePartialReadWithSomeData(spiedFs, fileSize, testFilePath, fileContent); @@ -583,8 +583,8 @@ private void verifyConfigValueInStream(final FSDataInputStream inputStream, private Path createPathAndFileWithContent(final AzureBlobFileSystem fs, final int fileIdx, final int fileSize) throws Exception { String fileName = methodName.getMethodName() + fileIdx; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - return abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = getRandomBytesArray(fileSize); + return createFileWithContent(fs, fileName, fileContent); } private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java index 5e3879a525cfe..f92d64359a222 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java @@ -74,8 +74,8 @@ private void validateNumBackendCalls(final boolean readSmallFilesCompletely, for (int i = 1; i <= 4; i++) { String fileName = methodName.getMethodName() + i; int fileSize = i * ONE_MB; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); int length = ONE_KB; try (FSDataInputStream iStream = fs.open(testFilePath)) { byte[] buffer = new byte[length]; @@ -185,8 +185,8 @@ private void validateSeekAndReadWithConf(final SeekTo seekTo, for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) { String fileName = methodName.getMethodName() + i; int fileSize = i * ONE_MB; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, fileName, fileContent); + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); int length = ONE_KB; int seekPos = seekPos(seekTo, fileSize, length); seekReadAndTest(fs, testFilePath, seekPos, length, fileContent); @@ -255,9 +255,9 @@ public void testPartialReadWithNoData() throws Exception { try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem( true)) { String fileName = methodName.getMethodName() + i; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray( + byte[] fileContent = getRandomBytesArray( fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, + Path testFilePath = createFileWithContent(fs, fileName, fileContent); partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4, fileContent); @@ -304,9 +304,9 @@ public void testPartialReadWithSomeData() throws Exception { try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem( true)) { String fileName = methodName.getMethodName() + i; - byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray( + byte[] fileContent = getRandomBytesArray( fileSize); - Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, + Path testFilePath = createFileWithContent(fs, fileName, fileContent); partialReadWithSomeData(fs, testFilePath, fileSize / 2, fileSize / 4, fileContent); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index 1ead30e9fa2a9..84b0fbd5196f7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -18,10 +18,8 @@ package org.apache.hadoop.fs.azurebfs.services; -import java.io.IOException; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; @@ -62,127 +59,122 @@ public class ITestReadBufferManager extends AbstractAbfsIntegrationTest { */ public static final int PROBE_INTERVAL_MILLIS = 1_000; - public ITestReadBufferManager() throws Exception { + public ITestReadBufferManager() throws Exception { + } + + @Test + public void testPurgeBufferManagerForParallelStreams() throws Exception { + describe("Testing purging of buffers from ReadBufferManagerV1 for " + + "parallel input streams"); + final int numBuffers = 16; + final LinkedList freeList = new LinkedList<>(); + for (int i=0; i < numBuffers; i++) { + freeList.add(i); } - - @Test - public void testPurgeBufferManagerForParallelStreams() throws Exception { - describe("Testing purging of buffers from ReadBufferManagerV1 for " - + "parallel input streams"); - final int numBuffers = 16; - final LinkedList freeList = new LinkedList<>(); - for (int i=0; i < numBuffers; i++) { - freeList.add(i); - } - ExecutorService executorService = Executors.newFixedThreadPool(4); - AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); - // verify that the fs has the capability to validate the fix - Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD)) - .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs) - .isTrue(); - - try { - for (int i = 0; i < 4; i++) { - final String fileName = methodName.getMethodName() + i; - executorService.submit((Callable) () -> { - byte[] fileContent = getRandomBytesArray(ONE_MB); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - try (FSDataInputStream iStream = fs.open(testFilePath)) { - iStream.read(); - } - return null; - }); - } - } finally { - executorService.shutdown(); - // wait for all tasks to finish - executorService.awaitTermination(1, TimeUnit.MINUTES); - } - - ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); - // readahead queue is empty - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - // verify the in progress list eventually empties out. - eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> - assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList())); + ExecutorService executorService = Executors.newFixedThreadPool(4); + AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + // verify that the fs has the capability to validate the fix + Assertions.assertThat(fs.hasPathCapability(new Path("/"), CAPABILITY_SAFE_READAHEAD)) + .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs) + .isTrue(); + + try { + for (int i = 0; i < 4; i++) { + final String fileName = methodName.getMethodName() + i; + executorService.submit((Callable) () -> { + byte[] fileContent = getRandomBytesArray(ONE_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + try (FSDataInputStream iStream = fs.open(testFilePath)) { + iStream.read(); + } + return null; + }); + } + } finally { + executorService.shutdown(); + // wait for all tasks to finish + executorService.awaitTermination(1, TimeUnit.MINUTES); } - private void assertListEmpty(String listName, List list) { - Assertions.assertThat(list) - .describedAs("After closing all streams %s should be empty", listName) - .hasSize(0); + ReadBufferManager bufferManager = getBufferManager(fs); + // readahead queue is empty + assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); + // verify the in progress list eventually empties out. + eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, () -> + assertListEmpty("InProgressList", bufferManager.getInProgressListCopy())); + } + + private void assertListEmpty(String listName, List list) { + Assertions.assertThat(list) + .describedAs("After closing all streams %s should be empty", listName) + .hasSize(0); + } + + @Test + public void testPurgeBufferManagerForSequentialStream() throws Exception { + describe("Testing purging of buffers in ReadBufferManagerV1 for " + + "sequential input streams"); + AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); + final String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(ONE_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + AbfsInputStream iStream1 = null; + // stream1 will be closed right away. + try { + iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + // Just reading one byte will trigger all read ahead calls. + iStream1.read(); + } finally { + IOUtils.closeStream(iStream1); } - - @Test - public void testPurgeBufferManagerForSequentialStream() throws Exception { - describe("Testing purging of buffers in ReadBufferManagerV1 for " - + "sequential input streams"); - AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); - final String fileName = methodName.getMethodName(); - byte[] fileContent = getRandomBytesArray(ONE_MB); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - - AbfsInputStream iStream1 = null; - // stream1 will be closed right away. - try { - iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - // Just reading one byte will trigger all read ahead calls. - iStream1.read(); - } finally { - IOUtils.closeStream(iStream1); - } - ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); - AbfsInputStream iStream2 = null; - try { - iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); - iStream2.read(); - // After closing stream1, no queued buffers of stream1 should be present - // assertions can't be made about the state of the other lists as it is - // too prone to race conditions. - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); - } finally { - // closing the stream later. - IOUtils.closeStream(iStream2); - } - // After closing stream2, no queued buffers of stream2 should be present. - assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); - - // After closing both the streams, read queue should be empty. - assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - + ReadBufferManager bufferManager = getBufferManager(fs); + AbfsInputStream iStream2 = null; + try { + iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); + iStream2.read(); + // After closing stream1, no queued buffers of stream1 should be present + // assertions can't be made about the state of the other lists as it is + // too prone to race conditions. + assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1); + } finally { + // closing the stream later. + IOUtils.closeStream(iStream2); } + // After closing stream2, no queued buffers of stream2 should be present. + assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2); + // After closing both the streams, read queue should be empty. + assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); - private void assertListDoesnotContainBuffersForIstream(List list, - AbfsInputStream inputStream) { - for (ReadBuffer buffer : list) { - Assertions.assertThat(buffer.getStream()) - .describedAs("Buffers associated with closed input streams shouldn't be present") - .isNotEqualTo(inputStream); - } - } + } - private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception { - Configuration conf = getRawConfiguration(); - conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8); - conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); - conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE); - return (AzureBlobFileSystem) FileSystem.newInstance(conf); - } - protected byte[] getRandomBytesArray(int length) { - final byte[] b = new byte[length]; - new Random().nextBytes(b); - return b; + private void assertListDoesnotContainBuffersForIstream(List list, + AbfsInputStream inputStream) { + for (ReadBuffer buffer : list) { + Assertions.assertThat(buffer.getStream()) + .describedAs("Buffers associated with closed input streams shouldn't be present") + .isNotEqualTo(inputStream); } - - protected Path createFileWithContent(FileSystem fs, String fileName, - byte[] fileContent) throws IOException { - Path testFilePath = path(fileName); - try (FSDataOutputStream oStream = fs.create(testFilePath)) { - oStream.write(fileContent); - oStream.flush(); - } - return testFilePath; + } + + private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8); + conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE); + conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE); + return (AzureBlobFileSystem) FileSystem.newInstance(conf); + } + + private ReadBufferManager getBufferManager(AzureBlobFileSystem fs) { + int blockSize = fs.getAbfsStore().getAbfsConfiguration().getReadAheadBlockSize(); + if (getConfiguration().isReadAheadV2Enabled()) { + ReadBufferManagerV2.setReadBufferManagerConfigs(blockSize, + getConfiguration()); + return ReadBufferManagerV2.getBufferManager(); } + ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize); + return ReadBufferManagerV1.getBufferManager(); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java new file mode 100644 index 0000000000000..4980684ff2aad --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; + +public class ITestReadBufferManagerV2 extends AbstractAbfsIntegrationTest { + + private static final int LESS_NUM_FILES = 2; + private static final int MORE_NUM_FILES = 5; + private static final int SMALL_FILE_SIZE = 6 * ONE_MB; + private static final int LARGE_FILE_SIZE = 50 * ONE_MB; + private static final int BLOCK_SIZE = 4 * ONE_MB; + + public ITestReadBufferManagerV2() throws Exception { + } + + @Test + public void testReadDifferentFilesInParallel() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + int fileSize = LARGE_FILE_SIZE; + int numFiles = MORE_NUM_FILES; + + Path[] testPaths = new Path[numFiles]; + int[] idx = {0}; + for (int i = 0; i < numFiles; i++) { + final String fileName = methodName.getMethodName() + i; + byte[] fileContent = getRandomBytesArray(fileSize); + testPaths[i] = createFileWithContent(fs, fileName, fileContent); + } + ExecutorService executorService = Executors.newFixedThreadPool(numFiles); + Map metricMap = getInstrumentationMap(fs); + long requestsMadeBeforeTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); + try { + for (int i = 0; i < numFiles; i++) { + executorService.submit((Callable) () -> { + try (FSDataInputStream iStream = fs.open(testPaths[idx[0]++])) { + byte[] buffer = new byte[fileSize]; + iStream.read(buffer, 0, fileSize); + } + return null; + }); + } + } finally { + executorService.shutdown(); + // wait for all tasks to finish + executorService.awaitTermination(1, TimeUnit.MINUTES); + } + metricMap = getInstrumentationMap(fs); + long requestsMadeAfterTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); + int expectedRequests = numFiles // Get Path Status for each file + + ((int)Math.ceil((double)fileSize/BLOCK_SIZE)) * numFiles; // Read requests for each file + assertEquals(expectedRequests, requestsMadeAfterTest - requestsMadeBeforeTest); + } + + @Test + public void testReadSameFileInParallel() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + int fileSize = SMALL_FILE_SIZE; + int numFiles = LESS_NUM_FILES; + + final String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(fileSize); + Path testPath = createFileWithContent(fs, fileName, fileContent); + ExecutorService executorService = Executors.newFixedThreadPool(numFiles); + Map metricMap = getInstrumentationMap(fs); + long requestsMadeBeforeTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); + try { + for (int i = 0; i < numFiles; i++) { + executorService.submit((Callable) () -> { + try (FSDataInputStream iStream = fs.open(testPath)) { + byte[] buffer = new byte[fileSize]; + iStream.read(buffer, 0, fileSize); + } + return null; + }); + } + } finally { + executorService.shutdown(); + // wait for all tasks to finish + executorService.awaitTermination(1, TimeUnit.MINUTES); + } + metricMap = getInstrumentationMap(fs); + long requestsMadeAfterTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); + int expectedRequests = numFiles // Get Path Status for each file + + ((int)Math.ceil((double)fileSize/BLOCK_SIZE)); // Read requests for each file + assertEquals(expectedRequests, requestsMadeAfterTest - requestsMadeBeforeTest); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 9b388b57c3e55..37d8046203ecc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -112,7 +112,7 @@ public void teardown() throws Exception { getBufferManager().testResetReadBufferManager(); } - private AbfsRestOperation getMockRestOp() { + AbfsRestOperation getMockRestOp() { AbfsRestOperation op = mock(AbfsRestOperation.class); AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); when(httpOp.getBytesReceived()).thenReturn(1024L); @@ -121,7 +121,7 @@ private AbfsRestOperation getMockRestOp() { return op; } - private AbfsClient getMockAbfsClient() throws URISyntaxException { + AbfsClient getMockAbfsClient() throws URISyntaxException { // Mock failure for client.read() AbfsClient client = mock(AbfsClient.class); AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); @@ -135,7 +135,7 @@ private AbfsClient getMockAbfsClient() throws URISyntaxException { return client; } - private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, + AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) throws IOException { AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); // Create AbfsInputStream with the client instance @@ -144,7 +144,10 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, null, FORWARD_SLASH + fileName, THREE_KB, - inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB), + inputStreamContext.withReadBufferSize(ONE_KB) + .withReadAheadQueueDepth(10) + .withReadAheadBlockSize(ONE_KB) + .isReadAheadV2Enabled(getConfiguration().isReadAheadV2Enabled()), "eTag", getTestTracingContext(null, false)); @@ -182,7 +185,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, return inputStream; } - private void queueReadAheads(AbfsInputStream inputStream) { + void queueReadAheads(AbfsInputStream inputStream) { // Mimic AbfsInputStream readAhead queue requests getBufferManager() .queueReadAhead(inputStream, 0, ONE_KB, inputStream.getTracingContext()); @@ -564,7 +567,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. Thread.sleep(readBufferTransferToInProgressProbableTime); - assertThat(readBufferManager.getInProgressCopiedList()) + assertThat(readBufferManager.getInProgressListCopy()) .describedAs(String.format("InProgressList should have %d elements", readBufferQueuedCount)) .hasSize(readBufferQueuedCount); @@ -577,7 +580,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { .hasSize(0); } - assertThat(readBufferManager.getInProgressCopiedList()) + assertThat(readBufferManager.getInProgressListCopy()) .describedAs(String.format("InProgressList should have %d elements", readBufferQueuedCount)) .hasSize(readBufferQueuedCount); @@ -1125,6 +1128,11 @@ private void resetReadBufferManager(int bufferSize, int threshold) { } private ReadBufferManager getBufferManager() { + if (getConfiguration().isReadAheadV2Enabled()) { + ReadBufferManagerV2.setReadBufferManagerConfigs( + getConfiguration().getReadAheadBlockSize(), getConfiguration()); + return ReadBufferManagerV2.getBufferManager(); + } return ReadBufferManagerV1.getBufferManager(); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java new file mode 100644 index 0000000000000..500d5c16f3a6d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.junit.jupiter.api.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit Tests around different components of Read Buffer Manager V2 + */ +public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest { + volatile boolean running = true; + + public TestReadBufferManagerV2() throws Exception { + super(); + } + + /** + * Test to verify init of ReadBufferManagerV2 + * @throws Exception if test fails + */ + @Test + public void testReadBufferManagerV2Init() throws Exception { + assertThat(ReadBufferManagerV2.getInstance()) + .as("ReadBufferManager should be uninitialized").isNull(); + intercept(IllegalStateException.class, "ReadBufferManagerV2 is not configured.", () -> { + ReadBufferManagerV2.getBufferManager(); + }); + // verify that multiple invocations of getBufferManager returns same instance. + ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration()); + ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager(); + ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager(); + ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance(); + assertThat(bufferManager).isNotNull(); + assertThat(bufferManager2).isNotNull(); + assertThat(bufferManager).isSameAs(bufferManager2); + assertThat(bufferManager3).isNotNull(); + assertThat(bufferManager3).isSameAs(bufferManager); + + // Verify default values are not invalid. + assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0); + assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0); + } + + /** + * Test to verify that cpu monitor thread is not active if disabled. + * @throws Exception if test fails + */ + @Test + public void testDynamicScalingSwitchingOnAndOff() throws Exception { + Configuration conf = new Configuration(getRawConfiguration()); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true); + try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) { + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration); + ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(); + assertThat(bufferManagerV2.getCpuMonitoringThread()) + .as("CPU Monitor thread should be initialized").isNotNull(); + bufferManagerV2.resetBufferManager(); + } + + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false); + try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) { + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration); + ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(); + assertThat(bufferManagerV2.getCpuMonitoringThread()) + .as("CPU Monitor thread should not be initialized").isNull(); + bufferManagerV2.resetBufferManager(); + } + } + + @Test + public void testThreadPoolUpscale() throws Exception { + TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream(); + AbfsClient client = testAbfsInputStream.getMockAbfsClient(); + AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt"); + Configuration configuration = getReabAheadV2Configuration(); + AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, + getAccountName()); + ReadBufferManagerV2.getBufferManager().testResetReadBufferManager(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig); + ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(); + assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2); + int[] reqOffset = {0}; + int reqLength = 1; + Thread t = new Thread(() -> { + while(running) { + bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength, + inputStream.getTracingContext()); + reqOffset[0] += reqLength; + } + }); + t.start(); + Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec()); + assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(4); + running = false; + t.join(); + Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec()); + assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4); + } + + private Configuration getReabAheadV2Configuration() { + Configuration conf = new Configuration(getRawConfiguration()); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true); + conf.setInt(FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE, 2); + conf.setInt(FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE, 4); + conf.setInt(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, 90); + conf.setInt(FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS, 1000); + return conf; + } +}