Skip to content

Commit 5ae6097

Browse files
HADOOP-19609. ABFS: Apache Client Connection Pool Relook (#7817)
Contributed by Manish Bhatt
1 parent 454f108 commit 5ae6097

18 files changed

+1050
-398
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -474,19 +474,37 @@ public class AbfsConfiguration{
474474
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
475475
private int blobDeleteDirConsumptionParallelism;
476476

477-
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
478-
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
477+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES,
478+
DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
479479
private int maxApacheHttpClientIoExceptionsRetries;
480480

481-
/**
482-
* Max idle TTL configuration for connection given in
483-
* {@value org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys#FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL}
484-
* with default of
485-
* {@value org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations#DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME}
486-
*/
487-
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL,
488-
DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME)
489-
private long maxApacheHttpClientConnectionIdleTime;
481+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
482+
FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
483+
MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue = MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
484+
private int apacheMaxCacheSize;
485+
486+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
487+
FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT,
488+
MinValue = 0, MaxValue = MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT)
489+
private int apacheCacheWarmupCount;
490+
491+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
492+
FS_AZURE_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT,
493+
MinValue = 0, MaxValue = MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT)
494+
private int apacheCacheRefreshCount;
495+
496+
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_MAX_REFRESH_WAIT_TIME_MILLIS,
497+
DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_REFRESH_WAIT_TIME_MILLIS)
498+
private long apacheMaxRefreshWaitTimeInMillis;
499+
500+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_MIN_TRIGGER_REFRESH_COUNT,
501+
DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MIN_TRIGGER_REFRESH_COUNT,
502+
MinValue = 0, MaxValue = MAX_APACHE_HTTP_CLIENT_MIN_TRIGGER_REFRESH_COUNT)
503+
private int apacheMinTriggerRefreshCount;
504+
505+
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_WARMUP_CACHE_TIMEOUT_MILLIS,
506+
DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_WARMUP_CACHE_TIMEOUT_MILLIS)
507+
private long apacheWarmupCacheTimeoutInMillis;
490508

491509
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID,
492510
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
@@ -1177,11 +1195,28 @@ public int getMaxApacheHttpClientIoExceptionsRetries() {
11771195
return maxApacheHttpClientIoExceptionsRetries;
11781196
}
11791197

1180-
/**
1181-
* @return {@link #maxApacheHttpClientConnectionIdleTime}.
1182-
*/
1183-
public long getMaxApacheHttpClientConnectionIdleTime() {
1184-
return maxApacheHttpClientConnectionIdleTime;
1198+
public int getApacheMaxCacheSize() {
1199+
return apacheMaxCacheSize;
1200+
}
1201+
1202+
public int getApacheCacheWarmupCount() {
1203+
return apacheCacheWarmupCount;
1204+
}
1205+
1206+
public int getApacheCacheRefreshCount() {
1207+
return apacheCacheRefreshCount;
1208+
}
1209+
1210+
public long getApacheMaxRefreshWaitTimeInMillis() {
1211+
return apacheMaxRefreshWaitTimeInMillis;
1212+
}
1213+
1214+
public int getApacheMinTriggerRefreshCount() {
1215+
return apacheMinTriggerRefreshCount;
1216+
}
1217+
1218+
public long getApacheWarmupCacheTimeoutInMillis() {
1219+
return apacheWarmupCacheTimeoutInMillis;
11851220
}
11861221

11871222
public boolean getIsClientTransactionIdEnabled() {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,8 +825,7 @@ public synchronized void close() throws IOException {
825825
IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
826826
logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
827827
}
828-
IOUtils.cleanupWithLogger(LOG, getAbfsStore(), delegationTokenManager,
829-
getAbfsClient());
828+
IOUtils.cleanupWithLogger(LOG, getAbfsStore(), delegationTokenManager);
830829
this.isClosed = true;
831830
if (LOG.isDebugEnabled()) {
832831
LOG.debug("Closing Abfs: {}", toString());

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public void close() throws IOException {
330330
} catch (ExecutionException e) {
331331
LOG.error("Error freeing leases", e);
332332
} finally {
333-
IOUtils.cleanupWithLogger(LOG, getClient());
333+
IOUtils.cleanupWithLogger(LOG, getClientHandler());
334334
}
335335
}
336336

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,31 @@ public static String containerProperty(String property, String fsName, String ac
401401
*/
402402
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = "fs.azure.apache.http.client.max.io.exception.retries";
403403
/**Maximum ApacheHttpClient-connection cache size at filesystem level: {@value}*/
404-
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE = "fs.azure.apache.http.client.max.cache.connection.size";
405-
/**Maximum idle time for a ApacheHttpClient-connection: {@value}*/
406-
public static final String FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL = "fs.azure.apache.http.client.idle.connection.ttl";
404+
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE = "fs.azure.apache.http.client.max.cache.size";
405+
/**
406+
* Defines number of connections to establish during warmup phase
407+
* of ApacheHttpClient connection cache: {@value}
408+
*/
409+
public static final String FS_AZURE_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = "fs.azure.apache.http.client.cache.warmup.count";
410+
/**
411+
* Defines number of connections to establish during refresh phase
412+
* of ApacheHttpClient connection cache: {@value}
413+
*/
414+
public static final String FS_AZURE_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = "fs.azure.apache.http.client.cache.refresh.count";
415+
/**
416+
* Defines time duration to wait for ApacheHttpClient connection
417+
* cache to warmup/ refresh: {@value}
418+
*/
419+
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_REFRESH_WAIT_TIME_MILLIS = "fs.azure.apache.http.client.max.refresh.wait.time.millis";
420+
/**
421+
* Minimum number of cached connections in ApacheHttpClient cache
422+
* below which refresh will be triggered. {@value}
423+
*/
424+
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MIN_TRIGGER_REFRESH_COUNT = "fs.azure.apache.http.client.min.trigger.refresh.count";
425+
/**
426+
* Time duration to wait for ApacheHttpClient connection cache to warmup/refresh: {@value}
427+
*/
428+
public static final String FS_AZURE_APACHE_HTTP_CLIENT_WARMUP_CACHE_TIMEOUT_MILLIS = "fs.azure.apache.http.client.warmup.cache.timeout.millis";
407429
/**
408430
* Blob copy API is an async API, this configuration defines polling duration
409431
* for checking copy status: {@value}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,27 @@ public final class FileSystemConfigurations {
218218

219219
public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 3;
220220

221-
public static final long DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME = 5_000L;
221+
public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE = 10;
222222

223-
public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 5;
223+
public static final int MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE = 5;
224+
225+
public static final int MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE = 20;
226+
227+
public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5;
228+
229+
public static final int MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5;
230+
231+
public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
232+
233+
public static final int MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
234+
235+
public static final long DEFAULT_APACHE_HTTP_CLIENT_MAX_REFRESH_WAIT_TIME_MILLIS = 500L;
236+
237+
public static final int DEFAULT_APACHE_HTTP_CLIENT_MIN_TRIGGER_REFRESH_COUNT = 2;
238+
239+
public static final long DEFAULT_APACHE_HTTP_CLIENT_WARMUP_CACHE_TIMEOUT_MILLIS = 2_000L;
240+
241+
public static final int MAX_APACHE_HTTP_CLIENT_MIN_TRIGGER_REFRESH_COUNT = 5;
224242

225243
public static final long DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = 1_000L;
226244

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public AbfsAHCHttpOperation(final URL url,
9898
final Duration readTimeout,
9999
final AbfsApacheHttpClient abfsApacheHttpClient,
100100
final AbfsClient abfsClient) throws IOException {
101-
super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout, abfsClient);
101+
super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout,
102+
abfsClient);
102103
this.isPayloadRequest = HTTP_METHOD_PUT.equals(method)
103104
|| HTTP_METHOD_PATCH.equals(method)
104105
|| HTTP_METHOD_POST.equals(method);
@@ -139,6 +140,14 @@ public AbfsAHCHttpOperation(final URL url,
139140
throw new PathIOException(getUrl().toString(),
140141
"Unsupported HTTP method: " + getMethod());
141142
}
143+
144+
// Set the request headers in the http request object.
145+
// Earlier we were setting it just before sending the request.
146+
// Setting here ensures that same header will get used while signing
147+
// the request as well as validating the request at server's end.
148+
for (AbfsHttpHeader header : requestHeaders) {
149+
setRequestProperty(header.getName(), header.getValue());
150+
}
142151
}
143152

144153
/**
@@ -163,12 +172,11 @@ protected InputStream getErrorStream() throws IOException {
163172
/**{@inheritDoc}*/
164173
@Override
165174
String getConnProperty(final String key) {
166-
for (AbfsHttpHeader header : getRequestHeaders()) {
167-
if (header.getName().equals(key)) {
168-
return header.getValue();
169-
}
175+
Header header = httpRequestBase.getFirstHeader(key);
176+
if (header == null) {
177+
return null;
170178
}
171-
return null;
179+
return header.getValue();
172180
}
173181

174182
/**{@inheritDoc}*/
@@ -196,7 +204,6 @@ public void processResponse(final byte[] buffer,
196204
final int length) throws IOException {
197205
try {
198206
if (!isPayloadRequest) {
199-
prepareRequest();
200207
LOG.debug("Sending request: {}", httpRequestBase);
201208
httpResponse = executeRequest();
202209
LOG.debug("Request sent: {}; response {}", httpRequestBase,
@@ -280,17 +287,20 @@ HttpResponse executeRequest() throws IOException {
280287
/**{@inheritDoc}*/
281288
@Override
282289
public void setRequestProperty(final String key, final String value) {
283-
List<AbfsHttpHeader> headers = getRequestHeaders();
284-
if (headers != null) {
285-
headers.add(new AbfsHttpHeader(key, value));
290+
// Content-Length is managed by HttpClient for entity enclosing requests.
291+
// Setting it manually can lead to protocol errors.
292+
if (httpRequestBase instanceof HttpEntityEnclosingRequestBase
293+
&& CONTENT_LENGTH.equals(key)) {
294+
return;
286295
}
296+
httpRequestBase.setHeader(key, value);
287297
}
288298

289299
/**{@inheritDoc}*/
290300
@Override
291301
Map<String, List<String>> getRequestProperties() {
292302
Map<String, List<String>> map = new HashMap<>();
293-
for (AbfsHttpHeader header : getRequestHeaders()) {
303+
for (Header header : httpRequestBase.getAllHeaders()) {
294304
map.put(header.getName(),
295305
new ArrayList<String>() {{
296306
add(header.getValue());
@@ -306,10 +316,10 @@ public String getResponseHeader(final String headerName) {
306316
return null;
307317
}
308318
Header header = httpResponse.getFirstHeader(headerName);
309-
if (header != null) {
310-
return header.getValue();
319+
if (header == null) {
320+
return null;
311321
}
312-
return null;
322+
return header.getValue();
313323
}
314324

315325
/**{@inheritDoc}*/
@@ -370,7 +380,6 @@ public void sendPayload(final byte[] buffer,
370380
httpEntity);
371381
}
372382

373-
prepareRequest();
374383
try {
375384
LOG.debug("Sending request: {}", httpRequestBase);
376385
httpResponse = executeRequest();
@@ -398,24 +407,10 @@ public void sendPayload(final byte[] buffer,
398407
}
399408
}
400409

401-
/**
402-
* Sets the header on the request.
403-
*/
404-
private void prepareRequest() {
405-
final boolean isEntityBasedRequest
406-
= httpRequestBase instanceof HttpEntityEnclosingRequestBase;
407-
for (AbfsHttpHeader header : getRequestHeaders()) {
408-
if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) {
409-
continue;
410-
}
411-
httpRequestBase.setHeader(header.getName(), header.getValue());
412-
}
413-
}
414-
415410
/**{@inheritDoc}*/
416411
@Override
417412
public String getRequestProperty(String name) {
418-
for (AbfsHttpHeader header : getRequestHeaders()) {
413+
for (Header header : httpRequestBase.getAllHeaders()) {
419414
if (header.getName().equals(name)) {
420415
String val = header.getValue();
421416
val = val == null ? EMPTY_STRING : val;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import java.io.Closeable;
2222
import java.io.IOException;
23+
import java.net.URL;
2324

25+
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
2426
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
2527
import org.apache.http.HttpResponse;
2628
import org.apache.http.client.config.RequestConfig;
@@ -71,15 +73,18 @@ static boolean usable() {
7173
}
7274

7375
AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
74-
final int readTimeout, final KeepAliveCache keepAliveCache) {
76+
final AbfsConfiguration abfsConfiguration, final KeepAliveCache keepAliveCache,
77+
URL baseUrl) {
7578
final AbfsConnectionManager connMgr = new AbfsConnectionManager(
7679
createSocketFactoryRegistry(
7780
new SSLConnectionSocketFactory(delegatingSSLSocketFactory,
7881
getDefaultHostnameVerifier())),
79-
new AbfsHttpClientConnectionFactory(), keepAliveCache);
82+
new AbfsHttpClientConnectionFactory(), keepAliveCache,
83+
abfsConfiguration, baseUrl);
8084
final HttpClientBuilder builder = HttpClients.custom();
8185
builder.setConnectionManager(connMgr)
82-
.setRequestExecutor(new AbfsManagedHttpRequestExecutor(readTimeout))
86+
.setRequestExecutor(
87+
new AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout()))
8388
.disableContentCompression()
8489
.disableRedirectHandling()
8590
.disableAutomaticRetries()

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,7 @@ private AbfsClient(final URL baseUrl,
254254

255255
abfsApacheHttpClient = new AbfsApacheHttpClient(
256256
DelegatingSSLSocketFactory.getDefaultFactory(),
257-
abfsConfiguration.getHttpReadTimeout(),
258-
keepAliveCache);
257+
abfsConfiguration, keepAliveCache, baseUrl);
259258
}
260259

261260
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.hadoop.fs.azurebfs.services;
2020

21+
import java.io.Closeable;
2122
import java.io.IOException;
2223
import java.net.URL;
2324

25+
import org.apache.hadoop.io.IOUtils;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628

@@ -37,7 +39,7 @@
3739
* AbfsClientHandler is a class that provides a way to get the AbfsClient
3840
* based on the service type.
3941
*/
40-
public class AbfsClientHandler {
42+
public class AbfsClientHandler implements Closeable {
4143
public static final Logger LOG = LoggerFactory.getLogger(AbfsClientHandler.class);
4244

4345
private AbfsServiceType defaultServiceType;
@@ -195,4 +197,9 @@ private AbfsBlobClient createBlobClient(final URL baseUrl,
195197
abfsClientContext);
196198
}
197199
}
200+
201+
@Override
202+
public void close() throws IOException {
203+
IOUtils.cleanupWithLogger(LOG, getDfsClient(), getBlobClient());
204+
}
198205
}

0 commit comments

Comments
 (0)