Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
struct CustomExec {
db: CustomDataSource,
projected_schema: SchemaRef,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl CustomExec {
Expand All @@ -202,12 +202,13 @@ impl CustomExec {
schema: SchemaRef,
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
let projected_schema =
project_schema(&schema, projections.map(AsRef::as_ref)).unwrap();
let cache = Self::compute_properties(projected_schema.clone());
Self {
db,
projected_schema,
cache,
cache: Arc::new(cache),
}
}

Expand Down Expand Up @@ -238,7 +239,7 @@ impl ExecutionPlan for CustomExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
struct BufferingExecutionPlan {
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}

impl BufferingExecutionPlan {
Expand Down Expand Up @@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
self.schema.clone()
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
self
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}

Expand Down Expand Up @@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
self
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ pub struct SampleExec {
upper_bound: f64,
seed: u64,
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl SampleExec {
Expand Down Expand Up @@ -656,7 +656,7 @@ impl SampleExec {
upper_bound,
seed,
metrics: ExecutionPlanMetricsSet::new(),
cache,
cache: Arc::new(cache),
})
}

Expand Down Expand Up @@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ impl TableProvider for ListingTable {

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
let projected_schema = project_schema(&self.schema(), projection.as_deref())?;
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
struct DmlResultExec {
rows_affected: u64,
schema: SchemaRef,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}

impl DmlResultExec {
Expand All @@ -570,7 +570,7 @@ impl DmlResultExec {
Self {
rows_affected,
schema,
properties,
properties: Arc::new(properties),
}
}
}
Expand Down Expand Up @@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
Arc::clone(&self.schema)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down
17 changes: 8 additions & 9 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl Statistics {
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
let Some(projection) = projection else {
return self;
};
Expand Down Expand Up @@ -1066,29 +1066,28 @@ mod tests {

#[test]
fn test_project_none() {
let projection = None;
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let stats = make_stats(vec![10, 20, 30]).project(None);
assert_eq!(stats, make_stats(vec![10, 20, 30]));
}

#[test]
fn test_project_empty() {
let projection = Some(vec![]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let projection: Option<&[_]> = Some(&[]);
let stats = make_stats(vec![10, 20, 30]).project(projection);
assert_eq!(stats, make_stats(vec![]));
}

#[test]
fn test_project_swap() {
let projection = Some(vec![2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let projection: Option<&[_]> = Some(&[2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection);
assert_eq!(stats, make_stats(vec![30, 20]));
}

#[test]
fn test_project_repeated() {
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection);
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use std::thread::available_parallelism;
///
/// // Pick columns 'c' and 'b'
/// let projection = Some(vec![2, 1]);
/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap();
///
/// let expected_schema = SchemaRef::new(Schema::new(vec![
/// Field::new("c", DataType::Utf8, true),
Expand All @@ -70,7 +70,7 @@ use std::thread::available_parallelism;
/// ```
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&Vec<usize>>,
projection: Option<&[usize]>,
) -> Result<SchemaRef> {
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns)?),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/benches/reset_plan_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
/// making an independent instance of the execution plan to re-execute it, avoiding
/// re-planning stage.
fn bench_reset_plan_states(c: &mut Criterion) {
env_logger::init();

let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
ctx.register_table(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {

let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'");

// ClickBench partitioned dataset was written by an ancient version of pyarrow that
// that wrote strings with the wrong logical type. To read it correctly, we must
// automatically convert binary to string.
rt.block_on(ctx.sql("SET datafusion.execution.parquet.binary_as_string = true;"))
.unwrap();
rt.block_on(ctx.sql(&sql)).unwrap();

let count =
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl TableProvider for EmptyTable {
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection)?;
let projected_schema =
project_schema(&self.schema, projection.map(AsRef::as_ref))?;
Ok(Arc::new(
EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3661,13 +3661,15 @@ mod tests {

#[derive(Debug)]
struct NoOpExecutionPlan {
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl NoOpExecutionPlan {
fn new(schema: SchemaRef) -> Self {
let cache = Self::compute_properties(schema);
Self { cache }
Self {
cache: Arc::new(cache),
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
Expand Down Expand Up @@ -3705,7 +3707,7 @@ mod tests {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down Expand Up @@ -3859,7 +3861,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
Expand Down Expand Up @@ -3908,7 +3910,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
Expand Down Expand Up @@ -4029,7 +4031,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,19 @@ struct CustomTableProvider;
#[derive(Debug, Clone)]
struct CustomExecutionPlan {
projection: Option<Vec<usize>>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl CustomExecutionPlan {
fn new(projection: Option<Vec<usize>>) -> Self {
let schema = TEST_CUSTOM_SCHEMA_REF!();
let schema =
project_schema(&schema, projection.as_ref()).expect("projected schema");
project_schema(&schema, projection.as_deref()).expect("projected schema");
let cache = Self::compute_properties(schema);
Self { projection, cache }
Self {
projection,
cache: Arc::new(cache),
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
Expand Down Expand Up @@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
#[derive(Debug)]
struct CustomPlan {
batches: Vec<RecordBatch>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl CustomPlan {
fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
let cache = Self::compute_properties(schema);
Self { batches, cache }
Self {
batches,
cache: Arc::new(cache),
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
Expand Down Expand Up @@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use async_trait::async_trait;
struct StatisticsValidation {
stats: Statistics,
schema: Arc<Schema>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl StatisticsValidation {
Expand All @@ -59,7 +59,7 @@ impl StatisticsValidation {
Self {
stats,
schema,
cache,
cache: Arc::new(cache),
}
}

Expand Down Expand Up @@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/fuzz_cases/once_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex};
pub struct OnceExec {
/// the results to send back
stream: Mutex<Option<SendableRecordBatchStream>>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl Debug for OnceExec {
Expand All @@ -46,7 +46,7 @@ impl OnceExec {
let cache = Self::compute_properties(stream.schema());
Self {
stream: Mutex::new(Some(stream)),
cache,
cache: Arc::new(cache),
}
}

Expand Down Expand Up @@ -83,7 +83,7 @@ impl ExecutionPlan for OnceExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Loading
Loading