Skip to content

Commit 0b5b9a9

Browse files
authored
perf(iceberg): [WRA-11] optimise direct insert for large dataset (#526)
* perf(iceberg): optimise direct insert for large dataset * perf(iceberg): add comment for direct insert and cleanup for local buffer
1 parent 2cede32 commit 0b5b9a9

File tree

1 file changed

+104
-83
lines changed

1 file changed

+104
-83
lines changed

wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs

Lines changed: 104 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub(crate) struct IcebergFdw {
4747
catalog: Box<dyn Catalog>,
4848
table: Option<Table>,
4949
predicate: Option<Predicate>,
50-
batch_size: Option<usize>,
50+
batch_size: usize,
5151

5252
// copy of target columns
5353
tgt_cols: Vec<Column>,
@@ -65,6 +65,9 @@ pub(crate) struct IcebergFdw {
6565
num_rows: usize,
6666
bytes_in: usize,
6767

68+
// for insertion: if it is a direct insertion, i.e. no partitioning and no sorting
69+
is_direct_insert: bool,
70+
6871
// for insertion: buffer for accumulating input rows before sorting
6972
input_rows: Vec<InputRow>,
7073
}
@@ -139,7 +142,7 @@ impl IcebergFdw {
139142
let mut scan_builder = table
140143
.scan()
141144
.select(self.tgt_cols.iter().map(|c| c.name.clone()))
142-
.with_batch_size(self.batch_size);
145+
.with_batch_size(Some(self.batch_size));
143146
if let Some(predicate) = &self.predicate {
144147
scan_builder = scan_builder.with_filter(predicate.clone());
145148
}
@@ -278,6 +281,86 @@ impl IcebergFdw {
278281

279282
Ok(record_batch)
280283
}
284+
285+
fn write_rows_to_iceberg(&mut self) -> IcebergFdwResult<()> {
286+
// only write if we have rows
287+
if self.input_rows.is_empty() {
288+
return Ok(());
289+
}
290+
291+
// clone the table to avoid borrowing conflicts
292+
let table = match &self.table {
293+
Some(table) => table.clone(),
294+
None => return Ok(()),
295+
};
296+
297+
let metadata = table.metadata();
298+
let schema = metadata.current_schema();
299+
300+
// sort input_rows by partition column values
301+
let sorted_rows = self.sort_rows_by_partition(metadata, schema)?;
302+
303+
// build record batch from sorted rows
304+
let record_batch = self.build_record_batch_from_rows(schema, &sorted_rows)?;
305+
306+
// split the record batch by partition values
307+
let partition_batches = utils::split_record_batch_by_partition(metadata, record_batch)?;
308+
309+
let mut data_files = Vec::new();
310+
311+
// write each partition batch separately
312+
for partition_batch in partition_batches.iter() {
313+
let location_generator = LocationGenerator::new(metadata, partition_batch)?;
314+
let file_name_generator = FileNameGenerator::new(DataFileFormat::Parquet);
315+
316+
// get partition value from location generator
317+
let partition_value = location_generator.partition_value();
318+
319+
let parquet_writer_builder = ParquetWriterBuilder::new(
320+
WriterProperties::default(),
321+
schema.clone(),
322+
table.file_io().clone(),
323+
location_generator,
324+
file_name_generator,
325+
);
326+
let data_file_writer_builder = DataFileWriterBuilder::new(
327+
parquet_writer_builder,
328+
partition_value,
329+
metadata.default_partition_spec().spec_id(),
330+
);
331+
let mut data_file_writer = self.rt.block_on(data_file_writer_builder.build())?;
332+
333+
// write the record batch to Iceberg and close the writer and get
334+
// the data file
335+
self.rt
336+
.block_on(data_file_writer.write(partition_batch.clone()))?;
337+
let mut part_data_files = self.rt.block_on(data_file_writer.close())?;
338+
339+
data_files.append(&mut part_data_files);
340+
}
341+
342+
// create transaction and commit the changes to update table metadata
343+
let tx = Transaction::new(&table);
344+
let append_action = tx.fast_append().add_data_files(data_files.clone());
345+
let tx = append_action.apply(tx)?;
346+
let updated_table = self.rt.block_on(tx.commit(self.catalog.as_ref()))?;
347+
348+
// update the cached table reference with the new metadata
349+
self.table = Some(updated_table);
350+
351+
if cfg!(debug_assertions) {
352+
for data_file in &data_files {
353+
report_info(&format!(
354+
"Data file: {}, records: {}, size: {} bytes",
355+
data_file.file_path(),
356+
data_file.record_count(),
357+
data_file.file_size_in_bytes()
358+
));
359+
}
360+
}
361+
362+
Ok(())
363+
}
281364
}
282365

283366
impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
@@ -346,14 +429,15 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
346429
catalog,
347430
table: None,
348431
predicate: None,
349-
batch_size: batch_size.into(),
432+
batch_size,
350433
tgt_cols: Vec::new(),
351434
stream: None,
352435
row_data: VecDeque::new(),
353436
src_fields: Vec::new(),
354437
mapper: Mapper::default(),
355438
num_rows: 0,
356439
bytes_in: 0,
440+
is_direct_insert: false,
357441
input_rows: Vec::new(),
358442
})
359443
}
@@ -416,10 +500,12 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
416500

417501
fn begin_modify(&mut self, options: &HashMap<String, String>) -> IcebergFdwResult<()> {
418502
let tbl_ident = TableIdent::from_strs(require_option("table", options)?.split("."))?;
419-
self.table = self
420-
.rt
421-
.block_on(self.catalog.load_table(&tbl_ident))?
422-
.into();
503+
let table = self.rt.block_on(self.catalog.load_table(&tbl_ident))?;
504+
let metadata = table.metadata();
505+
506+
self.is_direct_insert = metadata.default_partition_spec().is_unpartitioned()
507+
&& metadata.default_sort_order().is_unsorted();
508+
self.table = table.into();
423509
self.input_rows.clear();
424510

425511
Ok(())
@@ -431,86 +517,21 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
431517
cells: src.cells.clone(),
432518
});
433519

520+
// Direct insert optimization: when the table has no partitioning and no sorting,
521+
// rows can be written in batches to avoid buffering the entire dataset in memory.
522+
// Each batch is written to a separate parquet file, so larger batch sizes are
523+
// recommended for better performance when inserting large datasets.
524+
if self.is_direct_insert && self.input_rows.len() >= self.batch_size {
525+
self.write_rows_to_iceberg()?;
526+
self.input_rows.clear();
527+
}
528+
434529
Ok(())
435530
}
436531

437532
fn end_modify(&mut self) -> IcebergFdwResult<()> {
438-
// only write if we have rows
439-
if self.input_rows.is_empty() {
440-
return Ok(());
441-
}
442-
443-
// clone the table to avoid borrowing conflicts
444-
let table = match &self.table {
445-
Some(table) => table.clone(),
446-
None => return Ok(()),
447-
};
448-
449-
let metadata = table.metadata();
450-
let schema = metadata.current_schema();
451-
452-
// sort input_rows by partition column values
453-
let sorted_rows = self.sort_rows_by_partition(metadata, schema)?;
454-
455-
// build record batch from sorted rows
456-
let record_batch = self.build_record_batch_from_rows(schema, &sorted_rows)?;
457-
458-
// split the record batch by partition values
459-
let partition_batches = utils::split_record_batch_by_partition(metadata, record_batch)?;
460-
461-
let mut data_files = Vec::new();
462-
463-
// write each partition batch separately
464-
for partition_batch in partition_batches.iter() {
465-
let location_generator = LocationGenerator::new(metadata, partition_batch)?;
466-
let file_name_generator = FileNameGenerator::new(DataFileFormat::Parquet);
467-
468-
// get partition value from location generator
469-
let partition_value = location_generator.partition_value();
470-
471-
let parquet_writer_builder = ParquetWriterBuilder::new(
472-
WriterProperties::default(),
473-
schema.clone(),
474-
table.file_io().clone(),
475-
location_generator,
476-
file_name_generator,
477-
);
478-
let data_file_writer_builder = DataFileWriterBuilder::new(
479-
parquet_writer_builder,
480-
partition_value,
481-
metadata.default_partition_spec().spec_id(),
482-
);
483-
let mut data_file_writer = self.rt.block_on(data_file_writer_builder.build())?;
484-
485-
// write the record batch to Iceberg and close the writer and get
486-
// the data file
487-
self.rt
488-
.block_on(data_file_writer.write(partition_batch.clone()))?;
489-
let mut part_data_files = self.rt.block_on(data_file_writer.close())?;
490-
491-
data_files.append(&mut part_data_files);
492-
}
493-
494-
// create transaction and commit the changes to update table metadata
495-
let tx = Transaction::new(&table);
496-
let append_action = tx.fast_append().add_data_files(data_files.clone());
497-
let tx = append_action.apply(tx)?;
498-
let updated_table = self.rt.block_on(tx.commit(self.catalog.as_ref()))?;
499-
500-
// update the cached table reference with the new metadata
501-
self.table = Some(updated_table);
502-
503-
if cfg!(debug_assertions) {
504-
for data_file in &data_files {
505-
report_info(&format!(
506-
"Data file: {}, records: {}, size: {} bytes",
507-
data_file.file_path(),
508-
data_file.record_count(),
509-
data_file.file_size_in_bytes()
510-
));
511-
}
512-
}
513-
533+
self.write_rows_to_iceberg()?;
534+
self.input_rows.clear();
514535
Ok(())
515536
}
516537

0 commit comments

Comments
 (0)