Skip to content

Commit b5bcfba

Browse files
committed
feat(query): enable runtime cast transform in loading parquet files
1 parent fdaac4e commit b5bcfba

File tree

1 file changed

+100
-0
lines changed

1 file changed

+100
-0
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2021 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_exception::Result;
18+
use common_expression::BlockEntry;
19+
use common_expression::DataBlock;
20+
use common_expression::DataSchemaRef;
21+
use common_expression::Evaluator;
22+
use common_expression::Expr;
23+
use common_expression::FunctionContext;
24+
use common_functions::scalars::BUILTIN_FUNCTIONS;
25+
26+
use crate::pipelines::processors::port::InputPort;
27+
use crate::pipelines::processors::port::OutputPort;
28+
use crate::pipelines::processors::processor::ProcessorPtr;
29+
use crate::pipelines::processors::transforms::transform::Transform;
30+
use crate::pipelines::processors::transforms::transform::Transformer;
31+
32+
/// TransformRuntimeCastSchema is used to cast block to the specified schema.
33+
/// Different from `TransformCastSchema`, it is used at the runtime
34+
pub struct TransformRuntimeCastSchema {
35+
func_ctx: FunctionContext,
36+
insert_schema: DataSchemaRef,
37+
}
38+
39+
impl TransformRuntimeCastSchema
40+
where Self: Transform
41+
{
42+
pub fn try_create(
43+
input_port: Arc<InputPort>,
44+
output_port: Arc<OutputPort>,
45+
insert_schema: DataSchemaRef,
46+
func_ctx: FunctionContext,
47+
) -> Result<ProcessorPtr> {
48+
Ok(ProcessorPtr::create(Transformer::create(
49+
input_port,
50+
output_port,
51+
Self {
52+
func_ctx,
53+
insert_schema,
54+
},
55+
)))
56+
}
57+
}
58+
59+
impl Transform for TransformRuntimeCastSchema {
60+
const NAME: &'static str = "CastSchemaTransform";
61+
62+
fn transform(&mut self, data_block: DataBlock) -> Result<DataBlock> {
63+
let exprs: Vec<Expr> = data_block
64+
.columns()
65+
.iter()
66+
.zip(self.insert_schema.fields().iter().enumerate())
67+
.map(|(from, (index, to))| {
68+
let expr = Expr::ColumnRef {
69+
span: None,
70+
id: index,
71+
data_type: from.data_type.clone(),
72+
display_name: to.name().clone(),
73+
};
74+
if &from.data_type != to.data_type() {
75+
Expr::Cast {
76+
span: None,
77+
is_try: false,
78+
expr: Box::new(expr),
79+
dest_type: to.data_type().clone(),
80+
}
81+
} else {
82+
expr
83+
}
84+
})
85+
.collect();
86+
87+
let mut columns = Vec::with_capacity(exprs.len());
88+
let evaluator = Evaluator::new(&data_block, self.func_ctx, &BUILTIN_FUNCTIONS);
89+
90+
for (field, expr) in self.insert_schema.fields().iter().zip(exprs.iter()) {
91+
let value = evaluator.run(expr)?;
92+
let column = BlockEntry {
93+
data_type: field.data_type().clone(),
94+
value,
95+
};
96+
columns.push(column);
97+
}
98+
Ok(DataBlock::new(columns, data_block.num_rows()))
99+
}
100+
}

0 commit comments

Comments
 (0)