diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index b0402f5eefe27..2e1e334fe61f7 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -113,6 +113,7 @@ pub use user_defined_function::UDFDefinition; pub use user_defined_function::UDFScript; pub use user_defined_function::UDFServer; pub use user_defined_function::UserDefinedFunction; +pub use user_defined_function::UDTF; pub use user_grant::GrantEntry; pub use user_grant::GrantObject; pub use user_grant::UserGrantSet; diff --git a/src/meta/app/src/principal/user_defined_function.rs b/src/meta/app/src/principal/user_defined_function.rs index 441fd2ec27a04..98e46f2f1022a 100644 --- a/src/meta/app/src/principal/user_defined_function.rs +++ b/src/meta/app/src/principal/user_defined_function.rs @@ -51,6 +51,15 @@ pub struct UDFScript { pub immutable: Option, } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct UDTF { + // arg name with data type + pub arg_types: Vec<(String, DataType)>, + // return column name with data type + pub return_types: Vec<(String, DataType)>, + pub sql: String, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub struct UDAFScript { pub code: String, @@ -71,6 +80,7 @@ pub enum UDFDefinition { UDFServer(UDFServer), UDFScript(UDFScript), UDAFScript(UDAFScript), + UDTF(UDTF), } impl UDFDefinition { @@ -80,6 +90,7 @@ impl UDFDefinition { Self::UDFServer(_) => "UDFServer", Self::UDFScript(_) => "UDFScript", Self::UDAFScript(_) => "UDAFScript", + Self::UDTF(_) => "UDTF", } } @@ -88,6 +99,7 @@ impl UDFDefinition { Self::LambdaUDF(_) => false, Self::UDFServer(_) => false, Self::UDFScript(_) => false, + Self::UDTF(_) => false, Self::UDAFScript(_) => true, } } @@ -95,6 +107,7 @@ impl UDFDefinition { pub fn language(&self) -> &str { match self { Self::LambdaUDF(_) => "SQL", + Self::UDTF(_) => "SQL", Self::UDFServer(x) => x.language.as_str(), Self::UDFScript(x) => x.language.as_str(), Self::UDAFScript(x) => x.language.as_str(), @@ -292,6 +305,26 @@ impl Display for UDFDefinition { } write!(f, " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?; } + UDFDefinition::UDTF(UDTF { + arg_types, + return_types, + sql, + }) => { + for (i, (name, ty)) in arg_types.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{name} {ty}")?; + } + write!(f, ") RETURNS (")?; + for (i, (name, ty)) in return_types.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{name} {ty}")?; + } + write!(f, ") AS $${sql}$$")?; + } } Ok(()) } diff --git a/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs index ac4805219d3ea..72cbecb6b21d7 100644 --- a/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs @@ -20,6 +20,7 @@ use databend_common_expression::TableDataType; use databend_common_expression::TableField; use databend_common_meta_app::principal as mt; use databend_common_protos::pb; +use databend_common_protos::pb::UdtfArg; use crate::reader_check_msg; use crate::FromToProto; @@ -275,6 +276,85 @@ impl FromToProto for mt::UDAFScript { } } +impl FromToProto for mt::UDTF { + type PB = pb::Udtf; + + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + + fn from_pb(p: Self::PB) -> Result + where Self: Sized { + reader_check_msg(p.ver, p.min_reader_ver)?; + + let mut arg_types = Vec::new(); + for arg_ty in p.arg_types { + let ty = TableDataType::from_pb(arg_ty.ty.ok_or_else(|| { + Incompatible::new("UDTF.arg_types.ty can not be None".to_string()) + })?)?; + + arg_types.push((arg_ty.name, (&ty).into())); + } + + let mut return_types = Vec::new(); + for return_ty in p.return_types { + let ty = TableDataType::from_pb(return_ty.ty.ok_or_else(|| { + Incompatible::new("UDTF.arg_types.ty can not be None".to_string()) + })?)?; + + return_types.push((return_ty.name, (&ty).into())); + } + + Ok(Self { + arg_types, + return_types, + sql: p.sql, + }) + } + + fn to_pb(&self) -> Result { + let mut arg_types = Vec::with_capacity(self.arg_types.len()); + for (arg_name, arg_type) in self.arg_types.iter() { + let arg_type = infer_schema_type(arg_type) + .map_err(|e| { + Incompatible::new(format!( + "Convert DataType to TableDataType failed: {}", + e.message() + )) + })? + .to_pb()?; + arg_types.push(UdtfArg { + name: arg_name.clone(), + ty: Some(arg_type), + }); + } + + let mut return_types = Vec::with_capacity(self.return_types.len()); + for (return_name, return_type) in self.return_types.iter() { + let return_type = infer_schema_type(return_type) + .map_err(|e| { + Incompatible::new(format!( + "Convert DataType to TableDataType failed: {}", + e.message() + )) + })? + .to_pb()?; + return_types.push(UdtfArg { + name: return_name.clone(), + ty: Some(return_type), + }); + } + + Ok(pb::Udtf { + ver: VER, + min_reader_ver: MIN_READER_VER, + arg_types, + return_types, + sql: self.sql.clone(), + }) + } +} + impl FromToProto for mt::UserDefinedFunction { type PB = pb::UserDefinedFunction; fn get_pb_ver(p: &Self::PB) -> u64 { @@ -295,6 +375,9 @@ impl FromToProto for mt::UserDefinedFunction { Some(pb::user_defined_function::Definition::UdafScript(udaf_script)) => { mt::UDFDefinition::UDAFScript(mt::UDAFScript::from_pb(udaf_script)?) } + Some(pb::user_defined_function::Definition::Udtf(udtf)) => { + mt::UDFDefinition::UDTF(mt::UDTF::from_pb(udtf)?) + } None => { return Err(Incompatible::new( "UserDefinedFunction.definition cannot be None".to_string(), @@ -327,6 +410,9 @@ impl FromToProto for mt::UserDefinedFunction { mt::UDFDefinition::UDAFScript(udaf_script) => { pb::user_defined_function::Definition::UdafScript(udaf_script.to_pb()?) } + mt::UDFDefinition::UDTF(udtf) => { + pb::user_defined_function::Definition::Udtf(udtf.to_pb()?) + } }; Ok(pb::UserDefinedFunction { diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index ee5aafa78936a..ade59e929baca 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -171,6 +171,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"), (140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"), (141, "2025-08-06: Add: row_access.proto"), + (142, "2025-08-11: Add: add UDTF"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index a78442804ffa0..018b0db4473bc 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -133,3 +133,4 @@ mod v138_table_statistics; mod v139_add_grant_ownership_object_sequence; mod v140_task_message; mod v141_row_access_policy; +mod v142_udtf; diff --git a/src/meta/proto-conv/tests/it/v142_udtf.rs b/src/meta/proto-conv/tests/it/v142_udtf.rs new file mode 100644 index 0000000000000..c7dc2f0bbaeaa --- /dev/null +++ b/src/meta/proto-conv/tests/it/v142_udtf.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression::types::DataType; +use databend_common_meta_app::principal::UDFDefinition; +use databend_common_meta_app::principal::UserDefinedFunction; +use databend_common_meta_app::principal::UDTF; +use fastrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +// The message bytes are built from the output of `test_pb_from_to()` +#[test] +fn test_decode_v142_udtf() -> anyhow::Result<()> { + let bytes = vec![ + 10, 9, 116, 101, 115, 116, 95, 117, 100, 116, 102, 18, 21, 84, 104, 105, 115, 32, 105, 115, + 32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 66, 79, 10, 16, 10, 2, + 99, 49, 18, 10, 146, 2, 0, 160, 6, 142, 1, 168, 6, 24, 10, 16, 10, 2, 99, 50, 18, 10, 138, + 2, 0, 160, 6, 142, 1, 168, 6, 24, 18, 16, 10, 2, 99, 51, 18, 10, 170, 2, 0, 160, 6, 142, 1, + 168, 6, 24, 26, 16, 115, 101, 108, 101, 99, 116, 32, 42, 32, 102, 114, 111, 109, 32, 116, + 49, 160, 6, 142, 1, 168, 6, 24, 42, 23, 50, 48, 50, 51, 45, 49, 50, 45, 49, 53, 32, 48, 49, + 58, 50, 54, 58, 48, 57, 32, 85, 84, 67, 160, 6, 142, 1, 168, 6, 24, + ]; + + let want = || UserDefinedFunction { + name: "test_udtf".to_string(), + description: "This is a description".to_string(), + definition: UDFDefinition::UDTF(UDTF { + arg_types: vec![(s("c1"), DataType::String), (s("c2"), DataType::Boolean)], + return_types: vec![(s("c3"), DataType::Date)], + sql: "select * from t1".to_string(), + }), + created_on: DateTime::::from_timestamp(1702603569, 0).unwrap(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), bytes.as_slice(), 142, want()) +} + +fn s(ss: impl ToString) -> String { + ss.to_string() +} diff --git a/src/meta/protos/proto/udf.proto b/src/meta/protos/proto/udf.proto index 25dac1e67573a..e68301a96b0f1 100644 --- a/src/meta/protos/proto/udf.proto +++ b/src/meta/protos/proto/udf.proto @@ -69,6 +69,20 @@ message UDAFScript { repeated string packages = 8; } +message UDTFArg { + string name = 1; + DataType ty = 2; +} + +message UDTF { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + repeated UDTFArg arg_types = 1; + repeated UDTFArg return_types = 2; + string sql = 3; +} + message UserDefinedFunction { uint64 ver = 100; uint64 min_reader_ver = 101; @@ -80,6 +94,7 @@ message UserDefinedFunction { UDFServer udf_server = 4; UDFScript udf_script = 6; UDAFScript udaf_script = 7; + UDTF udtf = 8; } // The time udf created. optional string created_on = 5; diff --git a/src/query/ast/src/ast/statements/udf.rs b/src/query/ast/src/ast/statements/udf.rs index e4606840df6b0..0d0f6c4181cd3 100644 --- a/src/query/ast/src/ast/statements/udf.rs +++ b/src/query/ast/src/ast/statements/udf.rs @@ -71,6 +71,11 @@ pub enum UDFDefinition { language: String, runtime_version: String, }, + UDTFSql { + arg_types: Vec<(Identifier, TypeName)>, + return_types: Vec<(Identifier, TypeName)>, + sql: String, + }, } impl Display for UDFDefinition { @@ -176,6 +181,23 @@ impl Display for UDFDefinition { } write!(f, " ADDRESS = '{address}'")?; } + UDFDefinition::UDTFSql { + arg_types, + return_types, + sql, + } => { + write!(f, "(")?; + write_comma_separated_list( + f, + arg_types.iter().map(|(name, ty)| format!("{name} {ty}")), + )?; + write!(f, ") RETURNS TABLE (")?; + write_comma_separated_list( + f, + return_types.iter().map(|(name, ty)| format!("{name} {ty}")), + )?; + write!(f, ") AS $$\n{sql}\n$$")?; + } UDFDefinition::UDAFScript { arg_types, state_fields: state_types, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index e99157c28c3fe..2a142335edfc5 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -5018,6 +5018,19 @@ pub fn udf_definition(i: Input) -> IResult { }, ); + let udtf = map( + rule! { + "(" ~ #comma_separated_list0(udtf_arg) ~ ")" + ~ RETURNS ~ TABLE ~ "(" ~ #comma_separated_list0(udtf_arg) ~ ")" + ~ AS ~ ^#code_string + }, + |(_, arg_types, _, _, _, _, return_types, _, _, sql)| UDFDefinition::UDTFSql { + arg_types, + return_types, + sql, + }, + ); + let udaf = map( rule! { "(" ~ #comma_separated_list0(type_name) ~ ")" @@ -5082,10 +5095,14 @@ pub fn udf_definition(i: Input) -> IResult { #lambda_udf: "AS (, ...) -> " | #udaf: "(, ...) STATE {, ...} RETURNS LANGUAGE { ADDRESS= | AS } " | #udf: "(, ...) RETURNS LANGUAGE HANDLER= { ADDRESS= | AS } " - + | #udtf: "(, ...) RETURNS TABLE (, ...) AS }" )(i) } +fn udtf_arg(i: Input) -> IResult<(Identifier, TypeName)> { + map(rule! { #ident ~ ^#type_name }, |(name, ty)| (name, ty))(i) +} + fn udf_immutable(i: Input) -> IResult { alt(( value(false, rule! { VOLATILE }), diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index f20f163be19fc..4a5e6dfb803a3 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -826,6 +826,7 @@ SELECT * from s;"#, r#"CREATE OR REPLACE FUNCTION isnotempty_test_replace AS(p) -> not(is_null(p)) DESC = 'This is a description';"#, r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#, r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' HEADERS = ('X-Authorization' = '123') ADDRESS = 'http://0.0.0.0:8815';"#, + r#"CREATE FUNCTION binary_reverse_table () RETURNS TABLE (c1 int) AS $$ select * from binary_reverse $$;"#, r#"ALTER FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#, r#"CREATE OR REPLACE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#, r#"CREATE file format my_orc type = orc"#, diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index b895683f25904..267308812289b 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -25417,6 +25417,46 @@ CreateUDF( ) +---------- Input ---------- +CREATE FUNCTION binary_reverse_table () RETURNS TABLE (c1 int) AS $$ select * from binary_reverse $$; +---------- Output --------- +CREATE FUNCTION binary_reverse_table () RETURNS TABLE (c1 Int32) AS $$ +select * from binary_reverse +$$ +---------- AST ------------ +CreateUDF( + CreateUDFStmt { + create_option: Create, + udf_name: Identifier { + span: Some( + 16..36, + ), + name: "binary_reverse_table", + quote: None, + ident_type: None, + }, + description: None, + definition: UDTFSql { + arg_types: [], + return_types: [ + ( + Identifier { + span: Some( + 55..57, + ), + name: "c1", + quote: None, + ident_type: None, + }, + Int32, + ), + ], + sql: "select * from binary_reverse", + }, + }, +) + + ---------- Input ---------- ALTER FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815'; ---------- Output --------- diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs index 31c9b8bcbd0df..2996e439a9ed0 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs @@ -25,18 +25,25 @@ use databend_common_ast::ast::SelectTarget; use databend_common_ast::ast::TableAlias; use databend_common_ast::ast::TableReference; use databend_common_ast::Span; +use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_function::TableFunction; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::convert_to_type_name; use databend_common_expression::types::NumberScalar; use databend_common_expression::FunctionKind; use databend_common_expression::Scalar; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::principal::UDFDefinition; +use databend_common_meta_app::principal::UDTF; use databend_common_storages_result_cache::ResultCacheMetaManager; use databend_common_storages_result_cache::ResultScan; use databend_common_users::UserApiProvider; +use derive_visitor::DriveMut; +use derive_visitor::VisitorMut; +use itertools::Itertools; use crate::binder::scalar::ScalarBinder; use crate::binder::table_args::bind_table_args; @@ -45,11 +52,15 @@ use crate::binder::ColumnBindingBuilder; use crate::binder::Visibility; use crate::optimizer::ir::SExpr; use crate::planner::semantic::normalize_identifier; +use crate::plans::BoundColumnRef; +use crate::plans::CastExpr; use crate::plans::EvalScalar; use crate::plans::FunctionCall; +use crate::plans::Plan; use crate::plans::RelOperator; use crate::plans::ScalarItem; use crate::BindContext; +use crate::Planner; use crate::ScalarExpr; impl Binder { @@ -64,6 +75,41 @@ impl Binder { alias: &Option, sample: &Option, ) -> Result<(SExpr, BindContext)> { + #[derive(VisitorMut)] + #[visitor(Expr(enter))] + struct UDTFArgVisitor<'a> { + udtf: &'a UDTF, + table_args: &'a TableArgs, + } + + impl UDTFArgVisitor<'_> { + fn enter_expr(&mut self, expr: &mut Expr) { + if let Expr::ColumnRef { span, column } = expr { + if column.database.is_some() || column.table.is_some() { + return; + } + assert_eq!(self.udtf.arg_types.len(), self.table_args.positioned.len()); + let Some((pos, (_, ty))) = self + .udtf + .arg_types + .iter() + .find_position(|(name, _)| name == column.column.name()) + else { + return; + }; + *expr = Expr::Cast { + span: *span, + expr: Box::new(Expr::Literal { + span: *span, + value: Literal::String(self.table_args.positioned[pos].to_string()), + }), + target_type: convert_to_type_name(ty), + pg_style: false, + } + } + } + } + let func_name = normalize_identifier(name, &self.name_resolution_ctx); if BUILTIN_FUNCTIONS @@ -129,6 +175,115 @@ impl Binder { ); let table_args = bind_table_args(&mut scalar_binder, params, named_params)?; + let tenant = self.ctx.get_tenant(); + let udtf_result = databend_common_base::runtime::block_on(async { + if let Some(UDFDefinition::UDTF(udtf)) = UserApiProvider::instance() + .get_udf(&tenant, &func_name.name) + .await? + .map(|udf| udf.definition) + { + let mut stmt = Planner::new(self.ctx.clone()) + .parse_sql(&udtf.sql)? + .statement; + + if udtf.arg_types.len() != table_args.positioned.len() { + return Err(ErrorCode::UDFSchemaMismatch(format!( + "UDTF '{}' argument types length {} does not match input arguments length {}", + func_name, + udtf.arg_types.len(), + table_args.positioned.len() + ))); + } + let mut visitor = UDTFArgVisitor { + udtf: &udtf, + table_args: &table_args, + }; + stmt.drive_mut(&mut visitor); + + let binder = Binder::new( + self.ctx.clone(), + CatalogManager::instance(), + self.name_resolution_ctx.clone(), + self.metadata.clone(), + ) + .with_subquery_executor(self.subquery_executor.clone()); + let plan = binder.bind(&stmt).await?; + + let Plan::Query { + s_expr, + mut bind_context, + .. + } = plan + else { + return Err(ErrorCode::UDFRuntimeError( + "Query in UDTF returned no result set", + )); + }; + let mut output_bindings = Vec::with_capacity(bind_context.columns.len()); + let mut output_items = Vec::with_capacity(bind_context.columns.len()); + + if udtf.return_types.len() != bind_context.columns.len() { + return Err(ErrorCode::UDFSchemaMismatch(format!( + "UDTF '{}' return types length {} does not match output columns length {}", + func_name, + udtf.return_types.len(), + bind_context.columns.len() + ))); + } + + for ((return_name, return_type), output_binding) in udtf + .return_types + .into_iter() + .zip(bind_context.columns.iter()) + { + let input_expr = ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: output_binding.clone(), + }); + let cast_expr = ScalarExpr::CastExpr(CastExpr { + span: None, + is_try: false, + argument: Box::new(input_expr), + target_type: Box::new(return_type.clone()), + }); + let index = self.metadata.write().add_derived_column( + return_name.clone(), + return_type.clone(), + Some(cast_expr.clone()), + ); + let output_binding = ColumnBindingBuilder::new( + return_name, + index, + Box::new(return_type), + Visibility::Visible, + ) + .build(); + + output_items.push(ScalarItem { + scalar: cast_expr, + index: output_binding.index, + }); + output_bindings.push(output_binding); + } + bind_context.columns = output_bindings; + let s_expr = SExpr::create_unary( + Arc::new( + EvalScalar { + items: output_items, + } + .into(), + ), + s_expr, + ); + + return Ok(Some((s_expr, *bind_context))); + } + Ok(None) + }); + if let Some(result) = udtf_result? { + return Ok(result); + } + if func_name.name.eq_ignore_ascii_case("result_scan") { self.bind_result_scan(bind_context, span, alias, &table_args) } else { diff --git a/src/query/sql/src/planner/binder/udf.rs b/src/query/sql/src/planner/binder/udf.rs index 3512e24230623..fe5583dcfc2bc 100644 --- a/src/query/sql/src/planner/binder/udf.rs +++ b/src/query/sql/src/planner/binder/udf.rs @@ -32,6 +32,7 @@ use databend_common_meta_app::principal::UDFDefinition as PlanUDFDefinition; use databend_common_meta_app::principal::UDFScript; use databend_common_meta_app::principal::UDFServer; use databend_common_meta_app::principal::UserDefinedFunction; +use databend_common_meta_app::principal::UDTF; use databend_common_version::UDF_CLIENT_USER_AGENT; use crate::normalize_identifier; @@ -202,6 +203,40 @@ impl Binder { created_on: Utc::now(), }) } + UDFDefinition::UDTFSql { + arg_types, + return_types, + sql, + } => { + let arg_types = arg_types + .iter() + .map(|(name, arg_type)| { + let column = normalize_identifier(name, &self.name_resolution_ctx).name; + let ty = DataType::from(&resolve_type_name_udf(arg_type)?); + Ok((column, ty)) + }) + .collect::>>()?; + + let return_types = return_types + .iter() + .map(|(name, arg_type)| { + let column = normalize_identifier(name, &self.name_resolution_ctx).name; + let ty = DataType::from(&resolve_type_name_udf(arg_type)?); + Ok((column, ty)) + }) + .collect::>>()?; + + Ok(UserDefinedFunction { + name, + description, + definition: PlanUDFDefinition::UDTF(UDTF { + arg_types, + return_types, + sql: sql.to_string(), + }), + created_on: Utc::now(), + }) + } } } diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 48073b7e37ba8..c35ebfd287420 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -4762,6 +4762,7 @@ impl<'a> TypeChecker<'a> { UDFDefinition::UDAFScript(udf_def) => Ok(Some( self.resolve_udaf_script(span, name, arguments, udf_def)?, )), + UDFDefinition::UDTF(_) => unreachable!(), } } diff --git a/src/query/storages/system/src/user_functions_table.rs b/src/query/storages/system/src/user_functions_table.rs index 1b3a1949b50de..758df89bf70d7 100644 --- a/src/query/storages/system/src/user_functions_table.rs +++ b/src/query/storages/system/src/user_functions_table.rs @@ -211,6 +211,23 @@ impl UserFunctionsTable { .collect(), immutable: None, }, + UDFDefinition::UDTF(x) => UserFunctionArguments { + arg_types: x + .arg_types + .iter() + .map(|(name, ty)| format!("{name} {ty}")) + .collect(), + return_type: Some( + x.return_types + .iter() + .map(|(name, ty)| format!("{name} {ty}")) + .collect(), + ), + server: None, + parameters: vec![], + states: BTreeMap::new(), + immutable: None, + }, }, }) .collect()) diff --git a/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test b/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test index e877c22807aa0..94d08a02110ce 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test +++ b/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test @@ -205,3 +205,40 @@ select date_add_days(to_date('2022-01-02T01:12:00'), 12); statement ok DROP FUNCTION date_add_days; + +statement ok +create or replace table t1 (c1 int); + +statement ok +insert into t1 values(0), (1); + +statement ok +create or replace function scan_t1 () RETURNS TABLE (c1_string string) as $$ select * from t1 $$; + +query T +select * from scan_t1(); +---- +0 +1 + +statement ok +create or replace function filter_t1 (arg0 int) RETURNS TABLE (c1_string string) as $$ select * from t1 where c1 = arg0 $$; + +query T +select * from filter_t1(0); +---- +0 + +query T +select * from filter_t1(1); +---- +1 + +statement error +select * from filter_t1(); + +statement ok +create or replace function invalid_udtf_0 () RETURNS TABLE (c1_string string, c2 int) as $$ select * from t1 $$; + +statement error +select * from invalid_udtf_0();