Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
struct CustomExec {
db: CustomDataSource,
projected_schema: SchemaRef,
cache: PlanProperties,
cache: Arc<PlanProperties>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for this change

}

impl CustomExec {
Expand All @@ -202,12 +202,13 @@ impl CustomExec {
schema: SchemaRef,
db: CustomDataSource,
) -> Self {
let projected_schema = project_schema(&schema, projections).unwrap();
let projected_schema =
project_schema(&schema, projections.map(AsRef::as_ref)).unwrap();
let cache = Self::compute_properties(projected_schema.clone());
Self {
db,
projected_schema,
cache,
cache: Arc::new(cache),
}
}

Expand Down Expand Up @@ -238,7 +239,7 @@ impl ExecutionPlan for CustomExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
struct BufferingExecutionPlan {
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}

impl BufferingExecutionPlan {
Expand Down Expand Up @@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
self.schema.clone()
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
self
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}

Expand Down Expand Up @@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
self
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ pub struct SampleExec {
upper_bound: f64,
seed: u64,
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl SampleExec {
Expand Down Expand Up @@ -656,7 +656,7 @@ impl SampleExec {
upper_bound,
seed,
metrics: ExecutionPlanMetricsSet::new(),
cache,
cache: Arc::new(cache),
})
}

Expand Down Expand Up @@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ impl TableProvider for ListingTable {

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
let projected_schema = project_schema(&self.schema(), projection.as_deref())?;
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
struct DmlResultExec {
rows_affected: u64,
schema: SchemaRef,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}

impl DmlResultExec {
Expand All @@ -570,7 +570,7 @@ impl DmlResultExec {
Self {
rows_affected,
schema,
properties,
properties: Arc::new(properties),
}
}
}
Expand Down Expand Up @@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
Arc::clone(&self.schema)
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down
17 changes: 8 additions & 9 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl Statistics {
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
/// "b"}`.
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
let Some(projection) = projection else {
return self;
};
Expand Down Expand Up @@ -1066,29 +1066,28 @@ mod tests {

#[test]
fn test_project_none() {
let projection = None;
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let stats = make_stats(vec![10, 20, 30]).project(None);
assert_eq!(stats, make_stats(vec![10, 20, 30]));
}

#[test]
fn test_project_empty() {
let projection = Some(vec![]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let projection: Option<&[_]> = Some(&[]);
let stats = make_stats(vec![10, 20, 30]).project(projection);
assert_eq!(stats, make_stats(vec![]));
}

#[test]
fn test_project_swap() {
let projection = Some(vec![2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let projection: Option<&[_]> = Some(&[2, 1]);
let stats = make_stats(vec![10, 20, 30]).project(projection);
assert_eq!(stats, make_stats(vec![30, 20]));
}

#[test]
fn test_project_repeated() {
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]);
let stats = make_stats(vec![10, 20, 30]).project(projection);
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use std::thread::available_parallelism;
///
/// // Pick columns 'c' and 'b'
/// let projection = Some(vec![2, 1]);
/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap();
///
/// let expected_schema = SchemaRef::new(Schema::new(vec![
/// Field::new("c", DataType::Utf8, true),
Expand All @@ -70,7 +70,7 @@ use std::thread::available_parallelism;
/// ```
pub fn project_schema(
schema: &SchemaRef,
projection: Option<&Vec<usize>>,
projection: Option<&[usize]>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also technically a public API change, but I think it just relaxes the constraints (you can still pass in a &Vec so all downstream code should continue to work

) -> Result<SchemaRef> {
let schema = match projection {
Some(columns) => Arc::new(schema.project(columns)?),
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/benches/reset_plan_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
/// making an independent instance of the execution plan to re-execute it, avoiding
/// re-planning stage.
fn bench_reset_plan_states(c: &mut Criterion) {
env_logger::init();

let rt = Runtime::new().unwrap();
let ctx = SessionContext::new();
ctx.register_table(
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl TableProvider for EmptyTable {
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// even though there is no data, projections apply
let projected_schema = project_schema(&self.schema, projection)?;
let projected_schema =
project_schema(&self.schema, projection.map(AsRef::as_ref))?;
Ok(Arc::new(
EmptyExec::new(projected_schema).with_partitions(self.partitions),
))
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3661,13 +3661,15 @@ mod tests {

#[derive(Debug)]
struct NoOpExecutionPlan {
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl NoOpExecutionPlan {
fn new(schema: SchemaRef) -> Self {
let cache = Self::compute_properties(schema);
Self { cache }
Self {
cache: Arc::new(cache),
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
Expand Down Expand Up @@ -3705,7 +3707,7 @@ mod tests {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down Expand Up @@ -3859,7 +3861,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
Expand Down Expand Up @@ -3908,7 +3910,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
Expand Down Expand Up @@ -4029,7 +4031,7 @@ digraph {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn execute(
Expand Down
11 changes: 7 additions & 4 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,19 @@ struct CustomTableProvider;
#[derive(Debug, Clone)]
struct CustomExecutionPlan {
projection: Option<Vec<usize>>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl CustomExecutionPlan {
fn new(projection: Option<Vec<usize>>) -> Self {
let schema = TEST_CUSTOM_SCHEMA_REF!();
let schema =
project_schema(&schema, projection.as_ref()).expect("projected schema");
project_schema(&schema, projection.as_deref()).expect("projected schema");
let cache = Self::compute_properties(schema);
Self { projection, cache }
Self {
projection,
cache: Arc::new(cache),
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
Expand Down Expand Up @@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
#[derive(Debug)]
struct CustomPlan {
batches: Vec<RecordBatch>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl CustomPlan {
fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
let cache = Self::compute_properties(schema);
Self { batches, cache }
Self {
batches,
cache: Arc::new(cache),
}
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
Expand Down Expand Up @@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use async_trait::async_trait;
struct StatisticsValidation {
stats: Statistics,
schema: Arc<Schema>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl StatisticsValidation {
Expand All @@ -59,7 +59,7 @@ impl StatisticsValidation {
Self {
stats,
schema,
cache,
cache: Arc::new(cache),
}
}

Expand Down Expand Up @@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/fuzz_cases/once_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex};
pub struct OnceExec {
/// the results to send back
stream: Mutex<Option<SendableRecordBatchStream>>,
cache: PlanProperties,
cache: Arc<PlanProperties>,
}

impl Debug for OnceExec {
Expand All @@ -46,7 +46,7 @@ impl OnceExec {
let cache = Self::compute_properties(stream.schema());
Self {
stream: Mutex::new(Some(stream)),
cache,
cache: Arc::new(cache),
}
}

Expand Down Expand Up @@ -83,7 +83,7 @@ impl ExecutionPlan for OnceExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}

Expand Down
Loading