Skip to content

Commit 1576911

Browse files
committed
chore(cubestore): Upgrade DF: Pass physical predicate for Parquet row group pruning
1 parent bf90bb3 commit 1576911

File tree

1 file changed

+20
-4
lines changed

1 file changed

+20
-4
lines changed

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion::arrow::ipc::reader::StreamReader;
3333
use datafusion::arrow::ipc::writer::StreamWriter;
3434
use datafusion::arrow::record_batch::RecordBatch;
3535
use datafusion::catalog::Session;
36+
use datafusion::common::ToDFSchema;
3637
use datafusion::datasource::listing::PartitionedFile;
3738
use datafusion::datasource::object_store::ObjectStoreUrl;
3839
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
@@ -542,6 +543,7 @@ impl CubeTable {
542543

543544
fn async_scan(
544545
&self,
546+
state: &dyn Session,
545547
table_projection: Option<&Vec<usize>>,
546548
filters: &[Expr],
547549
) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
@@ -637,6 +639,15 @@ impl CubeTable {
637639
};
638640

639641
let predicate = combine_filters(filters);
642+
let physical_predicate =
643+
if let Some(pred) = &predicate {
644+
Some(state.create_physical_expr(
645+
pred.clone(),
646+
&index_schema.as_ref().clone().to_dfschema()?,
647+
)?)
648+
} else {
649+
None
650+
};
640651
for partition_snapshot in partition_snapshots {
641652
let partition = partition_snapshot.partition();
642653
let filter = self
@@ -672,9 +683,14 @@ impl CubeTable {
672683
))
673684
})
674685
.collect::<Result<Vec<_>, _>>()?]);
675-
let parquet_exec = ParquetExecBuilder::new(file_scan)
676-
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone())
677-
.build();
686+
let parquet_exec_builder = ParquetExecBuilder::new(file_scan)
687+
.with_parquet_file_reader_factory(self.parquet_metadata_cache.clone());
688+
let parquet_exec_builder = if let Some(phys_pred) = &physical_predicate {
689+
parquet_exec_builder.with_predicate(phys_pred.clone())
690+
} else {
691+
parquet_exec_builder
692+
};
693+
let parquet_exec = parquet_exec_builder.build();
678694

679695
let arc: Arc<dyn ExecutionPlan> = Arc::new(parquet_exec);
680696
let arc = FilterByKeyRangeExec::issue_filters(arc, filter.clone(), key_len);
@@ -1635,7 +1651,7 @@ impl TableProvider for CubeTable {
16351651
filters: &[Expr],
16361652
_limit: Option<usize>, // TODO: propagate limit
16371653
) -> DFResult<Arc<dyn ExecutionPlan>> {
1638-
let res = self.async_scan(projection, filters)?;
1654+
let res = self.async_scan(state, projection, filters)?;
16391655
Ok(res)
16401656
}
16411657
fn table_type(&self) -> TableType {

0 commit comments

Comments
 (0)