Skip to content

Commit 6b72d14

Browse files
ZhitongYanLeo YanejeffrliAbdulR3hman
authored
Add Query Federation Support with Substrait Integration and Credential Federation for JDBC Based Connectors (#3199)
Co-authored-by: Leo Yan <leozy@amazon.com> Co-authored-by: ejeffrli <144148373+ejeffrli@users.noreply.github.com> Co-authored-by: AbdulRehman <ar.al-faraj@outlook.com>
1 parent 136b460 commit 6b72d14

File tree

22 files changed

+2161
-191
lines changed

22 files changed

+2161
-191
lines changed

athena-cloudera-hive/src/main/java/com/amazonaws/athena/connectors/cloudera/HiveMetadataHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.arrow.vector.types.pojo.Schema;
5757
import org.slf4j.Logger;
5858
import org.slf4j.LoggerFactory;
59+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
5960
import software.amazon.awssdk.services.athena.AthenaClient;
6061
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
6162

@@ -298,7 +299,7 @@ private String encodeContinuationToken(int partition)
298299
* @throws Exception An Exception should be thrown for database connection failures , query syntax errors and so on.
299300
*/
300301
@Override
301-
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema) throws Exception
302+
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema, AwsRequestOverrideConfiguration requestOverrideConfiguration) throws Exception
302303
{
303304
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
304305
try (ResultSet resultSet = getColumns(jdbcConnection.getCatalog(), tableName, jdbcConnection.getMetaData());

athena-cloudera-impala/src/main/java/com/amazonaws/athena/connectors/cloudera/ImpalaMetadataHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.arrow.vector.types.pojo.Schema;
5656
import org.slf4j.Logger;
5757
import org.slf4j.LoggerFactory;
58+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
5859
import software.amazon.awssdk.services.athena.AthenaClient;
5960
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
6061

@@ -287,7 +288,7 @@ private String encodeContinuationToken(int partition)
287288
* @throws Exception An Exception should be thrown for database connection failures , query syntax errors and so on.
288289
*/
289290
@Override
290-
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema) throws Exception
291+
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema, AwsRequestOverrideConfiguration requestOverrideConfiguration) throws Exception
291292
{
292293
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();
293294
try (ResultSet resultSet = getColumns(jdbcConnection.getCatalog(), tableName, jdbcConnection.getMetaData());

athena-datalakegen2/src/main/java/com/amazonaws/athena/connectors/datalakegen2/DataLakeGen2MetadataHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
*
10+
*
1111
* http://www.apache.org/licenses/LICENSE-2.0
12-
*
12+
*
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -58,6 +58,7 @@
5858
import org.apache.arrow.vector.types.pojo.Schema;
5959
import org.slf4j.Logger;
6060
import org.slf4j.LoggerFactory;
61+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
6162
import software.amazon.awssdk.services.athena.AthenaClient;
6263
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
6364

@@ -216,7 +217,7 @@ protected Optional<ArrowType> convertDatasourceTypeToArrow(int columnIndex, int
216217
* @throws Exception
217218
*/
218219
@Override
219-
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema)
220+
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema, AwsRequestOverrideConfiguration requestOverrideConfiguration)
220221
throws Exception
221222
{
222223
LOGGER.info("Inside getSchema");
@@ -251,7 +252,7 @@ protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schem
251252
}
252253

253254
String environment = DataLakeGen2Util.checkEnvironment(jdbcConnection.getMetaData().getURL());
254-
255+
255256
if (DataLakeGen2Constants.SQL_POOL.equalsIgnoreCase(environment)) {
256257
// getColumns() method from SQL Server driver is causing an exception in case of Azure Serverless environment.
257258
// so doing explicit data type conversion

athena-db2-as400/src/main/java/com/amazonaws/athena/connectors/db2as400/Db2As400MetadataHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.arrow.vector.types.pojo.Schema;
5656
import org.slf4j.Logger;
5757
import org.slf4j.LoggerFactory;
58+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
5859
import software.amazon.awssdk.services.athena.AthenaClient;
5960
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
6061

@@ -396,7 +397,7 @@ private List<String> getTableList(final Connection connection, String query, Str
396397
* @throws Exception
397398
*/
398399
@Override
399-
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema)
400+
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema, AwsRequestOverrideConfiguration requestOverrideConfiguration)
400401
throws Exception
401402
{
402403
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();

athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.arrow.vector.types.pojo.Schema;
6666
import org.slf4j.Logger;
6767
import org.slf4j.LoggerFactory;
68+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
6869
import software.amazon.awssdk.services.athena.AthenaClient;
6970
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
7071

@@ -461,7 +462,7 @@ private List<String> getTableList(final Connection connection, String schemaName
461462
* @throws Exception
462463
*/
463464
@Override
464-
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema)
465+
protected Schema getSchema(Connection jdbcConnection, TableName tableName, Schema partitionSchema, AwsRequestOverrideConfiguration requestOverrideConfiguration)
465466
throws Exception
466467
{
467468
String typeName;
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*-
2+
* #%L
3+
* athena-jdbc
4+
* %%
5+
* Copyright (C) 2019 - 2025 Amazon Web Services
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package com.amazonaws.athena.connector.substrait;
21+
22+
import org.apache.arrow.vector.types.FloatingPointPrecision;
23+
import org.apache.arrow.vector.types.pojo.ArrowType;
24+
import org.apache.arrow.vector.types.pojo.Field;
25+
import org.apache.arrow.vector.types.pojo.Schema;
26+
import org.apache.calcite.sql.SqlDynamicParam;
27+
import org.apache.calcite.sql.SqlIdentifier;
28+
import org.apache.calcite.sql.SqlLiteral;
29+
import org.apache.calcite.sql.SqlNode;
30+
import org.apache.calcite.sql.type.SqlTypeName;
31+
import org.apache.calcite.sql.util.SqlShuttle;
32+
import org.apache.calcite.util.NlsString;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.util.List;
37+
import java.util.Map;
38+
39+
public class SubstraitAccumulatorVisitor extends SqlShuttle
40+
{
41+
private static final Logger LOGGER = LoggerFactory.getLogger(SubstraitAccumulatorVisitor.class);
42+
43+
private final List<SubstraitTypeAndValue> accumulator;
44+
private final Map<String, String> splitProperties;
45+
private final Schema schema;
46+
private String currentColumn;
47+
48+
public SubstraitAccumulatorVisitor(final List<SubstraitTypeAndValue> accumulator, final Map<String, String> splitProperties, final Schema schema)
49+
{
50+
this.accumulator = accumulator;
51+
this.splitProperties = splitProperties;
52+
this.schema = schema;
53+
}
54+
55+
@Override
56+
public SqlNode visit(final SqlIdentifier id)
57+
{
58+
if (id.isSimple()) {
59+
currentColumn = id.getSimple();
60+
}
61+
return super.visit(id);
62+
}
63+
64+
@Override
65+
public SqlNode visit(final SqlLiteral literal)
66+
{
67+
if (currentColumn == null) {
68+
// such as LIMIT
69+
LOGGER.info("literal value {} doesn't have an associated column. skipping", literal.toValue());
70+
return literal;
71+
}
72+
73+
Field arrowField = schema.findField(currentColumn);
74+
75+
if (arrowField == null) {
76+
throw new IllegalArgumentException("field " + currentColumn + " not found in " + schema.getFields());
77+
}
78+
79+
final SqlTypeName typeName = mapArrowTypeToSqlTypeName(arrowField.getType());
80+
if (literal.getValue() instanceof NlsString) {
81+
accumulator.add(new SubstraitTypeAndValue(typeName, ((NlsString) literal.getValue()).getValue(), currentColumn));
82+
}
83+
else {
84+
accumulator.add(new SubstraitTypeAndValue(typeName, literal.getValue(), currentColumn));
85+
}
86+
return new SqlDynamicParam(0, literal.getParserPosition());
87+
}
88+
89+
private SqlTypeName mapArrowTypeToSqlTypeName(final ArrowType arrowType)
90+
{
91+
if (arrowType instanceof ArrowType.Int) {
92+
final int bitWidth = ((ArrowType.Int) arrowType).getBitWidth();
93+
if (bitWidth <= 32) {
94+
return SqlTypeName.INTEGER;
95+
}
96+
return SqlTypeName.BIGINT;
97+
}
98+
else if (arrowType instanceof ArrowType.FloatingPoint) {
99+
final ArrowType.FloatingPoint fp = (ArrowType.FloatingPoint) arrowType;
100+
if (fp.getPrecision() == FloatingPointPrecision.SINGLE) {
101+
return SqlTypeName.FLOAT;
102+
}
103+
return SqlTypeName.DOUBLE;
104+
}
105+
else if (arrowType instanceof ArrowType.Null) {
106+
return SqlTypeName.NULL;
107+
}
108+
else if (arrowType instanceof ArrowType.Utf8 || arrowType instanceof ArrowType.LargeUtf8) {
109+
return SqlTypeName.VARCHAR;
110+
}
111+
else if (arrowType instanceof ArrowType.Bool) {
112+
return SqlTypeName.BOOLEAN;
113+
}
114+
else if (arrowType instanceof ArrowType.Decimal) {
115+
return SqlTypeName.DECIMAL;
116+
}
117+
else if (arrowType instanceof ArrowType.Date) {
118+
return SqlTypeName.DATE;
119+
}
120+
else if (arrowType instanceof ArrowType.Time) {
121+
return SqlTypeName.TIME;
122+
}
123+
else if (arrowType instanceof ArrowType.Timestamp) {
124+
return SqlTypeName.TIMESTAMP;
125+
}
126+
else if (arrowType instanceof ArrowType.Binary || arrowType instanceof ArrowType.LargeBinary) {
127+
return SqlTypeName.VARBINARY;
128+
}
129+
else {
130+
return SqlTypeName.VARCHAR;
131+
}
132+
}
133+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*-
2+
* #%L
3+
* athena-jdbc
4+
* %%
5+
* Copyright (C) 2019 - 2023 Amazon Web Services
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package com.amazonaws.athena.connector.substrait;
21+
22+
import org.apache.calcite.sql.type.SqlTypeName;
23+
import org.apache.commons.lang3.Validate;
24+
25+
public class SubstraitTypeAndValue
26+
{
27+
private final SqlTypeName type;
28+
private final Object value;
29+
private final String columnName;
30+
31+
public SubstraitTypeAndValue(final SqlTypeName type, final Object value, final String columnName)
32+
{
33+
this.type = Validate.notNull(type, "type is null");
34+
this.value = Validate.notNull(value, "value is null");
35+
this.columnName = Validate.notNull(columnName, "value is null");
36+
}
37+
38+
public SqlTypeName getType()
39+
{
40+
return type;
41+
}
42+
43+
public Object getValue()
44+
{
45+
return value;
46+
}
47+
48+
public String getColumnName()
49+
{
50+
return columnName;
51+
}
52+
53+
@Override
54+
public String toString()
55+
{
56+
return "TypeAndValue{" +
57+
"type=" + type +
58+
", value=" + value +
59+
'}';
60+
}
61+
}

0 commit comments

Comments
 (0)