Skip to content

Commit 022c639

Browse files
authored
feat: add async function read_file. (#19426)
1 parent 1cc88fb commit 022c639

File tree

14 files changed

+824
-36
lines changed

14 files changed

+824
-36
lines changed

src/query/functions/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ pub fn is_cacheable_function(name: &str) -> bool {
5454
#[ctor]
5555
pub static BUILTIN_FUNCTIONS: FunctionRegistry = builtin_functions();
5656

57-
pub const ASYNC_FUNCTIONS: [Ascii<&str>; 2] = [Ascii::new("nextval"), Ascii::new("dict_get")];
57+
pub const ASYNC_FUNCTIONS: [Ascii<&str>; 3] = [
58+
Ascii::new("nextval"),
59+
Ascii::new("dict_get"),
60+
Ascii::new("read_file"),
61+
];
5862

5963
pub const GENERAL_WITHIN_GROUP_FUNCTIONS: [Ascii<&str>; 5] = [
6064
Ascii::new("array_agg"),

src/query/service/src/physical_plans/physical_async_func.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,14 @@ impl IPhysicalPlan for AsyncFunction {
114114
let sequence_counters =
115115
TransformAsyncFunction::create_sequence_counters(self.async_func_descs.len());
116116

117-
builder.main_pipeline.add_async_transformer(|| {
117+
builder.main_pipeline.try_add_async_transformer(|| {
118118
TransformAsyncFunction::new(
119119
builder.ctx.clone(),
120120
self.async_func_descs.clone(),
121121
operators.clone(),
122122
sequence_counters.clone(),
123123
)
124-
});
124+
})?;
125125

126126
Ok(())
127127
}

src/query/service/src/physical_plans/physical_multi_table_insert.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -552,15 +552,14 @@ impl IPhysicalPlan for ChunkFillAndReorder {
552552
TransformAsyncFunction::create_sequence_counters(async_funcs.len());
553553
let ctx = builder.ctx.clone();
554554
plan.async_builder = Some(Box::new(move |input, output| {
555+
let transform = TransformAsyncFunction::new(
556+
ctx.clone(),
557+
async_funcs.clone(),
558+
BTreeMap::new(),
559+
counters.clone(),
560+
)?;
555561
Ok(ProcessorPtr::create(AsyncTransformer::create(
556-
input,
557-
output,
558-
TransformAsyncFunction::new(
559-
ctx.clone(),
560-
async_funcs.clone(),
561-
BTreeMap::new(),
562-
counters.clone(),
563-
),
562+
input, output, transform,
564563
)))
565564
}));
566565

src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ impl PipelineBuilder {
5757
TransformAsyncFunction::create_sequence_counters(async_funcs.len());
5858

5959
pipeline.try_add_async_transformer(|| {
60-
Ok(TransformAsyncFunction::new(
60+
TransformAsyncFunction::new(
6161
ctx.clone(),
6262
async_funcs.clone(),
6363
BTreeMap::new(),
6464
sequence_counters.clone(),
65-
))
65+
)
6666
})?;
6767
if new_default_schema != new_default_schema_no_cast {
6868
pipeline.try_add_transformer(|| {

src/query/service/src/pipelines/builders/builder_mutation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use databend_common_storages_fuse::operations::UnMatchedExprs;
4141

4242
use crate::pipelines::PipelineBuilder;
4343
use crate::pipelines::processors::transforms::AsyncFunctionBranch;
44+
use crate::pipelines::processors::transforms::ReadFileContext;
4445
use crate::pipelines::processors::transforms::TransformAsyncFunction;
4546
use crate::pipelines::processors::transforms::TransformBranchedAsyncFunction;
4647
use crate::pipelines::processors::transforms::TransformResortAddOnWithoutSourceSchema;
@@ -125,6 +126,7 @@ impl PipelineBuilder {
125126
Ok(TransformBranchedAsyncFunction {
126127
ctx: self.ctx.clone(),
127128
branches: branches.clone(),
129+
read_file_ctx: ReadFileContext::try_new(&self.ctx)?,
128130
})
129131
},
130132
transform_len,

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ pub use new_hash_join::Join;
4848
pub use new_hash_join::TransformHashJoin;
4949
pub use new_hash_join::*;
5050
pub use sort::*;
51+
pub(crate) use transform_async_function::AutoIncrementNextValFetcher;
52+
pub(crate) use transform_async_function::ReadFileContext;
5153
pub use transform_async_function::SequenceCounters;
54+
pub(crate) use transform_async_function::SequenceNextValFetcher;
5255
pub use transform_async_function::TransformAsyncFunction;
5356
pub use transform_branched_async_function::AsyncFunctionBranch;
5457
pub use transform_branched_async_function::TransformBranchedAsyncFunction;

0 commit comments

Comments
 (0)