diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 58b1512ac3e9c..59727cac53c91 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -214,7 +214,7 @@ public final class FileSystemConfigurations { public static final long THOUSAND = 1000L; public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY - = HttpOperationType.JDK_HTTP_URL_CONNECTION; + = HttpOperationType.APACHE_HTTP_CLIENT; public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 3; @@ -228,7 +228,7 @@ public final class FileSystemConfigurations { public static final int MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5; - public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5; + public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 3; public static final int MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java index 1d22ae52cde6f..0dffead84245c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java @@ -65,6 +65,13 @@ static void registerFallback() { usable = false; } + /** + * In case, getting success response from apache client, sets the usable flag to true. + */ + static void setUsable() { + usable = true; + } + /** * @return if ApacheHttpClient is usable. */ @@ -73,18 +80,21 @@ static boolean usable() { } AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory, - final AbfsConfiguration abfsConfiguration, final KeepAliveCache keepAliveCache, - URL baseUrl) { + final AbfsConfiguration abfsConfiguration, + final KeepAliveCache keepAliveCache, + URL baseUrl, + final boolean isCacheWarmupNeeded) { final AbfsConnectionManager connMgr = new AbfsConnectionManager( createSocketFactoryRegistry( new SSLConnectionSocketFactory(delegatingSSLSocketFactory, getDefaultHostnameVerifier())), new AbfsHttpClientConnectionFactory(), keepAliveCache, - abfsConfiguration, baseUrl); + abfsConfiguration, baseUrl, isCacheWarmupNeeded); final HttpClientBuilder builder = HttpClients.custom(); builder.setConnectionManager(connMgr) .setRequestExecutor( - new AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout())) + new AbfsManagedHttpRequestExecutor( + abfsConfiguration.getHttpReadTimeout())) .disableContentCompression() .disableRedirectHandling() .disableAutomaticRetries() diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 77aca70990ff3..d6ae0427b23b9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -59,6 +59,7 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; @@ -188,7 +189,7 @@ public AbfsBlobClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB); this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureAtomicRenameDirs() .split(AbfsHttpConstants.COMMA))); @@ -201,7 +202,7 @@ public AbfsBlobClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB); this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureAtomicRenameDirs() .split(AbfsHttpConstants.COMMA))); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index ad17c1bfc20a5..6bc174e291a84 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType; @@ -199,6 +200,8 @@ public abstract class AbfsClient implements Closeable { private AbfsApacheHttpClient abfsApacheHttpClient; + private AbfsServiceType abfsServiceType; + /** * logging the rename failure if metadata is in an incomplete state. */ @@ -208,7 +211,8 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final EncryptionContextProvider encryptionContextProvider, - final AbfsClientContext abfsClientContext) throws IOException { + final AbfsClientContext abfsClientContext, + final AbfsServiceType abfsServiceType) throws IOException { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -220,6 +224,7 @@ private AbfsClient(final URL baseUrl, this.authType = abfsConfiguration.getAuthType(accountName); this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration); this.renameResilience = abfsConfiguration.getRenameResilience(); + this.abfsServiceType = abfsServiceType; if (encryptionContextProvider != null) { this.encryptionContextProvider = encryptionContextProvider; @@ -254,7 +259,8 @@ private AbfsClient(final URL baseUrl, abfsApacheHttpClient = new AbfsApacheHttpClient( DelegatingSSLSocketFactory.getDefaultFactory(), - abfsConfiguration, keepAliveCache, baseUrl); + abfsConfiguration, keepAliveCache, baseUrl, + abfsConfiguration.getFsConfiguredServiceType() == abfsServiceType); } this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); @@ -328,25 +334,29 @@ private AbfsClient(final URL baseUrl, LOG.trace("primaryUserGroup is {}", this.primaryUserGroup); } - public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, - final AbfsConfiguration abfsConfiguration, - final AccessTokenProvider tokenProvider, - final EncryptionContextProvider encryptionContextProvider, - final AbfsClientContext abfsClientContext) + public AbfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext, + final AbfsServiceType abfsServiceType) throws IOException { this(baseUrl, sharedKeyCredentials, abfsConfiguration, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, abfsServiceType); this.tokenProvider = tokenProvider; } - public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, - final AbfsConfiguration abfsConfiguration, - final SASTokenProvider sasTokenProvider, - final EncryptionContextProvider encryptionContextProvider, - final AbfsClientContext abfsClientContext) + public AbfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext, + final AbfsServiceType abfsServiceType) throws IOException { this(baseUrl, sharedKeyCredentials, abfsConfiguration, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, abfsServiceType); this.sasTokenProvider = sasTokenProvider; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java index f15b0b5326c90..0211b8bd52307 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java @@ -68,13 +68,13 @@ public AbfsClientHandler(final URL baseUrl, final SASTokenProvider sasTokenProvider, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { + initServiceType(abfsConfiguration); this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, abfsClientContext); this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials, abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, abfsClientContext); - initServiceType(abfsConfiguration); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java index 16697fa838a45..4a15fa57af245 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -91,26 +92,34 @@ class AbfsConnectionManager implements HttpClientConnectionManager { /** * The base host for which connections are managed. */ - private HttpHost baseHost; + private final HttpHost baseHost; AbfsConnectionManager(Registry socketFactoryRegistry, - AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac, - final AbfsConfiguration abfsConfiguration, final URL baseUrl) { + AbfsHttpClientConnectionFactory connectionFactory, + KeepAliveCache kac, + final AbfsConfiguration abfsConfiguration, + final URL baseUrl, + final boolean isCacheWarmupNeeded) { this.httpConnectionFactory = connectionFactory; this.kac = kac; this.connectionOperator = new DefaultHttpClientConnectionOperator( socketFactoryRegistry, null, null); this.abfsConfiguration = abfsConfiguration; - if (abfsConfiguration.getApacheCacheWarmupCount() > 0 + this.baseHost = new HttpHost(baseUrl.getHost(), + baseUrl.getDefaultPort(), baseUrl.getProtocol()); + if (isCacheWarmupNeeded && abfsConfiguration.getApacheCacheWarmupCount() > 0 && kac.getFixedThreadPool() != null) { // Warm up the cache with connections. LOG.debug("Warming up the KeepAliveCache with {} connections", abfsConfiguration.getApacheCacheWarmupCount()); - this.baseHost = new HttpHost(baseUrl.getHost(), - baseUrl.getDefaultPort(), baseUrl.getProtocol()); HttpRoute route = new HttpRoute(baseHost, null, true); - cacheExtraConnection(route, + int totalConnectionsCreated = cacheExtraConnection(route, abfsConfiguration.getApacheCacheWarmupCount()); + if (totalConnectionsCreated == 0) { + AbfsApacheHttpClient.registerFallback(); + } else { + AbfsApacheHttpClient.setUsable(); + } } } @@ -271,7 +280,7 @@ public void connect(final HttpClientConnection conn, LOG.debug("Connection established: {}", conn); if (context instanceof AbfsManagedHttpClientContext) { ((AbfsManagedHttpClientContext) context).setConnectTime( - TimeUnit.MILLISECONDS.toMillis(System.nanoTime() - start)); + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } } @@ -318,8 +327,9 @@ public void shutdown() { * @param route the HTTP route for which connections are created * @param numberOfConnections the number of connections to create */ - private void cacheExtraConnection(final HttpRoute route, + private int cacheExtraConnection(final HttpRoute route, final int numberOfConnections) { + AtomicInteger totalConnectionCreated = new AtomicInteger(0); if (!isCacheRefreshInProgress.getAndSet(true)) { long start = System.nanoTime(); CountDownLatch latch = new CountDownLatch(numberOfConnections); @@ -333,6 +343,7 @@ private void cacheExtraConnection(final HttpRoute route, connect(conn, route, abfsConfiguration.getHttpConnectionTimeout(), new AbfsManagedHttpClientContext()); addConnectionToCache(conn); + totalConnectionCreated.incrementAndGet(); } catch (Exception e) { LOG.debug("Error creating connection: {}", e.getMessage()); if (conn != null) { @@ -350,7 +361,7 @@ private void cacheExtraConnection(final HttpRoute route, } catch (RejectedExecutionException e) { LOG.debug("Task rejected for connection creation: {}", e.getMessage()); - return; + return 0; } } @@ -370,6 +381,7 @@ private void cacheExtraConnection(final HttpRoute route, elapsedTimeMillis(start)); } } + return totalConnectionCreated.get(); } /** @@ -383,10 +395,10 @@ private void addConnectionToCache(HttpClientConnection conn) { if (((AbfsManagedApacheHttpConnection) conn).getTargetHost() .equals(baseHost)) { boolean connAddedInKac = kac.add(conn); - synchronized (connectionLock) { - connectionLock.notify(); // wake up one thread only - } if (connAddedInKac) { + synchronized (connectionLock) { + connectionLock.notify(); // wake up one thread only + } LOG.debug("Connection cached: {}", conn); } else { LOG.debug("Connection not cached, and is released: {}", conn); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index f88df14c3423a..f574f4704ab5c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; @@ -160,7 +161,7 @@ public AbfsDfsClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS); } public AbfsDfsClient(final URL baseUrl, @@ -170,7 +171,7 @@ public AbfsDfsClient(final URL baseUrl, final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider, - encryptionContextProvider, abfsClientContext); + encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java index 967bf6272ab2b..d1ab0b3a1f8c7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java @@ -69,6 +69,8 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest { LoggerFactory.getLogger(ITestWasbAbfsCompatibility.class); public ITestWasbAbfsCompatibility() throws Exception { + // To ensure the wasb and abfs filesystem are initialized. + super.setup(); assumeThat(isIPAddress()).as("Emulator is not supported").isFalse(); assumeHnsDisabled(); assumeBlobServiceType(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 14a8ca283c804..8505f5f3266f9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -66,6 +66,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE; @@ -890,8 +892,7 @@ public void testKeepAliveCacheInitializationWithApacheHttpClient() throws Except AbfsClient dfsClient = abfsClientHandler.getDfsClient(); AbfsClient blobClient = abfsClientHandler.getBlobClient(); - checkKacOnBothClientsAfterFSInit(dfsClient); - checkKacOnBothClientsAfterFSInit(blobClient); + checkKacState(dfsClient, blobClient); } /** @@ -917,9 +918,7 @@ public void testStaleConnectionBehavior() throws Exception { AbfsClient dfsClient = abfsClientHandler.getDfsClient(); AbfsClient blobClient = abfsClientHandler.getBlobClient(); - checkKacOnBothClientsAfterFSInit(dfsClient); - checkKacOnBothClientsAfterFSInit(blobClient); - + checkKacState(dfsClient, blobClient); // Wait for 5 minutes to make the cached connections stale // This will ensure all the connections in the KeepAliveCache are stale // and will be removed by the Apache HttpClient's KeepAliveStrategy. @@ -949,11 +948,13 @@ public void testApacheConnectionReuse() throws Exception { AbfsClient dfsClient = abfsClientHandler.getDfsClient(); AbfsClient blobClient = abfsClientHandler.getBlobClient(); - checkKacOnBothClientsAfterFSInit(dfsClient); - checkKacOnBothClientsAfterFSInit(blobClient); + checkKacState(dfsClient, blobClient); - checkConnectionReuse(dfsClient); - checkConnectionReuse(blobClient); + if (getAbfsServiceType() == AbfsServiceType.DFS) { + checkConnectionReuse(dfsClient); + } else { + checkConnectionReuse(blobClient); + } } /** @@ -969,8 +970,8 @@ public void testConnectionNotReusedOnIOException() throws Exception { AzureBlobFileSystem fs = this.getFileSystem(); AbfsClientHandler abfsClientHandler = fs.getAbfsStore().getClientHandler(); - AbfsClient dfsClient = abfsClientHandler.getDfsClient(); - KeepAliveCache keepAliveCache = dfsClient.getKeepAliveCache(); + AbfsClient client = abfsClientHandler.getClient(); + KeepAliveCache keepAliveCache = client.getKeepAliveCache(); HttpClientConnection connection = keepAliveCache.pollFirst(); Assertions.assertThat(connection) @@ -988,7 +989,7 @@ public void testConnectionNotReusedOnIOException() throws Exception { // First list call fail with IOException exception and that connection will not be reused. // Subsequent retry call will use a new connection from the cache. - dfsClient.listPath("/", false, 1, + client.listPath("/", false, 1, null, getTestTracingContext(fs, true), null); // After the failed operation, connection should NOT be reused @@ -1019,11 +1020,15 @@ public void testNumberOfConnectionsInKacWithoutWarmup() throws Exception { AzureBlobFileSystem fs = this.getFileSystem(); final Configuration configuration = fs.getConf(); configuration.setInt(FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT, 0); + // To avoid any network calls during FS initialization + configuration.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, false); + configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, false); fs = this.getFileSystem(configuration); AbfsClient dfsClient = fs.getAbfsStore().getClientHandler().getDfsClient(); AbfsClient blobClient = fs.getAbfsStore().getClientHandler().getBlobClient(); + // In case cache is not warmed up Assertions.assertThat(dfsClient.getKeepAliveCache().size()) .describedAs("KeepAliveCache will be empty when warmup count is set to 0") .isEqualTo(0); @@ -1032,13 +1037,32 @@ public void testNumberOfConnectionsInKacWithoutWarmup() throws Exception { .isEqualTo(0); } + /** + * Helper method to check the KeepAliveCache on both clients based on the + * configured service type. + * @param dfsClient AbfsClient instance for DFS endpoint + * @param blobClient AbfsClient instance for Blob endpoint + * + * @throws IOException if an error occurs while checking the cache + */ + private void checkKacState(AbfsClient dfsClient, AbfsClient blobClient) + throws IOException { + if (getAbfsServiceType() == AbfsServiceType.DFS) { + checkKacOnDefaultClientsAfterFSInit(dfsClient); + checkKacOnNonDefaultClientsAfterFSInit(blobClient); + } else { + checkKacOnDefaultClientsAfterFSInit(blobClient); + checkKacOnNonDefaultClientsAfterFSInit(dfsClient); + } + } + /** * Helper method to check the KeepAliveCache on both clients. * @param abfsClient AbfsClient instance to check * * @throws IOException if an error occurs while checking the cache */ - private void checkKacOnBothClientsAfterFSInit(AbfsClient abfsClient) throws IOException { + private void checkKacOnDefaultClientsAfterFSInit(AbfsClient abfsClient) throws IOException { AbfsApacheHttpClient abfsApacheHttpClient = abfsClient.getAbfsApacheHttpClient(); Assertions.assertThat(abfsApacheHttpClient) .describedAs("AbfsApacheHttpClient should not be null") @@ -1062,6 +1086,36 @@ private void checkKacOnBothClientsAfterFSInit(AbfsClient abfsClient) throws IOEx .isEqualTo(this.getConfiguration().getApacheCacheWarmupCount() - 1); } + /** + * Helper method to check the KeepAliveCache on both clients. + * @param abfsClient AbfsClient instance to check + * + * @throws IOException if an error occurs while checking the cache + */ + private void checkKacOnNonDefaultClientsAfterFSInit(AbfsClient abfsClient) throws IOException { + AbfsApacheHttpClient abfsApacheHttpClient = abfsClient.getAbfsApacheHttpClient(); + Assertions.assertThat(abfsApacheHttpClient) + .describedAs("AbfsApacheHttpClient should not be null") + .isNotNull(); + + KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache(); + + Assertions.assertThat(keepAliveCache.size()) + .describedAs("KeepAliveCache size should be 0 as non-default clients do not warmup") + .isEqualTo(0); + + Assertions.assertThat(keepAliveCache.get()) + .describedAs("KeepAliveCache should be null") + .isNull(); + + // 1 connection is taken in above get call, so size should be + // DEFAULT_APACHE_CACHE_WARMUP_CONNECTION_COUNT - 1 + // after the get call. + Assertions.assertThat(keepAliveCache.size()) + .describedAs("KeepAliveCache size should be 0 as no new connection is added") + .isEqualTo(0); + } + /** * Helper method to check the KeepAliveCache after making connections stale. * @param abfsClient AbfsClient instance to check @@ -1088,10 +1142,10 @@ private void checkKacAfterMakingConnectionsStale(AbfsClient abfsClient) * @throws IOException if an error occurs while checking the cache */ private void checkConnectionReuse(AbfsClient abfsClient) throws IOException { - KeepAliveCache dfsKeepAliveCache = abfsClient.getKeepAliveCache(); + KeepAliveCache keepAliveCache = abfsClient.getKeepAliveCache(); for (int i = 0; i < this.getConfiguration().getApacheCacheWarmupCount(); i++) { // Check first connection in the cache before the operation - HttpClientConnection connection = dfsKeepAliveCache.peekFirst(); + HttpClientConnection connection = keepAliveCache.peekFirst(); // Perform a list operation to reuse the connection // This will use the first connection in the cache. abfsClient.listPath("/", false, 1, @@ -1099,7 +1153,7 @@ private void checkConnectionReuse(AbfsClient abfsClient) throws IOException { // After the operation, the connection should be kept back in the last position Assertions.assertThat(connection) .describedAs("Connection will be put back to the cache for reuse.") - .isEqualTo(dfsKeepAliveCache.peekLast()); + .isEqualTo(keepAliveCache.peekLast()); } } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java index 05313b52172b9..c16ec2888a2c1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.net.URL; import java.util.Map; import org.assertj.core.api.Assertions; @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.ClosedIOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; @@ -44,6 +46,7 @@ import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY; @@ -118,6 +121,38 @@ public void testConnectedConnectionLogging() throws Exception { .isEqualTo(4); } + /** + * Test to verify that the ApacheHttpClient falls back to JDK client + * when connection warmup fails. + * This test is applicable only for ApacheHttpClient. + */ + @Test + public void testApacheClientFallbackDuringConnectionWarmup() + throws Exception { + try (KeepAliveCache keepAliveCache = new KeepAliveCache( + new AbfsConfiguration(new Configuration(), EMPTY_STRING))) { + // Create a connection manager with invalid URL to force fallback to JDK client + // during connection warmup. + // This is to simulate failure during connection warmup in the connection manager. + // The invalid URL will cause the connection manager to fail to create connections + // during warmup, forcing it to fall back to JDK client. + final AbfsConnectionManager connMgr = new AbfsConnectionManager( + RegistryBuilder.create() + .register(HTTPS_SCHEME, new SSLConnectionSocketFactory( + DelegatingSSLSocketFactory.getDefaultFactory(), + getDefaultHostnameVerifier())) + .build(), + new AbfsHttpClientConnectionFactory(), keepAliveCache, + new AbfsConfiguration(new Configuration(), EMPTY_STRING), + new URL("https://test.com"), true); + + Assertions.assertThat(AbfsApacheHttpClient.usable()) + .describedAs("Apache HttpClient should be not usable") + .isFalse(); + AbfsApacheHttpClient.setUsable(); + } + } + private Map.Entry getTestConnection() throws IOException { HttpHost host = new HttpHost(getFileSystem().getUri().getHost(),