Skip to content

Commit dda78fa

Browse files
committed
feat: udtf supports return types conversion
1 parent 928cc7b commit dda78fa

File tree

15 files changed

+286
-69
lines changed

15 files changed

+286
-69
lines changed

src/meta/app/src/principal/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ pub use task::Status;
9696
pub use task::Task;
9797
pub use task::TaskMessage;
9898
pub use task::TaskRun;
99-
pub use user_defined_function::UDTF;
10099
pub use task::WarehouseOptions;
101100
pub use task_ident::TaskIdent;
102101
pub use task_ident::TaskIdentRaw;
@@ -114,6 +113,7 @@ pub use user_defined_function::UDFDefinition;
114113
pub use user_defined_function::UDFScript;
115114
pub use user_defined_function::UDFServer;
116115
pub use user_defined_function::UserDefinedFunction;
116+
pub use user_defined_function::UDTF;
117117
pub use user_grant::GrantEntry;
118118
pub use user_grant::GrantObject;
119119
pub use user_grant::UserGrantSet;

src/meta/app/src/principal/user_defined_function.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ pub struct UDFScript {
5353

5454
#[derive(Clone, Debug, Eq, PartialEq)]
5555
pub struct UDTF {
56-
pub arg_types: BTreeMap<String, DataType>,
57-
pub return_types: BTreeMap<String, DataType>,
56+
pub arg_types: Vec<(String, DataType)>,
57+
pub return_types: Vec<(String, DataType)>,
5858
pub sql: String,
5959
}
6060

@@ -304,7 +304,7 @@ impl Display for UDFDefinition {
304304
write!(f, " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?;
305305
}
306306
UDFDefinition::UDTF(UDTF {
307-
arg_types,
307+
arg_types,
308308
return_types,
309309
sql,
310310
}) => {

src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::BTreeMap;
1615
use chrono::DateTime;
1716
use chrono::Utc;
1817
use databend_common_expression::infer_schema_type;
@@ -21,6 +20,7 @@ use databend_common_expression::TableDataType;
2120
use databend_common_expression::TableField;
2221
use databend_common_meta_app::principal as mt;
2322
use databend_common_protos::pb;
23+
use databend_common_protos::pb::UdtfArg;
2424

2525
use crate::reader_check_msg;
2626
use crate::FromToProto;
@@ -284,27 +284,27 @@ impl FromToProto for mt::UDTF {
284284
}
285285

286286
fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
287-
where
288-
Self: Sized
289-
{
287+
where Self: Sized {
290288
reader_check_msg(p.ver, p.min_reader_ver)?;
291289

292-
let mut arg_types = BTreeMap::new();
293-
for (arg_name, arg_ty) in p
294-
.arg_names
295-
.into_iter().zip(p.arg_types.into_iter()) {
296-
let ty = (&TableDataType::from_pb(arg_ty)?).into();
290+
let mut arg_types = Vec::new();
291+
for arg_ty in p.arg_types {
292+
let ty = (&TableDataType::from_pb(arg_ty.r#type.ok_or_else(|| {
293+
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
294+
})?)?)
295+
.into();
297296

298-
arg_types.insert(arg_name, ty);
297+
arg_types.push((arg_ty.name, ty));
299298
}
300299

301-
let mut return_types = BTreeMap::new();
302-
for (return_name, arg_ty) in p
303-
.return_names
304-
.into_iter().zip(p.return_types.into_iter()) {
305-
let ty = (&TableDataType::from_pb(arg_ty)?).into();
300+
let mut return_types = Vec::new();
301+
for return_ty in p.return_types {
302+
let ty = (&TableDataType::from_pb(return_ty.r#type.ok_or_else(|| {
303+
Incompatible::new("UDTF.arg_types.ty can not be None".to_string())
304+
})?)?)
305+
.into();
306306

307-
return_types.insert(return_name, ty);
307+
return_types.push((return_ty.name, ty));
308308
}
309309

310310
Ok(Self {
@@ -315,7 +315,6 @@ impl FromToProto for mt::UDTF {
315315
}
316316

317317
fn to_pb(&self) -> Result<Self::PB, Incompatible> {
318-
let mut arg_names = Vec::with_capacity(self.arg_types.len());
319318
let mut arg_types = Vec::with_capacity(self.arg_types.len());
320319
for (arg_name, arg_type) in self.arg_types.iter() {
321320
let arg_type = infer_schema_type(arg_type)
@@ -326,11 +325,12 @@ impl FromToProto for mt::UDTF {
326325
))
327326
})?
328327
.to_pb()?;
329-
arg_names.push(arg_name.clone());
330-
arg_types.push(arg_type);
328+
arg_types.push(UdtfArg {
329+
name: arg_name.clone(),
330+
r#type: Some(arg_type),
331+
});
331332
}
332333

333-
let mut return_names = Vec::with_capacity(self.return_types.len());
334334
let mut return_types = Vec::with_capacity(self.return_types.len());
335335
for (return_name, return_type) in self.return_types.iter() {
336336
let return_type = infer_schema_type(return_type)
@@ -341,16 +341,16 @@ impl FromToProto for mt::UDTF {
341341
))
342342
})?
343343
.to_pb()?;
344-
return_names.push(return_name.clone());
345-
return_types.push(return_type);
344+
return_types.push(UdtfArg {
345+
name: return_name.clone(),
346+
r#type: Some(return_type),
347+
});
346348
}
347349

348350
Ok(pb::Udtf {
349351
ver: VER,
350352
min_reader_ver: MIN_READER_VER,
351-
arg_names,
352353
arg_types,
353-
return_names,
354354
return_types,
355355
sql: self.sql.clone(),
356356
})

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
171171
(139, "2025-07-25: Add: Grant/OwnershipSequenceObject and UserPrivilegeType AccessSequence, AccessSequence"),
172172
(140, "2025-07-24: Add: TaskMessage::Delete add WarehouseOptions"),
173173
(141, "2025-08-06: Add: row_access.proto"),
174+
(142, "2025-08-11: Add: add UDTF"),
174175
// Dear developer:
175176
// If you're gonna add a new metadata version, you'll have to add a test for it.
176177
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)

src/meta/proto-conv/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,4 @@ mod v138_table_statistics;
133133
mod v139_add_grant_ownership_object_sequence;
134134
mod v140_task_message;
135135
mod v141_row_access_policy;
136+
mod v142_udtf;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use chrono::DateTime;
16+
use chrono::Utc;
17+
use databend_common_meta_app::principal::UDFDefinition;
18+
use databend_common_meta_app::principal::UserDefinedFunction;
19+
use databend_common_meta_app::principal::UDTF;
20+
use fastrace::func_name;
21+
22+
use crate::common;
23+
24+
// These bytes are built when a new version in introduced,
25+
// and are kept for backward compatibility test.
26+
//
27+
// *************************************************************
28+
// * These messages should never be updated, *
29+
// * only be added when a new version is added, *
30+
// * or be removed when an old version is no longer supported. *
31+
// *************************************************************
32+
//
33+
// The message bytes are built from the output of `test_pb_from_to()`
34+
#[test]
35+
fn test_decode_v142_udtf() -> anyhow::Result<()> {
36+
let bytes = vec![
37+
10, 9, 116, 101, 115, 116, 95, 117, 100, 116, 102, 18, 21, 84, 104, 105, 115, 32, 105, 115,
38+
32, 97, 32, 100, 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 66, 25, 26, 16, 115, 101,
39+
108, 101, 99, 116, 32, 42, 32, 102, 114, 111, 109, 32, 116, 49, 160, 6, 142, 1, 168, 6, 24,
40+
42, 23, 50, 48, 50, 51, 45, 49, 50, 45, 49, 53, 32, 48, 49, 58, 50, 54, 58, 48, 57, 32, 85,
41+
84, 67, 160, 6, 142, 1, 168, 6, 24,
42+
];
43+
44+
let want = || UserDefinedFunction {
45+
name: "test_udtf".to_string(),
46+
description: "This is a description".to_string(),
47+
definition: UDFDefinition::UDTF(UDTF {
48+
arg_types: vec![],
49+
return_types: vec![],
50+
sql: "select * from t1".to_string(),
51+
}),
52+
created_on: DateTime::<Utc>::from_timestamp(1702603569, 0).unwrap(),
53+
};
54+
55+
common::test_pb_from_to(func_name!(), want())?;
56+
common::test_load_old(func_name!(), bytes.as_slice(), 142, want())
57+
}

src/meta/protos/proto/udf.proto

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,18 @@ message UDAFScript {
6969
repeated string packages = 8;
7070
}
7171

72+
message UDTFArg {
73+
string name = 1;
74+
DataType type = 2;
75+
}
76+
7277
message UDTF {
7378
uint64 ver = 100;
7479
uint64 min_reader_ver = 101;
7580

76-
repeated string arg_names = 1;
77-
repeated DataType arg_types = 2;
78-
repeated string return_names = 3;
79-
repeated DataType return_types = 4;
80-
string sql = 5;
81+
repeated UDTFArg arg_types = 1;
82+
repeated UDTFArg return_types = 2;
83+
string sql = 3;
8184
}
8285

8386
message UserDefinedFunction {

src/query/ast/src/ast/statements/udf.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ pub enum UDFDefinition {
7272
runtime_version: String,
7373
},
7474
UDTFSql {
75-
arg_types: BTreeMap<Identifier, TypeName>,
76-
return_types: BTreeMap<Identifier, TypeName>,
75+
arg_types: Vec<(Identifier, TypeName)>,
76+
return_types: Vec<(Identifier, TypeName)>,
7777
sql: String,
78-
}
78+
},
7979
}
8080

8181
impl Display for UDFDefinition {
@@ -184,12 +184,18 @@ impl Display for UDFDefinition {
184184
UDFDefinition::UDTFSql {
185185
arg_types,
186186
return_types,
187-
sql
187+
sql,
188188
} => {
189189
write!(f, "(")?;
190-
write_comma_separated_list(f, arg_types.iter().map(|(name, ty)| format!("{name} {ty}")))?;
190+
write_comma_separated_list(
191+
f,
192+
arg_types.iter().map(|(name, ty)| format!("{name} {ty}")),
193+
)?;
191194
write!(f, ") RETURNS TABLE (")?;
192-
write_comma_separated_list(f, return_types.iter().map(|(name, ty)| format!("{name} {ty}")))?;
195+
write_comma_separated_list(
196+
f,
197+
return_types.iter().map(|(name, ty)| format!("{name} {ty}")),
198+
)?;
193199
write!(f, ") AS $$\n{sql}\n$$")?;
194200
}
195201
UDFDefinition::UDAFScript {

src/query/ast/src/parser/statement.rs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5024,13 +5024,11 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
50245024
~ RETURNS ~ TABLE ~ "(" ~ #comma_separated_list0(udtf_arg) ~ ")"
50255025
~ AS ~ ^#code_string
50265026
},
5027-
|(_, arg_types, _, _, _, _, return_types, _, _, sql)| {
5028-
UDFDefinition::UDTFSql {
5029-
arg_types: BTreeMap::from_iter(arg_types),
5030-
return_types: BTreeMap::from_iter(return_types),
5031-
sql,
5032-
}
5033-
}
5027+
|(_, arg_types, _, _, _, _, return_types, _, _, sql)| UDFDefinition::UDTFSql {
5028+
arg_types,
5029+
return_types,
5030+
sql,
5031+
},
50345032
);
50355033

50365034
let udaf = map(
@@ -5101,11 +5099,8 @@ pub fn udf_definition(i: Input) -> IResult<UDFDefinition> {
51015099
)(i)
51025100
}
51035101

5104-
fn udtf_arg(i:Input) -> IResult<(Identifier, TypeName)> {
5105-
map(
5106-
rule! { #ident ~ ^#type_name },
5107-
|(name, ty)| (name, ty),
5108-
)(i)
5102+
fn udtf_arg(i: Input) -> IResult<(Identifier, TypeName)> {
5103+
map(rule! { #ident ~ ^#type_name }, |(name, ty)| (name, ty))(i)
51095104
}
51105105

51115106
fn udf_immutable(i: Input) -> IResult<bool> {

src/query/ast/tests/it/parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,7 @@ SELECT * from s;"#,
826826
r#"CREATE OR REPLACE FUNCTION isnotempty_test_replace AS(p) -> not(is_null(p)) DESC = 'This is a description';"#,
827827
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
828828
r#"CREATE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' HEADERS = ('X-Authorization' = '123') ADDRESS = 'http://0.0.0.0:8815';"#,
829+
r#"CREATE FUNCTION binary_reverse_table () RETURNS TABLE (c1 int) AS $$ select * from binary_reverse $$;"#,
829830
r#"ALTER FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
830831
r#"CREATE OR REPLACE FUNCTION binary_reverse (BINARY) RETURNS BINARY LANGUAGE python HANDLER = 'binary_reverse' ADDRESS = 'http://0.0.0.0:8815';"#,
831832
r#"CREATE file format my_orc type = orc"#,

0 commit comments

Comments
 (0)