Skip to content

Commit 2bfbb69

Browse files
committed
[feature](maxcompute)support maxcompute ram_role_arn and ecs_ram_role.
1 parent bad06d3 commit 2bfbb69

File tree

10 files changed

+139
-68
lines changed

10 files changed

+139
-68
lines changed

be/src/runtime/descriptors.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,13 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
310310
_init_status =
311311
Status::InvalidArgument("fail to init MaxComputeTableDescriptor, missing quota.");
312312
}
313+
314+
if (tdesc.mcTable.__isset.properties) {
315+
_props = tdesc.mcTable.properties;
316+
} else {
317+
_init_status = Status::InvalidArgument(
318+
"fail to init MaxComputeTableDescriptor, missing properties.");
319+
}
313320
}
314321

315322
MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;

be/src/runtime/descriptors.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,18 +245,20 @@ class MaxComputeTableDescriptor : public TableDescriptor {
245245
std::string endpoint() const { return _endpoint; }
246246
std::string quota() const { return _quota; }
247247
Status init_status() const { return _init_status; }
248+
std::map<std::string, std::string> properties() const { return _props; }
248249

249250
private:
250251
std::string _region; //deprecated
251252
std::string _project;
252253
std::string _table;
253-
std::string _odps_url; //deprecated
254-
std::string _tunnel_url; //deprecated
255-
std::string _access_key;
256-
std::string _secret_key;
254+
std::string _odps_url; //deprecated
255+
std::string _tunnel_url; //deprecated
256+
std::string _access_key; //deprecated
257+
std::string _secret_key; //deprecated
257258
std::string _public_access; //deprecated
258259
std::string _endpoint;
259260
std::string _quota;
261+
std::map<std::string, std::string> _props;
260262
Status _init_status = Status::OK();
261263
};
262264

be/src/vec/exec/format/table/max_compute_jni_reader.cpp

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,27 +62,31 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
6262
}
6363
index++;
6464
}
65-
std::map<String, String> params = {
66-
{"access_key", _table_desc->access_key()},
67-
{"secret_key", _table_desc->secret_key()},
68-
{"endpoint", _table_desc->endpoint()},
69-
{"quota", _table_desc->quota()},
70-
{"project", _table_desc->project()},
71-
{"table", _table_desc->table()},
7265

73-
{"session_id", _max_compute_params.session_id},
74-
{"scan_serializer", _max_compute_params.table_batch_read_session},
66+
auto properties = _table_desc->properties();
67+
for (auto [a, b] : properties) {
68+
std::cout << "" << a << ":" << b << std::endl;
69+
}
70+
71+
properties["endpoint"] = _table_desc->endpoint();
72+
properties["quota"] = _table_desc->quota();
73+
properties["project"] = _table_desc->project();
74+
properties["table"] = _table_desc->table();
75+
76+
properties["session_id"] = _max_compute_params.session_id;
77+
properties["scan_serializer"] = _max_compute_params.table_batch_read_session;
78+
79+
properties["start_offset"] = std::to_string(_range.start_offset);
80+
properties["split_size"] = std::to_string(_range.size);
81+
properties["required_fields"] = required_fields.str();
82+
properties["columns_types"] = columns_types.str();
7583

76-
{"start_offset", std::to_string(_range.start_offset)},
77-
{"split_size", std::to_string(_range.size)},
78-
{"required_fields", required_fields.str()},
79-
{"columns_types", columns_types.str()},
84+
properties["connect_timeout"] = std::to_string(_max_compute_params.connect_timeout);
85+
properties["read_timeout"] = std::to_string(_max_compute_params.read_timeout);
86+
properties["retry_count"] = std::to_string(_max_compute_params.retry_times);
8087

81-
{"connect_timeout", std::to_string(_max_compute_params.connect_timeout)},
82-
{"read_timeout", std::to_string(_max_compute_params.read_timeout)},
83-
{"retry_count", std::to_string(_max_compute_params.retry_times)}};
8488
_jni_connector = std::make_unique<JniConnector>(
85-
"org/apache/doris/maxcompute/MaxComputeJniScanner", params, column_names);
89+
"org/apache/doris/maxcompute/MaxComputeJniScanner", properties, column_names);
8690
}
8791

8892
Status MaxComputeJniReader::init_reader() {

fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
import org.apache.doris.common.jni.JniScanner;
2121
import org.apache.doris.common.jni.vec.ColumnType;
22+
import org.apache.doris.common.maxcompute.MCUtils;
2223

2324
import com.aliyun.odps.Odps;
24-
import com.aliyun.odps.account.Account;
25-
import com.aliyun.odps.account.AliyunAccount;
2625
import com.aliyun.odps.table.configuration.CompressionCodec;
2726
import com.aliyun.odps.table.configuration.ReaderOptions;
2827
import com.aliyun.odps.table.configuration.RestOptions;
@@ -120,8 +119,6 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
120119
}
121120
}
122121

123-
String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'.");
124-
String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'.");
125122
String endpoint = Objects.requireNonNull(params.get(ENDPOINT), "required property '" + ENDPOINT + "'.");
126123
String quota = Objects.requireNonNull(params.get(QUOTA), "required property '" + QUOTA + "'.");
127124
String scanSerializer = Objects.requireNonNull(params.get(SCAN_SERIALIZER),
@@ -137,10 +134,7 @@ public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
137134
timeZone = ZoneId.systemDefault();
138135
}
139136

140-
141-
Account account = new AliyunAccount(accessKey, secretKey);
142-
Odps odps = new Odps(account);
143-
137+
Odps odps = MCUtils.createMcClient(params);
144138
odps.setDefaultProject(project);
145139
odps.setEndpoint(endpoint);
146140

fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java renamed to fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
package org.apache.doris.datasource.property.constants;
19-
20-
import org.apache.doris.common.credentials.CloudCredential;
21-
22-
import java.util.Map;
18+
package org.apache.doris.common.maxcompute;
2319

2420
/**
2521
* properties for aliyun max compute
2622
*/
27-
public class MCProperties extends BaseProperties {
23+
public class MCProperties {
2824

2925
//To be compatible with previous versions of the catalog.
3026
public static final String REGION = "mc.region";
@@ -99,7 +95,12 @@ public class MCProperties extends BaseProperties {
9995
public static final String ENABLE_NAMESPACE_SCHEMA = "mc.enable.namespace.schema";
10096
public static final String DEFAULT_ENABLE_NAMESPACE_SCHEMA = "false";
10197

102-
public static CloudCredential getCredential(Map<String, String> props) {
103-
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
104-
}
98+
public static final String AUTH_TYPE = "mc.auth.type";
99+
public static final String AUTH_TYPE_AK_SK = "ak_sk";
100+
public static final String AUTH_TYPE_RAM_ROLE_ARN = "ram_role_arn";
101+
public static final String AUTH_TYPE_ECS_RAM_ROLE = "ecs_ram_role";
102+
public static final String DEFAULT_AUTH_TYPE = AUTH_TYPE_AK_SK;
103+
104+
public static final String RAM_ROLE_ARN = "mc.ram_role_arn";
105+
public static final String ECS_RAM_ROLE = "mc.ecs_ram_role";
105106
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.common.maxcompute;
19+
20+
import com.aliyun.auth.credentials.Credential;
21+
import com.aliyun.auth.credentials.provider.EcsRamRoleCredentialProvider;
22+
import com.aliyun.auth.credentials.provider.RamRoleArnCredentialProvider;
23+
import com.aliyun.odps.Odps;
24+
import com.aliyun.odps.account.Account;
25+
import com.aliyun.odps.account.AklessAccount;
26+
import com.aliyun.odps.account.AliyunAccount;
27+
28+
import java.util.Map;
29+
30+
public class MCUtils {
31+
public static void checkAuthProperties(Map<String, String> properties) {
32+
String authType = properties.getOrDefault(MCProperties.AUTH_TYPE, MCProperties.DEFAULT_AUTH_TYPE);
33+
if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_AK_SK)) {
34+
if (!properties.containsKey(MCProperties.ACCESS_KEY) || !properties.containsKey(MCProperties.SECRET_KEY)) {
35+
throw new RuntimeException("Missing access key or secret key for AK/SK auth type");
36+
}
37+
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_RAM_ROLE_ARN)) {
38+
if (!properties.containsKey(MCProperties.ACCESS_KEY) || !properties.containsKey(MCProperties.SECRET_KEY)
39+
|| !properties.containsKey(MCProperties.RAM_ROLE_ARN)) {
40+
throw new RuntimeException("Missing access key, secret key or role arn for RAM Role ARN auth type");
41+
}
42+
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_ECS_RAM_ROLE)) {
43+
if (!properties.containsKey(MCProperties.ECS_RAM_ROLE)) {
44+
throw new RuntimeException("Missing role name for ECS RAM Role auth type");
45+
}
46+
} else {
47+
throw new RuntimeException("Unsupported auth type: " + authType);
48+
}
49+
}
50+
51+
public static Odps createMcClient(Map<String, String> properties) {
52+
String authType = properties.getOrDefault(MCProperties.AUTH_TYPE, MCProperties.DEFAULT_AUTH_TYPE);
53+
if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_AK_SK)) {
54+
String accessKey = properties.get(MCProperties.ACCESS_KEY);
55+
String secretKey = properties.get(MCProperties.SECRET_KEY);
56+
Account account = new AliyunAccount(accessKey, secretKey);
57+
return new Odps(account);
58+
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_RAM_ROLE_ARN)) {
59+
String accessKey = properties.get(MCProperties.ACCESS_KEY);
60+
String secretKey = properties.get(MCProperties.SECRET_KEY);
61+
String roleArn = properties.get(MCProperties.RAM_ROLE_ARN);
62+
RamRoleArnCredentialProvider ramRoleArnCredentialProvider =
63+
RamRoleArnCredentialProvider.builder().credential(
64+
Credential.builder().accessKeyId(accessKey)
65+
.accessKeySecret(secretKey).build())
66+
.roleArn(roleArn).build();
67+
AklessAccount aklessAccount = new AklessAccount(ramRoleArnCredentialProvider);
68+
return new Odps(aklessAccount);
69+
} else if (authType.equalsIgnoreCase(MCProperties.AUTH_TYPE_ECS_RAM_ROLE)) {
70+
String roleName = properties.get(MCProperties.ECS_RAM_ROLE);
71+
EcsRamRoleCredentialProvider credentialProvider = EcsRamRoleCredentialProvider.create(roleName);
72+
AklessAccount aklessAccount = new AklessAccount(credentialProvider);
73+
return new Odps(aklessAccount);
74+
} else {
75+
throw new RuntimeException("Unsupported auth type: " + authType);
76+
}
77+
}
78+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,16 @@
1919

2020

2121
import org.apache.doris.common.DdlException;
22-
import org.apache.doris.common.credentials.CloudCredential;
22+
import org.apache.doris.common.maxcompute.MCProperties;
23+
import org.apache.doris.common.maxcompute.MCUtils;
2324
import org.apache.doris.datasource.CatalogProperty;
2425
import org.apache.doris.datasource.ExternalCatalog;
2526
import org.apache.doris.datasource.InitCatalogLog;
2627
import org.apache.doris.datasource.SessionContext;
27-
import org.apache.doris.datasource.property.constants.MCProperties;
2828

2929
import com.aliyun.odps.Odps;
3030
import com.aliyun.odps.Partition;
31-
import com.aliyun.odps.account.Account;
3231
import com.aliyun.odps.account.AccountFormat;
33-
import com.aliyun.odps.account.AliyunAccount;
3432
import com.aliyun.odps.table.TableIdentifier;
3533
import com.aliyun.odps.table.configuration.RestOptions;
3634
import com.aliyun.odps.table.configuration.SplitOptions;
@@ -54,9 +52,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
5452
// you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints
5553
private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
5654

55+
private Map<String, String> props;
5756
private Odps odps;
58-
private String accessKey;
59-
private String secretKey;
6057
private String endpoint;
6158
private String defaultProject;
6259
private String quota;
@@ -158,7 +155,7 @@ protected void generatorEndpoint() {
158155

159156
@Override
160157
protected void initLocalObjectsImpl() {
161-
Map<String, String> props = catalogProperty.getProperties();
158+
props = catalogProperty.getProperties();
162159

163160
generatorEndpoint();
164161

@@ -198,16 +195,11 @@ protected void initLocalObjectsImpl() {
198195
.withReadTimeout(readTimeout)
199196
.withRetryTimes(retryTimes).build();
200197

201-
CloudCredential credential = MCProperties.getCredential(props);
202-
accessKey = credential.getAccessKey();
203-
secretKey = credential.getSecretKey();
204-
205198
dateTimePredicatePushDown = Boolean.parseBoolean(
206199
props.getOrDefault(MCProperties.DATETIME_PREDICATE_PUSH_DOWN,
207200
MCProperties.DEFAULT_DATETIME_PREDICATE_PUSH_DOWN));
208201

209-
Account account = new AliyunAccount(accessKey, secretKey);
210-
this.odps = new Odps(account);
202+
odps = MCUtils.createMcClient(props);
211203
odps.setDefaultProject(defaultProject);
212204
odps.setEndpoint(endpoint);
213205

@@ -288,14 +280,9 @@ public List<String> listTableNames(SessionContext ctx, String dbName) {
288280
return mcStructureHelper.listTableNames(getClient(), dbName);
289281
}
290282

291-
public String getAccessKey() {
292-
makeSureInitialized();
293-
return accessKey;
294-
}
295-
296-
public String getSecretKey() {
283+
public Map<String, String> getProperties() {
297284
makeSureInitialized();
298-
return secretKey;
285+
return props;
299286
}
300287

301288
public String getEndpoint() {
@@ -449,10 +436,6 @@ public void checkProperties() throws DdlException {
449436
+ MCProperties.READ_TIMEOUT + "/" + MCProperties.RETRY_COUNT + "must be an integer");
450437
}
451438

452-
CloudCredential credential = MCProperties.getCredential(props);
453-
if (!credential.isWhole()) {
454-
throw new DdlException("Max-Compute credential properties '"
455-
+ MCProperties.ACCESS_KEY + "' and '" + MCProperties.SECRET_KEY + "' are required.");
456-
}
439+
MCUtils.checkAuthProperties(props);
457440
}
458441
}

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,9 @@ public TTableDescriptor toThrift() {
322322
TMCTable tMcTable = new TMCTable();
323323
MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog);
324324

325-
tMcTable.setAccessKey(mcCatalog.getAccessKey());
326-
tMcTable.setSecretKey(mcCatalog.getSecretKey());
325+
tMcTable.setAccessKey("deprecated");
326+
tMcTable.setSecretKey("deprecated");
327+
tMcTable.setProperties(mcCatalog.getProperties());
327328
tMcTable.setOdpsUrl("deprecated");
328329
tMcTable.setRegion("deprecated");
329330
tMcTable.setEndpoint(mcCatalog.getEndpoint());

fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@
3333
import org.apache.doris.catalog.TableIf;
3434
import org.apache.doris.common.AnalysisException;
3535
import org.apache.doris.common.UserException;
36+
import org.apache.doris.common.maxcompute.MCProperties;
3637
import org.apache.doris.common.util.LocationPath;
3738
import org.apache.doris.datasource.FileQueryScanNode;
3839
import org.apache.doris.datasource.TableFormatType;
3940
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
4041
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
4142
import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
42-
import org.apache.doris.datasource.property.constants.MCProperties;
4343
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
4444
import org.apache.doris.nereids.util.DateUtils;
4545
import org.apache.doris.planner.PlanNodeId;

gensrc/thrift/Descriptors.thrift

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,13 +442,14 @@ struct TMCTable {
442442
1: optional string region // deprecated
443443
2: optional string project
444444
3: optional string table
445-
4: optional string access_key
446-
5: optional string secret_key
445+
4: optional string access_key // deprecated
446+
5: optional string secret_key // deprecated
447447
6: optional string public_access // deprecated
448448
7: optional string odps_url // deprecated
449449
8: optional string tunnel_url // deprecated
450450
9: optional string endpoint
451451
10: optional string quota
452+
11: optional map<string, string> properties // contains authentication properties
452453
}
453454

454455
struct TTrinoConnectorTable {

0 commit comments

Comments
 (0)