Skip to content

feat: implement Tabular SQL UDFs #18511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub use user_defined_function::UDFDefinition;
pub use user_defined_function::UDFScript;
pub use user_defined_function::UDFServer;
pub use user_defined_function::UserDefinedFunction;
pub use user_defined_function::UDTF;
pub use user_grant::GrantEntry;
pub use user_grant::GrantObject;
pub use user_grant::UserGrantSet;
Expand Down
33 changes: 33 additions & 0 deletions src/meta/app/src/principal/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ pub struct UDFScript {
pub immutable: Option<bool>,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UDTF {
// arg name with data type
pub arg_types: Vec<(String, DataType)>,
// return column name with data type
pub return_types: Vec<(String, DataType)>,
pub sql: String,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UDAFScript {
pub code: String,
Expand All @@ -71,6 +80,7 @@ pub enum UDFDefinition {
UDFServer(UDFServer),
UDFScript(UDFScript),
UDAFScript(UDAFScript),
UDTF(UDTF),
}

impl UDFDefinition {
Expand All @@ -80,6 +90,7 @@ impl UDFDefinition {
Self::UDFServer(_) => "UDFServer",
Self::UDFScript(_) => "UDFScript",
Self::UDAFScript(_) => "UDAFScript",
Self::UDTF(_) => "UDTF",
}
}

Expand All @@ -88,13 +99,15 @@ impl UDFDefinition {
Self::LambdaUDF(_) => false,
Self::UDFServer(_) => false,
Self::UDFScript(_) => false,
Self::UDTF(_) => false,
Self::UDAFScript(_) => true,
}
}

pub fn language(&self) -> &str {
match self {
Self::LambdaUDF(_) => "SQL",
Self::UDTF(_) => "SQL",
Self::UDFServer(x) => x.language.as_str(),
Self::UDFScript(x) => x.language.as_str(),
Self::UDAFScript(x) => x.language.as_str(),
Expand Down Expand Up @@ -292,6 +305,26 @@ impl Display for UDFDefinition {
}
write!(f, " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?;
}
UDFDefinition::UDTF(UDTF {
arg_types,
return_types,
sql,
}) => {
for (i, (name, ty)) in arg_types.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{name} {ty}")?;
}
write!(f, ") RETURNS (")?;
for (i, (name, ty)) in return_types.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{name} {ty}")?;
}
write!(f, ") AS $${sql}$$")?;
}
}
Ok(())
}
Expand Down
86 changes: 86 additions & 0 deletions src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_meta_app::principal as mt;
use databend_common_protos::pb;
use databend_common_protos::pb::UdtfArg;

use crate::reader_check_msg;
use crate::FromToProto;
Expand Down Expand Up @@ -275,6 +276,85 @@ impl FromToProto for mt::UDAFScript {
}
}

impl FromToProto for mt::UDTF {
type PB = pb::Udtf;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
where Self: Sized {
reader_check_msg(p.ver, p.min_reader_ver)?;

let mut arg_types = Vec::new();
for arg_ty in p.arg_types {
let ty = TableDataType::from_pb(arg_ty.ty.ok_or_else(|| {
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
})?)?;

arg_types.push((arg_ty.name, (&ty).into()));
}

let mut return_types = Vec::new();
for return_ty in p.return_types {
let ty = TableDataType::from_pb(return_ty.ty.ok_or_else(|| {
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
})?)?;

return_types.push((return_ty.name, (&ty).into()));
}

Ok(Self {
arg_types,
return_types,
sql: p.sql,
})
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
let mut arg_types = Vec::with_capacity(self.arg_types.len());
for (arg_name, arg_type) in self.arg_types.iter() {
let arg_type = infer_schema_type(arg_type)
.map_err(|e| {
Incompatible::new(format!(
"Convert DataType to TableDataType failed: {}",
e.message()
))
})?
.to_pb()?;
arg_types.push(UdtfArg {
name: arg_name.clone(),
ty: Some(arg_type),
});
}

let mut return_types = Vec::with_capacity(self.return_types.len());
for (return_name, return_type) in self.return_types.iter() {
let return_type = infer_schema_type(return_type)
.map_err(|e| {
Incompatible::new(format!(
"Convert DataType to TableDataType failed: {}",
e.message()
))
})?
.to_pb()?;
return_types.push(UdtfArg {
name: return_name.clone(),
ty: Some(return_type),
});
}

Ok(pb::Udtf {
ver: VER,
min_reader_ver: MIN_READER_VER,
arg_types,
return_types,
sql: self.sql.clone(),
})
}
}

impl FromToProto for mt::UserDefinedFunction {
type PB = pb::UserDefinedFunction;
fn get_pb_ver(p: &Self::PB) -> u64 {
Expand All @@ -295,6 +375,9 @@ impl FromToProto for mt::UserDefinedFunction {
Some(pb::user_defined_function::Definition::UdafScript(udaf_script)) => {
mt::UDFDefinition::UDAFScript(mt::UDAFScript::from_pb(udaf_script)?)
}
Some(pb::user_defined_function::Definition::Udtf(udtf)) => {
mt::UDFDefinition::UDTF(mt::UDTF::from_pb(udtf)?)
}
None => {
return Err(Incompatible::new(
"UserDefinedFunction.definition cannot be None".to_string(),
Expand Down Expand Up @@ -327,6 +410,9 @@ impl FromToProto for mt::UserDefinedFunction {
mt::UDFDefinition::UDAFScript(udaf_script) => {
pb::user_defined_function::Definition::UdafScript(udaf_script.to_pb()?)
}
mt::UDFDefinition::UDTF(udtf) => {
pb::user_defined_function::Definition::Udtf(udtf.to_pb()?)
}
};

Ok(pb::UserDefinedFunction {
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
(141, "2025-08-06: Add: row_access.proto"),
(142, "2025-08-11: Add: add UDTF"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,4 @@ mod v138_table_statistics;
mod v139_add_grant_ownership_object_sequence;
mod v140_task_message;
mod v141_row_access_policy;
mod v142_udtf;
64 changes: 64 additions & 0 deletions src/meta/proto-conv/tests/it/v142_udtf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::DateTime;
use chrono::Utc;
use databend_common_expression::types::DataType;
use databend_common_meta_app::principal::UDFDefinition;
use databend_common_meta_app::principal::UserDefinedFunction;
use databend_common_meta_app::principal::UDTF;
use fastrace::func_name;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
// The message bytes are built from the output of `test_pb_from_to()`
#[test]
fn test_decode_v142_udtf() -> anyhow::Result<()> {
let bytes = vec![
10, 9, 116, 101, 115, 116, 95, 117, 100, 116, 102, 18, 21, 84, 104, 105, 115, 32, 105, 115,
32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 66, 79, 10, 16, 10, 2,
99, 49, 18, 10, 146, 2, 0, 160, 6, 142, 1, 168, 6, 24, 10, 16, 10, 2, 99, 50, 18, 10, 138,
2, 0, 160, 6, 142, 1, 168, 6, 24, 18, 16, 10, 2, 99, 51, 18, 10, 170, 2, 0, 160, 6, 142, 1,
168, 6, 24, 26, 16, 115, 101, 108, 101, 99, 116, 32, 42, 32, 102, 114, 111, 109, 32, 116,
49, 160, 6, 142, 1, 168, 6, 24, 42, 23, 50, 48, 50, 51, 45, 49, 50, 45, 49, 53, 32, 48, 49,
58, 50, 54, 58, 48, 57, 32, 85, 84, 67, 160, 6, 142, 1, 168, 6, 24,
];

let want = || UserDefinedFunction {
name: "test_udtf".to_string(),
description: "This is a description".to_string(),
definition: UDFDefinition::UDTF(UDTF {
arg_types: vec![(s("c1"), DataType::String), (s("c2"), DataType::Boolean)],
return_types: vec![(s("c3"), DataType::Date)],
sql: "select * from t1".to_string(),
}),
created_on: DateTime::<Utc>::from_timestamp(1702603569, 0).unwrap(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 142, want())
}

fn s(ss: impl ToString) -> String {
ss.to_string()
}
15 changes: 15 additions & 0 deletions src/meta/protos/proto/udf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ message UDAFScript {
repeated string packages = 8;
}

message UDTFArg {
string name = 1;
DataType ty = 2;
}

message UDTF {
uint64 ver = 100;
uint64 min_reader_ver = 101;

repeated UDTFArg arg_types = 1;
repeated UDTFArg return_types = 2;
string sql = 3;
}

message UserDefinedFunction {
uint64 ver = 100;
uint64 min_reader_ver = 101;
Expand All @@ -80,6 +94,7 @@ message UserDefinedFunction {
UDFServer udf_server = 4;
UDFScript udf_script = 6;
UDAFScript udaf_script = 7;
UDTF udtf = 8;
}
// The time udf created.
optional string created_on = 5;
Expand Down
22 changes: 22 additions & 0 deletions src/query/ast/src/ast/statements/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ pub enum UDFDefinition {
language: String,
runtime_version: String,
},
UDTFSql {
arg_types: Vec<(Identifier, TypeName)>,
return_types: Vec<(Identifier, TypeName)>,
sql: String,
},
}

impl Display for UDFDefinition {
Expand Down Expand Up @@ -176,6 +181,23 @@ impl Display for UDFDefinition {
}
write!(f, " ADDRESS = '{address}'")?;
}
UDFDefinition::UDTFSql {
arg_types,
return_types,
sql,
} => {
write!(f, "(")?;
write_comma_separated_list(
f,
arg_types.iter().map(|(name, ty)| format!("{name} {ty}")),
)?;
write!(f, ") RETURNS TABLE (")?;
write_comma_separated_list(
f,
return_types.iter().map(|(name, ty)| format!("{name} {ty}")),
)?;
write!(f, ") AS $$\n{sql}\n$$")?;
}
UDFDefinition::UDAFScript {
arg_types,
state_fields: state_types,
Expand Down
19 changes: 18 additions & 1 deletion src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5018,6 +5018,19 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
},
);

let udtf = map(
rule! {
"(" ~ #comma_separated_list0(udtf_arg) ~ ")"
~ RETURNS ~ TABLE ~ "(" ~ #comma_separated_list0(udtf_arg) ~ ")"
~ AS ~ ^#code_string
},
|(_, arg_types, _, _, _, _, return_types, _, _, sql)| UDFDefinition::UDTFSql {
arg_types,
return_types,
sql,
},
);

let udaf = map(
rule! {
"(" ~ #comma_separated_list0(type_name) ~ ")"
Expand Down Expand Up @@ -5082,10 +5095,14 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
#lambda_udf: "AS (<parameter>, ...) -> <definition expr>"
| #udaf: "(<arg_type>, ...) STATE {<state_field>, ...} RETURNS <return_type> LANGUAGE <language> { ADDRESS=<udf_server_address> | AS <language_codes> } "
| #udf: "(<arg_type>, ...) RETURNS <return_type> LANGUAGE <language> HANDLER=<handler> { ADDRESS=<udf_server_address> | AS <language_codes> } "

| #udtf: "(<arg_type>, ...) RETURNS TABLE (<return_type>, ...) AS <sql> }"
)(i)
}

fn udtf_arg(i: Input) -> IResult<(Identifier, TypeName)> {
map(rule! { #ident ~ ^#type_name }, |(name, ty)| (name, ty))(i)
}

fn udf_immutable(i: Input) -> IResult<bool> {
alt((
value(false, rule! { VOLATILE }),
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ SELECT * from s;"#,
r#"CREATE OR REPLACE FUNCTION isnotempty_test_replace AS(p) -> not(is_null(p)) DESC = 'This is a description';"#,
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' HEADERS = ('X-Authorization' = '123') ADDRESS = 'http://0.0.0.0:8815';"#,
r#"CREATE FUNCTION binary_reverse_table () RETURNS TABLE (c1 int) AS $$ select * from binary_reverse $$;"#,
r#"ALTER FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
r#"CREATE OR REPLACE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
r#"CREATE file format my_orc type = orc"#,
Expand Down
Loading