Skip to content

Commit 1b8ef43

Browse files
authored
Merge branch 'main' into test_SpillPool
2 parents ccde430 + ed0a060 commit 1b8ef43

File tree

28 files changed

+1202
-297
lines changed

28 files changed

+1202
-297
lines changed

datafusion-examples/examples/data_io/parquet_encrypted.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub async fn parquet_encrypted() -> datafusion::common::Result<()> {
5555

5656
// Create a temporary file location for the encrypted parquet file
5757
let tmp_source = TempDir::new()?;
58-
let tempfile = tmp_source.path().join("cars_encrypted");
58+
let tempfile = tmp_source.path().join("cars_encrypted.parquet");
5959

6060
// Write encrypted parquet
6161
let mut options = TableParquetOptions::default();

datafusion/catalog-listing/src/table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use datafusion_common::{
2828
use datafusion_datasource::file::FileSource;
2929
use datafusion_datasource::file_groups::FileGroup;
3030
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
31-
use datafusion_datasource::file_sink_config::FileSinkConfig;
31+
use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig};
3232
#[expect(deprecated)]
3333
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3434
use datafusion_datasource::{
@@ -674,6 +674,7 @@ impl TableProvider for ListingTable {
674674
insert_op,
675675
keep_partition_by_columns,
676676
file_extension: self.options().format.get_ext(),
677+
file_output_mode: FileOutputMode::Automatic,
677678
};
678679

679680
// For writes, we only use user-specified ordering (no file groups to derive from)

datafusion/core/src/dataframe/mod.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,11 @@ pub struct DataFrameWriteOptions {
7878
/// Controls how new data should be written to the table, determining whether
7979
/// to append, overwrite, or replace existing data.
8080
insert_op: InsertOp,
81-
/// Controls if all partitions should be coalesced into a single output file
82-
/// Generally will have slower performance when set to true.
83-
single_file_output: bool,
81+
/// Controls if all partitions should be coalesced into a single output file.
82+
/// - `None`: Use automatic mode (extension-based heuristic)
83+
/// - `Some(true)`: Force single file output at exact path
84+
/// - `Some(false)`: Force directory output with generated filenames
85+
single_file_output: Option<bool>,
8486
/// Sets which columns should be used for hive-style partitioned writes by name.
8587
/// Can be set to empty vec![] for non-partitioned writes.
8688
partition_by: Vec<String>,
@@ -94,7 +96,7 @@ impl DataFrameWriteOptions {
9496
pub fn new() -> Self {
9597
DataFrameWriteOptions {
9698
insert_op: InsertOp::Append,
97-
single_file_output: false,
99+
single_file_output: None,
98100
partition_by: vec![],
99101
sort_by: vec![],
100102
}
@@ -108,9 +110,13 @@ impl DataFrameWriteOptions {
108110

109111
/// Set the single_file_output value to true or false
110112
///
111-
/// When set to true, an output file will always be created even if the DataFrame is empty
113+
/// - `true`: Force single file output at the exact path specified
114+
/// - `false`: Force directory output with generated filenames
115+
///
116+
/// When not called, automatic mode is used (extension-based heuristic).
117+
/// When set to true, an output file will always be created even if the DataFrame is empty.
112118
pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
113-
self.single_file_output = single_file_output;
119+
self.single_file_output = Some(single_file_output);
114120
self
115121
}
116122

@@ -125,6 +131,15 @@ impl DataFrameWriteOptions {
125131
self.sort_by = sort_by;
126132
self
127133
}
134+
135+
/// Build the options HashMap to pass to CopyTo for sink configuration.
136+
fn build_sink_options(&self) -> HashMap<String, String> {
137+
let mut options = HashMap::new();
138+
if let Some(single_file) = self.single_file_output {
139+
options.insert("single_file_output".to_string(), single_file.to_string());
140+
}
141+
options
142+
}
128143
}
129144

130145
impl Default for DataFrameWriteOptions {
@@ -2040,6 +2055,8 @@ impl DataFrame {
20402055

20412056
let file_type = format_as_file_type(format);
20422057

2058+
let copy_options = options.build_sink_options();
2059+
20432060
let plan = if options.sort_by.is_empty() {
20442061
self.plan
20452062
} else {
@@ -2052,7 +2069,7 @@ impl DataFrame {
20522069
plan,
20532070
path.into(),
20542071
file_type,
2055-
HashMap::new(),
2072+
copy_options,
20562073
options.partition_by,
20572074
)?
20582075
.build()?;
@@ -2108,6 +2125,8 @@ impl DataFrame {
21082125

21092126
let file_type = format_as_file_type(format);
21102127

2128+
let copy_options = options.build_sink_options();
2129+
21112130
let plan = if options.sort_by.is_empty() {
21122131
self.plan
21132132
} else {
@@ -2120,7 +2139,7 @@ impl DataFrame {
21202139
plan,
21212140
path.into(),
21222141
file_type,
2123-
Default::default(),
2142+
copy_options,
21242143
options.partition_by,
21252144
)?
21262145
.build()?;

datafusion/core/src/dataframe/parquet.rs

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ impl DataFrame {
7676

7777
let file_type = format_as_file_type(format);
7878

79+
let copy_options = options.build_sink_options();
80+
7981
let plan = if options.sort_by.is_empty() {
8082
self.plan
8183
} else {
@@ -88,7 +90,7 @@ impl DataFrame {
8890
plan,
8991
path.into(),
9092
file_type,
91-
Default::default(),
93+
copy_options,
9294
options.partition_by,
9395
)?
9496
.build()?;
@@ -324,4 +326,156 @@ mod tests {
324326

325327
Ok(())
326328
}
329+
330+
/// Test FileOutputMode::SingleFile - explicitly request single file output
331+
/// for paths WITHOUT file extensions. This verifies the fix for the regression
332+
/// where extension heuristics ignored the explicit with_single_file_output(true).
333+
#[tokio::test]
334+
async fn test_file_output_mode_single_file() -> Result<()> {
335+
use arrow::array::Int32Array;
336+
use arrow::datatypes::{DataType, Field, Schema};
337+
use arrow::record_batch::RecordBatch;
338+
339+
let ctx = SessionContext::new();
340+
let tmp_dir = TempDir::new()?;
341+
342+
// Path WITHOUT .parquet extension - this is the key scenario
343+
let output_path = tmp_dir.path().join("data_no_ext");
344+
let output_path_str = output_path.to_str().unwrap();
345+
346+
let df = ctx.read_batch(RecordBatch::try_new(
347+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
348+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
349+
)?)?;
350+
351+
// Explicitly request single file output
352+
df.write_parquet(
353+
output_path_str,
354+
DataFrameWriteOptions::new().with_single_file_output(true),
355+
None,
356+
)
357+
.await?;
358+
359+
// Verify: output should be a FILE, not a directory
360+
assert!(
361+
output_path.is_file(),
362+
"Expected single file at {:?}, but got is_file={}, is_dir={}",
363+
output_path,
364+
output_path.is_file(),
365+
output_path.is_dir()
366+
);
367+
368+
// Verify the file is readable as parquet
369+
let file = std::fs::File::open(&output_path)?;
370+
let reader = parquet::file::reader::SerializedFileReader::new(file)?;
371+
let metadata = reader.metadata();
372+
assert_eq!(metadata.num_row_groups(), 1);
373+
assert_eq!(metadata.file_metadata().num_rows(), 3);
374+
375+
Ok(())
376+
}
377+
378+
/// Test FileOutputMode::Automatic - uses extension heuristic.
379+
/// Path WITH extension -> single file; path WITHOUT extension -> directory.
380+
#[tokio::test]
381+
async fn test_file_output_mode_automatic() -> Result<()> {
382+
use arrow::array::Int32Array;
383+
use arrow::datatypes::{DataType, Field, Schema};
384+
use arrow::record_batch::RecordBatch;
385+
386+
let ctx = SessionContext::new();
387+
let tmp_dir = TempDir::new()?;
388+
389+
let schema =
390+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
391+
let batch = RecordBatch::try_new(
392+
schema,
393+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
394+
)?;
395+
396+
// Case 1: Path WITH extension -> should create single file (Automatic mode)
397+
let output_with_ext = tmp_dir.path().join("data.parquet");
398+
let df = ctx.read_batch(batch.clone())?;
399+
df.write_parquet(
400+
output_with_ext.to_str().unwrap(),
401+
DataFrameWriteOptions::new(), // Automatic mode (default)
402+
None,
403+
)
404+
.await?;
405+
406+
assert!(
407+
output_with_ext.is_file(),
408+
"Path with extension should be a single file, got is_file={}, is_dir={}",
409+
output_with_ext.is_file(),
410+
output_with_ext.is_dir()
411+
);
412+
413+
// Case 2: Path WITHOUT extension -> should create directory (Automatic mode)
414+
let output_no_ext = tmp_dir.path().join("data_dir");
415+
let df = ctx.read_batch(batch)?;
416+
df.write_parquet(
417+
output_no_ext.to_str().unwrap(),
418+
DataFrameWriteOptions::new(), // Automatic mode (default)
419+
None,
420+
)
421+
.await?;
422+
423+
assert!(
424+
output_no_ext.is_dir(),
425+
"Path without extension should be a directory, got is_file={}, is_dir={}",
426+
output_no_ext.is_file(),
427+
output_no_ext.is_dir()
428+
);
429+
430+
Ok(())
431+
}
432+
433+
/// Test FileOutputMode::Directory - explicitly request directory output
434+
/// even for paths WITH file extensions.
435+
#[tokio::test]
436+
async fn test_file_output_mode_directory() -> Result<()> {
437+
use arrow::array::Int32Array;
438+
use arrow::datatypes::{DataType, Field, Schema};
439+
use arrow::record_batch::RecordBatch;
440+
441+
let ctx = SessionContext::new();
442+
let tmp_dir = TempDir::new()?;
443+
444+
// Path WITH .parquet extension but explicitly requesting directory output
445+
let output_path = tmp_dir.path().join("output.parquet");
446+
let output_path_str = output_path.to_str().unwrap();
447+
448+
let df = ctx.read_batch(RecordBatch::try_new(
449+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
450+
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
451+
)?)?;
452+
453+
// Explicitly request directory output (single_file_output = false)
454+
df.write_parquet(
455+
output_path_str,
456+
DataFrameWriteOptions::new().with_single_file_output(false),
457+
None,
458+
)
459+
.await?;
460+
461+
// Verify: output should be a DIRECTORY, not a single file
462+
assert!(
463+
output_path.is_dir(),
464+
"Expected directory at {:?}, but got is_file={}, is_dir={}",
465+
output_path,
466+
output_path.is_file(),
467+
output_path.is_dir()
468+
);
469+
470+
// Verify the directory contains parquet file(s)
471+
let entries: Vec<_> = std::fs::read_dir(&output_path)?
472+
.filter_map(|e| e.ok())
473+
.collect();
474+
assert!(
475+
!entries.is_empty(),
476+
"Directory should contain at least one file"
477+
);
478+
479+
Ok(())
480+
}
327481
}

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ mod tests {
130130
use datafusion_common::test_util::batches_to_string;
131131
use datafusion_common::{Result, ScalarValue};
132132
use datafusion_datasource::file_format::FileFormat;
133-
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
133+
use datafusion_datasource::file_sink_config::{
134+
FileOutputMode, FileSink, FileSinkConfig,
135+
};
134136
use datafusion_datasource::{ListingTableUrl, PartitionedFile};
135137
use datafusion_datasource_parquet::{
136138
ParquetFormat, ParquetFormatFactory, ParquetSink,
@@ -1547,6 +1549,7 @@ mod tests {
15471549
insert_op: InsertOp::Overwrite,
15481550
keep_partition_by_columns: false,
15491551
file_extension: "parquet".into(),
1552+
file_output_mode: FileOutputMode::Automatic,
15501553
};
15511554
let parquet_sink = Arc::new(ParquetSink::new(
15521555
file_sink_config,
@@ -1638,6 +1641,7 @@ mod tests {
16381641
insert_op: InsertOp::Overwrite,
16391642
keep_partition_by_columns: false,
16401643
file_extension: "parquet".into(),
1644+
file_output_mode: FileOutputMode::Automatic,
16411645
};
16421646
let parquet_sink = Arc::new(ParquetSink::new(
16431647
file_sink_config,
@@ -1728,6 +1732,7 @@ mod tests {
17281732
insert_op: InsertOp::Overwrite,
17291733
keep_partition_by_columns: false,
17301734
file_extension: "parquet".into(),
1735+
file_output_mode: FileOutputMode::Automatic,
17311736
};
17321737
let parquet_sink = Arc::new(ParquetSink::new(
17331738
file_sink_config,

datafusion/core/src/physical_planner.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323

2424
use crate::datasource::file_format::file_type_to_format;
2525
use crate::datasource::listing::ListingTableUrl;
26-
use crate::datasource::physical_plan::FileSinkConfig;
26+
use crate::datasource::physical_plan::{FileOutputMode, FileSinkConfig};
2727
use crate::datasource::{DefaultTableSource, source_as_provider};
2828
use crate::error::{DataFusionError, Result};
2929
use crate::execution::context::{ExecutionProps, SessionState};
@@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner {
549549
}
550550
};
551551

552+
// Parse single_file_output option if explicitly set
553+
let file_output_mode = match source_option_tuples
554+
.get("single_file_output")
555+
.map(|v| v.trim())
556+
{
557+
None => FileOutputMode::Automatic,
558+
Some("true") => FileOutputMode::SingleFile,
559+
Some("false") => FileOutputMode::Directory,
560+
Some(value) => {
561+
return Err(DataFusionError::Configuration(format!(
562+
"provided value for 'single_file_output' was not recognized: \"{value}\""
563+
)));
564+
}
565+
};
566+
567+
// Filter out sink-related options that are not format options
568+
let format_options: HashMap<String, String> = source_option_tuples
569+
.iter()
570+
.filter(|(k, _)| k.as_str() != "single_file_output")
571+
.map(|(k, v)| (k.clone(), v.clone()))
572+
.collect();
573+
552574
let sink_format = file_type_to_format(file_type)?
553-
.create(session_state, source_option_tuples)?;
575+
.create(session_state, &format_options)?;
554576

555577
// Determine extension based on format extension and compression
556578
let file_extension = match sink_format.compression_type() {
@@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner {
571593
insert_op: InsertOp::Append,
572594
keep_partition_by_columns,
573595
file_extension,
596+
file_output_mode,
574597
};
575598

576599
let ordering = input_exec.properties().output_ordering().cloned();

0 commit comments

Comments
 (0)