Skip to content

Commit c8ea305

Browse files
authored
HADOOP-18562: S3A: support custom S3 and STS headers (#7379)
Contributed by Aditya Deshpande
1 parent c4abdd7 commit c8ea305

File tree

4 files changed

+228
-0
lines changed

4 files changed

+228
-0
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
2626

2727
import java.time.Duration;
28+
import java.util.Locale;
2829
import java.util.concurrent.TimeUnit;
2930

3031
import static org.apache.hadoop.io.Sizes.S_128K;
@@ -1339,6 +1340,37 @@ private Constants() {
13391340
public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
13401341
public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";
13411342

1343+
/** Prefix for S3A client-specific properties.
1344+
* value: {@value}
1345+
*/
1346+
public static final String FS_S3A_CLIENT_PREFIX = "fs.s3a.client.";
1347+
1348+
/** Custom headers postfix.
1349+
* value: {@value}
1350+
*/
1351+
public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers";
1352+
1353+
/**
1354+
* List of custom headers to be set on the service client.
1355+
* Multiple parameters can be used to specify custom headers.
1356+
* <pre>
1357+
* Usage:
1358+
* fs.s3a.client.s3.custom.headers - Headers to add on all the S3 requests.
1359+
* fs.s3a.client.sts.custom.headers - Headers to add on all the STS requests.
1360+
*
1361+
* Examples:
1362+
* CustomHeader {@literal ->} 'Header1:Value1'
1363+
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
1364+
* </pre>
1365+
*/
1366+
public static final String CUSTOM_HEADERS_STS =
1367+
FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_STS.toLowerCase(Locale.ROOT)
1368+
+ CUSTOM_HEADERS_POSTFIX;
1369+
1370+
public static final String CUSTOM_HEADERS_S3 =
1371+
FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_S3.toLowerCase(Locale.ROOT)
1372+
+ CUSTOM_HEADERS_POSTFIX;
1373+
13421374
/**
13431375
* How long to wait for the thread pool to terminate when cleaning up.
13441376
* Value: {@value} seconds.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import java.net.URI;
2323
import java.net.URISyntaxException;
2424
import java.time.Duration;
25+
import java.util.Arrays;
26+
import java.util.List;
27+
import java.util.Map;
2528
import java.util.concurrent.TimeUnit;
29+
import java.util.stream.Collectors;
2630

2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
@@ -76,6 +80,8 @@
7680
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS;
7781
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
7882
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
83+
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
84+
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
7985
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
8086
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
8187
import static org.apache.hadoop.util.Preconditions.checkArgument;
@@ -120,6 +126,8 @@ public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Conf
120126

121127
initUserAgent(conf, overrideConfigBuilder);
122128

129+
initRequestHeaders(conf, overrideConfigBuilder, awsServiceIdentifier);
130+
123131
String signer = conf.getTrimmed(SIGNING_ALGORITHM, "");
124132
if (!signer.isEmpty()) {
125133
LOG.debug("Signer override = {}", signer);
@@ -412,6 +420,44 @@ private static void initSigner(Configuration conf,
412420
}
413421
}
414422

423+
/**
424+
* Initialize custom request headers for AWS clients.
425+
* @param conf hadoop configuration
426+
* @param clientConfig client configuration to update
427+
* @param awsServiceIdentifier service name
428+
*/
429+
private static void initRequestHeaders(Configuration conf,
430+
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
431+
String configKey = null;
432+
switch (awsServiceIdentifier) {
433+
case AWS_SERVICE_IDENTIFIER_S3:
434+
configKey = CUSTOM_HEADERS_S3;
435+
break;
436+
case AWS_SERVICE_IDENTIFIER_STS:
437+
configKey = CUSTOM_HEADERS_STS;
438+
break;
439+
default:
440+
// No known service.
441+
}
442+
if (configKey != null) {
443+
Map<String, String> awsClientCustomHeadersMap =
444+
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
445+
awsClientCustomHeadersMap.forEach((header, valueString) -> {
446+
List<String> headerValues = Arrays.stream(valueString.split(";"))
447+
.map(String::trim)
448+
.filter(v -> !v.isEmpty())
449+
.collect(Collectors.toList());
450+
if (!headerValues.isEmpty()) {
451+
clientConfig.putHeader(header, headerValues);
452+
} else {
453+
LOG.warn("Ignoring header '{}' for {} client because no values were provided",
454+
header, awsServiceIdentifier);
455+
}
456+
});
457+
LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());
458+
}
459+
}
460+
415461
/**
416462
* Configures request timeout in the client configuration.
417463
* This is independent of the timeouts set in the sync and async HTTP clients;

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,6 +947,31 @@ The switch to turn S3A auditing on or off.
947947
</property>
948948

949949
```
950+
951+
### Configuring Custom Headers for AWS Service Clients
952+
953+
You can set custom headers for S3 and STS requests. These headers are set on client level, and will be sent for all requests made to these services.
954+
955+
**Configuration Properties:**
956+
- `fs.s3a.client.s3.custom.headers`: Custom headers for S3 service requests.
957+
- `fs.s3a.client.sts.custom.headers`: Sets custom headers for all requests to AWS STS.
958+
959+
**Header Format:**
960+
Custom headers should be specified as key-value pairs, separated by `=`. Multiple values for a single header can be separated by `;`. Multiple headers can be separated by `,`.
961+
962+
963+
```xml
964+
<property>
965+
<name>fs.s3a.client.s3.custom.headers</name>
966+
<value>Header1=Value1</value>
967+
</property>
968+
969+
<property>
970+
<name>fs.s3a.client.sts.custom.headers</name>
971+
<value>Header1=Value1;Value2,Header2=Value1</value>
972+
</property>
973+
```
974+
950975
## <a name="retry_and_recovery"></a>Retry and Recovery
951976

952977
The S3A client makes a best-effort attempt at recovering from network failures;

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestAwsClientConfig.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.s3a.impl;
2020

21+
import java.io.IOException;
2122
import java.time.Duration;
2223
import java.util.Arrays;
2324

@@ -28,11 +29,16 @@
2829

2930
import org.apache.hadoop.conf.Configuration;
3031
import org.apache.hadoop.test.AbstractHadoopTestBase;
32+
import org.apache.hadoop.util.Lists;
3133

34+
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
35+
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
3236
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
3337
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
3438
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
3539
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
40+
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
41+
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
3642
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
3743
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
3844
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
@@ -47,6 +53,7 @@
4753
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
4854
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
4955
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createApiConnectionSettings;
56+
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createClientConfigBuilder;
5057
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.createConnectionSettings;
5158
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
5259
import static org.assertj.core.api.Assertions.assertThat;
@@ -201,4 +208,122 @@ public void testCreateApiConnectionSettingsDefault() {
201208
private void setOptionsToValue(String value, Configuration conf, String... keys) {
202209
Arrays.stream(keys).forEach(key -> conf.set(key, value));
203210
}
211+
212+
/**
213+
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_STS} is set,
214+
* verify that returned client configuration has desired headers set.
215+
*/
216+
@Test
217+
public void testInitRequestHeadersForSTS() throws IOException {
218+
final Configuration conf = new Configuration();
219+
conf.set(CUSTOM_HEADERS_STS, "header1=value1;value2,header2=value3");
220+
221+
Assertions.assertThat(conf.get(CUSTOM_HEADERS_S3))
222+
.describedAs("Custom client headers for s3 %s", CUSTOM_HEADERS_S3)
223+
.isNull();
224+
225+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
226+
.headers().size())
227+
.describedAs("Count of S3 client headers")
228+
.isEqualTo(0);
229+
230+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
231+
.headers().size())
232+
.describedAs("Count of STS client headers")
233+
.isEqualTo(2);
234+
235+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
236+
.headers().get("header1"))
237+
.describedAs("STS client 'header1' header value")
238+
.isEqualTo(Lists.newArrayList("value1", "value2"));
239+
240+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
241+
.headers().get("header2"))
242+
.describedAs("STS client 'header2' header value")
243+
.isEqualTo(Lists.newArrayList("value3"));
244+
}
245+
246+
/**
247+
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set,
248+
* verify that returned client configuration has desired headers set.
249+
*/
250+
@Test
251+
public void testInitRequestHeadersForS3() throws IOException {
252+
final Configuration conf = new Configuration();
253+
conf.set(CUSTOM_HEADERS_S3, "header1=value1;value2,header2=value3");
254+
255+
Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS))
256+
.describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS)
257+
.isNull();
258+
259+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
260+
.headers().size())
261+
.describedAs("Count of STS client headers")
262+
.isEqualTo(0);
263+
264+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
265+
.headers().size())
266+
.describedAs("Count of S3 client headers")
267+
.isEqualTo(2);
268+
269+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
270+
.headers().get("header1"))
271+
.describedAs("S3 client 'header1' header value")
272+
.isEqualTo(Lists.newArrayList("value1", "value2"));
273+
274+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
275+
.headers().get("header2"))
276+
.describedAs("S3 client 'header2' header value")
277+
.isEqualTo(Lists.newArrayList("value3"));
278+
}
279+
280+
/**
281+
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set,
282+
* verify that returned client configuration has desired headers set with
283+
* whitespaces trimmed for headers and values.
284+
*/
285+
@Test
286+
public void testInitRequestHeadersForS3WithWhitespace() throws IOException {
287+
final Configuration conf = new Configuration();
288+
conf.set(CUSTOM_HEADERS_S3, " header1 = value1 ; value2 , header2= value3 ");
289+
290+
Assertions.assertThat(conf.get(CUSTOM_HEADERS_STS))
291+
.describedAs("Custom client headers for STS %s", CUSTOM_HEADERS_STS)
292+
.isNull();
293+
294+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_STS)
295+
.headers().size())
296+
.describedAs("Count of STS client headers")
297+
.isEqualTo(0);
298+
299+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
300+
.headers().size())
301+
.describedAs("Count of S3 client headers")
302+
.isEqualTo(2);
303+
304+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
305+
.headers().get("header1"))
306+
.describedAs("S3 client 'header1' header value")
307+
.isEqualTo(Lists.newArrayList("value1", "value2"));
308+
309+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
310+
.headers().get("header2"))
311+
.describedAs("S3 client 'header2' header value")
312+
.isEqualTo(Lists.newArrayList("value3"));
313+
}
314+
315+
/**
316+
* if {@link org.apache.hadoop.fs.s3a.Constants#CUSTOM_HEADERS_S3} is set with duplicate values,
317+
* verify that returned client configuration has desired headers with both values.
318+
*/
319+
@Test
320+
public void testInitRequestHeadersForS3WithDuplicateValues() throws IOException {
321+
Configuration conf = new Configuration();
322+
conf.set(CUSTOM_HEADERS_S3, "header1=duplicate;duplicate");
323+
324+
Assertions.assertThat(createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3)
325+
.headers().get("header1"))
326+
.describedAs("S3 client 'header1' header value")
327+
.isEqualTo(Lists.newArrayList("duplicate", "duplicate"));
328+
}
204329
}

0 commit comments

Comments
 (0)