Skip to content

Commit fd7e2a1

Browse files
authored
[FLINK-38682][table-planner] Support unknown -> RAW cast during type inference in limited validator scope
1 parent 7de3392 commit fd7e2a1

File tree

2 files changed

+91
-5
lines changed

2 files changed

+91
-5
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package org.apache.flink.table.planner.functions.inference;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.sql.parser.type.SqlRawTypeNameSpec;
2223
import org.apache.flink.table.api.ValidationException;
2324
import org.apache.flink.table.catalog.DataTypeFactory;
2425
import org.apache.flink.table.functions.FunctionDefinition;
2526
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
27+
import org.apache.flink.table.planner.plan.schema.RawRelDataType;
2628
import org.apache.flink.table.types.DataType;
2729
import org.apache.flink.table.types.inference.ArgumentCount;
2830
import org.apache.flink.table.types.inference.CallContext;
@@ -32,16 +34,20 @@
3234
import org.apache.flink.table.types.inference.TypeInference;
3335
import org.apache.flink.table.types.inference.TypeInferenceUtil;
3436
import org.apache.flink.table.types.logical.LogicalType;
37+
import org.apache.flink.table.types.logical.RawType;
3538
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
3639

3740
import org.apache.calcite.rel.type.RelDataType;
3841
import org.apache.calcite.rel.type.RelDataTypeFactory;
3942
import org.apache.calcite.rel.type.StructKind;
4043
import org.apache.calcite.sql.SqlCallBinding;
44+
import org.apache.calcite.sql.SqlDataTypeSpec;
4145
import org.apache.calcite.sql.SqlKind;
46+
import org.apache.calcite.sql.SqlLiteral;
4247
import org.apache.calcite.sql.SqlNode;
4348
import org.apache.calcite.sql.SqlOperandCountRange;
4449
import org.apache.calcite.sql.SqlOperator;
50+
import org.apache.calcite.sql.SqlTypeNameSpec;
4551
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
4652
import org.apache.calcite.sql.parser.SqlParserPos;
4753
import org.apache.calcite.sql.type.SqlOperandMetadata;
@@ -240,10 +246,28 @@ private void insertImplicitCasts(SqlCallBinding callBinding, List<DataType> expe
240246

241247
/** Adopted from {@link org.apache.calcite.sql.validate.implicit.AbstractTypeCoercion}. */
242248
private SqlNode castTo(SqlNode node, RelDataType type) {
243-
return SqlStdOperatorTable.CAST.createCall(
244-
SqlParserPos.ZERO,
245-
node,
246-
SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable()));
249+
final SqlDataTypeSpec dataType;
250+
if (type instanceof RawRelDataType) {
251+
dataType = createRawDataTypeSpec((RawRelDataType) type);
252+
} else {
253+
dataType = SqlTypeUtil.convertTypeToSpec(type).withNullable(type.isNullable());
254+
}
255+
256+
return SqlStdOperatorTable.CAST.createCall(SqlParserPos.ZERO, node, dataType);
257+
}
258+
259+
private SqlDataTypeSpec createRawDataTypeSpec(RawRelDataType type) {
260+
final RawType<?> rawType = type.getRawType();
261+
262+
SqlNode className =
263+
SqlLiteral.createCharString(
264+
rawType.getOriginatingClass().getName(), SqlParserPos.ZERO);
265+
SqlNode serializer =
266+
SqlLiteral.createCharString(rawType.getSerializerString(), SqlParserPos.ZERO);
267+
268+
SqlTypeNameSpec rawSpec = new SqlRawTypeNameSpec(className, serializer, SqlParserPos.ZERO);
269+
270+
return new SqlDataTypeSpec(rawSpec, null, type.isNullable(), SqlParserPos.ZERO);
247271
}
248272

249273
/** Adopted from {@link org.apache.calcite.sql.validate.implicit.AbstractTypeCoercion}. */

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package org.apache.flink.table.planner.runtime.batch.sql;
2020

21+
import org.apache.flink.table.annotation.DataTypeHint;
2122
import org.apache.flink.table.api.Table;
23+
import org.apache.flink.table.functions.ScalarFunction;
2224
import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
2325
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
2426
import org.apache.flink.types.Row;
@@ -99,6 +101,49 @@ void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception {
99101
testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
100102
}
101103

104+
@Test
105+
void testOrderByScopeRawTypeCast() throws Exception {
106+
final List<Row> sourceData = List.of(Row.of(1), Row.of(2), Row.of(3), Row.of(4), Row.of(5));
107+
TestCollectionTableFactory.reset();
108+
TestCollectionTableFactory.initData(sourceData);
109+
110+
tEnv().executeSql("CREATE TABLE Source(i INT) WITH ('connector' = 'COLLECTION')");
111+
tEnv().executeSql("CREATE TABLE Sink(i INT) WITH ('connector' = 'COLLECTION')");
112+
113+
tEnv().createTemporarySystemFunction("CustomIntUdf", new CustomIntUdf());
114+
115+
tEnv().executeSql(
116+
"INSERT INTO Sink"
117+
+ " SELECT i FROM Source"
118+
+ " ORDER BY CustomIntUdf(NULL)")
119+
.await();
120+
121+
assertThat(TestCollectionTableFactory.getResult()).hasSize(5);
122+
}
123+
124+
@Test
125+
void testHavingScopeRawTypeCast() throws Exception {
126+
final List<Row> sourceData = List.of(Row.of(1), Row.of(2), Row.of(3), Row.of(4), Row.of(5));
127+
TestCollectionTableFactory.reset();
128+
TestCollectionTableFactory.initData(sourceData);
129+
130+
tEnv().executeSql("CREATE TABLE Source(i INT) WITH ('connector' = 'COLLECTION')");
131+
tEnv().executeSql("CREATE TABLE Sink(i INT) WITH ('connector' = 'COLLECTION')");
132+
133+
tEnv().createTemporarySystemFunction("CustomIntUdf", new CustomIntUdf());
134+
135+
tEnv().executeSql(
136+
"INSERT INTO Sink"
137+
+ " SELECT SUM(i) AS s FROM Source"
138+
+ " HAVING CustomIntUdf(NULL) = 0")
139+
.await();
140+
141+
assertThat(TestCollectionTableFactory.getResult())
142+
.singleElement()
143+
.asString()
144+
.contains("15");
145+
}
146+
102147
private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL)
103148
throws Exception {
104149
List<Row> sourceData =
@@ -123,7 +168,7 @@ private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String
123168
Table t2 = tEnv().sqlQuery(query);
124169
t2.executeInsert("t2").await();
125170

126-
List<Row> result = TestCollectionTableFactory.RESULT();
171+
List<Row> result = TestCollectionTableFactory.getResult();
127172
List<Row> expected =
128173
Arrays.asList(
129174
Row.of(1, "jark"),
@@ -139,4 +184,21 @@ private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String
139184
// delete the function
140185
tEnv().executeSql(dropFunctionDDL);
141186
}
187+
188+
// ----- Test types / UDF -----
189+
190+
@DataTypeHint(value = "RAW", bridgedTo = CustomInt.class)
191+
public static class CustomInt {
192+
public Integer value;
193+
194+
public CustomInt(Integer v) {
195+
this.value = v;
196+
}
197+
}
198+
199+
public static class CustomIntUdf extends ScalarFunction {
200+
public Integer eval(CustomInt v) {
201+
return 0;
202+
}
203+
}
142204
}

0 commit comments

Comments
 (0)