Skip to content

Commit 882f464

Browse files
authored
refactor(fuse): runtime filter into two files: expr_bloom_filter.rs and expr_runtime_pruner.rs (#17804)
refactor(fuse): runtime filter into two files:expr_bloom_filter.rs and expr_runtime_pruner.rs
1 parent 88e85b8 commit 882f464

File tree

9 files changed

+308
-270
lines changed

9 files changed

+308
-270
lines changed

โ€Žsrc/query/storages/fuse/src/operations/read/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ mod parquet_data_source;
2222
mod parquet_data_source_deserializer;
2323
mod parquet_data_transform_reader;
2424
mod parquet_rows_fetcher;
25-
mod runtime_filter_prunner;
2625

2726
mod block_partition_meta;
2827
mod block_partition_receiver_source;

โ€Žsrc/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ use crate::io::AggIndexReader;
7070
use crate::io::BlockReader;
7171
use crate::io::VirtualColumnReader;
7272
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
73-
use crate::operations::read::runtime_filter_prunner::update_bitmap_with_bloom_filter;
73+
use crate::pruning::ExprBloomFilter;
7474
use crate::DEFAULT_ROW_PER_PAGE;
7575

7676
/// A helper struct to store the intermediate state while reading a native partition.
@@ -857,7 +857,9 @@ impl NativeDeserializeDataTransform {
857857
let probe_block = self.block_reader.build_block(&[column], None)?;
858858
let mut bitmap = MutableBitmap::from_len_zeroed(probe_block.num_rows());
859859
let probe_column = probe_block.get_last_column().clone();
860-
update_bitmap_with_bloom_filter(probe_column, filter, &mut bitmap)?;
860+
// Apply the filter to the probe column.
861+
ExprBloomFilter::new(filter.clone()).apply(probe_column, &mut bitmap)?;
862+
861863
let unset_bits = bitmap.null_count();
862864
if unset_bits == bitmap.len() {
863865
// skip current page.

โ€Žsrc/query/storages/fuse/src/operations/read/native_data_transform_reader.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::io::TableMetaLocationGenerator;
3838
use crate::io::VirtualColumnReader;
3939
use crate::operations::read::block_partition_meta::BlockPartitionMeta;
4040
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
41-
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;
41+
use crate::pruning::ExprRuntimePruner;
4242
use crate::FuseBlockPartInfo;
4343

4444
pub struct ReadNativeDataTransform<const BLOCKING_IO: bool> {
@@ -123,14 +123,12 @@ impl Transform for ReadNativeDataTransform<true> {
123123
self.context
124124
.get_min_max_runtime_filter_with_id(self.scan_id),
125125
);
126-
if runtime_filter_pruner(
127-
self.table_schema.clone(),
128-
&part,
129-
&filters,
130-
&self.func_ctx,
131-
)? {
126+
127+
let runtime_filter = ExprRuntimePruner::new(filters.clone());
128+
if runtime_filter.prune(&self.func_ctx, self.table_schema.clone(), &part)? {
132129
return Ok(DataBlock::empty());
133130
}
131+
134132
if let Some(index_reader) = self.index_reader.as_ref() {
135133
let fuse_part = FuseBlockPartInfo::from_part(&part)?;
136134
let loc =
@@ -201,13 +199,9 @@ impl AsyncTransform for ReadNativeDataTransform<false> {
201199
.get_min_max_runtime_filter_with_id(self.scan_id),
202200
);
203201
let mut native_part_infos = Vec::with_capacity(parts.len());
202+
let runtime_filter = ExprRuntimePruner::new(filters.clone());
204203
for part in parts.into_iter() {
205-
if runtime_filter_pruner(
206-
self.table_schema.clone(),
207-
&part,
208-
&filters,
209-
&self.func_ctx,
210-
)? {
204+
if runtime_filter.prune(&self.func_ctx, self.table_schema.clone(), &part)? {
211205
continue;
212206
}
213207

โ€Žsrc/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::io::AggIndexReader;
5353
use crate::io::BlockReader;
5454
use crate::io::VirtualColumnReader;
5555
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
56-
use crate::operations::read::runtime_filter_prunner::update_bitmap_with_bloom_filter;
56+
use crate::pruning::ExprBloomFilter;
5757

5858
pub struct DeserializeDataTransform {
5959
ctx: Arc<dyn TableContext>,
@@ -165,7 +165,9 @@ impl DeserializeDataTransform {
165165
let probe_column = probe_block_entry
166166
.value
167167
.convert_to_full_column(&probe_block_entry.data_type, data_block.num_rows());
168-
update_bitmap_with_bloom_filter(probe_column, filter, &mut bitmap)?;
168+
169+
// Apply bloom filter
170+
ExprBloomFilter::new(filter.clone()).apply(probe_column, &mut bitmap)?;
169171
bitmaps.push(bitmap);
170172
}
171173
if !bitmaps.is_empty() {

โ€Žsrc/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::io::TableMetaLocationGenerator;
4040
use crate::io::VirtualColumnReader;
4141
use crate::operations::read::block_partition_meta::BlockPartitionMeta;
4242
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
43-
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;
43+
use crate::pruning::ExprRuntimePruner;
4444

4545
pub struct ReadParquetDataTransform<const BLOCKING_IO: bool> {
4646
func_ctx: FunctionContext,
@@ -124,12 +124,9 @@ impl Transform for ReadParquetDataTransform<true> {
124124
self.context
125125
.get_min_max_runtime_filter_with_id(self.scan_id),
126126
);
127-
if runtime_filter_pruner(
128-
self.table_schema.clone(),
129-
&part,
130-
&filters,
131-
&self.func_ctx,
132-
)? {
127+
128+
let runtime_filter = ExprRuntimePruner::new(filters.clone());
129+
if runtime_filter.prune(&self.func_ctx, self.table_schema.clone(), &part)? {
133130
return Ok(DataBlock::empty());
134131
}
135132

@@ -207,13 +204,10 @@ impl AsyncTransform for ReadParquetDataTransform<false> {
207204
.get_min_max_runtime_filter_with_id(self.scan_id),
208205
);
209206
let mut fuse_part_infos = Vec::with_capacity(parts.len());
207+
208+
let runtime_filter = ExprRuntimePruner::new(filters.clone());
210209
for part in parts.into_iter() {
211-
if runtime_filter_pruner(
212-
self.table_schema.clone(),
213-
&part,
214-
&filters,
215-
&self.func_ctx,
216-
)? {
210+
if runtime_filter.prune(&self.func_ctx, self.table_schema.clone(), &part)? {
217211
continue;
218212
}
219213

โ€Žsrc/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs

Lines changed: 0 additions & 239 deletions
This file was deleted.

0 commit comments

Comments
ย (0)