Skip to content

Commit 80e26ca

Browse files
authored
[FLINK-37924][table]Introduce Built-in Function to Access field or element in the Variant (#27330)
1 parent 00e4c8f commit 80e26ca

File tree

11 files changed

+635
-18
lines changed

11 files changed

+635
-18
lines changed

docs/data/sql_functions.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,16 @@ variant:
11671167
parser will keep the last occurrence of all fields with the same key, otherwise when
11681168
`allowDuplicateKeys` is false it will throw an error. The default value of
11691169
`allowDuplicateKeys` is false.
1170+
- sql: variant '[' INT ']'
1171+
table: VARIANT.at(INT)
1172+
description: |
1173+
If the VARIANT is an ARRAY value, returns a VARIANT whose value is the element at
1174+
the specified index. The index starts from 1, If the index is out of range, it returns NULL.
1175+
- sql: variant '[' STRING ']'
1176+
table: VARIANT.at(STRING)
1177+
description: |
1178+
If the VARIANT is a MAP value that has an element with this key, a VARIANT holding
1179+
the associated value is returned. Otherwise, NULL is returned.
11701180
11711181
valueconstruction:
11721182
- sql: |

docs/data/sql_functions_zh.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,6 @@ variant:
12411241
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
12421242
allowDuplicateKeys 的值为 false。
12431243
1244-
12451244
- sql: TRY_PARSE_JSON(json_string[, allow_duplicate_keys])
12461245
description: |
12471246
尽可能将 JSON 字符串解析为 Variant。如果 JSON 字符串无效,则返回 NULL。如果希望抛出错误而不是返回 NULL,
@@ -1251,6 +1250,16 @@ variant:
12511250
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
12521251
allowDuplicateKeys 的值为 false。
12531252
1253+
- sql: variant '[' INT ']'
1254+
table: VARIANT.at(INT)
1255+
description: |
1256+
如果这是一个 ARRAY 类型的 VARIANT,则返回一个 VARIANT,其值为指定索引处的元素。索引从 1 开始,如果索引超出范围,则返回 NULL。
1257+
1258+
- sql: variant '[' STRING ']'
1259+
table: VARIANT.at(STRING)
1260+
description: |
1261+
如果这是一个 MAP 类型的 VARIANT,则返回一个 VARIANT,其值为指定 key 对应的值。否则返回 NULL。
1262+
12541263
12551264
valueconstruction:
12561265
- sql: |
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.calcite.sql.fun;
19+
20+
import org.apache.calcite.rel.type.RelDataType;
21+
import org.apache.calcite.rel.type.RelDataTypeFactory;
22+
import org.apache.calcite.rel.type.RelDataTypeField;
23+
import org.apache.calcite.sql.SqlCall;
24+
import org.apache.calcite.sql.SqlCallBinding;
25+
import org.apache.calcite.sql.SqlKind;
26+
import org.apache.calcite.sql.SqlNode;
27+
import org.apache.calcite.sql.SqlOperandCountRange;
28+
import org.apache.calcite.sql.SqlOperatorBinding;
29+
import org.apache.calcite.sql.SqlSpecialOperator;
30+
import org.apache.calcite.sql.SqlWriter;
31+
import org.apache.calcite.sql.parser.SqlParserPos;
32+
import org.apache.calcite.sql.type.OperandTypes;
33+
import org.apache.calcite.sql.type.SqlOperandCountRanges;
34+
import org.apache.calcite.sql.type.SqlSingleOperandTypeChecker;
35+
import org.apache.calcite.sql.type.SqlTypeFamily;
36+
import org.apache.calcite.sql.type.SqlTypeName;
37+
import org.apache.calcite.sql.type.SqlTypeUtil;
38+
39+
import java.util.Arrays;
40+
41+
import static java.util.Objects.requireNonNull;
42+
import static org.apache.calcite.sql.type.NonNullableAccessors.getComponentTypeOrThrow;
43+
import static org.apache.calcite.sql.validate.SqlNonNullableAccessors.getOperandLiteralValueOrThrow;
44+
45+
/**
46+
* The item operator {@code [ ... ]}, used to access a given element of an array, map or struct. For
47+
* example, {@code myArray[3]}, {@code "myMap['foo']"}, {@code myStruct[2]} or {@code
48+
* myStruct['fieldName']}.
49+
*
50+
* <p>This class was copied over from Calcite 1.39.0 version to support access variant
51+
* (FLINK-37924).
52+
*
53+
* <p>Line 148 ~ 153, CALCITE-7325, should be removed after upgrading Calcite to 1.42.0.
54+
*/
55+
public class SqlItemOperator extends SqlSpecialOperator {
56+
public final int offset;
57+
public final boolean safe;
58+
59+
public SqlItemOperator(
60+
String name, SqlSingleOperandTypeChecker operandTypeChecker, int offset, boolean safe) {
61+
super(name, SqlKind.ITEM, 100, true, null, null, operandTypeChecker);
62+
this.offset = offset;
63+
this.safe = safe;
64+
}
65+
66+
@Override
67+
public ReduceResult reduceExpr(int ordinal, TokenSequence list) {
68+
SqlNode left = list.node(ordinal - 1);
69+
SqlNode right = list.node(ordinal + 1);
70+
return new ReduceResult(
71+
ordinal - 1,
72+
ordinal + 2,
73+
createCall(
74+
SqlParserPos.sum(
75+
Arrays.asList(
76+
requireNonNull(left, "left").getParserPosition(),
77+
requireNonNull(right, "right").getParserPosition(),
78+
list.pos(ordinal))),
79+
left,
80+
right));
81+
}
82+
83+
@Override
84+
public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
85+
call.operand(0).unparse(writer, leftPrec, 0);
86+
final SqlWriter.Frame frame = writer.startList("[", "]");
87+
if (!this.getName().equals("ITEM")) {
88+
final SqlWriter.Frame offsetFrame = writer.startFunCall(this.getName());
89+
call.operand(1).unparse(writer, 0, 0);
90+
writer.endFunCall(offsetFrame);
91+
} else {
92+
call.operand(1).unparse(writer, 0, 0);
93+
}
94+
writer.endList(frame);
95+
}
96+
97+
@Override
98+
public SqlOperandCountRange getOperandCountRange() {
99+
return SqlOperandCountRanges.of(2);
100+
}
101+
102+
@Override
103+
public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
104+
final SqlNode left = callBinding.operand(0);
105+
final SqlNode right = callBinding.operand(1);
106+
if (!getOperandTypeChecker().checkSingleOperandType(callBinding, left, 0, throwOnFailure)) {
107+
return false;
108+
}
109+
final SqlSingleOperandTypeChecker checker = getChecker(callBinding);
110+
return checker.checkSingleOperandType(callBinding, right, 0, throwOnFailure);
111+
}
112+
113+
@Override
114+
public SqlSingleOperandTypeChecker getOperandTypeChecker() {
115+
return (SqlSingleOperandTypeChecker)
116+
requireNonNull(super.getOperandTypeChecker(), "operandTypeChecker");
117+
}
118+
119+
private static SqlSingleOperandTypeChecker getChecker(SqlCallBinding callBinding) {
120+
final RelDataType operandType = callBinding.getOperandType(0);
121+
switch (operandType.getSqlTypeName()) {
122+
case ARRAY:
123+
return OperandTypes.family(SqlTypeFamily.INTEGER);
124+
case MAP:
125+
RelDataType keyType =
126+
requireNonNull(operandType.getKeyType(), "operandType.getKeyType()");
127+
SqlTypeName sqlTypeName = keyType.getSqlTypeName();
128+
return OperandTypes.family(
129+
requireNonNull(
130+
sqlTypeName.getFamily(),
131+
() ->
132+
"keyType.getSqlTypeName().getFamily() null, type is "
133+
+ sqlTypeName));
134+
case ROW:
135+
case ANY:
136+
case DYNAMIC_STAR:
137+
case VARIANT:
138+
return OperandTypes.family(SqlTypeFamily.INTEGER)
139+
.or(OperandTypes.family(SqlTypeFamily.CHARACTER));
140+
default:
141+
throw callBinding.newValidationSignatureError();
142+
}
143+
}
144+
145+
@Override
146+
public String getAllowedSignatures(String name) {
147+
if (name.equals("ITEM")) {
148+
// FLINK MODIFICATION BEGIN
149+
return "<ARRAY>[<INTEGER>]\n"
150+
+ "<MAP>[<ANY>]\n"
151+
+ "<ROW>[<CHARACTER>|<INTEGER>]\n"
152+
+ "<VARIANT>[<CHARACTER>|<INTEGER>]";
153+
// FLINK MODIFICATION END
154+
} else {
155+
return "<ARRAY>[" + name + "(<INTEGER>)]";
156+
}
157+
}
158+
159+
@Override
160+
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
161+
final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
162+
final RelDataType operandType = opBinding.getOperandType(0);
163+
switch (operandType.getSqlTypeName()) {
164+
case VARIANT:
165+
// Return type is always nullable VARIANT
166+
return typeFactory.createTypeWithNullability(operandType, true);
167+
case ARRAY:
168+
return typeFactory.createTypeWithNullability(
169+
getComponentTypeOrThrow(operandType), true);
170+
case MAP:
171+
return typeFactory.createTypeWithNullability(
172+
requireNonNull(
173+
operandType.getValueType(),
174+
() -> "operandType.getValueType() is null for " + operandType),
175+
true);
176+
case ROW:
177+
RelDataType fieldType;
178+
RelDataType indexType = opBinding.getOperandType(1);
179+
180+
if (SqlTypeUtil.isString(indexType)) {
181+
final String fieldName =
182+
getOperandLiteralValueOrThrow(opBinding, 1, String.class);
183+
RelDataTypeField field = operandType.getField(fieldName, false, false);
184+
if (field == null) {
185+
throw new AssertionError(
186+
"Cannot infer type of field '"
187+
+ fieldName
188+
+ "' within ROW type: "
189+
+ operandType);
190+
} else {
191+
fieldType = field.getType();
192+
}
193+
} else if (SqlTypeUtil.isIntType(indexType)) {
194+
Integer index = opBinding.getOperandLiteralValue(1, Integer.class);
195+
if (index == null || index < 1 || index > operandType.getFieldCount()) {
196+
throw new AssertionError(
197+
"Cannot infer type of field at position "
198+
+ index
199+
+ " within ROW type: "
200+
+ operandType);
201+
} else {
202+
fieldType =
203+
operandType.getFieldList().get(index - 1).getType(); // 1 indexed
204+
}
205+
} else {
206+
throw new AssertionError(
207+
"Unsupported field identifier type: '" + indexType + "'");
208+
}
209+
if (operandType.isNullable()) {
210+
fieldType = typeFactory.createTypeWithNullability(fieldType, true);
211+
}
212+
return fieldType;
213+
case ANY:
214+
case DYNAMIC_STAR:
215+
return typeFactory.createTypeWithNullability(
216+
typeFactory.createSqlType(SqlTypeName.ANY), true);
217+
default:
218+
throw new AssertionError();
219+
}
220+
}
221+
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2455,7 +2455,8 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
24552455
sequence(
24562456
or(
24572457
logical(LogicalTypeRoot.ARRAY),
2458-
logical(LogicalTypeRoot.MAP)),
2458+
logical(LogicalTypeRoot.MAP),
2459+
logical(LogicalTypeRoot.VARIANT)),
24592460
InputTypeStrategies.ITEM_AT_INDEX))
24602461
.outputTypeStrategy(SpecificTypeStrategies.ITEM_AT)
24612462
.build();

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtIndexArgumentTypeStrategy.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@
4141
* or {@link LogicalTypeRoot#MULTISET}
4242
*
4343
* <p>the type to be equal to the key type of {@link LogicalTypeRoot#MAP} if the first argument is a
44-
* map.
44+
* map
45+
*
46+
* <p>a {@link LogicalTypeFamily#NUMERIC} type or {@link LogicalTypeFamily#CHARACTER_STRING} type if
47+
* the first argument is a {@link LogicalTypeRoot#VARIANT}.
4548
*/
4649
@Internal
4750
public final class ItemAtIndexArgumentTypeStrategy implements ArgumentTypeStrategy {
@@ -86,12 +89,36 @@ public Optional<DataType> inferArgumentType(
8689
}
8790
}
8891

92+
if (collectionType.is(LogicalTypeRoot.VARIANT)) {
93+
if (indexType.getLogicalType().is(LogicalTypeFamily.INTEGER_NUMERIC)) {
94+
95+
if (callContext.isArgumentLiteral(1)) {
96+
Optional<Integer> literalVal = callContext.getArgumentValue(1, Integer.class);
97+
if (literalVal.isPresent() && literalVal.get() <= 0) {
98+
return callContext.fail(
99+
throwOnFailure,
100+
"The provided index must be a valid SQL index starting from 1, but was '%s'",
101+
literalVal.get());
102+
}
103+
}
104+
105+
return Optional.of(indexType);
106+
} else if (indexType.getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) {
107+
return Optional.of(indexType);
108+
} else {
109+
return callContext.fail(
110+
throwOnFailure,
111+
"Incorrect type %s supplied for the variant value. Variant values can only be accessed with a CHARACTER STRING map key or an INTEGER NUMERIC array index.",
112+
indexType.getLogicalType().toString());
113+
}
114+
}
115+
89116
return Optional.empty();
90117
}
91118

92119
@Override
93120
public Signature.Argument getExpectedArgument(
94121
FunctionDefinition functionDefinition, int argumentPos) {
95-
return Signature.Argument.of("[<INTEGER NUMERIC> | <MAP_KEY_TYPE>]");
122+
return Signature.Argument.of("[<CHARACTER STRING> | <INTEGER NUMERIC> | <MAP_KEY_TYPE>]");
96123
}
97124
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtTypeStrategy.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
23+
import org.apache.flink.table.types.AtomicDataType;
2324
import org.apache.flink.table.types.CollectionDataType;
2425
import org.apache.flink.table.types.DataType;
2526
import org.apache.flink.table.types.KeyValueDataType;
@@ -33,27 +34,30 @@
3334
/**
3435
* An output type strategy for {@link BuiltInFunctionDefinitions#AT}.
3536
*
36-
* <p>Returns either the element of an {@link LogicalTypeFamily#COLLECTION} type or the value of
37-
* {@link LogicalTypeRoot#MAP}.
37+
* <p>Returns either the element of an {@link LogicalTypeFamily#COLLECTION} type, the value of
38+
* {@link LogicalTypeRoot#MAP}, or another {@link LogicalTypeRoot#VARIANT} value obtained by
39+
* accessing the input {@link LogicalTypeRoot#VARIANT}.
3840
*/
3941
@Internal
4042
public final class ItemAtTypeStrategy implements TypeStrategy {
4143
@Override
4244
public Optional<DataType> inferType(CallContext callContext) {
4345

44-
DataType arrayOrMapType = callContext.getArgumentDataTypes().get(0);
46+
DataType containerType = callContext.getArgumentDataTypes().get(0);
4547
final Optional<DataType> legacyArrayElement =
46-
StrategyUtils.extractLegacyArrayElement(arrayOrMapType);
48+
StrategyUtils.extractLegacyArrayElement(containerType);
4749

4850
if (legacyArrayElement.isPresent()) {
4951
return legacyArrayElement;
5052
}
5153

52-
if (arrayOrMapType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
54+
if (containerType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
5355
return Optional.of(
54-
((CollectionDataType) arrayOrMapType).getElementDataType().nullable());
55-
} else if (arrayOrMapType instanceof KeyValueDataType) {
56-
return Optional.of(((KeyValueDataType) arrayOrMapType).getValueDataType().nullable());
56+
((CollectionDataType) containerType).getElementDataType().nullable());
57+
} else if (containerType instanceof KeyValueDataType) {
58+
return Optional.of(((KeyValueDataType) containerType).getValueDataType().nullable());
59+
} else if (containerType.getLogicalType().is(LogicalTypeRoot.VARIANT)) {
60+
return Optional.of(((AtomicDataType) containerType).nullable());
5761
}
5862

5963
return Optional.empty();

0 commit comments

Comments
 (0)