Skip to content
Merged
15 changes: 9 additions & 6 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::common::{
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetFileReaderFactory, ParquetSource,
FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
Expand All @@ -55,6 +55,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::datasource::memory::DataSourceExec;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -498,13 +499,15 @@ impl TableProvider for IndexTableProvider {
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config = FileScanConfig::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

// Finally, put it all together into a DataSourceExec
Ok(file_scan_config.build())
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible to have this return as a function? Is it because of import cycles?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean as aDataSourceExec function? It also looks a bit verbose to me, but the inner Arc is needed for dynamic dispatch, and the outer one makes the return type more explicit. Happy to make it DataSourceExec::new_arc if you want, but i don't think we use that a lot in datafusion

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant just a cosmetic change to not re-writing the whole Arc::new(DataSourceExec::new(Arc::new(file_scan))) Maybe it can be something like DataSourceExec::from_file_source(file_scan) -> Arc<DataSourceExec>

What would you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you like 748a335 ? Looks cleaner

}

/// Tell DataFusion to push filters down to the scan method
Expand Down
7 changes: 4 additions & 3 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::{
Expand Down Expand Up @@ -244,9 +244,10 @@ impl TableProvider for IndexTableProvider {
let source =
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
let mut file_scan_config =
FileScanConfig::new(object_store_url, self.schema(), source)
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
.with_projection(projection.cloned())
.with_limit(limit);
.with_limit(limit)
.build();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mertak-synnada, hey, this PR is still WIP but I was wondering if you're happy with this approach. That's what we've discussed in #14685 (comment)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far, thank you! I haven't been able to test it thoroughly yet, but the legacy ParquetExecBuilder could be helpful for understanding specific cases, just FYI.


// Transform to the format needed to pass to DataSourceExec
// Create one file group per file (default to scanning them all in parallel)
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use datafusion_common::{
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
Expand All @@ -58,6 +58,7 @@ use datafusion_physical_plan::insert::{DataSink, DataSinkExec};

use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -173,7 +174,12 @@ impl FileFormat for ArrowFormat {
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(conf.with_source(Arc::new(ArrowSource::default())).build())
let source = Arc::new(ArrowSource::default());
let config = FileScanConfigBuilder::from(conf)
.with_source(source)
.build();

Ok(Arc::new(DataSourceExec::new(Arc::new(config))))
}

async fn create_writer_physical_plan(
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ pub(crate) mod test_util {

use datafusion_catalog::Session;
use datafusion_common::Result;
use datafusion_datasource::{
file_format::FileFormat, file_scan_config::FileScanConfig, PartitionedFile,
};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
use datafusion_execution::object_store::ObjectStoreUrl;

use crate::test::object_store::local_unpartitioned_file;
Expand Down Expand Up @@ -78,15 +77,16 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
FileScanConfig::new(
FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
format.file_source(),
)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
.with_limit(limit)
.build(),
None,
)
.await?;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -941,7 +941,7 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
session_state,
FileScanConfig::new(
FileScanConfigBuilder::new(
object_store_url,
Arc::clone(&self.file_schema),
self.options.format.file_source(),
Expand All @@ -952,7 +952,8 @@ impl TableProvider for ListingTable {
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
.with_table_partition_cols(table_partition_cols)
.build(),
filters.as_ref(),
)
.await
Expand Down
16 changes: 11 additions & 5 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ mod tests {
use arrow::datatypes::{DataType, Field, SchemaBuilder};
use datafusion_common::{assert_batches_eq, test_util, Result, ScalarValue};
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{
FileScanConfig, FileScanConfigBuilder,
};
use datafusion_datasource::PartitionedFile;
use datafusion_datasource_avro::source::AvroSource;
use datafusion_datasource_avro::AvroFormat;
Expand Down Expand Up @@ -79,10 +81,14 @@ mod tests {
.await?;

let source = Arc::new(AvroSource::new());
let conf =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, source)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]));
let conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
source,
)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]))
.build();

let source_exec = conf.build();
assert_eq!(
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod tests {
use arrow::array::Array;
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Field, SchemaBuilder};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
Expand Down Expand Up @@ -329,7 +330,6 @@ mod tests {
async fn nd_json_exec_file_mixed_order_projection(
file_compression_type: FileCompressionType,
) -> Result<()> {
use datafusion_datasource::file_scan_config::FileScanConfig;
use futures::StreamExt;

let session_ctx = SessionContext::new();
Expand All @@ -340,10 +340,11 @@ mod tests {
prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await;

let source = Arc::new(JsonSource::new());
let conf = FileScanConfig::new(object_store_url, file_schema, source)
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
.with_file_groups(file_groups)
.with_projection(Some(vec![3, 0, 2]))
.with_file_compression_type(file_compression_type.to_owned());
.with_file_compression_type(file_compression_type.to_owned())
.build();
let exec = conf.build();
let inferred_schema = exec.schema();
assert_eq!(inferred_schema.fields().len(), 3);
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub use datafusion_datasource::file_groups::FileGroupPartitioner;
pub use datafusion_datasource::file_meta::FileMeta;
pub use datafusion_datasource::file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
FileScanConfigBuilder,
};
pub use datafusion_datasource::file_sink_config::*;

Expand Down
15 changes: 10 additions & 5 deletions datafusion/datasource-csv/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::{
FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
Expand Down Expand Up @@ -406,10 +406,9 @@ impl FileFormat for CsvFormat {
async fn create_physical_plan(
&self,
state: &dyn Session,
mut conf: FileScanConfig,
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
conf.file_compression_type = self.options.compression.into();
// Consult configuration options for default values
let has_header = self
.options
Expand All @@ -419,15 +418,21 @@ impl FileFormat for CsvFormat {
.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values);
conf.new_lines_in_values = newlines_in_values;

let conf_builder = FileScanConfigBuilder::from(conf)
.with_file_compression_type(self.options.compression.into())
.with_newlines_in_values(newlines_in_values);

let source = Arc::new(
CsvSource::new(has_header, self.options.delimiter, self.options.quote)
.with_escape(self.options.escape)
.with_terminator(self.options.terminator)
.with_comment(self.options.comment),
);
Ok(conf.with_source(source).build())

let config = conf_builder.with_source(source).build();

Ok(config.build())
}

async fn create_writer_physical_plan(
Expand Down
Loading
Loading