Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub use user_auth::AuthType;
pub use user_auth::PasswordHashMethod;
pub use user_defined_file_format::UserDefinedFileFormat;
pub use user_defined_function::LambdaUDF;
pub use user_defined_function::ScalarUDF;
pub use user_defined_function::UDAFScript;
pub use user_defined_function::UDFDefinition;
pub use user_defined_function::UDFScript;
Expand Down
30 changes: 30 additions & 0 deletions src/meta/app/src/principal/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ pub struct UDTF {
pub sql: String,
}

/// User Defined Scalar Function (ScalarUDF)
///
/// # Fields
/// - `arg_types`: arg name with data type
/// - `return_type`: return data type
/// - `definition`: typically including the code or expression implementing the function logic
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ScalarUDF {
pub arg_types: Vec<(String, DataType)>,
pub return_type: DataType,
pub definition: String,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct UDAFScript {
pub code: String,
Expand All @@ -85,6 +98,7 @@ pub enum UDFDefinition {
UDFScript(UDFScript),
UDAFScript(UDAFScript),
UDTF(UDTF),
ScalarUDF(ScalarUDF),
}

impl UDFDefinition {
Expand All @@ -95,6 +109,7 @@ impl UDFDefinition {
Self::UDFScript(_) => "UDFScript",
Self::UDAFScript(_) => "UDAFScript",
Self::UDTF(_) => "UDTF",
UDFDefinition::ScalarUDF(_) => "ScalarUDF",
}
}

Expand All @@ -104,6 +119,7 @@ impl UDFDefinition {
Self::UDFServer(_) => false,
Self::UDFScript(_) => false,
Self::UDTF(_) => false,
Self::ScalarUDF(_) => false,
Self::UDAFScript(_) => true,
}
}
Expand All @@ -112,6 +128,7 @@ impl UDFDefinition {
match self {
Self::LambdaUDF(_) => "SQL",
Self::UDTF(_) => "SQL",
Self::ScalarUDF(_) => "SQL",
Self::UDFServer(x) => x.language.as_str(),
Self::UDFScript(x) => x.language.as_str(),
Self::UDAFScript(x) => x.language.as_str(),
Expand Down Expand Up @@ -329,6 +346,19 @@ impl Display for UDFDefinition {
}
write!(f, ") AS $${sql}$$")?;
}
UDFDefinition::ScalarUDF(ScalarUDF {
arg_types,
return_type,
definition,
}) => {
for (i, (name, ty)) in arg_types.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{name} {ty}")?;
}
write!(f, ") RETURNS {return_type} AS $${definition}$$")?;
}
}
Ok(())
}
Expand Down
75 changes: 75 additions & 0 deletions src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,75 @@ impl FromToProto for mt::UDTF {
}
}

impl FromToProto for mt::ScalarUDF {
type PB = pb::ScalarUdf;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
where Self: Sized {
reader_check_msg(p.ver, p.min_reader_ver)?;

let mut arg_types = Vec::new();
for arg_ty in p.arg_types {
let ty_pb = arg_ty.ty.ok_or_else(|| {
Incompatible::new("ScalarUDF.arg_types.ty can not be None".to_string())
})?;
let ty = TableDataType::from_pb(ty_pb)?;

arg_types.push((arg_ty.name, (&ty).into()));
}

let return_type_pb = p.return_type.ok_or_else(|| {
Incompatible::new("ScalarUDF.return_type can not be None".to_string())
})?;
let return_type = TableDataType::from_pb(return_type_pb)?;

Ok(Self {
arg_types,
return_type: (&return_type).into(),
definition: p.definition,
})
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
let mut arg_types = Vec::with_capacity(self.arg_types.len());
for (arg_name, arg_type) in self.arg_types.iter() {
let arg_type = infer_schema_type(arg_type)
.map_err(|e| {
Incompatible::new(format!(
"Convert DataType to TableDataType failed: {}",
e.message()
))
})?
.to_pb()?;
arg_types.push(UdtfArg {
name: arg_name.clone(),
ty: Some(arg_type),
});
}

let return_type = infer_schema_type(&self.return_type)
.map_err(|e| {
Incompatible::new(format!(
"Convert DataType to TableDataType failed: {}",
e.message()
))
})?
.to_pb()?;

Ok(pb::ScalarUdf {
ver: VER,
min_reader_ver: MIN_READER_VER,
arg_types,
return_type: Some(return_type),
definition: self.definition.clone(),
})
}
}

impl FromToProto for mt::UserDefinedFunction {
type PB = pb::UserDefinedFunction;
fn get_pb_ver(p: &Self::PB) -> u64 {
Expand All @@ -380,6 +449,9 @@ impl FromToProto for mt::UserDefinedFunction {
Some(pb::user_defined_function::Definition::Udtf(udtf)) => {
mt::UDFDefinition::UDTF(mt::UDTF::from_pb(udtf)?)
}
Some(pb::user_defined_function::Definition::ScalarUdf(scalar_udf)) => {
mt::UDFDefinition::ScalarUDF(mt::ScalarUDF::from_pb(scalar_udf)?)
}
None => {
return Err(Incompatible::new(
"UserDefinedFunction.definition cannot be None".to_string(),
Expand Down Expand Up @@ -415,6 +487,9 @@ impl FromToProto for mt::UserDefinedFunction {
mt::UDFDefinition::UDTF(udtf) => {
pb::user_defined_function::Definition::Udtf(udtf.to_pb()?)
}
mt::UDFDefinition::ScalarUDF(scalar_udf) => {
pb::user_defined_function::Definition::ScalarUdf(scalar_udf.to_pb()?)
}
};

Ok(pb::UserDefinedFunction {
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(141, "2025-08-06: Add: row_access.proto"),
(142, "2025-08-15: Add: table_meta add row_access_policy"),
(143, "2025-08-18: Add: add UDTF"),
(144, "2025-08-18: Add: add ScalarUDF"),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@ mod v140_task_message;
mod v141_row_access_policy;
mod v142_table_row_access_policy;
mod v143_udtf;
mod v144_scalar_udf;
64 changes: 64 additions & 0 deletions src/meta/proto-conv/tests/it/v144_scalar_udf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::DateTime;
use chrono::Utc;
use databend_common_expression::types::DataType;
use databend_common_meta_app::principal::ScalarUDF;
use databend_common_meta_app::principal::UDFDefinition;
use databend_common_meta_app::principal::UserDefinedFunction;
use fastrace::func_name;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
// The message bytes are built from the output of `test_pb_from_to()`
#[test]
fn test_decode_v144_scalar_udf() -> anyhow::Result<()> {
let bytes = vec![
10, 15, 116, 101, 115, 116, 95, 115, 99, 97, 108, 97, 114, 95, 117, 100, 102, 18, 21, 84,
104, 105, 115, 32, 105, 115, 32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111,
110, 74, 69, 10, 16, 10, 2, 99, 49, 18, 10, 146, 2, 0, 160, 6, 144, 1, 168, 6, 24, 10, 16,
10, 2, 99, 50, 18, 10, 138, 2, 0, 160, 6, 144, 1, 168, 6, 24, 18, 10, 170, 2, 0, 160, 6,
144, 1, 168, 6, 24, 26, 12, 67, 85, 82, 82, 69, 78, 84, 95, 68, 65, 84, 69, 160, 6, 144, 1,
168, 6, 24, 42, 23, 50, 48, 50, 51, 45, 49, 50, 45, 49, 53, 32, 48, 49, 58, 50, 54, 58, 48,
57, 32, 85, 84, 67, 160, 6, 144, 1, 168, 6, 24,
];

let want = || UserDefinedFunction {
name: "test_scalar_udf".to_string(),
description: "This is a description".to_string(),
definition: UDFDefinition::ScalarUDF(ScalarUDF {
arg_types: vec![(s("c1"), DataType::String), (s("c2"), DataType::Boolean)],
return_type: DataType::Date,
definition: "CURRENT_DATE".to_string(),
}),
created_on: DateTime::<Utc>::from_timestamp(1702603569, 0).unwrap(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), bytes.as_slice(), 144, want())
}

fn s(ss: impl ToString) -> String {
ss.to_string()
}
14 changes: 14 additions & 0 deletions src/meta/protos/proto/udf.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ message UDTF {
string sql = 3;
}

message ScalarUDF {
uint64 ver = 100;
uint64 min_reader_ver = 101;

// arg name with data type
repeated UDTFArg arg_types = 1;
// return data type
DataType return_type = 2;
// typically including the code or expression implementing the function logic
string definition = 3;
}


message UserDefinedFunction {
uint64 ver = 100;
uint64 min_reader_ver = 101;
Expand All @@ -97,6 +110,7 @@ message UserDefinedFunction {
UDFScript udf_script = 6;
UDAFScript udaf_script = 7;
UDTF udtf = 8;
ScalarUDF scalar_udf = 9;
}
// The time udf created.
optional string created_on = 5;
Expand Down
17 changes: 17 additions & 0 deletions src/query/ast/src/ast/statements/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ pub enum UDFDefinition {
return_types: Vec<(Identifier, TypeName)>,
sql: String,
},
ScalarUDF {
arg_types: Vec<(Identifier, TypeName)>,
definition: String,
return_type: TypeName,
},
}

impl Display for UDFDefinition {
Expand Down Expand Up @@ -198,6 +203,18 @@ impl Display for UDFDefinition {
)?;
write!(f, ") AS $$\n{sql}\n$$")?;
}
UDFDefinition::ScalarUDF {
arg_types,
definition,
return_type,
} => {
write!(f, "(")?;
write_comma_separated_list(
f,
arg_types.iter().map(|(name, ty)| format!("{name} {ty}")),
)?;
write!(f, ") RETURNS {return_type} AS $$\n{definition}\n$$")?;
}
UDFDefinition::UDAFScript {
arg_types,
state_fields: state_types,
Expand Down
46 changes: 39 additions & 7 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4975,6 +4975,31 @@ pub fn udf_script_or_address(i: Input) -> IResult<(String, bool)> {
}

pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
enum ReturnBody {
Scalar(TypeName),
Table(Vec<(Identifier, TypeName)>),
}

fn return_body(i: Input) -> IResult<ReturnBody> {
let scalar = map(
rule! {
#type_name
},
ReturnBody::Scalar,
);
let table = map(
rule! {
TABLE ~ "(" ~ #comma_separated_list0(udtf_arg) ~ ")"
},
|(_, _, arg_types, _)| ReturnBody::Table(arg_types),
);

rule!(
#scalar: "<return_type>"
| #table: "TABLE (<return_type>, ...)"
)(i)
}

let lambda_udf = map(
rule! {
AS ~ "(" ~ #comma_separated_list0(ident) ~ ")"
Expand Down Expand Up @@ -5049,16 +5074,23 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
},
);

let udtf = map(
let scalar_udf_or_udtf = map(
rule! {
"(" ~ #comma_separated_list0(udtf_arg) ~ ")"
~ RETURNS ~ TABLE ~ "(" ~ #comma_separated_list0(udtf_arg) ~ ")"
~ RETURNS ~ ^#return_body
~ AS ~ ^#code_string
},
|(_, arg_types, _, _, _, _, return_types, _, _, sql)| UDFDefinition::UDTFSql {
arg_types,
return_types,
sql,
|(_, arg_types, _, _, return_body, _, sql)| match return_body {
ReturnBody::Scalar(return_type) => UDFDefinition::ScalarUDF {
arg_types,
definition: sql,
return_type,
},
ReturnBody::Table(return_types) => UDFDefinition::UDTFSql {
arg_types,
return_types,
sql,
},
},
);

Expand Down Expand Up @@ -5126,7 +5158,7 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
#lambda_udf: "AS (<parameter>, ...) -> <definition expr>"
| #udaf: "(<arg_type>, ...) STATE {<state_field>, ...} RETURNS <return_type> LANGUAGE <language> { ADDRESS=<udf_server_address> | AS <language_codes> } "
| #udf: "(<arg_type>, ...) RETURNS <return_type> LANGUAGE <language> HANDLER=<handler> { ADDRESS=<udf_server_address> | AS <language_codes> } "
| #udtf: "(<arg_type>, ...) RETURNS TABLE (<return_type>, ...) AS <sql> }"
| #scalar_udf_or_udtf: "(<arg_type>, ...) RETURNS <return body> AS <sql> }"
)(i)
}

Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ SELECT * from s;"#,
r#"attach table t 's3://a' connection=(access_key_id ='x' secret_access_key ='y' endpoint_url='http://127.0.0.1:9900')"#,
r#"CREATE FUNCTION IF NOT EXISTS isnotempty AS(p) -> not(is_null(p));"#,
r#"CREATE OR REPLACE FUNCTION isnotempty_test_replace AS(p) -> not(is_null(p)) DESC = 'This is a description';"#,
r#"CREATE OR REPLACE FUNCTION isnotempty_test_replace (p STRING) RETURNS BOOL AS $$ not(is_null(p)) $$;"#,
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' HEADERS = ('X-Authorization' = '123') ADDRESS = 'http://0.0.0.0:8815';"#,
r#"CREATE FUNCTION binary_reverse_table () RETURNS TABLE (c1 int) AS $$ select * from binary_reverse $$;"#,
Expand Down
Loading