Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::catalog::Session;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
Expand Down Expand Up @@ -491,23 +492,22 @@ impl TableProvider for IndexTableProvider {
CachedParquetFileReaderFactory::new(Arc::clone(&self.object_store))
.with_file(indexed_file);

let file_source = Arc::new(
ParquetSource::default()
let file_scan_config = FileScanConfigBuilder::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

let file_source =
ParquetSource::new(TableParquetOptions::default(), file_scan_config)
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();
.with_parquet_file_reader_factory(Arc::new(reader_factory));

// Finally, put it all together into a DataSourceExec
Ok(DataSourceExec::from_data_source(file_scan_config))
Ok(DataSourceExec::from_data_source(file_source))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
27 changes: 11 additions & 16 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -58,24 +58,22 @@ async fn csv_opener() -> Result<()> {
let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::clone(&schema),
Arc::new(CsvSource::default()),
)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();

let config = CsvSource::new(true, b',', b'"')
let source = CsvSource::new(true, b',', b'"', scan_config)
.with_comment(Some(b'#'))
.with_schema(schema)
.with_batch_size(8192)
.with_projection(&scan_config);
.with_batch_size(8192);

let opener = config.create_file_opener(object_store, &scan_config, 0);
let opener = source.create_file_opener(object_store, 0);

let mut result = vec![];
let mut stream =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
FileStream::new(&source.config(), 0, opener, &ExecutionPlanMetricsSet::new())?;
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
}
Expand Down Expand Up @@ -121,15 +119,12 @@ async fn json_opener() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Arc::new(JsonSource::default()),
)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();
let scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();

let mut stream = FileStream::new(
&scan_config,
Expand Down
6 changes: 1 addition & 5 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::{
csv::CsvFormatFactory, file_compression_type::FileCompressionType,
FileFormat, FileFormatFactory,
},
physical_plan::{FileScanConfig, FileSinkConfig, FileSource},
physical_plan::{FileScanConfig, FileSinkConfig},
MemTable,
},
error::Result,
Expand Down Expand Up @@ -127,10 +127,6 @@ impl FileFormat for TSVFileFormat {
.create_writer_physical_plan(input, state, conf, order_requirements)
.await
}

fn file_source(&self) -> Arc<dyn FileSource> {
self.csv_file_format.file_source()
}
}

#[derive(Default, Debug)]
Expand Down
19 changes: 9 additions & 10 deletions datafusion-examples/examples/default_column_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::catalog::{Session, TableProvider};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::DFSchema;
use datafusion::common::{Result, ScalarValue};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::context::SessionContext;
Expand Down Expand Up @@ -235,10 +236,6 @@ impl TableProvider for DefaultValueTableProvider {
&df_schema,
)?;

let parquet_source = ParquetSource::default()
.with_predicate(filter)
.with_pushdown_filters(true);

let object_store_url = ObjectStoreUrl::parse("memory://")?;
let store = state.runtime_env().object_store(object_store_url)?;

Expand All @@ -255,19 +252,21 @@ impl TableProvider for DefaultValueTableProvider {
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
.collect();

let file_scan_config = FileScanConfigBuilder::new(
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("memory://")?,
self.schema.clone(),
Arc::new(parquet_source),
)
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _))
.build();

let parquet_source = ParquetSource::new(TableParquetOptions::default(), config)
.with_predicate(filter)
.with_pushdown_filters(true);

Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
))))
Ok(DataSourceExec::from_data_source(parquet_source))
}
}

Expand Down
34 changes: 16 additions & 18 deletions datafusion-examples/examples/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion::common::{assert_contains, DFSchema, Result};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::context::SessionContext;
Expand Down Expand Up @@ -243,10 +244,6 @@ impl TableProvider for ExampleTableProvider {
&df_schema,
)?;

let parquet_source = ParquetSource::default()
.with_predicate(filter)
.with_pushdown_filters(true);

let object_store_url = ObjectStoreUrl::parse("memory://")?;

let store = state.runtime_env().object_store(object_store_url)?;
Expand All @@ -264,20 +261,21 @@ impl TableProvider for ExampleTableProvider {
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
.collect();

let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("memory://")?,
schema,
Arc::new(parquet_source),
)
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
// if the rewriter needs a reference to the table schema you can bind self.schema() here
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _));

Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
))))
let file_scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::parse("memory://")?, schema)
.with_projection(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
// if the rewriter needs a reference to the table schema you can bind self.schema() here
.with_expr_adapter(Some(Arc::new(ShreddedJsonRewriterFactory) as _))
.build();

let parquet_source =
ParquetSource::new(TableParquetOptions::default(), file_scan_config)
.with_predicate(filter)
.with_pushdown_filters(true);

Ok(DataSourceExec::from_data_source(parquet_source))
}
}

Expand Down
10 changes: 7 additions & 3 deletions datafusion-examples/examples/parquet_embedded_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::{exec_err, HashMap, HashSet, Result};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
Expand Down Expand Up @@ -426,8 +427,8 @@ impl TableProvider for DistinctIndexTable {

// Build ParquetSource to actually read the files
let url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::default().with_enable_page_index(true));
let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source);

let mut builder = FileScanConfigBuilder::new(url, self.schema.clone());
for file in files_to_scan {
let path = self.dir.join(file);
let len = std::fs::metadata(&path)?.len();
Expand All @@ -438,7 +439,10 @@ impl TableProvider for DistinctIndexTable {
PartitionedFile::new(path.to_str().unwrap().to_string(), len);
builder = builder.with_file(partitioned_file);
}
Ok(DataSourceExec::from_data_source(builder.build()))

let source = ParquetSource::new(TableParquetOptions::default(), builder.build())
.with_enable_page_index(true);
Ok(DataSourceExec::from_data_source(source))
}

/// Tell DataFusion that we can handle filters on the "category" column
Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::physical_plan::{FileGroup, ParquetSource};
use datafusion::datasource::physical_plan::{FileGroup, FileSource, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionContext;
Expand Down Expand Up @@ -98,9 +98,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
if let Some((file_config, _)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
if let Some(parquet_source) =
data_source_exec.as_any().downcast_ref::<ParquetSource>()
{
let file_config = parquet_source.config();

self.file_groups = Some(file_config.file_groups.clone());

let metrics = match data_source_exec.metrics() {
Expand Down
13 changes: 8 additions & 5 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::common::pruning::PruningStatistics;
use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
Expand Down Expand Up @@ -243,9 +244,8 @@ impl TableProvider for IndexTableProvider {
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
let mut file_scan_config_builder =
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
FileScanConfigBuilder::new(object_store_url, self.schema())
.with_projection(projection.cloned())
.with_limit(limit);

Expand All @@ -258,9 +258,12 @@ impl TableProvider for IndexTableProvider {
PartitionedFile::new(canonical_path.display().to_string(), file_size),
);
}
Ok(DataSourceExec::from_data_source(
file_scan_config_builder.build(),
))

let config = file_scan_config_builder.build();
let source = ParquetSource::new(TableParquetOptions::default(), config)
.with_predicate(predicate);

Ok(DataSourceExec::from_data_source(source))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
16 changes: 4 additions & 12 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ use datafusion_common::{
};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_datasource::write::ObjectWriterBuilder;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -178,12 +177,9 @@ impl FileFormat for ArrowFormat {
_state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let source = Arc::new(ArrowSource::default());
let config = FileScanConfigBuilder::from(conf)
.with_source(source)
.build();

Ok(DataSourceExec::from_data_source(config))
Ok(DataSourceExec::from_data_source(ArrowSource::new(
conf.clone(),
)))
}

async fn create_writer_physical_plan(
Expand All @@ -201,10 +197,6 @@ impl FileFormat for ArrowFormat {

Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}

fn file_source(&self) -> Arc<dyn FileSource> {
Arc::new(ArrowSource::default())
}
}

/// Implements [`FileSink`] for writing to arrow_ipc files
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub(crate) mod test_util {
FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
format.file_source(),
// format.file_source(),
)
.with_file_groups(file_groups)
.with_statistics(statistics)
Expand Down
Loading
Loading