Skip to content
Merged
15 changes: 9 additions & 6 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::common::{
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetFileReaderFactory, ParquetSource,
FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
Expand All @@ -55,6 +55,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::datasource::memory::DataSourceExec;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -498,13 +499,15 @@ impl TableProvider for IndexTableProvider {
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config = FileScanConfig::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

// Finally, put it all together into a DataSourceExec
Ok(file_scan_config.build())
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible to have this return as a function? Is it because of import cycles?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean as aDataSourceExec function? It also looks a bit verbose to me, but the inner Arc is needed for dynamic dispatch, and the outer one makes the return type more explicit. Happy to make it DataSourceExec::new_arc if you want, but i don't think we use that a lot in datafusion

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant just a cosmetic change to not re-writing the whole Arc::new(DataSourceExec::new(Arc::new(file_scan))) Maybe it can be something like DataSourceExec::from_file_source(file_scan) -> Arc<DataSourceExec>

What would you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you like 748a335 ? Looks cleaner

}

/// Tell DataFusion to push filters down to the scan method
Expand Down
15 changes: 8 additions & 7 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ use datafusion::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{
CsvSource, FileScanConfig, FileSource, FileStream, JsonOpener, JsonSource,
},
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use datafusion::datasource::physical_plan::FileScanConfigBuilder;
use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

Expand All @@ -56,14 +55,15 @@ async fn csv_opener() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig::new(
let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::clone(&schema),
Arc::new(CsvSource::default()),
)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();

let config = CsvSource::new(true, b',', b'"')
.with_comment(Some(b'#'))
Expand Down Expand Up @@ -121,14 +121,15 @@ async fn json_opener() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfig::new(
let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Arc::new(JsonSource::default()),
)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();

let mut stream = FileStream::new(
&scan_config,
Expand Down
10 changes: 6 additions & 4 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::{
Expand Down Expand Up @@ -244,9 +245,10 @@ impl TableProvider for IndexTableProvider {
let source =
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
let mut file_scan_config =
FileScanConfig::new(object_store_url, self.schema(), source)
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
.with_projection(projection.cloned())
.with_limit(limit);
.with_limit(limit)
.build();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mertak-synnada, hey, this PR is still WIP but I was wondering if you're happy with this approach. That's what we've discussed in #14685 (comment)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far, thank you! I haven't been able to test it thoroughly yet, but the legacy ParquetExecBuilder could be helpful for understanding specific cases, just FYI.


// Transform to the format needed to pass to DataSourceExec
// Create one file group per file (default to scanning them all in parallel)
Expand All @@ -258,7 +260,7 @@ impl TableProvider for IndexTableProvider {
file_size,
));
}
Ok(file_scan_config.build())
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use datafusion_common::{
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
Expand All @@ -58,6 +58,7 @@ use datafusion_physical_plan::insert::{DataSink, DataSinkExec};

use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -173,7 +174,12 @@ impl FileFormat for ArrowFormat {
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(conf.with_source(Arc::new(ArrowSource::default())).build())
let source = Arc::new(ArrowSource::default());
let config = FileScanConfigBuilder::from(conf)
.with_source(source)
.build();

Ok(Arc::new(DataSourceExec::new(Arc::new(config))))
}

async fn create_writer_physical_plan(
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ pub(crate) mod test_util {

use datafusion_catalog::Session;
use datafusion_common::Result;
use datafusion_datasource::{
file_format::FileFormat, file_scan_config::FileScanConfig, PartitionedFile,
};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
use datafusion_execution::object_store::ObjectStoreUrl;

use crate::test::object_store::local_unpartitioned_file;
Expand Down Expand Up @@ -78,15 +77,16 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
FileScanConfig::new(
FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
format.file_source(),
)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
.with_limit(limit)
.build(),
None,
)
.await?;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -941,7 +941,7 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
session_state,
FileScanConfig::new(
FileScanConfigBuilder::new(
object_store_url,
Arc::clone(&self.file_schema),
self.options.format.file_source(),
Expand All @@ -952,7 +952,8 @@ impl TableProvider for ListingTable {
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
.with_table_partition_cols(table_partition_cols)
.build(),
filters.as_ref(),
)
.await
Expand Down
15 changes: 10 additions & 5 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::assert_batches_sorted_eq;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
Expand All @@ -71,6 +71,7 @@ mod tests {

use ::object_store::path::Path;
use ::object_store::ObjectMeta;
use datafusion_datasource::source::DataSourceExec;
use datafusion_physical_plan::collect;
use tempfile::TempDir;

Expand Down Expand Up @@ -127,11 +128,15 @@ mod tests {
ParquetSource::default()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})),
);
let base_conf =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
.with_file(partitioned_file);
let base_conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
source,
)
.with_file(partitioned_file)
.build();

let parquet_exec = base_conf.build();
let parquet_exec = Arc::new(DataSourceExec::new(Arc::new(base_conf)));

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
Expand Down
33 changes: 20 additions & 13 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ mod tests {
use arrow::datatypes::{DataType, Field, SchemaBuilder};
use datafusion_common::{assert_batches_eq, test_util, Result, ScalarValue};
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource_avro::source::AvroSource;
use datafusion_datasource_avro::AvroFormat;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::ExecutionPlan;

use datafusion_datasource::source::DataSourceExec;
use futures::StreamExt;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
Expand Down Expand Up @@ -79,12 +80,16 @@ mod tests {
.await?;

let source = Arc::new(AvroSource::new());
let conf =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, source)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]));

let source_exec = conf.build();
let conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
source,
)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]))
.build();

let source_exec = Arc::new(DataSourceExec::new(Arc::new(conf)));
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -153,11 +158,12 @@ mod tests {
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let source = Arc::new(AvroSource::new());
let conf = FileScanConfig::new(object_store_url, file_schema, source)
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
.with_file(meta.into())
.with_projection(projection);
.with_projection(projection)
.build();

let source_exec = conf.build();
let source_exec = Arc::new(DataSourceExec::new(Arc::new(conf)));
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -224,14 +230,15 @@ mod tests {

let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let source = Arc::new(AvroSource::new());
let conf = FileScanConfig::new(object_store_url, file_schema, source)
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
.with_projection(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]);
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
.build();

let source_exec = conf.build();
let source_exec = Arc::new(DataSourceExec::new(Arc::new(conf)));

assert_eq!(
source_exec
Expand Down
Loading