Skip to content

Commit 9844638

Browse files
authored
feat: make FanoutWriter writer configurable (#1962)
## Which issue does this PR close? - Closes #1834. ## What changes are included in this PR? - Fellow on #1872. ## Are these changes tested? --------- Signed-off-by: StandingMan <[email protected]>
1 parent 4d09ba2 commit 9844638

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

crates/iceberg/src/spec/table_properties.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub struct TableProperties {
4949
pub write_format_default: String,
5050
/// The target file size for files.
5151
pub write_target_file_size_bytes: usize,
52+
/// Whether to use `FanoutWriter` for partitioned tables.
53+
pub write_datafusion_fanout_enabled: bool,
5254
}
5355

5456
impl TableProperties {
@@ -137,6 +139,11 @@ impl TableProperties {
137139
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
138140
/// Default target file size
139141
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
142+
/// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data).
143+
/// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient).
144+
pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
145+
/// Default value for fanout writer enabled
146+
pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
140147
}
141148

142149
impl TryFrom<&HashMap<String, String>> for TableProperties {
@@ -175,6 +182,11 @@ impl TryFrom<&HashMap<String, String>> for TableProperties {
175182
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
176183
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
177184
)?,
185+
write_datafusion_fanout_enabled: parse_property(
186+
props,
187+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
188+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
189+
)?,
178190
})
179191
}
180192
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,28 @@ impl ExecutionPlan for IcebergWriteExec {
266266
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
267267

268268
// Create TaskWriter
269-
// TODO: Make fanout_enabled configurable via table properties
270-
let fanout_enabled = true;
269+
let fanout_enabled = self
270+
.table
271+
.metadata()
272+
.properties()
273+
.get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
274+
.map(|value| {
275+
value
276+
.parse::<bool>()
277+
.map_err(|e| {
278+
Error::new(
279+
ErrorKind::DataInvalid,
280+
format!(
281+
"Invalid value for {}, expected 'true' or 'false'",
282+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
283+
),
284+
)
285+
.with_source(e)
286+
})
287+
.map_err(to_datafusion_error)
288+
})
289+
.transpose()?
290+
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
271291
let schema = self.table.metadata().current_schema().clone();
272292
let partition_spec = self.table.metadata().default_partition_spec().clone();
273293
let task_writer = TaskWriter::try_new(

0 commit comments

Comments
 (0)