Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public final class FileSystemConfigurations {
public static final long THOUSAND = 1000L;

public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
= HttpOperationType.JDK_HTTP_URL_CONNECTION;
= HttpOperationType.APACHE_HTTP_CLIENT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We should change this in our md file as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will make the change in .md file as well


public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 3;

Expand All @@ -228,7 +228,7 @@ public final class FileSystemConfigurations {

public static final int MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5;

public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 3;

public static final int MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ static void registerFallback() {
usable = false;
}

/**
* In case, getting success response from apache client, sets the usable flag to true.
*/
static void setUsable() {
usable = true;
}

/**
* @return if ApacheHttpClient is usable.
*/
Expand All @@ -73,18 +80,21 @@ static boolean usable() {
}

AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
final AbfsConfiguration abfsConfiguration, final KeepAliveCache keepAliveCache,
URL baseUrl) {
final AbfsConfiguration abfsConfiguration,
final KeepAliveCache keepAliveCache,
URL baseUrl,
final boolean isCacheWarmupNeeded) {
final AbfsConnectionManager connMgr = new AbfsConnectionManager(
createSocketFactoryRegistry(
new SSLConnectionSocketFactory(delegatingSSLSocketFactory,
getDefaultHostnameVerifier())),
new AbfsHttpClientConnectionFactory(), keepAliveCache,
abfsConfiguration, baseUrl);
abfsConfiguration, baseUrl, isCacheWarmupNeeded);
final HttpClientBuilder builder = HttpClients.custom();
builder.setConnectionManager(connMgr)
.setRequestExecutor(
new AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout()))
new AbfsManagedHttpRequestExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we were discussing last time, if we keep it to read timeout the 100 continue timeout would become 30 seconds, this should be another config for 100 continue timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will create a new config for read timeout and use that when 100 continue is enabled.

abfsConfiguration.getHttpReadTimeout()))
.disableContentCompression()
.disableRedirectHandling()
.disableAutomaticRetries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
Expand Down Expand Up @@ -188,7 +189,7 @@ public AbfsBlobClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
encryptionContextProvider, abfsClientContext);
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described in the previous comment, we need to find out default client and for that we are comparing these values with the default value configured.

this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
.split(AbfsHttpConstants.COMMA)));
Expand All @@ -201,7 +202,7 @@ public AbfsBlobClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
encryptionContextProvider, abfsClientContext);
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs()
.split(AbfsHttpConstants.COMMA)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
Expand Down Expand Up @@ -199,6 +200,8 @@ public abstract class AbfsClient implements Closeable {

private AbfsApacheHttpClient abfsApacheHttpClient;

private AbfsServiceType abfsServiceType;

/**
* logging the rename failure if metadata is in an incomplete state.
*/
Expand All @@ -208,7 +211,8 @@ private AbfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
final AbfsClientContext abfsClientContext,
final AbfsServiceType abfsServiceType) throws IOException {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
Expand All @@ -220,6 +224,7 @@ private AbfsClient(final URL baseUrl,
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
this.renameResilience = abfsConfiguration.getRenameResilience();
this.abfsServiceType = abfsServiceType;

if (encryptionContextProvider != null) {
this.encryptionContextProvider = encryptionContextProvider;
Expand Down Expand Up @@ -254,7 +259,8 @@ private AbfsClient(final URL baseUrl,

abfsApacheHttpClient = new AbfsApacheHttpClient(
DelegatingSSLSocketFactory.getDefaultFactory(),
abfsConfiguration, keepAliveCache, baseUrl);
abfsConfiguration, keepAliveCache, baseUrl,
abfsConfiguration.getFsConfiguredServiceType() == abfsServiceType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments around this change as to how it would affect the need for cache warmup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
What are we trying to achieve here?

Copy link
Contributor Author

@bhattmanish98 bhattmanish98 Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this change: Since the keep alive cache is on client level and we were doing cache warmup for both the client separately. Now with this change, we will do cache warmup only for the default client, not for both the clients.
Will add the comment in the code as well

}

this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
Expand Down Expand Up @@ -328,25 +334,29 @@ private AbfsClient(final URL baseUrl,
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
}

public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext)
public AbfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext,
final AbfsServiceType abfsServiceType)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext);
encryptionContextProvider, abfsClientContext, abfsServiceType);
this.tokenProvider = tokenProvider;
}

public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext)
public AbfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext,
final AbfsServiceType abfsServiceType)
throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
encryptionContextProvider, abfsClientContext);
encryptionContextProvider, abfsClientContext, abfsServiceType);
this.sasTokenProvider = sasTokenProvider;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ public AbfsClientHandler(final URL baseUrl,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
initServiceType(abfsConfiguration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will initialize the default and ingress service types. This is needed before crating the clients so that we can do cache warmup only for default client.

this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
initServiceType(abfsConfiguration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,26 +92,34 @@ class AbfsConnectionManager implements HttpClientConnectionManager {
/**
* The base host for which connections are managed.
*/
private HttpHost baseHost;
private final HttpHost baseHost;

AbfsConnectionManager(Registry<ConnectionSocketFactory> socketFactoryRegistry,
AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
AbfsHttpClientConnectionFactory connectionFactory,
KeepAliveCache kac,
final AbfsConfiguration abfsConfiguration,
final URL baseUrl,
final boolean isCacheWarmupNeeded) {
this.httpConnectionFactory = connectionFactory;
this.kac = kac;
this.connectionOperator = new DefaultHttpClientConnectionOperator(
socketFactoryRegistry, null, null);
this.abfsConfiguration = abfsConfiguration;
if (abfsConfiguration.getApacheCacheWarmupCount() > 0
this.baseHost = new HttpHost(baseUrl.getHost(),
baseUrl.getDefaultPort(), baseUrl.getProtocol());
if (isCacheWarmupNeeded && abfsConfiguration.getApacheCacheWarmupCount() > 0
&& kac.getFixedThreadPool() != null) {
// Warm up the cache with connections.
LOG.debug("Warming up the KeepAliveCache with {} connections",
abfsConfiguration.getApacheCacheWarmupCount());
this.baseHost = new HttpHost(baseUrl.getHost(),
baseUrl.getDefaultPort(), baseUrl.getProtocol());
HttpRoute route = new HttpRoute(baseHost, null, true);
cacheExtraConnection(route,
int totalConnectionsCreated = cacheExtraConnection(route,
abfsConfiguration.getApacheCacheWarmupCount());
if (totalConnectionsCreated == 0) {
Copy link
Contributor

@anmolanmol1234 anmolanmol1234 Sep 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if we fail or catch rejected exception for any one of the tasks we want to register fallback ? as the successfully submitted tasks might have increased the count of totalConnectionsCreated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, make sense. I have updated the returned value in case of rejected exception; other thing will remain the same.

AbfsApacheHttpClient.registerFallback();
} else {
AbfsApacheHttpClient.setUsable();
}
}
}

Expand Down Expand Up @@ -271,7 +280,7 @@ public void connect(final HttpClientConnection conn,
LOG.debug("Connection established: {}", conn);
if (context instanceof AbfsManagedHttpClientContext) {
((AbfsManagedHttpClientContext) context).setConnectTime(
TimeUnit.MILLISECONDS.toMillis(System.nanoTime() - start));
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}

Expand Down Expand Up @@ -318,8 +327,9 @@ public void shutdown() {
* @param route the HTTP route for which connections are created
* @param numberOfConnections the number of connections to create
*/
private void cacheExtraConnection(final HttpRoute route,
private int cacheExtraConnection(final HttpRoute route,
final int numberOfConnections) {
AtomicInteger totalConnectionCreated = new AtomicInteger(0);
if (!isCacheRefreshInProgress.getAndSet(true)) {
long start = System.nanoTime();
CountDownLatch latch = new CountDownLatch(numberOfConnections);
Expand All @@ -333,6 +343,7 @@ private void cacheExtraConnection(final HttpRoute route,
connect(conn, route, abfsConfiguration.getHttpConnectionTimeout(),
new AbfsManagedHttpClientContext());
addConnectionToCache(conn);
totalConnectionCreated.incrementAndGet();
} catch (Exception e) {
LOG.debug("Error creating connection: {}", e.getMessage());
if (conn != null) {
Expand All @@ -350,7 +361,7 @@ private void cacheExtraConnection(final HttpRoute route,
} catch (RejectedExecutionException e) {
LOG.debug("Task rejected for connection creation: {}",
e.getMessage());
return;
return 0;
}
}

Expand All @@ -370,6 +381,7 @@ private void cacheExtraConnection(final HttpRoute route,
elapsedTimeMillis(start));
}
}
return totalConnectionCreated.get();
}

/**
Expand All @@ -383,10 +395,10 @@ private void addConnectionToCache(HttpClientConnection conn) {
if (((AbfsManagedApacheHttpConnection) conn).getTargetHost()
.equals(baseHost)) {
boolean connAddedInKac = kac.add(conn);
synchronized (connectionLock) {
connectionLock.notify(); // wake up one thread only
}
if (connAddedInKac) {
synchronized (connectionLock) {
connectionLock.notify(); // wake up one thread only
}
LOG.debug("Connection cached: {}", conn);
} else {
LOG.debug("Connection not cached, and is released: {}", conn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
Expand Down Expand Up @@ -160,7 +161,7 @@ public AbfsDfsClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
encryptionContextProvider, abfsClientContext);
encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
}

public AbfsDfsClient(final URL baseUrl,
Expand All @@ -170,7 +171,7 @@ public AbfsDfsClient(final URL baseUrl,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
encryptionContextProvider, abfsClientContext);
encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
LoggerFactory.getLogger(ITestWasbAbfsCompatibility.class);

public ITestWasbAbfsCompatibility() throws Exception {
// To ensure the wasb and abfs filesystem are initialized.
super.setup();
assumeThat(isIPAddress()).as("Emulator is not supported").isFalse();
assumeHnsDisabled();
assumeBlobServiceType();
Expand Down
Loading