Skip to content

Commit fb2a560

Browse files
authored
feat: implement Tabular SQL UDFs (#18511)
* feat: Implement UDTF * feat: udtf supports return types conversion * chore: codefmt * feat: impl UDTF Args replace * chore: codefmt * chore: codefmt * chore: rebase * chore: rebase
1 parent ff04924 commit fb2a560

File tree

16 files changed

+535
-1
lines changed

16 files changed

+535
-1
lines changed

src/meta/app/src/principal/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ pub use user_defined_function::UDFDefinition;
114114
pub use user_defined_function::UDFScript;
115115
pub use user_defined_function::UDFServer;
116116
pub use user_defined_function::UserDefinedFunction;
117+
pub use user_defined_function::UDTF;
117118
pub use user_grant::GrantEntry;
118119
pub use user_grant::GrantObject;
119120
pub use user_grant::UserGrantSet;

src/meta/app/src/principal/user_defined_function.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ pub struct UDFScript {
5151
pub immutable: Option<bool>,
5252
}
5353

54+
/// User Defined Table Function (UDTF)
55+
///
56+
/// # Fields
57+
/// - `arg_types`: arg name with data type
58+
/// - `return_types`: return column name with data type
59+
/// - `sql`: SQL implementing the UDTF
60+
#[derive(Clone, Debug, Eq, PartialEq)]
61+
pub struct UDTF {
62+
pub arg_types: Vec<(String, DataType)>,
63+
pub return_types: Vec<(String, DataType)>,
64+
pub sql: String,
65+
}
66+
5467
#[derive(Clone, Debug, Eq, PartialEq)]
5568
pub struct UDAFScript {
5669
pub code: String,
@@ -71,6 +84,7 @@ pub enum UDFDefinition {
7184
UDFServer(UDFServer),
7285
UDFScript(UDFScript),
7386
UDAFScript(UDAFScript),
87+
UDTF(UDTF),
7488
}
7589

7690
impl UDFDefinition {
@@ -80,6 +94,7 @@ impl UDFDefinition {
8094
Self::UDFServer(_) => "UDFServer",
8195
Self::UDFScript(_) => "UDFScript",
8296
Self::UDAFScript(_) => "UDAFScript",
97+
Self::UDTF(_) => "UDTF",
8398
}
8499
}
85100

@@ -88,13 +103,15 @@ impl UDFDefinition {
88103
Self::LambdaUDF(_) => false,
89104
Self::UDFServer(_) => false,
90105
Self::UDFScript(_) => false,
106+
Self::UDTF(_) => false,
91107
Self::UDAFScript(_) => true,
92108
}
93109
}
94110

95111
pub fn language(&self) -> &str {
96112
match self {
97113
Self::LambdaUDF(_) => "SQL",
114+
Self::UDTF(_) => "SQL",
98115
Self::UDFServer(x) => x.language.as_str(),
99116
Self::UDFScript(x) => x.language.as_str(),
100117
Self::UDAFScript(x) => x.language.as_str(),
@@ -292,6 +309,26 @@ impl Display for UDFDefinition {
292309
}
293310
write!(f, " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?;
294311
}
312+
UDFDefinition::UDTF(UDTF {
313+
arg_types,
314+
return_types,
315+
sql,
316+
}) => {
317+
for (i, (name, ty)) in arg_types.iter().enumerate() {
318+
if i > 0 {
319+
write!(f, ", ")?;
320+
}
321+
write!(f, "{name} {ty}")?;
322+
}
323+
write!(f, ") RETURNS (")?;
324+
for (i, (name, ty)) in return_types.iter().enumerate() {
325+
if i > 0 {
326+
write!(f, ", ")?;
327+
}
328+
write!(f, "{name} {ty}")?;
329+
}
330+
write!(f, ") AS $${sql}$$")?;
331+
}
295332
}
296333
Ok(())
297334
}

src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_common_expression::TableDataType;
2020
use databend_common_expression::TableField;
2121
use databend_common_meta_app::principal as mt;
2222
use databend_common_protos::pb;
23+
use databend_common_protos::pb::UdtfArg;
2324

2425
use crate::reader_check_msg;
2526
use crate::FromToProto;
@@ -275,6 +276,87 @@ impl FromToProto for mt::UDAFScript {
275276
}
276277
}
277278

279+
impl FromToProto for mt::UDTF {
280+
type PB = pb::Udtf;
281+
282+
fn get_pb_ver(p: &Self::PB) -> u64 {
283+
p.ver
284+
}
285+
286+
fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
287+
where Self: Sized {
288+
reader_check_msg(p.ver, p.min_reader_ver)?;
289+
290+
let mut arg_types = Vec::new();
291+
for arg_ty in p.arg_types {
292+
let ty_pb = arg_ty.ty.ok_or_else(|| {
293+
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
294+
})?;
295+
let ty = TableDataType::from_pb(ty_pb)?;
296+
297+
arg_types.push((arg_ty.name, (&ty).into()));
298+
}
299+
300+
let mut return_types = Vec::new();
301+
for return_ty in p.return_types {
302+
let ty_pb = return_ty.ty.ok_or_else(|| {
303+
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
304+
})?;
305+
let ty = TableDataType::from_pb(ty_pb)?;
306+
307+
return_types.push((return_ty.name, (&ty).into()));
308+
}
309+
310+
Ok(Self {
311+
arg_types,
312+
return_types,
313+
sql: p.sql,
314+
})
315+
}
316+
317+
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
318+
let mut arg_types = Vec::with_capacity(self.arg_types.len());
319+
for (arg_name, arg_type) in self.arg_types.iter() {
320+
let arg_type = infer_schema_type(arg_type)
321+
.map_err(|e| {
322+
Incompatible::new(format!(
323+
"Convert DataType to TableDataType failed: {}",
324+
e.message()
325+
))
326+
})?
327+
.to_pb()?;
328+
arg_types.push(UdtfArg {
329+
name: arg_name.clone(),
330+
ty: Some(arg_type),
331+
});
332+
}
333+
334+
let mut return_types = Vec::with_capacity(self.return_types.len());
335+
for (return_name, return_type) in self.return_types.iter() {
336+
let return_type = infer_schema_type(return_type)
337+
.map_err(|e| {
338+
Incompatible::new(format!(
339+
"Convert DataType to TableDataType failed: {}",
340+
e.message()
341+
))
342+
})?
343+
.to_pb()?;
344+
return_types.push(UdtfArg {
345+
name: return_name.clone(),
346+
ty: Some(return_type),
347+
});
348+
}
349+
350+
Ok(pb::Udtf {
351+
ver: VER,
352+
min_reader_ver: MIN_READER_VER,
353+
arg_types,
354+
return_types,
355+
sql: self.sql.clone(),
356+
})
357+
}
358+
}
359+
278360
impl FromToProto for mt::UserDefinedFunction {
279361
type PB = pb::UserDefinedFunction;
280362
fn get_pb_ver(p: &Self::PB) -> u64 {
@@ -295,6 +377,9 @@ impl FromToProto for mt::UserDefinedFunction {
295377
Some(pb::user_defined_function::Definition::UdafScript(udaf_script)) => {
296378
mt::UDFDefinition::UDAFScript(mt::UDAFScript::from_pb(udaf_script)?)
297379
}
380+
Some(pb::user_defined_function::Definition::Udtf(udtf)) => {
381+
mt::UDFDefinition::UDTF(mt::UDTF::from_pb(udtf)?)
382+
}
298383
None => {
299384
return Err(Incompatible::new(
300385
"UserDefinedFunction.definition cannot be None".to_string(),
@@ -327,6 +412,9 @@ impl FromToProto for mt::UserDefinedFunction {
327412
mt::UDFDefinition::UDAFScript(udaf_script) => {
328413
pb::user_defined_function::Definition::UdafScript(udaf_script.to_pb()?)
329414
}
415+
mt::UDFDefinition::UDTF(udtf) => {
416+
pb::user_defined_function::Definition::Udtf(udtf.to_pb()?)
417+
}
330418
};
331419

332420
Ok(pb::UserDefinedFunction {

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
172172
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
173173
(141, "2025-08-06: Add: row_access.proto"),
174174
(142, "2025-08-15: Add: table_meta add row_access_policy"),
175+
(143, "2025-08-18: Add: add UDTF"),
175176
// Dear developer:
176177
// If you're gonna add a new metadata version, you'll have to add a test for it.
177178
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,4 @@ mod v139_add_grant_ownership_object_sequence;
134134
mod v140_task_message;
135135
mod v141_row_access_policy;
136136
mod v142_table_row_access_policy;
137+
mod v143_udtf;
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use chrono::DateTime;
16+
use chrono::Utc;
17+
use databend_common_expression::types::DataType;
18+
use databend_common_meta_app::principal::UDFDefinition;
19+
use databend_common_meta_app::principal::UserDefinedFunction;
20+
use databend_common_meta_app::principal::UDTF;
21+
use fastrace::func_name;
22+
23+
use crate::common;
24+
25+
// These bytes are built when a new version in introduced,
26+
// and are kept for backward compatibility test.
27+
//
28+
// *************************************************************
29+
// * These messages should never be updated, *
30+
// * only be added when a new version is added, *
31+
// * or be removed when an old version is no longer supported. *
32+
// *************************************************************
33+
//
34+
// The message bytes are built from the output of `test_pb_from_to()`
35+
#[test]
36+
fn test_decode_v143_udtf() -> anyhow::Result<()> {
37+
let bytes = vec![
38+
10, 9, 116, 101, 115, 116, 95, 117, 100, 116, 102, 18, 21, 84, 104, 105, 115, 32, 105, 115,
39+
32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 66, 79, 10, 16, 10, 2,
40+
99, 49, 18, 10, 146, 2, 0, 160, 6, 143, 1, 168, 6, 24, 10, 16, 10, 2, 99, 50, 18, 10, 138,
41+
2, 0, 160, 6, 143, 1, 168, 6, 24, 18, 16, 10, 2, 99, 51, 18, 10, 170, 2, 0, 160, 6, 143, 1,
42+
168, 6, 24, 26, 16, 115, 101, 108, 101, 99, 116, 32, 42, 32, 102, 114, 111, 109, 32, 116,
43+
49, 160, 6, 143, 1, 168, 6, 24, 42, 23, 50, 48, 50, 51, 45, 49, 50, 45, 49, 53, 32, 48, 49,
44+
58, 50, 54, 58, 48, 57, 32, 85, 84, 67, 160, 6, 143, 1, 168, 6, 24,
45+
];
46+
47+
let want = || UserDefinedFunction {
48+
name: "test_udtf".to_string(),
49+
description: "This is a description".to_string(),
50+
definition: UDFDefinition::UDTF(UDTF {
51+
arg_types: vec![(s("c1"), DataType::String), (s("c2"), DataType::Boolean)],
52+
return_types: vec![(s("c3"), DataType::Date)],
53+
sql: "select * from t1".to_string(),
54+
}),
55+
created_on: DateTime::<Utc>::from_timestamp(1702603569, 0).unwrap(),
56+
};
57+
58+
common::test_pb_from_to(func_name!(), want())?;
59+
common::test_load_old(func_name!(), bytes.as_slice(), 143, want())
60+
}
61+
62+
fn s(ss: impl ToString) -> String {
63+
ss.to_string()
64+
}

src/meta/protos/proto/udf.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,22 @@ message UDAFScript {
6969
repeated string packages = 8;
7070
}
7171

72+
message UDTFArg {
73+
string name = 1;
74+
DataType ty = 2;
75+
}
76+
77+
message UDTF {
78+
uint64 ver = 100;
79+
uint64 min_reader_ver = 101;
80+
81+
// arg name with data type
82+
repeated UDTFArg arg_types = 1;
83+
// return column name with data type
84+
repeated UDTFArg return_types = 2;
85+
string sql = 3;
86+
}
87+
7288
message UserDefinedFunction {
7389
uint64 ver = 100;
7490
uint64 min_reader_ver = 101;
@@ -80,6 +96,7 @@ message UserDefinedFunction {
8096
UDFServer udf_server = 4;
8197
UDFScript udf_script = 6;
8298
UDAFScript udaf_script = 7;
99+
UDTF udtf = 8;
83100
}
84101
// The time udf created.
85102
optional string created_on = 5;

src/query/ast/src/ast/statements/udf.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ pub enum UDFDefinition {
7171
language: String,
7272
runtime_version: String,
7373
},
74+
UDTFSql {
75+
arg_types: Vec<(Identifier, TypeName)>,
76+
return_types: Vec<(Identifier, TypeName)>,
77+
sql: String,
78+
},
7479
}
7580

7681
impl Display for UDFDefinition {
@@ -176,6 +181,23 @@ impl Display for UDFDefinition {
176181
}
177182
write!(f, " ADDRESS = '{address}'")?;
178183
}
184+
UDFDefinition::UDTFSql {
185+
arg_types,
186+
return_types,
187+
sql,
188+
} => {
189+
write!(f, "(")?;
190+
write_comma_separated_list(
191+
f,
192+
arg_types.iter().map(|(name, ty)| format!("{name} {ty}")),
193+
)?;
194+
write!(f, ") RETURNS TABLE (")?;
195+
write_comma_separated_list(
196+
f,
197+
return_types.iter().map(|(name, ty)| format!("{name} {ty}")),
198+
)?;
199+
write!(f, ") AS $$\n{sql}\n$$")?;
200+
}
179201
UDFDefinition::UDAFScript {
180202
arg_types,
181203
state_fields: state_types,

src/query/ast/src/parser/statement.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5049,6 +5049,19 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
50495049
},
50505050
);
50515051

5052+
let udtf = map(
5053+
rule! {
5054+
"(" ~ #comma_separated_list0(udtf_arg) ~ ")"
5055+
~ RETURNS ~ TABLE ~ "(" ~ #comma_separated_list0(udtf_arg) ~ ")"
5056+
~ AS ~ ^#code_string
5057+
},
5058+
|(_, arg_types, _, _, _, _, return_types, _, _, sql)| UDFDefinition::UDTFSql {
5059+
arg_types,
5060+
return_types,
5061+
sql,
5062+
},
5063+
);
5064+
50525065
let udaf = map(
50535066
rule! {
50545067
"(" ~ #comma_separated_list0(type_name) ~ ")"
@@ -5113,10 +5126,14 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
51135126
#lambda_udf: "AS (<parameter>, ...) -> <definition expr>"
51145127
| #udaf: "(<arg_type>, ...) STATE {<state_field>, ...} RETURNS <return_type> LANGUAGE <language> { ADDRESS=<udf_server_address> | AS <language_codes> } "
51155128
| #udf: "(<arg_type>, ...) RETURNS <return_type> LANGUAGE <language> HANDLER=<handler> { ADDRESS=<udf_server_address> | AS <language_codes> } "
5116-
5129+
| #udtf: "(<arg_type>, ...) RETURNS TABLE (<return_type>, ...) AS <sql> }"
51175130
)(i)
51185131
}
51195132

5133+
fn udtf_arg(i: Input) -> IResult<(Identifier, TypeName)> {
5134+
map(rule! { #ident ~ ^#type_name }, |(name, ty)| (name, ty))(i)
5135+
}
5136+
51205137
fn udf_immutable(i: Input) -> IResult<bool> {
51215138
alt((
51225139
value(false, rule! { VOLATILE }),

src/query/ast/tests/it/parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,7 @@ SELECT * from s;"#,
830830
r#"CREATE OR REPLACE FUNCTION isnotempty_test_replace AS(p) -> not(is_null(p)) DESC = 'This is a description';"#,
831831
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
832832
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' HEADERS = ('X-Authorization' = '123') ADDRESS = 'http://0.0.0.0:8815';"#,
833+
r#"CREATE FUNCTION binary_reverse_table () RETURNS TABLE (c1 int) AS $$ select * from binary_reverse $$;"#,
833834
r#"ALTER FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
834835
r#"CREATE OR REPLACE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
835836
r#"CREATE file format my_orc type = orc"#,

0 commit comments

Comments
 (0)