Skip to content

Commit 3bc6820

Browse files
committed
feat: switch parquet writer to use the new lineage-based memory consumers
1 parent ab62fcb commit 3bc6820

File tree

2 files changed

+43
-32
lines changed

2 files changed

+43
-32
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -393,13 +393,13 @@ async fn oom_parquet_sink() {
393393
path.to_string_lossy()
394394
))
395395
.with_expected_errors(vec![
396-
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as:
397-
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
398-
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
399-
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
400-
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
401-
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB.
402-
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool",
396+
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=1)) with top memory consumers (across reservations) as:
397+
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB,
398+
ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB,
399+
ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB,
400+
ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB,
401+
ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB.
402+
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=1)) with x KB already allocated for this reservation - x KB remain available for the total pool",
403403
])
404404
.with_memory_limit(200_000)
405405
.run()
@@ -425,9 +425,9 @@ async fn oom_with_tracked_consumer_pool() {
425425
path.to_string_lossy()
426426
))
427427
.with_expected_errors(vec![
428-
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as:
429-
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB.
430-
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool"
428+
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=2)) with top memory consumers (across reservations) as:
429+
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB.
430+
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=2)) with x KB already allocated for this reservation - x KB remain available for the total pool"
431431
])
432432
.with_memory_pool(Arc::new(
433433
TrackConsumersPool::new(

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use datafusion_datasource::display::FileGroupDisplay;
5252
use datafusion_datasource::file::FileSource;
5353
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
5454
use datafusion_datasource::sink::{DataSink, DataSinkExec};
55-
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
55+
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
5656
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5757
use datafusion_expr::dml::InsertOp;
5858
use datafusion_physical_expr_common::sort_expr::LexRequirement;
@@ -1268,6 +1268,7 @@ impl FileSink for ParquetSink {
12681268
parquet_props.clone(),
12691269
)
12701270
.await?;
1271+
// Create a reservation for non-parallel parquet writing
12711272
let mut reservation =
12721273
MemoryConsumer::new(format!("ParquetSink[path={path}]"))
12731274
.register(context.memory_pool());
@@ -1302,7 +1303,9 @@ impl FileSink for ParquetSink {
13021303
let props = parquet_props.clone();
13031304
let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
13041305
let parallel_options_clone = parallel_options.clone();
1305-
let pool = Arc::clone(context.memory_pool());
1306+
// Create a reservation for the parallel parquet writing
1307+
let reservation = MemoryConsumer::new("ParquetSink(ParallelWriter)")
1308+
.register(context.memory_pool());
13061309
file_write_tasks.spawn(async move {
13071310
let file_metadata = output_single_parquet_file_parallelized(
13081311
writer,
@@ -1311,7 +1314,7 @@ impl FileSink for ParquetSink {
13111314
&props,
13121315
skip_arrow_metadata,
13131316
parallel_options_clone,
1314-
pool,
1317+
reservation,
13151318
)
13161319
.await?;
13171320
Ok((path, file_metadata))
@@ -1392,20 +1395,20 @@ type ColSender = Sender<ArrowLeafColumn>;
13921395
fn spawn_column_parallel_row_group_writer(
13931396
col_writers: Vec<ArrowColumnWriter>,
13941397
max_buffer_size: usize,
1395-
pool: &Arc<dyn MemoryPool>,
1398+
parent_reservation: &MemoryReservation,
13961399
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
13971400
let num_columns = col_writers.len();
13981401

13991402
let mut col_writer_tasks = Vec::with_capacity(num_columns);
14001403
let mut col_array_channels = Vec::with_capacity(num_columns);
1401-
for writer in col_writers.into_iter() {
1404+
for (i, writer) in col_writers.into_iter().enumerate() {
14021405
// Buffer size of this channel limits the number of arrays queued up for column level serialization
14031406
let (send_array, receive_array) =
14041407
mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
14051408
col_array_channels.push(send_array);
14061409

1407-
let reservation =
1408-
MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1410+
let reservation = parent_reservation
1411+
.new_child_reservation(format!("ParquetSink(ArrowColumnWriter(col={i}))"));
14091412
let task = SpawnedTask::spawn(column_serializer_task(
14101413
receive_array,
14111414
writer,
@@ -1457,21 +1460,19 @@ async fn send_arrays_to_col_writers(
14571460
fn spawn_rg_join_and_finalize_task(
14581461
column_writer_tasks: Vec<ColumnWriterTask>,
14591462
rg_rows: usize,
1460-
pool: &Arc<dyn MemoryPool>,
1463+
mut rg_reservation: MemoryReservation,
14611464
) -> SpawnedTask<RBStreamSerializeResult> {
1462-
let mut rg_reservation =
1463-
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1464-
14651465
SpawnedTask::spawn(async move {
14661466
let num_cols = column_writer_tasks.len();
14671467
let mut finalized_rg = Vec::with_capacity(num_cols);
1468+
let reservation = &mut rg_reservation;
14681469
for task in column_writer_tasks.into_iter() {
14691470
let (writer, _col_reservation) = task
14701471
.join_unwind()
14711472
.await
14721473
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
14731474
let encoded_size = writer.get_estimated_total_bytes();
1474-
rg_reservation.grow(encoded_size);
1475+
reservation.grow(encoded_size);
14751476
finalized_rg.push(writer.close()?);
14761477
}
14771478

@@ -1494,16 +1495,26 @@ fn spawn_parquet_parallel_serialization_task(
14941495
schema: Arc<Schema>,
14951496
writer_props: Arc<WriterProperties>,
14961497
parallel_options: ParallelParquetWriterOptions,
1497-
pool: Arc<dyn MemoryPool>,
1498+
parent_reservation: &MemoryReservation,
14981499
) -> SpawnedTask<Result<(), DataFusionError>> {
1500+
let cols_reservation =
1501+
parent_reservation.new_child_reservation("ParquetSink(ParallelColumnWriters)");
1502+
1503+
let rg_reservation =
1504+
parent_reservation.new_child_reservation("ParquetSink(SerializedRowGroupWriter)");
1505+
14991506
SpawnedTask::spawn(async move {
15001507
let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
15011508
let max_row_group_rows = writer_props.max_row_group_size();
15021509
let mut row_group_index = 0;
15031510
let col_writers =
15041511
row_group_writer_factory.create_column_writers(row_group_index)?;
15051512
let (mut column_writer_handles, mut col_array_channels) =
1506-
spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1513+
spawn_column_parallel_row_group_writer(
1514+
col_writers,
1515+
max_buffer_rb,
1516+
&cols_reservation,
1517+
)?;
15071518
let mut current_rg_rows = 0;
15081519

15091520
while let Some(mut rb) = data.recv().await {
@@ -1537,7 +1548,7 @@ fn spawn_parquet_parallel_serialization_task(
15371548
let finalize_rg_task = spawn_rg_join_and_finalize_task(
15381549
column_writer_handles,
15391550
max_row_group_rows,
1540-
&pool,
1551+
rg_reservation.cloned_reservation(),
15411552
);
15421553

15431554
// Do not surface error from closed channel (means something
@@ -1556,7 +1567,7 @@ fn spawn_parquet_parallel_serialization_task(
15561567
spawn_column_parallel_row_group_writer(
15571568
col_writers,
15581569
max_buffer_rb,
1559-
&pool,
1570+
&cols_reservation,
15601571
)?;
15611572
}
15621573
}
@@ -1568,7 +1579,7 @@ fn spawn_parquet_parallel_serialization_task(
15681579
let finalize_rg_task = spawn_rg_join_and_finalize_task(
15691580
column_writer_handles,
15701581
current_rg_rows,
1571-
&pool,
1582+
rg_reservation.cloned_reservation(),
15721583
);
15731584

15741585
// Do not surface error from closed channel (means something
@@ -1589,10 +1600,10 @@ async fn concatenate_parallel_row_groups(
15891600
merged_buff: SharedBuffer,
15901601
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
15911602
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1592-
pool: Arc<dyn MemoryPool>,
1603+
parent_reservation: &MemoryReservation,
15931604
) -> Result<FileMetaData> {
15941605
let mut file_reservation =
1595-
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1606+
parent_reservation.new_child_reservation("ParquetSink(SerializedFileWriter)");
15961607

15971608
while let Some(task) = serialize_rx.recv().await {
15981609
let result = task.join_unwind().await;
@@ -1639,7 +1650,7 @@ async fn output_single_parquet_file_parallelized(
16391650
parquet_props: &WriterProperties,
16401651
skip_arrow_metadata: bool,
16411652
parallel_options: ParallelParquetWriterOptions,
1642-
pool: Arc<dyn MemoryPool>,
1653+
parent_reservation: MemoryReservation,
16431654
) -> Result<FileMetaData> {
16441655
let max_rowgroups = parallel_options.max_parallel_row_groups;
16451656
// Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
@@ -1665,14 +1676,14 @@ async fn output_single_parquet_file_parallelized(
16651676
Arc::clone(&output_schema),
16661677
Arc::clone(&arc_props),
16671678
parallel_options,
1668-
Arc::clone(&pool),
1679+
&parent_reservation,
16691680
);
16701681
let file_metadata = concatenate_parallel_row_groups(
16711682
writer,
16721683
merged_buff,
16731684
serialize_rx,
16741685
object_store_writer,
1675-
pool,
1686+
&parent_reservation,
16761687
)
16771688
.await?;
16781689

0 commit comments

Comments
 (0)