Skip to content

Commit 8d54e7b

Browse files
adriangbalamb
andauthored
Add TableSchema helper to encapsulate file schema + partition fields (#18178)
Hoping this helps with #14993 --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 347b2b6 commit 8d54e7b

File tree

8 files changed

+216
-63
lines changed

8 files changed

+216
-63
lines changed

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ mod tests {
121121
.with_projection(Some(vec![0, 2, 4]))
122122
.build();
123123

124-
assert_eq!(13, config.file_schema.fields().len());
124+
assert_eq!(13, config.file_schema().fields().len());
125125
let csv = DataSourceExec::from_data_source(config);
126126

127127
assert_eq!(3, csv.schema().fields().len());
@@ -185,7 +185,7 @@ mod tests {
185185
.with_file_compression_type(file_compression_type.to_owned())
186186
.with_projection(Some(vec![4, 0, 2]))
187187
.build();
188-
assert_eq!(13, config.file_schema.fields().len());
188+
assert_eq!(13, config.file_schema().fields().len());
189189
let csv = DataSourceExec::from_data_source(config);
190190
assert_eq!(3, csv.schema().fields().len());
191191

@@ -250,7 +250,7 @@ mod tests {
250250
.with_file_compression_type(file_compression_type.to_owned())
251251
.with_limit(Some(5))
252252
.build();
253-
assert_eq!(13, config.file_schema.fields().len());
253+
assert_eq!(13, config.file_schema().fields().len());
254254
let csv = DataSourceExec::from_data_source(config);
255255
assert_eq!(13, csv.schema().fields().len());
256256

@@ -313,7 +313,7 @@ mod tests {
313313
.with_file_compression_type(file_compression_type.to_owned())
314314
.with_limit(Some(5))
315315
.build();
316-
assert_eq!(14, config.file_schema.fields().len());
316+
assert_eq!(14, config.file_schema().fields().len());
317317
let csv = DataSourceExec::from_data_source(config);
318318
assert_eq!(14, csv.schema().fields().len());
319319

@@ -349,38 +349,37 @@ mod tests {
349349
let filename = "aggregate_test_100.csv";
350350
let tmp_dir = TempDir::new()?;
351351

352-
let file_groups = partitioned_file_groups(
352+
let mut file_groups = partitioned_file_groups(
353353
path.as_str(),
354354
filename,
355355
1,
356356
Arc::new(CsvFormat::default()),
357357
file_compression_type.to_owned(),
358358
tmp_dir.path(),
359359
)?;
360+
// Add partition columns / values
361+
file_groups[0][0].partition_values = vec![ScalarValue::from("2021-10-26")];
362+
363+
let num_file_schema_fields = file_schema.fields().len();
360364

361365
let source = Arc::new(CsvSource::new(true, b',', b'"'));
362-
let mut config = FileScanConfigBuilder::from(partitioned_csv_config(
366+
let config = FileScanConfigBuilder::from(partitioned_csv_config(
363367
file_schema,
364368
file_groups,
365369
source,
366370
))
367371
.with_newlines_in_values(false)
368372
.with_file_compression_type(file_compression_type.to_owned())
369-
.build();
370-
371-
// Add partition columns
372-
config.table_partition_cols =
373-
vec![Arc::new(Field::new("date", DataType::Utf8, false))];
374-
config.file_groups[0][0].partition_values = vec![ScalarValue::from("2021-10-26")];
375-
373+
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
376374
// We should be able to project on the partition column
377375
// Which is supposed to be after the file fields
378-
config.projection = Some(vec![0, config.file_schema.fields().len()]);
376+
.with_projection(Some(vec![0, num_file_schema_fields]))
377+
.build();
379378

380379
// we don't have `/date=xx/` in the path but that is ok because
381380
// partitions are resolved during scan anyway
382381

383-
assert_eq!(13, config.file_schema.fields().len());
382+
assert_eq!(13, config.file_schema().fields().len());
384383
let csv = DataSourceExec::from_data_source(config);
385384
assert_eq!(2, csv.schema().fields().len());
386385

datafusion/datasource-parquet/src/source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ impl FileSource for ParquetSource {
497497
) -> Arc<dyn FileOpener> {
498498
let projection = base_config
499499
.file_column_projection_indices()
500-
.unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect());
500+
.unwrap_or_else(|| (0..base_config.file_schema().fields().len()).collect());
501501

502502
let (expr_adapter_factory, schema_adapter_factory) = match (
503503
base_config.expr_adapter_factory.as_ref(),
@@ -566,8 +566,8 @@ impl FileSource for ParquetSource {
566566
.expect("Batch size must set before creating ParquetOpener"),
567567
limit: base_config.limit,
568568
predicate: self.predicate.clone(),
569-
logical_file_schema: Arc::clone(&base_config.file_schema),
570-
partition_fields: base_config.table_partition_cols.clone(),
569+
logical_file_schema: Arc::clone(base_config.file_schema()),
570+
partition_fields: base_config.table_partition_cols().clone(),
571571
metadata_size_hint: self.metadata_size_hint,
572572
metrics: self.metrics().clone(),
573573
parquet_file_reader_factory,

datafusion/datasource/src/file_scan_config.rs

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::schema_adapter::SchemaAdapterFactory;
2424
use crate::{
2525
display::FileGroupsDisplay, file::FileSource,
2626
file_compression_type::FileCompressionType, file_stream::FileStream,
27-
source::DataSource, statistics::MinMaxStatistics, PartitionedFile,
27+
source::DataSource, statistics::MinMaxStatistics, PartitionedFile, TableSchema,
2828
};
2929
use arrow::datatypes::FieldRef;
3030
use arrow::{
@@ -153,15 +153,11 @@ pub struct FileScanConfig {
153153
/// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
154154
/// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
155155
pub object_store_url: ObjectStoreUrl,
156-
/// Schema before `projection` is applied. It contains the all columns that may
157-
/// appear in the files. It does not include table partition columns
158-
/// that may be added.
159-
/// Note that this is **not** the schema of the physical files.
160-
/// This is the schema that the physical file schema will be
161-
/// mapped onto, and the schema that the [`DataSourceExec`] will return.
156+
/// Schema information including the file schema, table partition columns,
157+
/// and the combined table schema.
162158
///
163159
/// [`DataSourceExec`]: crate::source::DataSourceExec
164-
pub file_schema: SchemaRef,
160+
pub table_schema: TableSchema,
165161
/// List of files to be processed, grouped into partitions
166162
///
167163
/// Each file must have a schema of `file_schema` or a subset. If
@@ -180,8 +176,6 @@ pub struct FileScanConfig {
180176
/// The maximum number of records to read from this plan. If `None`,
181177
/// all records after filtering are returned.
182178
pub limit: Option<usize>,
183-
/// The partitioning columns
184-
pub table_partition_cols: Vec<FieldRef>,
185179
/// All equivalent lexicographical orderings that describe the schema.
186180
pub output_ordering: Vec<LexOrdering>,
187181
/// File compression type
@@ -459,13 +453,15 @@ impl FileScanConfigBuilder {
459453
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
460454
let new_lines_in_values = new_lines_in_values.unwrap_or(false);
461455

456+
// Create TableSchema from file_schema and table_partition_cols
457+
let table_schema = TableSchema::new(file_schema, table_partition_cols);
458+
462459
FileScanConfig {
463460
object_store_url,
464-
file_schema,
461+
table_schema,
465462
file_source,
466463
limit,
467464
projection,
468-
table_partition_cols,
469465
constraints,
470466
file_groups,
471467
output_ordering,
@@ -481,7 +477,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
481477
fn from(config: FileScanConfig) -> Self {
482478
Self {
483479
object_store_url: config.object_store_url,
484-
file_schema: config.file_schema,
480+
file_schema: Arc::clone(config.table_schema.file_schema()),
485481
file_source: Arc::<dyn FileSource>::clone(&config.file_source),
486482
file_groups: config.file_groups,
487483
statistics: config.file_source.statistics().ok(),
@@ -490,7 +486,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
490486
new_lines_in_values: Some(config.new_lines_in_values),
491487
limit: config.limit,
492488
projection: config.projection,
493-
table_partition_cols: config.table_partition_cols,
489+
table_partition_cols: config.table_schema.table_partition_cols().clone(),
494490
constraints: Some(config.constraints),
495491
batch_size: config.batch_size,
496492
expr_adapter_factory: config.expr_adapter_factory,
@@ -635,7 +631,7 @@ impl DataSource for FileScanConfig {
635631
.expr
636632
.as_any()
637633
.downcast_ref::<Column>()
638-
.map(|expr| expr.index() >= self.file_schema.fields().len())
634+
.map(|expr| expr.index() >= self.file_schema().fields().len())
639635
.unwrap_or(false)
640636
});
641637

@@ -650,7 +646,7 @@ impl DataSource for FileScanConfig {
650646
&file_scan
651647
.projection
652648
.clone()
653-
.unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
649+
.unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()),
654650
);
655651

656652
Arc::new(
@@ -691,11 +687,21 @@ impl DataSource for FileScanConfig {
691687
}
692688

693689
impl FileScanConfig {
690+
/// Get the file schema (schema of the files without partition columns)
691+
pub fn file_schema(&self) -> &SchemaRef {
692+
self.table_schema.file_schema()
693+
}
694+
695+
/// Get the table partition columns
696+
pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
697+
self.table_schema.table_partition_cols()
698+
}
699+
694700
fn projection_indices(&self) -> Vec<usize> {
695701
match &self.projection {
696702
Some(proj) => proj.clone(),
697-
None => (0..self.file_schema.fields().len()
698-
+ self.table_partition_cols.len())
703+
None => (0..self.file_schema().fields().len()
704+
+ self.table_partition_cols().len())
699705
.collect(),
700706
}
701707
}
@@ -707,7 +713,7 @@ impl FileScanConfig {
707713
.projection_indices()
708714
.into_iter()
709715
.map(|idx| {
710-
if idx < self.file_schema.fields().len() {
716+
if idx < self.file_schema().fields().len() {
711717
statistics.column_statistics[idx].clone()
712718
} else {
713719
// TODO provide accurate stat for partition column (#1186)
@@ -729,20 +735,20 @@ impl FileScanConfig {
729735
.projection_indices()
730736
.into_iter()
731737
.map(|idx| {
732-
if idx < self.file_schema.fields().len() {
733-
self.file_schema.field(idx).clone()
738+
if idx < self.file_schema().fields().len() {
739+
self.file_schema().field(idx).clone()
734740
} else {
735-
let partition_idx = idx - self.file_schema.fields().len();
741+
let partition_idx = idx - self.file_schema().fields().len();
736742
Arc::unwrap_or_clone(Arc::clone(
737-
&self.table_partition_cols[partition_idx],
743+
&self.table_partition_cols()[partition_idx],
738744
))
739745
}
740746
})
741747
.collect();
742748

743749
Arc::new(Schema::new_with_metadata(
744750
table_fields,
745-
self.file_schema.metadata().clone(),
751+
self.file_schema().metadata().clone(),
746752
))
747753
}
748754

@@ -790,9 +796,9 @@ impl FileScanConfig {
790796

791797
/// Project the schema, constraints, and the statistics on the given column indices
792798
pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
793-
if self.projection.is_none() && self.table_partition_cols.is_empty() {
799+
if self.projection.is_none() && self.table_partition_cols().is_empty() {
794800
return (
795-
Arc::clone(&self.file_schema),
801+
Arc::clone(self.file_schema()),
796802
self.constraints.clone(),
797803
self.file_source.statistics().unwrap().clone(),
798804
self.output_ordering.clone(),
@@ -811,8 +817,8 @@ impl FileScanConfig {
811817
pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
812818
self.projection.as_ref().map(|p| {
813819
p.iter()
814-
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
815-
.map(|col_idx| self.file_schema.field(*col_idx).name())
820+
.filter(|col_idx| **col_idx < self.file_schema().fields().len())
821+
.map(|col_idx| self.file_schema().field(*col_idx).name())
816822
.cloned()
817823
.collect()
818824
})
@@ -823,17 +829,17 @@ impl FileScanConfig {
823829
let fields = self.file_column_projection_indices().map(|indices| {
824830
indices
825831
.iter()
826-
.map(|col_idx| self.file_schema.field(*col_idx))
832+
.map(|col_idx| self.file_schema().field(*col_idx))
827833
.cloned()
828834
.collect::<Vec<_>>()
829835
});
830836

831837
fields.map_or_else(
832-
|| Arc::clone(&self.file_schema),
838+
|| Arc::clone(self.file_schema()),
833839
|f| {
834840
Arc::new(Schema::new_with_metadata(
835841
f,
836-
self.file_schema.metadata.clone(),
842+
self.file_schema().metadata.clone(),
837843
))
838844
},
839845
)
@@ -842,7 +848,7 @@ impl FileScanConfig {
842848
pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
843849
self.projection.as_ref().map(|p| {
844850
p.iter()
845-
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
851+
.filter(|col_idx| **col_idx < self.file_schema().fields().len())
846852
.copied()
847853
.collect()
848854
})
@@ -2182,11 +2188,11 @@ mod tests {
21822188

21832189
// Verify the built config has all the expected values
21842190
assert_eq!(config.object_store_url, object_store_url);
2185-
assert_eq!(config.file_schema, file_schema);
2191+
assert_eq!(*config.file_schema(), file_schema);
21862192
assert_eq!(config.limit, Some(1000));
21872193
assert_eq!(config.projection, Some(vec![0, 1]));
2188-
assert_eq!(config.table_partition_cols.len(), 1);
2189-
assert_eq!(config.table_partition_cols[0].name(), "date");
2194+
assert_eq!(config.table_partition_cols().len(), 1);
2195+
assert_eq!(config.table_partition_cols()[0].name(), "date");
21902196
assert_eq!(config.file_groups.len(), 1);
21912197
assert_eq!(config.file_groups[0].len(), 1);
21922198
assert_eq!(
@@ -2265,10 +2271,10 @@ mod tests {
22652271

22662272
// Verify default values
22672273
assert_eq!(config.object_store_url, object_store_url);
2268-
assert_eq!(config.file_schema, file_schema);
2274+
assert_eq!(*config.file_schema(), file_schema);
22692275
assert_eq!(config.limit, None);
22702276
assert_eq!(config.projection, None);
2271-
assert!(config.table_partition_cols.is_empty());
2277+
assert!(config.table_partition_cols().is_empty());
22722278
assert!(config.file_groups.is_empty());
22732279
assert_eq!(
22742280
config.file_compression_type,
@@ -2339,10 +2345,10 @@ mod tests {
23392345
// Verify properties match
23402346
let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
23412347
assert_eq!(new_config.object_store_url, object_store_url);
2342-
assert_eq!(new_config.file_schema, schema);
2348+
assert_eq!(*new_config.file_schema(), schema);
23432349
assert_eq!(new_config.projection, Some(vec![0, 2]));
23442350
assert_eq!(new_config.limit, Some(10));
2345-
assert_eq!(new_config.table_partition_cols, partition_cols);
2351+
assert_eq!(*new_config.table_partition_cols(), partition_cols);
23462352
assert_eq!(new_config.file_groups.len(), 1);
23472353
assert_eq!(new_config.file_groups[0].len(), 1);
23482354
assert_eq!(

datafusion/datasource/src/file_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl FileStream {
8080
let pc_projector = PartitionColumnProjector::new(
8181
Arc::clone(&projected_schema),
8282
&config
83-
.table_partition_cols
83+
.table_partition_cols()
8484
.iter()
8585
.map(|x| x.name().clone())
8686
.collect::<Vec<_>>(),

datafusion/datasource/src/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub mod schema_adapter;
4141
pub mod sink;
4242
pub mod source;
4343
mod statistics;
44+
pub mod table_schema;
4445

4546
#[cfg(test)]
4647
pub mod test_util;
@@ -57,6 +58,7 @@ use datafusion_common::{ScalarValue, Statistics};
5758
use futures::{Stream, StreamExt};
5859
use object_store::{path::Path, ObjectMeta};
5960
use object_store::{GetOptions, GetRange, ObjectStore};
61+
pub use table_schema::TableSchema;
6062
// Remove when add_row_stats is remove
6163
#[allow(deprecated)]
6264
pub use statistics::add_row_stats;

0 commit comments

Comments
 (0)