Skip to content

Commit 6078188

Browse files
authored
[chore](param-refactor)Support independent AWS connection timeout settings for each object storage (apache#54882)
… ### What problem does this PR solve? Previously, connector properties such as use_path_style, connection.timeout, and request.timeout were shared across all object storage connectors. When multiple object storages (e.g., OSS, OBS, S3) were configured, their connection-related parameters could overwrite each other due to identical property names. After: Now, each object storage type supports prefixed property names to isolate configurations. For example: ``` # OSS-specific settings oss.connection.timeout=8000 oss.connection.request.timeout=15000 oss.use_path_style=true # OBS-specific settings obs.connection.timeout=12000 obs.connection.request.timeout=20000 obs.use_path_style=false ```
1 parent e999329 commit 6078188

17 files changed

+303
-116
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.doris.datasource.property.storage.StorageProperties;
2323

2424
import lombok.Getter;
25+
import org.apache.commons.lang3.StringUtils;
2526
import org.apache.iceberg.CatalogProperties;
2627
import org.apache.iceberg.catalog.Catalog;
2728

29+
import java.util.HashMap;
2830
import java.util.List;
2931
import java.util.Map;
3032

@@ -71,5 +73,26 @@ protected AbstractIcebergProperties(Map<String, String> props) {
7173
* This field is used to perform metadata operations like creating, querying,
7274
* and deleting Iceberg tables.
7375
*/
74-
public abstract Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList);
76+
public final Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
77+
Map<String, String> catalogProps = new HashMap<>(getOrigProps());
78+
if (StringUtils.isNotBlank(warehouse)) {
79+
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
80+
}
81+
82+
Catalog catalog = initCatalog(catalogName, catalogProps, storagePropertiesList);
83+
84+
if (catalog == null) {
85+
throw new IllegalStateException("Catalog must not be null after initialization.");
86+
}
87+
return catalog;
88+
}
89+
90+
/**
91+
* Subclasses must implement this to create the concrete Catalog instance.
92+
*/
93+
protected abstract Catalog initCatalog(
94+
String catalogName,
95+
Map<String, String> catalogProps,
96+
List<StorageProperties> storagePropertiesList
97+
);
7598
}

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergAliyunDLFMetaStoreProperties.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323

2424
import com.aliyun.datalake.metastore.common.DataLakeConfig;
2525
import org.apache.hadoop.conf.Configuration;
26-
import org.apache.iceberg.CatalogProperties;
2726
import org.apache.iceberg.catalog.Catalog;
2827

29-
import java.util.HashMap;
3028
import java.util.List;
3129
import java.util.Map;
3230

@@ -47,8 +45,8 @@ public String getIcebergCatalogType() {
4745
}
4846

4947
@Override
50-
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
51-
48+
public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
49+
List<StorageProperties> storagePropertiesList) {
5250
DLFCatalog dlfCatalog = new DLFCatalog();
5351
// @see com.aliyun.datalake.metastore.hive.common.utils.ConfigUtils
5452
Configuration conf = new Configuration();
@@ -63,9 +61,7 @@ public Catalog initializeCatalog(String catalogName, List<StorageProperties> sto
6361
conf.set("hive.metastore.type", "dlf");
6462
conf.set("type", "hms");
6563
dlfCatalog.setConf(conf);
66-
Map<String, String> catalogProperties = new HashMap<>(origProps);
67-
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
68-
dlfCatalog.initialize(catalogName, catalogProperties);
64+
dlfCatalog.initialize(catalogName, catalogProps);
6965
return dlfCatalog;
7066
}
7167
}

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,9 @@ public IcebergFileSystemMetaStoreProperties(Map<String, String> props) {
4444
}
4545

4646
@Override
47-
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
47+
public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
48+
List<StorageProperties> storagePropertiesList) {
4849
Configuration configuration = buildConfiguration(storagePropertiesList);
49-
Map<String, String> catalogProps = buildCatalogProps(storagePropertiesList);
50-
5150
HadoopCatalog catalog = new HadoopCatalog();
5251
catalog.setConf(configuration);
5352
try {

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergGlueMetaStoreProperties.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,19 @@ public void initNormalizeAndCheckProps() {
5858
}
5959

6060
@Override
61-
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storageProperties) {
62-
Map<String, String> props = prepareBaseCatalogProps();
63-
appendS3Props(props);
64-
appendGlueProps(props);
65-
props.put("client.region", glueProperties.glueRegion);
66-
61+
public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
62+
List<StorageProperties> storagePropertiesList) {
63+
appendS3Props(catalogProps);
64+
appendGlueProps(catalogProps);
65+
catalogProps.put("client.region", glueProperties.glueRegion);
6766
if (StringUtils.isNotBlank(warehouse)) {
68-
props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
67+
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
6968
} else {
70-
props.put(CatalogProperties.WAREHOUSE_LOCATION, CHECKED_WAREHOUSE);
69+
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, CHECKED_WAREHOUSE);
7170
}
7271

7372
GlueCatalog catalog = new GlueCatalog();
74-
catalog.initialize(catalogName, props);
73+
catalog.initialize(catalogName, catalogProps);
7574
return catalog;
7675
}
7776

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,17 @@ public void initNormalizeAndCheckProps() {
6363
}
6464

6565
@Override
66-
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
66+
public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
67+
List<StorageProperties> storagePropertiesList) {
6768
checkInitialized();
68-
6969
Configuration conf = buildHiveConfiguration(storagePropertiesList);
70-
Map<String, String> catalogProps = buildCatalogProperties();
71-
7270
HiveCatalog hiveCatalog = new HiveCatalog();
7371
hiveCatalog.setConf(conf);
7472
storagePropertiesList.forEach(sp -> {
7573
for (Map.Entry<String, String> entry : sp.getHadoopStorageConfig()) {
7674
catalogProps.put(entry.getKey(), entry.getValue());
7775
}
7876
});
79-
8077
try {
8178
this.executionAuthenticator.execute(() -> hiveCatalog.initialize(catalogName, catalogProps));
8279
return hiveCatalog;

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ public String getIcebergCatalogType() {
161161
}
162162

163163
@Override
164-
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
164+
public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
165+
List<StorageProperties> storagePropertiesList) {
165166
Map<String, String> fileIOProperties = Maps.newHashMap();
166167
Configuration conf = new Configuration();
167168
toFileIOProperties(storagePropertiesList, fileIOProperties, conf);

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergS3TablesMetaStoreProperties.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,9 @@
2222
import org.apache.doris.datasource.property.storage.S3Properties;
2323
import org.apache.doris.datasource.property.storage.StorageProperties;
2424

25-
import org.apache.commons.lang3.StringUtils;
26-
import org.apache.iceberg.CatalogProperties;
2725
import org.apache.iceberg.catalog.Catalog;
2826
import software.amazon.s3tables.iceberg.S3TablesCatalog;
2927

30-
import java.util.HashMap;
3128
import java.util.List;
3229
import java.util.Map;
3330

@@ -51,34 +48,28 @@ public void initNormalizeAndCheckProps() {
5148
}
5249

5350
@Override
54-
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
51+
public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
52+
List<StorageProperties> storagePropertiesList) {
5553
checkInitialized();
5654

57-
Map<String, String> props = buildS3CatalogProperties();
55+
buildS3CatalogProperties(catalogProps);
5856

5957
S3TablesCatalog catalog = new S3TablesCatalog();
6058
try {
61-
catalog.initialize(catalogName, props);
59+
catalog.initialize(catalogName, catalogProps);
6260
return catalog;
6361
} catch (Exception e) {
6462
throw new RuntimeException("Failed to initialize S3TablesCatalog for Iceberg. "
6563
+ "CatalogName=" + catalogName + ", region=" + s3Properties.getRegion(), e);
6664
}
6765
}
6866

69-
private Map<String, String> buildS3CatalogProperties() {
70-
Map<String, String> props = new HashMap<>();
67+
private void buildS3CatalogProperties(Map<String, String> props) {
7168
props.put("client.credentials-provider", CustomAwsCredentialsProvider.class.getName());
7269
props.put("client.credentials-provider.s3.access-key-id", s3Properties.getAccessKey());
7370
props.put("client.credentials-provider.s3.secret-access-key", s3Properties.getSecretKey());
7471
props.put("client.credentials-provider.s3.session-token", s3Properties.getSessionToken());
7572
props.put("client.region", s3Properties.getRegion());
76-
77-
if (StringUtils.isNotBlank(warehouse)) {
78-
props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
79-
}
80-
81-
return props;
8273
}
8374

8475
private void checkInitialized() {

fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java

Lines changed: 7 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818
package org.apache.doris.datasource.property.storage;
1919

2020
import org.apache.doris.common.UserException;
21-
import org.apache.doris.datasource.property.ConnectorProperty;
2221

2322
import com.google.common.base.Strings;
24-
import lombok.Getter;
25-
import lombok.Setter;
2623
import org.apache.commons.lang3.StringUtils;
2724
import org.apache.hadoop.conf.Configuration;
2825
import org.apache.logging.log4j.LogManager;
@@ -50,51 +47,6 @@
5047
*/
5148
public abstract class AbstractS3CompatibleProperties extends StorageProperties implements ObjectStorageProperties {
5249
private static final Logger LOG = LogManager.getLogger(AbstractS3CompatibleProperties.class);
53-
/**
54-
* The maximum number of concurrent connections that can be made to the object storage system.
55-
* This value is optional and can be configured by the user.
56-
*/
57-
@Getter
58-
@ConnectorProperty(names = {"connection.maximum"}, required = false, description = "Maximum number of connections.")
59-
protected String maxConnections = "100";
60-
61-
/**
62-
* The timeout (in milliseconds) for requests made to the object storage system.
63-
* This value is optional and can be configured by the user.
64-
*/
65-
@Getter
66-
@ConnectorProperty(names = {"connection.request.timeout"}, required = false,
67-
description = "Request timeout in seconds.")
68-
protected String requestTimeoutS = "10000";
69-
70-
/**
71-
* The timeout (in milliseconds) for establishing a connection to the object storage system.
72-
* This value is optional and can be configured by the user.
73-
*/
74-
@Getter
75-
@ConnectorProperty(names = {"connection.timeout"}, required = false, description = "Connection timeout in seconds.")
76-
protected String connectionTimeoutS = "10000";
77-
78-
/**
79-
* Flag indicating whether to use path-style URLs for the object storage system.
80-
* This value is optional and can be configured by the user.
81-
*/
82-
@Setter
83-
@Getter
84-
@ConnectorProperty(names = {"use_path_style", "s3.path-style-access"}, required = false,
85-
description = "Whether to use path style URL for the storage.")
86-
protected String usePathStyle = "false";
87-
@ConnectorProperty(names = {"force_parsing_by_standard_uri"}, required = false,
88-
description = "Whether to use path style URL for the storage.")
89-
@Setter
90-
@Getter
91-
protected String forceParsingByStandardUrl = "false";
92-
93-
@Getter
94-
@ConnectorProperty(names = {"s3.session_token", "session_token"},
95-
required = false,
96-
description = "The session token of S3.")
97-
protected String sessionToken = "";
9850

9951
/**
10052
* Constructor to initialize the object storage properties with the provided type and original properties map.
@@ -132,7 +84,8 @@ protected Map<String, String> generateBackendS3Configuration(String maxConnectio
13284
* @return a map containing AWS S3 configuration properties.
13385
*/
13486
protected Map<String, String> generateBackendS3Configuration() {
135-
return doBuildS3Configuration(maxConnections, requestTimeoutS, connectionTimeoutS, usePathStyle);
87+
return doBuildS3Configuration(getMaxConnections(), getRequestTimeoutS(), getConnectionTimeoutS(),
88+
getUsePathStyle());
13689
}
13790

13891
/**
@@ -164,11 +117,11 @@ public Map<String, String> getBackendConfigProperties() {
164117

165118
public AwsCredentialsProvider getAwsCredentialsProvider() {
166119
if (StringUtils.isNotBlank(getAccessKey()) && StringUtils.isNotBlank(getSecretKey())) {
167-
if (Strings.isNullOrEmpty(sessionToken)) {
120+
if (Strings.isNullOrEmpty(getSessionToken())) {
168121
return StaticCredentialsProvider.create(AwsBasicCredentials.create(getAccessKey(), getSecretKey()));
169122
} else {
170123
return StaticCredentialsProvider.create(AwsSessionCredentials.create(getAccessKey(), getSecretKey(),
171-
sessionToken));
124+
getSessionToken()));
172125
}
173126
}
174127
return null;
@@ -207,7 +160,8 @@ protected void setEndpointIfPossible() {
207160
String endpoint = null;
208161
// 1. try getting endpoint from uri
209162
try {
210-
endpoint = S3PropertyUtils.constructEndpointFromUrl(origProps, usePathStyle, forceParsingByStandardUrl);
163+
endpoint = S3PropertyUtils.constructEndpointFromUrl(origProps, getUsePathStyle(),
164+
getForceParsingByStandardUrl());
211165
} catch (Exception e) {
212166
if (LOG.isDebugEnabled()) {
213167
LOG.debug("Failed to construct endpoint from url: " + origProps, e);
@@ -309,7 +263,7 @@ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
309263
hadoopStorageConfig.set("fs.s3a.connection.maximum", getMaxConnections());
310264
hadoopStorageConfig.set("fs.s3a.connection.request.timeout", getRequestTimeoutS());
311265
hadoopStorageConfig.set("fs.s3a.connection.timeout", getConnectionTimeoutS());
312-
hadoopStorageConfig.set("fs.s3a.path.style.access", usePathStyle);
266+
hadoopStorageConfig.set("fs.s3a.path.style.access", getUsePathStyle());
313267
}
314268

315269
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,55 @@ public class COSProperties extends AbstractS3CompatibleProperties {
6363
description = "The secret key of COS.")
6464
protected String secretKey = "";
6565

66+
@Getter
67+
@ConnectorProperty(names = {"cos.session_token", "s3.session_token", "session_token"},
68+
required = false,
69+
description = "The session token of COS.")
70+
protected String sessionToken = "";
71+
72+
/**
73+
* The maximum number of concurrent connections that can be made to the object storage system.
74+
* This value is optional and can be configured by the user.
75+
*/
76+
@Getter
77+
@ConnectorProperty(names = {"cos.connection.maximum", "s3.connection.maximum"}, required = false,
78+
description = "Maximum number of connections.")
79+
protected String maxConnections = "100";
80+
81+
/**
82+
* The timeout (in milliseconds) for requests made to the object storage system.
83+
* This value is optional and can be configured by the user.
84+
*/
85+
@Getter
86+
@ConnectorProperty(names = {"cos.connection.request.timeout", "s3.connection.request.timeout"}, required = false,
87+
description = "Request timeout in seconds.")
88+
protected String requestTimeoutS = "10000";
89+
90+
/**
91+
* The timeout (in milliseconds) for establishing a connection to the object storage system.
92+
* This value is optional and can be configured by the user.
93+
*/
94+
@Getter
95+
@ConnectorProperty(names = {"cos.connection.timeout", "s3.connection.timeout"}, required = false,
96+
description = "Connection timeout in seconds.")
97+
protected String connectionTimeoutS = "10000";
98+
99+
/**
100+
* Flag indicating whether to use path-style URLs for the object storage system.
101+
* This value is optional and can be configured by the user.
102+
*/
103+
@Setter
104+
@Getter
105+
@ConnectorProperty(names = {"cos.use_path_style", "use_path_style", "s3.path-style-access"}, required = false,
106+
description = "Whether to use path style URL for the storage.")
107+
protected String usePathStyle = "false";
108+
109+
@ConnectorProperty(names = {"cos.force_parsing_by_standard_uri", "force_parsing_by_standard_uri"}, required = false,
110+
description = "Whether to use path style URL for the storage.")
111+
@Setter
112+
@Getter
113+
protected String forceParsingByStandardUrl = "false";
114+
66115
/**
67116
* Pattern to extract the region from a Tencent Cloud COS endpoint.
68117
* <p>

0 commit comments

Comments
 (0)