Skip to content

Commit 0506856

Browse files
committed
feat: make FanoutWriter writer configurable
Signed-off-by: StandingMan <[email protected]>
1 parent 99ca196 commit 0506856

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

crates/iceberg/src/spec/table_properties.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ pub struct TableProperties {
4949
pub write_format_default: String,
5050
/// The target file size for files.
5151
pub write_target_file_size_bytes: usize,
52+
/// Whether to use `FanoutWriter` for partitioned tables.
53+
pub write_datafusion_fanout_enabled: bool,
5254
}
5355

5456
impl TableProperties {
@@ -137,6 +139,11 @@ impl TableProperties {
137139
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes";
138140
/// Default target file size
139141
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB
142+
/// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data).
143+
/// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient).
144+
pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled";
145+
/// Default value for fanout writer enabled
146+
pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
140147
}
141148

142149
impl TryFrom<&HashMap<String, String>> for TableProperties {
@@ -175,6 +182,11 @@ impl TryFrom<&HashMap<String, String>> for TableProperties {
175182
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
176183
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
177184
)?,
185+
write_datafusion_fanout_enabled: parse_property(
186+
props,
187+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
188+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
189+
)?,
178190
})
179191
}
180192
}

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use iceberg::writer::file_writer::location_generator::{
4444
DefaultFileNameGenerator, DefaultLocationGenerator,
4545
};
4646
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
47+
use iceberg::writer::partitioning::fanout_writer;
4748
use iceberg::{Error, ErrorKind};
4849
use parquet::file::properties::WriterProperties;
4950
use uuid::Uuid;
@@ -266,8 +267,28 @@ impl ExecutionPlan for IcebergWriteExec {
266267
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
267268

268269
// Create TaskWriter
269-
// TODO: Make fanout_enabled configurable via table properties
270-
let fanout_enabled = true;
270+
let fanout_enabled = self
271+
.table
272+
.metadata()
273+
.properties()
274+
.get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
275+
.map(|value| {
276+
value
277+
.parse::<bool>()
278+
.map_err(|e| {
279+
Error::new(
280+
ErrorKind::DataInvalid,
281+
format!(
282+
"Invalid value for {}, expected 'true' or 'false'",
283+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
284+
),
285+
)
286+
.with_source(e)
287+
})
288+
.map_err(to_datafusion_error)
289+
})
290+
.transpose()?
291+
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
271292
let schema = self.table.metadata().current_schema().clone();
272293
let partition_spec = self.table.metadata().default_partition_spec().clone();
273294
let task_writer = TaskWriter::try_new(

0 commit comments

Comments
 (0)