Skip to content

Commit 5494e91

Browse files
authored
[FLINK-38198] Support WITH Clause in CREATE FUNCTION Statement (#26874)
1 parent cae5fb4 commit 5494e91

File tree

21 files changed

+699
-131
lines changed

21 files changed

+699
-131
lines changed

docs/content.zh/docs/dev/table/sql/create.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,7 @@ CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
863863
[IF NOT EXISTS] [[catalog_name.]db_name.]function_name
864864
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
865865
[USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
866+
[WITH (key1=val1, key2=val2, ...)]
866867
```
867868

868869
创建一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个 identifier ,可指定 language tag 。 若 catalog 中,已经有同名的函数注册了,则无法注册。

docs/content/docs/dev/table/sql/create.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,7 @@ CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
860860
[IF NOT EXISTS] [catalog_name.][db_name.]function_name
861861
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
862862
[USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
863+
[WITH (key1=val1, key2=val2, ...)]
863864
```
864865

865866
Create a catalog function that has catalog and database namespaces with the identifier and optional language tag. If a function with the same name already exists in the catalog, an exception is thrown.

flink-python/pyflink/table/catalog.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,16 @@ def get_function_language(self):
11181118
"""
11191119
return self._j_catalog_function.getFunctionLanguage()
11201120

1121+
def get_options(self) -> Dict[str, str]:
1122+
"""
1123+
Returns a map of string-based options.
1124+
1125+
:return: Property map of the function.
1126+
1127+
.. versionadded:: 2.2.0
1128+
"""
1129+
return dict(self._j_catalog_function.getOptions())
1130+
11211131

11221132
@PublicEvolving()
11231133
class CatalogModel(object):

flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
396396
boolean ifNotExists = false;
397397
boolean isSystemFunction = false;
398398
SqlNodeList resourceInfos = SqlNodeList.EMPTY;
399+
SqlNodeList propertyList = SqlNodeList.EMPTY;
399400
SqlParserPos functionLanguagePos = null;
400401
}
401402
{
@@ -466,6 +467,10 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
466467
)*
467468
{ resourceInfos = new SqlNodeList(resourceList, s.pos()); }
468469
]
470+
[
471+
<WITH>
472+
propertyList = Properties()
473+
]
469474
{
470475
return new SqlCreateFunction(
471476
s.pos(),
@@ -475,7 +480,8 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
475480
ifNotExists,
476481
isTemporary,
477482
isSystemFunction,
478-
resourceInfos);
483+
resourceInfos,
484+
propertyList);
479485
}
480486
}
481487

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateFunction.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.sql.parser.ddl;
2020

21+
import org.apache.flink.sql.parser.SqlUnparseUtils;
22+
2123
import org.apache.calcite.sql.SqlCharStringLiteral;
2224
import org.apache.calcite.sql.SqlCreate;
2325
import org.apache.calcite.sql.SqlIdentifier;
@@ -53,6 +55,7 @@ public class SqlCreateFunction extends SqlCreate {
5355
private final boolean isSystemFunction;
5456

5557
private final SqlNodeList resourceInfos;
58+
private final SqlNodeList propertyList;
5659

5760
public SqlCreateFunction(
5861
SqlParserPos pos,
@@ -62,14 +65,16 @@ public SqlCreateFunction(
6265
boolean ifNotExists,
6366
boolean isTemporary,
6467
boolean isSystemFunction,
65-
SqlNodeList resourceInfos) {
68+
SqlNodeList resourceInfos,
69+
SqlNodeList propertyList) {
6670
super(OPERATOR, pos, false, ifNotExists);
6771
this.functionIdentifier = requireNonNull(functionIdentifier);
6872
this.functionClassName = requireNonNull(functionClassName);
6973
this.isSystemFunction = isSystemFunction;
7074
this.isTemporary = isTemporary;
7175
this.functionLanguage = functionLanguage;
7276
this.resourceInfos = resourceInfos;
77+
this.propertyList = requireNonNull(propertyList);
7378
}
7479

7580
@Override
@@ -103,7 +108,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
103108
writer.keyword("LANGUAGE");
104109
writer.keyword(functionLanguage);
105110
}
106-
if (resourceInfos.size() > 0) {
111+
if (!resourceInfos.isEmpty()) {
107112
writer.keyword("USING");
108113
SqlWriter.Frame withFrame = writer.startList("", "");
109114
for (SqlNode resourcePath : resourceInfos) {
@@ -112,6 +117,16 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
112117
}
113118
writer.endList(withFrame);
114119
}
120+
if (!this.propertyList.isEmpty()) {
121+
writer.keyword("WITH");
122+
SqlWriter.Frame withFrame = writer.startList("(", ")");
123+
for (SqlNode property : propertyList) {
124+
SqlUnparseUtils.printIndent(writer);
125+
property.unparse(writer, leftPrec, rightPrec);
126+
}
127+
writer.newlineAndIndent();
128+
writer.endList(withFrame);
129+
}
115130
}
116131

117132
public boolean isIfNotExists() {
@@ -141,4 +156,8 @@ public String[] getFunctionIdentifier() {
141156
public List<SqlNode> getResourceInfos() {
142157
return resourceInfos.getList();
143158
}
159+
160+
public SqlNodeList getPropertyList() {
161+
return propertyList;
162+
}
144163
}

flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2491,6 +2491,20 @@ void testCreateFunction() {
24912491
+ "Was expecting:\n"
24922492
+ " \"JAR\" ...\n"
24932493
+ " .*");
2494+
2495+
sql("create function function1 as 'org.apache.flink.function.function1' language java using jar 'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
2496+
.ok(
2497+
"CREATE FUNCTION `FUNCTION1` AS 'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar' WITH (\n"
2498+
+ " 'k1' = 'v1',\n"
2499+
+ " 'k2' = 'v2'\n"
2500+
+ ")");
2501+
2502+
sql("create temporary function function1 as 'org.apache.flink.function.function1' language java using jar 'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
2503+
.ok(
2504+
"CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 'file:///path/to/test.jar' WITH (\n"
2505+
+ " 'k1' = 'v1',\n"
2506+
+ " 'k2' = 'v2'\n"
2507+
+ ")");
24942508
}
24952509

24962510
@Test
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.api;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.table.catalog.CatalogFunction;
23+
import org.apache.flink.table.catalog.FunctionLanguage;
24+
import org.apache.flink.table.functions.UserDefinedFunction;
25+
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
26+
import org.apache.flink.table.resource.ResourceUri;
27+
import org.apache.flink.util.Preconditions;
28+
29+
import java.util.ArrayList;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
/**
36+
* Describes a {@link CatalogFunction}.
37+
*
38+
* <p>A {@link FunctionDescriptor} is a template for creating a {@link CatalogFunction} instance. It
39+
* closely resembles the "CREATE FUNCTION" SQL DDL statement.
40+
*
41+
* <p>This can be used to register a table in the Table API, see {@link
42+
* TableEnvironment#createFunction(String, FunctionDescriptor)}.
43+
*/
44+
@PublicEvolving
45+
public class FunctionDescriptor {
46+
47+
private final String className;
48+
private final FunctionLanguage language;
49+
private final List<ResourceUri> resourceUris;
50+
private final Map<String, String> options;
51+
52+
private FunctionDescriptor(
53+
String className,
54+
FunctionLanguage language,
55+
List<ResourceUri> resourceUris,
56+
Map<String, String> options) {
57+
this.className = className;
58+
this.language = language;
59+
this.resourceUris = resourceUris;
60+
this.options = options;
61+
}
62+
63+
/** Creates a {@link Builder} for a function descriptor with the given class name. */
64+
public static Builder forClassName(String className) {
65+
return new Builder(className);
66+
}
67+
68+
/** Creates a {@link Builder} for a function descriptor for the given function class. */
69+
public static Builder forFunctionClass(Class<? extends UserDefinedFunction> functionClass) {
70+
try {
71+
UserDefinedFunctionHelper.validateClass(functionClass);
72+
} catch (Throwable t) {
73+
throw new ValidationException(
74+
String.format(
75+
"Can not create a function '%s' due to implementation errors.",
76+
functionClass.getName()),
77+
t);
78+
}
79+
return new Builder(functionClass.getName()).language(FunctionLanguage.JAVA);
80+
}
81+
82+
public String getClassName() {
83+
return className;
84+
}
85+
86+
public FunctionLanguage getLanguage() {
87+
return language;
88+
}
89+
90+
public List<ResourceUri> getResourceUris() {
91+
return resourceUris;
92+
}
93+
94+
public Map<String, String> getOptions() {
95+
return options;
96+
}
97+
98+
/** Builder for {@link FunctionDescriptor}. */
99+
@PublicEvolving
100+
public static final class Builder {
101+
private final String className;
102+
private FunctionLanguage language = FunctionLanguage.JAVA;
103+
private final List<ResourceUri> resourceUris = new ArrayList<>();
104+
private final Map<String, String> options = new HashMap<>();
105+
106+
private Builder(String className) {
107+
this.className = className;
108+
}
109+
110+
/**
111+
* Sets the language of the function. Equivalent to the {@code LANGUAGE} clause in the
112+
* "CREATE FUNCTION" SQL DDL statement.
113+
*/
114+
public Builder language(FunctionLanguage language) {
115+
Preconditions.checkNotNull(language, "Function language must not be null.");
116+
this.language = language;
117+
return this;
118+
}
119+
120+
/**
121+
* Adds a list of resource URIs to the function descriptor. Equivalent to the {@code USING}
122+
* clause in the "CREATE FUNCTION" SQL DDL statement.
123+
*/
124+
public Builder resourceUris(List<ResourceUri> uri) {
125+
Preconditions.checkNotNull(uri, "Resource URIs must not be null.");
126+
this.resourceUris.addAll(uri);
127+
return this;
128+
}
129+
130+
/**
131+
* Adds a single resource URI to the function descriptor. Equivalent to the {@code USING}
132+
* clause in the "CREATE FUNCTION" SQL DDL statement.
133+
*/
134+
public Builder resourceUri(ResourceUri uri) {
135+
Preconditions.checkNotNull(uri, "Resource URI must not be null.");
136+
this.resourceUris.add(uri);
137+
return this;
138+
}
139+
140+
/**
141+
* Adds an option to the function descriptor. Equivalent to the {@code WITH} clause in the
142+
* "CREATE FUNCTION" SQL DDL statement.
143+
*/
144+
public Builder option(String key, String value) {
145+
Preconditions.checkNotNull(key, "Option key must not be null.");
146+
Preconditions.checkNotNull(value, "Option value must not be null.");
147+
this.options.put(key, value);
148+
return this;
149+
}
150+
151+
/**
152+
* Adds multiple options to the function descriptor. Equivalent to the {@code WITH} clause
153+
* in the "CREATE FUNCTION" SQL DDL statement.
154+
*/
155+
public Builder options(Map<String, String> options) {
156+
Preconditions.checkNotNull(options, "Options must not be null.");
157+
this.options.putAll(options);
158+
return this;
159+
}
160+
161+
public FunctionDescriptor build() {
162+
return new FunctionDescriptor(
163+
className,
164+
language,
165+
Collections.unmodifiableList(resourceUris),
166+
Collections.unmodifiableMap(options));
167+
}
168+
}
169+
}

0 commit comments

Comments
 (0)