From f6c1f18fd42e84654060df437e09968df7fe358c Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 13:00:42 +0300 Subject: [PATCH 01/10] chore: make memory consumer naming more readible --- .../tests/snapshots/cli_top_memory_consumers@top2.snap | 2 +- .../snapshots/cli_top_memory_consumers@top3_default.snap | 2 +- datafusion/core/tests/memory_limit/mod.rs | 8 ++++---- datafusion/datasource-parquet/src/file_format.rs | 5 +++-- datafusion/physical-plan/src/aggregates/no_grouping.rs | 5 +++-- datafusion/physical-plan/src/joins/hash_join/exec.rs | 2 +- datafusion/physical-plan/src/joins/nested_loop_join.rs | 2 +- .../physical-plan/src/joins/sort_merge_join/exec.rs | 5 +++-- .../physical-plan/src/joins/symmetric_hash_join.rs | 6 ++++-- datafusion/physical-plan/src/repartition/mod.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 9 +++++---- .../physical-plan/src/sorts/sort_preserving_merge.rs | 7 ++++--- datafusion/physical-plan/src/topk/mod.rs | 2 +- 13 files changed, 32 insertions(+), 25 deletions(-) diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap index 62f864b3adb6..b04863eb1508 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap @@ -16,7 +16,7 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by -Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: +Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. Error: Failed to allocate diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap index 9845d095c918..5d3b809d7e05 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap @@ -14,7 +14,7 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by -Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: +Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 89bc48b1e634..1b14a997cada 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -153,7 +153,7 @@ async fn join_by_expression() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]", + "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[partition=0]", ]) .with_memory_limit(1_000) .run() @@ -375,7 +375,7 @@ async fn oom_parquet_sink() { )) .with_expected_errors(vec![ "Failed to allocate additional", - "for ParquetSink(ArrowColumnWriter)", + "ParquetSink(ArrowColumnWriter(col=1))", ]) .with_memory_limit(200_000) .run() @@ -402,8 +402,8 @@ async fn oom_with_tracked_consumer_pool() { )) .with_expected_errors(vec![ "Failed to allocate additional", - "for ParquetSink(ArrowColumnWriter)", - "Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)" + "for ParquetSink(ArrowColumnWriter(col=2))", + "Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter(col=8))" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 963c1d77950c..d4a84e42ce45 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1268,8 +1268,9 @@ impl FileSink for ParquetSink { parquet_props.clone(), ) .await?; - let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]")) - .register(context.memory_pool()); + let mut reservation = + MemoryConsumer::new(format!("ParquetSink[path={path}]")) + .register(context.memory_pool()); file_write_tasks.spawn(async move { while let Some(batch) = rx.recv().await { writer.write(&batch).await?; diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 9474a5f88c92..19e1929c919f 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -88,8 +88,9 @@ impl AggregateStream { }; let accumulators = create_accumulators(&agg.aggr_expr)?; - let reservation = MemoryConsumer::new(format!("AggregateStream[{partition}]")) - .register(context.memory_pool()); + let reservation = + MemoryConsumer::new(format!("AggregateStream[partition={partition}]")) + .register(context.memory_pool()); let inner = AggregateStreamInner { schema: Arc::clone(&agg.schema), diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4c293b0498e7..da3fef672d55 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -953,7 +953,7 @@ impl ExecutionPlan for HashJoinExec { let left_stream = self.left.execute(partition, Arc::clone(&context))?; let reservation = - MemoryConsumer::new(format!("HashJoinInput[{partition}]")) + MemoryConsumer::new(format!("HashJoinInput[partition={partition}]")) .register(context.memory_pool()); OnceFut::new(collect_left_input( diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 0974b3a9114e..b953fc219cd6 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -496,7 +496,7 @@ impl ExecutionPlan for NestedLoopJoinExec { // Initialization reservation for load of inner table let load_reservation = - MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]")) + MemoryConsumer::new(format!("NestedLoopJoinLoad[partition={partition}]")) .register(context.memory_pool()); let build_side_data = self.build_side_data.try_once(|| { diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 592878a3bb1c..1a66079b9d1b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -491,8 +491,9 @@ impl ExecutionPlan for SortMergeJoinExec { let batch_size = context.session_config().batch_size(); // create memory reservation - let reservation = MemoryConsumer::new(format!("SMJStream[{partition}]")) - .register(context.memory_pool()); + let reservation = + MemoryConsumer::new(format!("SMJStream[partition={partition}]")) + .register(context.memory_pool()); // create join stream Ok(Box::pin(SortMergeJoinStream::try_new( diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index b55b7e15f194..5f8ec5f81db4 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -524,8 +524,10 @@ impl ExecutionPlan for SymmetricHashJoinExec { context.session_config().enforce_batch_size_in_joins(); let reservation = Arc::new(Mutex::new( - MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]")) - .register(context.memory_pool()), + MemoryConsumer::new(format!( + "SymmetricHashJoinStream[partition={partition}]" + )) + .register(context.memory_pool()), )); if let Some(g) = graph.as_ref() { reservation.lock().try_grow(g.size())?; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 22bc1b5cf924..3670a6c5daa4 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -204,7 +204,7 @@ impl RepartitionExecState { let mut channels = HashMap::with_capacity(txs.len()); for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { let reservation = Arc::new(Mutex::new( - MemoryConsumer::new(format!("{name}[{partition}]")) + MemoryConsumer::new(format!("{name}[partition={partition}]")) .register(context.memory_pool()), )); channels.insert(partition, (tx, rx, reservation)); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 7f47d60c735a..c5a2a3af48dd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -280,12 +280,13 @@ impl ExternalSorter { runtime: Arc, ) -> Result { let metrics = ExternalSorterMetrics::new(metrics, partition_id); - let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]")) - .with_can_spill(true) - .register(&runtime.memory_pool); + let reservation = + MemoryConsumer::new(format!("ExternalSorter[partition={partition_id}]")) + .with_can_spill(true) + .register(&runtime.memory_pool); let merge_reservation = - MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) + MemoryConsumer::new(format!("ExternalSorterMerge[partition={partition_id}]")) .register(&runtime.memory_pool); let spill_manager = SpillManager::new( diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 09ad71974e6c..6d3c34693b90 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -292,9 +292,10 @@ impl ExecutionPlan for SortPreservingMergeExec { ); let schema = self.schema(); - let reservation = - MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) - .register(&context.runtime_env().memory_pool); + let reservation = MemoryConsumer::new(format!( + "SortPreservingMergeExec[partition={partition}]" + )) + .register(&context.runtime_env().memory_pool); match input_partitions { 0 => internal_err!( diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 9435de1cc448..1c72d1defad7 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -188,7 +188,7 @@ impl TopK { metrics: &ExecutionPlanMetricsSet, filter: Arc>, ) -> Result { - let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) + let reservation = MemoryConsumer::new(format!("TopK[partition={partition_id}]")) .register(&runtime.memory_pool); let sort_fields = build_sort_fields(&expr, &schema)?; From f4cd2db4b2ebb029eb589decce148738b857e73b Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 21:41:38 +0300 Subject: [PATCH 02/10] test: update OOM integration test to use full stdout --- datafusion/core/tests/memory_limit/mod.rs | 99 ++++++++++++++++------- 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 1b14a997cada..314fd77b66ba 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -85,8 +85,10 @@ async fn group_by_none() { TestCase::new() .with_query("select median(request_bytes) from t") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n AggregateStream", + "Resources exhausted: Additional allocation failed for AggregateStream[partition=0] with top memory consumers (across reservations) as: + AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B, + AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x B for AggregateStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(2_000) .run() @@ -98,7 +100,10 @@ async fn group_by_row_hash() { TestCase::new() .with_query("select count(*) from t GROUP BY response_bytes") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" + "Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as: + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B, + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B. +Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(2_000) .run() @@ -111,7 +116,10 @@ async fn group_by_hash() { // group by dict column .with_query("select count(*) from t GROUP BY service, host, pod, container") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" + "Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as: + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B, + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B. +Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool" ]) .with_memory_limit(1_000) .run() @@ -124,8 +132,9 @@ async fn join_by_key_multiple_partitions() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n HashJoinInput", + "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as: + HashJoinInput#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_config(config) @@ -139,8 +148,9 @@ async fn join_by_key_single_partition() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n HashJoinInput", + "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as: + HashJoinInput#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_config(config) @@ -153,7 +163,9 @@ async fn join_by_expression() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[partition=0]", + "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as: + NestedLoopJoinLoad[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for NestedLoopJoinLoad[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .run() @@ -165,8 +177,9 @@ async fn cross_join() { TestCase::new() .with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n CrossJoinExec", + "Resources exhausted: Additional allocation failed for CrossJoinExec with top memory consumers (across reservations) as: + CrossJoinExec#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for CrossJoinExec with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .run() @@ -185,9 +198,9 @@ async fn sort_merge_join_no_spill() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Failed to allocate additional", - "SMJStream", - "Disk spilling disabled", + "Execution error: Additional allocation failed for SMJStream[partition=0] with top memory consumers (across reservations) as: + SMJStream[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for SMJStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool. Disk spilling disabled.", ]) .with_memory_limit(1_000) .with_config(config) @@ -222,7 +235,9 @@ async fn symmetric_hash_join() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream", + "Resources exhausted: Additional allocation failed for SymmetricHashJoinStream[partition=0] with top memory consumers (across reservations) as: + SymmetricHashJoinStream[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for SymmetricHashJoinStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_scenario(Scenario::AccessLogStreaming) @@ -240,7 +255,9 @@ async fn sort_preserving_merge() { // so only a merge is needed .with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec", + "Resources exhausted: Additional allocation failed for SortPreservingMergeExec[partition=0] with top memory consumers (across reservations) as: + SortPreservingMergeExec[partition=0]#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for SortPreservingMergeExec[partition=0] with x KB already allocated for this reservation - x B remain available for the total pool", ]) // provide insufficient memory to merge .with_memory_limit(partition_size / 2) @@ -319,9 +336,10 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:", - "B for ExternalSorterMerge", + "Resources exhausted: Additional allocation failed for ExternalSorterMerge[partition=0] with top memory consumers (across reservations) as: + ExternalSorter[partition=0]#ID(can spill: true) consumed x KB, peak x KB, + ExternalSorterMerge[partition=0]#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ExternalSorterMerge[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_config(config) .run() @@ -350,8 +368,9 @@ async fn oom_recursive_cte() { SELECT * FROM nodes;", ) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n RecursiveQuery", + "Resources exhausted: Additional allocation failed for RecursiveQuery with top memory consumers (across reservations) as: + RecursiveQuery#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for RecursiveQuery with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(2_000) .run() @@ -374,8 +393,13 @@ async fn oom_parquet_sink() { path.to_string_lossy() )) .with_expected_errors(vec![ - "Failed to allocate additional", - "ParquetSink(ArrowColumnWriter(col=1))", + "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as: + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool", ]) .with_memory_limit(200_000) .run() @@ -401,9 +425,9 @@ async fn oom_with_tracked_consumer_pool() { path.to_string_lossy() )) .with_expected_errors(vec![ - "Failed to allocate additional", - "for ParquetSink(ArrowColumnWriter(col=2))", - "Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter(col=8))" + "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as: + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( @@ -420,8 +444,10 @@ async fn oom_grouped_hash_aggregate() { TestCase::new() .with_query("SELECT COUNT(*), SUM(request_bytes) FROM t GROUP BY host") .with_expected_errors(vec![ - "Failed to allocate additional", - "GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))", + "Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with top memory consumers (across reservations) as: + GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B, + GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B. +Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .run() @@ -885,8 +911,9 @@ impl TestCase { "Unexpected failure when running, expected success but got: {e}" ) } else { + let err_msg = normalize_oom_errors(&e.to_string()); for error_substring in expected_errors { - assert_contains!(e.to_string(), error_substring); + assert_contains!(&err_msg, error_substring); } } } @@ -894,6 +921,20 @@ impl TestCase { } } +fn normalize_oom_errors(err: &str) -> String { + let re = regex::Regex::new(r"\#\d+\(can spill:").unwrap(); + let mut err = re.replace_all(err, "#ID(can spill:").to_string(); + + let re = regex::Regex::new(r"\d+\.\d+ KB").unwrap(); + err = re.replace_all(&err, "x KB").to_string(); + + let re = regex::Regex::new(r"\d+\.\d+ MB").unwrap(); + err = re.replace_all(&err, "x MB").to_string(); + + let re = regex::Regex::new(r"\d+\.\d+ B").unwrap(); + re.replace_all(&err, "x B").to_string() +} + /// 50 byte memory limit const MEMORY_FRACTION: f64 = 0.95; From 4115b5cbd5da718d5eb793ec1a49e8ebf7345d2e Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 23:12:38 +0300 Subject: [PATCH 03/10] chore: handle CI error --- datafusion/core/tests/memory_limit/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 314fd77b66ba..15de08eb437c 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -200,7 +200,7 @@ async fn sort_merge_join_no_spill() { .with_expected_errors(vec![ "Execution error: Additional allocation failed for SMJStream[partition=0] with top memory consumers (across reservations) as: SMJStream[partition=0]#ID(can spill: false) consumed x B, peak x B. -Error: Failed to allocate additional x KB for SMJStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool. Disk spilling disabled.", +Error: Failed to allocate additional x KB for SMJStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_config(config) From 987192cbec6814253eaab9eedd42c812b2f08024 Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 14:10:07 +0300 Subject: [PATCH 04/10] feat: add lineage information to a given MemoryConsumer, and enable reservations to make child reservations --- datafusion/execution/src/memory_pool/mod.rs | 41 +++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index e620b2326796..93b3fd58131b 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -240,6 +240,7 @@ pub struct MemoryConsumer { name: String, can_spill: bool, id: usize, + parent_id: Option, } impl PartialEq for MemoryConsumer { @@ -250,6 +251,7 @@ impl PartialEq for MemoryConsumer { if is_same_id { assert_eq!(self.name, other.name); assert_eq!(self.can_spill, other.can_spill); + assert_eq!(self.parent_id, other.parent_id); } is_same_id @@ -263,6 +265,7 @@ impl Hash for MemoryConsumer { self.id.hash(state); self.name.hash(state); self.can_spill.hash(state); + self.parent_id.hash(state); } } @@ -278,6 +281,17 @@ impl MemoryConsumer { name: name.into(), can_spill: false, id: Self::new_unique_id(), + parent_id: None, + } + } + + /// Create a new [`MemoryConsumer`] with a parent consumer ID for lineage tracking + pub fn new_with_parent(name: impl Into, parent_id: usize) -> Self { + Self { + name: name.into(), + can_spill: false, + id: Self::new_unique_id(), + parent_id: Some(parent_id), } } @@ -289,6 +303,7 @@ impl MemoryConsumer { name: self.name.clone(), can_spill: self.can_spill, id: Self::new_unique_id(), + parent_id: self.parent_id, } } @@ -312,6 +327,11 @@ impl MemoryConsumer { &self.name } + /// Returns the parent consumer ID if this consumer has a parent + pub fn parent_id(&self) -> Option { + self.parent_id + } + /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation pub fn register(self, pool: &Arc) -> MemoryReservation { @@ -462,6 +482,27 @@ impl MemoryReservation { } } + /// Create a new [`MemoryReservation`] with a new [`MemoryConsumer] that + /// is a child of this reservation's consumer. + /// + /// This is useful for creating memory consumers with lineage tracking. + pub fn new_child_reservation(&self, name: impl Into) -> MemoryReservation { + MemoryConsumer::new_with_parent(name, self.consumer().id()) + .register(&self.registration.pool) + } + + /// Create a new [`MemoryReservation`] which is a clone to the current + /// [`MemoryReservation`]. This means that it's a cloned [`MemoryConsumer`] + /// with the same configuration, but a new unique ID. + /// + /// This is useful for creating memory consumers with lineage tracking, + /// while dealing with multithreaded scenarios. + pub fn cloned_reservation(&self) -> MemoryReservation { + self.consumer() + .clone_with_new_id() + .register(&self.registration.pool) + } + /// Splits off all the bytes from this [`MemoryReservation`] into /// a new [`MemoryReservation`] with the same [`MemoryConsumer`] pub fn take(&mut self) -> MemoryReservation { From 0bc7630e56c62fe16218a26c06cc7c04b03ba28d Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 19:53:47 +0300 Subject: [PATCH 05/10] refactor: make a separate ReportedConsumer whcih represents a snapshot --- datafusion/execution/src/memory_pool/pool.rs | 59 +++++++++++++------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 306df3defdbb..66a09fdeabb2 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -268,10 +268,47 @@ fn insufficient_capacity_err( struct TrackedConsumer { name: String, can_spill: bool, + parent_id: Option, reserved: AtomicUsize, peak: AtomicUsize, } +/// A snapshot of a TrackedConsumer with static values and consumer ID +#[derive(Debug, Clone)] +struct ReportedConsumer { + consumer_id: usize, + name: String, + can_spill: bool, + reserved: usize, + peak: usize, +} + +impl std::fmt::Display for ReportedConsumer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}#{}(can spill: {}) consumed {}, peak {}", + self.name, + self.consumer_id, + self.can_spill, + human_readable_size(self.reserved), + human_readable_size(self.peak) + ) + } +} + +impl From<(usize, &TrackedConsumer)> for ReportedConsumer { + fn from((consumer_id, tracked): (usize, &TrackedConsumer)) -> Self { + Self { + consumer_id, + name: tracked.name.clone(), + can_spill: tracked.can_spill, + reserved: tracked.reserved(), + peak: tracked.peak(), + } + } +} + impl TrackedConsumer { /// Shorthand to return the currently reserved value fn reserved(&self) -> usize { @@ -380,29 +417,13 @@ impl TrackConsumersPool { .tracked_consumers .lock() .iter() - .map(|(consumer_id, tracked_consumer)| { - ( - ( - *consumer_id, - tracked_consumer.name.to_owned(), - tracked_consumer.can_spill, - tracked_consumer.peak(), - ), - tracked_consumer.reserved(), - ) - }) + .map(|(id, consumer_stats)| ReportedConsumer::from((*id, consumer_stats))) .collect::>(); - consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering + consumers.sort_by(|a, b| b.reserved.cmp(&a.reserved)); // inverse ordering consumers[0..std::cmp::min(top, consumers.len())] .iter() - .map(|((id, name, can_spill, peak), size)| { - format!( - " {name}#{id}(can spill: {can_spill}) consumed {}, peak {}", - human_readable_size(*size), - human_readable_size(*peak), - ) - }) + .map(|reported_consumer| format!(" {reported_consumer}")) .collect::>() .join(",\n") + "." From ab62fcb30f596fd7714d3f00b20670a3df2cebbc Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 20:50:11 +0300 Subject: [PATCH 06/10] feat: add consumer stacktrace --- datafusion/execution/src/memory_pool/pool.rs | 129 +++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 66a09fdeabb2..0f6e09184a20 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -277,6 +277,7 @@ struct TrackedConsumer { #[derive(Debug, Clone)] struct ReportedConsumer { consumer_id: usize, + parent_id: Option, name: String, can_spill: bool, reserved: usize, @@ -301,6 +302,7 @@ impl From<(usize, &TrackedConsumer)> for ReportedConsumer { fn from((consumer_id, tracked): (usize, &TrackedConsumer)) -> Self { Self { consumer_id, + parent_id: tracked.parent_id, name: tracked.name.clone(), can_spill: tracked.can_spill, reserved: tracked.reserved(), @@ -309,6 +311,62 @@ impl From<(usize, &TrackedConsumer)> for ReportedConsumer { } } +/// A stack trace representation of a memory consumer's lineage +#[derive(Debug, Clone)] +struct ConsumerStackTrace { + /// The consumer for which we're building the stack trace + consumer: ReportedConsumer, + /// The trace from immediate parent to oldest ancestor (excluding the consumer itself) + trace: Vec, +} + +impl std::fmt::Display for ConsumerStackTrace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.trace.is_empty() { + // If no trace, just display the consumer + write!(f, "{}", self.consumer)?; + } else { + // If there's a trace, show the full stack backtrace + writeln!(f, "{}:", self.consumer)?; + writeln!(f, "stack backtrace:")?; + writeln!(f, " 0: {}", self.consumer)?; + for (i, consumer) in self.trace.iter().enumerate() { + writeln!(f, " {}: {}", i + 1, consumer)?; + } + } + Ok(()) + } +} + +/// Builds a stack trace for a consumer, following parent relationships until reaching a root +/// +/// # Arguments +/// * `consumer` - The consumer to build a stack trace for +/// * `map` - HashMap mapping consumer_id to ReportedConsumer +/// +/// # Returns +/// A ConsumerStackTrace containing the consumer and its parent lineage +fn build_consumer_stack_trace( + consumer: ReportedConsumer, + map: &HashMap, +) -> ConsumerStackTrace { + let mut trace = Vec::new(); + let mut current_parent_id = consumer.parent_id; + + // Follow the parent chain until we reach a root (parent_id is None) + while let Some(parent_id) = current_parent_id { + if let Some(parent_consumer) = map.get(&parent_id) { + trace.push(parent_consumer.clone()); + current_parent_id = parent_consumer.parent_id; + } else { + // Parent not found in map, stop traversal + break; + } + } + + ConsumerStackTrace { consumer, trace } +} + impl TrackedConsumer { /// Shorthand to return the currently reserved value fn reserved(&self) -> usize { @@ -440,6 +498,7 @@ impl MemoryPool for TrackConsumersPool { TrackedConsumer { name: consumer.name().to_string(), can_spill: consumer.can_spill(), + parent_id: consumer.parent_id(), reserved: Default::default(), peak: Default::default(), }, @@ -823,4 +882,74 @@ mod tests { r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B. "); } + + #[test] + fn test_build_consumer_stack_trace_no_parent() { + // Test case 1: Consumer with no parent (root consumer) + let mut reported_consumers = HashMap::new(); + + let root_consumer = ReportedConsumer { + consumer_id: 1, + name: "root".to_string(), + can_spill: false, + parent_id: None, + reserved: 100, + peak: 150, + }; + + reported_consumers.insert(1, root_consumer.clone()); + + let stack_trace = + build_consumer_stack_trace(root_consumer.clone(), &reported_consumers); + + assert_eq!(stack_trace.consumer.consumer_id, 1); + assert_eq!(stack_trace.trace.len(), 0); // No parents + + assert_eq!( + format!("{stack_trace}"), + "root#1(can spill: false) consumed 100.0 B, peak 150.0 B" + ); + } + + #[test] + fn test_build_consumer_stack_trace_hierarchy() { + let mut reported_consumers = HashMap::new(); + + // Create: great_grandparent(1) -> grandparent(2) -> parent(3) -> child(4) -> grandchild(5) + for i in 1..=5 { + let parent_id = if i == 1 { None } else { Some(i - 1) }; + let consumer = ReportedConsumer { + consumer_id: i, + name: format!("consumer_{i}"), + can_spill: i % 2 == 0, + parent_id, + reserved: i * 10, + peak: i * 15, + }; + reported_consumers.insert(i, consumer); + } + + let grandchild = reported_consumers.get(&5).unwrap().clone(); + let stack_trace = build_consumer_stack_trace(grandchild, &reported_consumers); + + assert_eq!(stack_trace.consumer.consumer_id, 5); + assert_eq!(stack_trace.trace.len(), 4); + // Verify the order: immediate parent (4) to oldest ancestor (1) + assert_eq!(stack_trace.trace[0].consumer_id, 4); // immediate parent + assert_eq!(stack_trace.trace[1].consumer_id, 3); // grandparent + assert_eq!(stack_trace.trace[2].consumer_id, 2); // great-grandparent + assert_eq!(stack_trace.trace[3].consumer_id, 1); // oldest ancestor + + assert_eq!( + format!("{stack_trace}"), + "consumer_5#5(can spill: false) consumed 50.0 B, peak 75.0 B: +stack backtrace: + 0: consumer_5#5(can spill: false) consumed 50.0 B, peak 75.0 B + 1: consumer_4#4(can spill: true) consumed 40.0 B, peak 60.0 B + 2: consumer_3#3(can spill: false) consumed 30.0 B, peak 45.0 B + 3: consumer_2#2(can spill: true) consumed 20.0 B, peak 30.0 B + 4: consumer_1#1(can spill: false) consumed 10.0 B, peak 15.0 B +" + ); + } } From 3bc68207d921f00986759cfe893d5597058e3bef Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 15:43:58 +0300 Subject: [PATCH 07/10] feat: switch parquet writer to use the new lineage-based memory consumers --- datafusion/core/tests/memory_limit/mod.rs | 20 +++---- .../datasource-parquet/src/file_format.rs | 55 +++++++++++-------- 2 files changed, 43 insertions(+), 32 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 15de08eb437c..38db8d0efd01 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -393,13 +393,13 @@ async fn oom_parquet_sink() { path.to_string_lossy() )) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as: - ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB. -Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool", + "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=1)) with top memory consumers (across reservations) as: + ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=1)) with x KB already allocated for this reservation - x KB remain available for the total pool", ]) .with_memory_limit(200_000) .run() @@ -425,9 +425,9 @@ async fn oom_with_tracked_consumer_pool() { path.to_string_lossy() )) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as: - ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB. -Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool" + "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=2)) with top memory consumers (across reservations) as: + ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=2)) with x KB already allocated for this reservation - x KB remain available for the total pool" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index d4a84e42ce45..a68ce1c271af 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -52,7 +52,7 @@ use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; -use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; @@ -1268,6 +1268,7 @@ impl FileSink for ParquetSink { parquet_props.clone(), ) .await?; + // Create a reservation for non-parallel parquet writing let mut reservation = MemoryConsumer::new(format!("ParquetSink[path={path}]")) .register(context.memory_pool()); @@ -1302,7 +1303,9 @@ impl FileSink for ParquetSink { let props = parquet_props.clone(); let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata; let parallel_options_clone = parallel_options.clone(); - let pool = Arc::clone(context.memory_pool()); + // Create a reservation for the parallel parquet writing + let reservation = MemoryConsumer::new("ParquetSink(ParallelWriter)") + .register(context.memory_pool()); file_write_tasks.spawn(async move { let file_metadata = output_single_parquet_file_parallelized( writer, @@ -1311,7 +1314,7 @@ impl FileSink for ParquetSink { &props, skip_arrow_metadata, parallel_options_clone, - pool, + reservation, ) .await?; Ok((path, file_metadata)) @@ -1392,20 +1395,20 @@ type ColSender = Sender; fn spawn_column_parallel_row_group_writer( col_writers: Vec, max_buffer_size: usize, - pool: &Arc, + parent_reservation: &MemoryReservation, ) -> Result<(Vec, Vec)> { let num_columns = col_writers.len(); let mut col_writer_tasks = Vec::with_capacity(num_columns); let mut col_array_channels = Vec::with_capacity(num_columns); - for writer in col_writers.into_iter() { + for (i, writer) in col_writers.into_iter().enumerate() { // Buffer size of this channel limits the number of arrays queued up for column level serialization let (send_array, receive_array) = mpsc::channel::(max_buffer_size); col_array_channels.push(send_array); - let reservation = - MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool); + let reservation = parent_reservation + .new_child_reservation(format!("ParquetSink(ArrowColumnWriter(col={i}))")); let task = SpawnedTask::spawn(column_serializer_task( receive_array, writer, @@ -1457,21 +1460,19 @@ async fn send_arrays_to_col_writers( fn spawn_rg_join_and_finalize_task( column_writer_tasks: Vec, rg_rows: usize, - pool: &Arc, + mut rg_reservation: MemoryReservation, ) -> SpawnedTask { - let mut rg_reservation = - MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool); - SpawnedTask::spawn(async move { let num_cols = column_writer_tasks.len(); let mut finalized_rg = Vec::with_capacity(num_cols); + let reservation = &mut rg_reservation; for task in column_writer_tasks.into_iter() { let (writer, _col_reservation) = task .join_unwind() .await .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; let encoded_size = writer.get_estimated_total_bytes(); - rg_reservation.grow(encoded_size); + reservation.grow(encoded_size); finalized_rg.push(writer.close()?); } @@ -1494,8 +1495,14 @@ fn spawn_parquet_parallel_serialization_task( schema: Arc, writer_props: Arc, parallel_options: ParallelParquetWriterOptions, - pool: Arc, + parent_reservation: &MemoryReservation, ) -> SpawnedTask> { + let cols_reservation = + parent_reservation.new_child_reservation("ParquetSink(ParallelColumnWriters)"); + + let rg_reservation = + parent_reservation.new_child_reservation("ParquetSink(SerializedRowGroupWriter)"); + SpawnedTask::spawn(async move { let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream; let max_row_group_rows = writer_props.max_row_group_size(); @@ -1503,7 +1510,11 @@ fn spawn_parquet_parallel_serialization_task( let col_writers = row_group_writer_factory.create_column_writers(row_group_index)?; let (mut column_writer_handles, mut col_array_channels) = - spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?; + spawn_column_parallel_row_group_writer( + col_writers, + max_buffer_rb, + &cols_reservation, + )?; let mut current_rg_rows = 0; while let Some(mut rb) = data.recv().await { @@ -1537,7 +1548,7 @@ fn spawn_parquet_parallel_serialization_task( let finalize_rg_task = spawn_rg_join_and_finalize_task( column_writer_handles, max_row_group_rows, - &pool, + rg_reservation.cloned_reservation(), ); // Do not surface error from closed channel (means something @@ -1556,7 +1567,7 @@ fn spawn_parquet_parallel_serialization_task( spawn_column_parallel_row_group_writer( col_writers, max_buffer_rb, - &pool, + &cols_reservation, )?; } } @@ -1568,7 +1579,7 @@ fn spawn_parquet_parallel_serialization_task( let finalize_rg_task = spawn_rg_join_and_finalize_task( column_writer_handles, current_rg_rows, - &pool, + rg_reservation.cloned_reservation(), ); // Do not surface error from closed channel (means something @@ -1589,10 +1600,10 @@ async fn concatenate_parallel_row_groups( merged_buff: SharedBuffer, mut serialize_rx: Receiver>, mut object_store_writer: Box, - pool: Arc, + parent_reservation: &MemoryReservation, ) -> Result { let mut file_reservation = - MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool); + parent_reservation.new_child_reservation("ParquetSink(SerializedFileWriter)"); while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; @@ -1639,7 +1650,7 @@ async fn output_single_parquet_file_parallelized( parquet_props: &WriterProperties, skip_arrow_metadata: bool, parallel_options: ParallelParquetWriterOptions, - pool: Arc, + parent_reservation: MemoryReservation, ) -> Result { let max_rowgroups = parallel_options.max_parallel_row_groups; // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel @@ -1665,14 +1676,14 @@ async fn output_single_parquet_file_parallelized( Arc::clone(&output_schema), Arc::clone(&arc_props), parallel_options, - Arc::clone(&pool), + &parent_reservation, ); let file_metadata = concatenate_parallel_row_groups( writer, merged_buff, serialize_rx, object_store_writer, - pool, + &parent_reservation, ) .await?; From c7e869fcf673c4d51027ab766acd3be42d8785fb Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 22:17:57 +0300 Subject: [PATCH 08/10] feat: turn on stacktrace (if available) in error reporting --- datafusion/core/tests/memory_limit/mod.rs | 42 +++++++++++++++++--- datafusion/execution/src/memory_pool/pool.rs | 11 ++++- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 38db8d0efd01..8becd550fbda 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -394,11 +394,36 @@ async fn oom_parquet_sink() { )) .with_expected_errors(vec![ "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=1)) with top memory consumers (across reservations) as: - ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB, - ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB. + ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB: +stack backtrace: + 0: ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB + 1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B + 2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B +, + ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB: +stack backtrace: + 0: ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB + 1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B + 2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B +, + ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB: +stack backtrace: + 0: ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB + 1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B + 2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B +, + ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB: +stack backtrace: + 0: ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB + 1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B + 2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B +, + ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB: +stack backtrace: + 0: ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB + 1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B + 2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B +. Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=1)) with x KB already allocated for this reservation - x KB remain available for the total pool", ]) .with_memory_limit(200_000) @@ -426,7 +451,12 @@ async fn oom_with_tracked_consumer_pool() { )) .with_expected_errors(vec![ "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=2)) with top memory consumers (across reservations) as: - ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB. + ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB: +stack backtrace: + 0: ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB + 1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B + 2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B +. Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=2)) with x KB already allocated for this reservation - x KB remain available for the total pool" ]) .with_memory_pool(Arc::new( diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 0f6e09184a20..efeebec94159 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -479,9 +479,18 @@ impl TrackConsumersPool { .collect::>(); consumers.sort_by(|a, b| b.reserved.cmp(&a.reserved)); // inverse ordering + let consumers_map: HashMap = consumers + .iter() + .map(|consumer| (consumer.consumer_id, consumer.clone())) + .collect(); + consumers[0..std::cmp::min(top, consumers.len())] .iter() - .map(|reported_consumer| format!(" {reported_consumer}")) + .map(|reported_consumer| { + let trace = + build_consumer_stack_trace(reported_consumer.clone(), &consumers_map); + format!(" {trace}") + }) .collect::>() .join(",\n") + "." From 7f45ee8b453a933eace75dce8374ce257ffc106e Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Mon, 6 Oct 2025 23:41:45 +0300 Subject: [PATCH 09/10] test: update tests for OOM output --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 4 ++-- datafusion/physical-plan/src/joins/nested_loop_join.rs | 2 +- datafusion/physical-plan/src/joins/sort_merge_join/tests.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index da3fef672d55..e917319efbba 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4381,13 +4381,13 @@ mod tests { // Asserting that stream-level reservation attempting to overallocate assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]" + "Resources exhausted: Additional allocation failed for HashJoinInput[partition=1] with top memory consumers (across reservations) as:\n HashJoinInput[partition=1]" ); assert_contains!( err.to_string(), - "Failed to allocate additional 120.0 B for HashJoinInput[1]" + "Failed to allocate additional 120.0 B for HashJoinInput[partition=1]" ); } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b953fc219cd6..db5fb369d547 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -2475,7 +2475,7 @@ pub(crate) mod tests { assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]" + "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[partition=0]" ); } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 83a5c4041cc0..4e7b2b85ea8a 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -1799,7 +1799,7 @@ async fn overallocation_single_batch_no_spill() -> Result<()> { let err = common::collect(stream).await.unwrap_err(); assert_contains!(err.to_string(), "Failed to allocate additional"); - assert_contains!(err.to_string(), "SMJStream[0]"); + assert_contains!(err.to_string(), "SMJStream[partition=0]"); assert_contains!(err.to_string(), "Disk spilling disabled"); assert!(join.metrics().is_some()); assert_eq!(join.metrics().unwrap().spill_count(), Some(0)); @@ -1879,7 +1879,7 @@ async fn overallocation_multi_batch_no_spill() -> Result<()> { let err = common::collect(stream).await.unwrap_err(); assert_contains!(err.to_string(), "Failed to allocate additional"); - assert_contains!(err.to_string(), "SMJStream[0]"); + assert_contains!(err.to_string(), "SMJStream[partition=0]"); assert_contains!(err.to_string(), "Disk spilling disabled"); assert!(join.metrics().is_some()); assert_eq!(join.metrics().unwrap().spill_count(), Some(0)); From a77e316fc9253f7d15900e55e81f5e502b8ec051 Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Tue, 7 Oct 2025 20:27:31 +0300 Subject: [PATCH 10/10] refactor: enable can_spill config setting when creating child reservations --- .../datasource-parquet/src/file_format.rs | 18 ++++++++++-------- datafusion/execution/src/memory_pool/mod.rs | 7 ++++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index a68ce1c271af..49e2bf2eb4aa 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1407,8 +1407,10 @@ fn spawn_column_parallel_row_group_writer( mpsc::channel::(max_buffer_size); col_array_channels.push(send_array); - let reservation = parent_reservation - .new_child_reservation(format!("ParquetSink(ArrowColumnWriter(col={i}))")); + let reservation = parent_reservation.new_child_reservation( + format!("ParquetSink(ArrowColumnWriter(col={i}))"), + false, + ); let task = SpawnedTask::spawn(column_serializer_task( receive_array, writer, @@ -1497,11 +1499,11 @@ fn spawn_parquet_parallel_serialization_task( parallel_options: ParallelParquetWriterOptions, parent_reservation: &MemoryReservation, ) -> SpawnedTask> { - let cols_reservation = - parent_reservation.new_child_reservation("ParquetSink(ParallelColumnWriters)"); + let cols_reservation = parent_reservation + .new_child_reservation("ParquetSink(ParallelColumnWriters)", false); - let rg_reservation = - parent_reservation.new_child_reservation("ParquetSink(SerializedRowGroupWriter)"); + let rg_reservation = parent_reservation + .new_child_reservation("ParquetSink(SerializedRowGroupWriter)", false); SpawnedTask::spawn(async move { let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream; @@ -1602,8 +1604,8 @@ async fn concatenate_parallel_row_groups( mut object_store_writer: Box, parent_reservation: &MemoryReservation, ) -> Result { - let mut file_reservation = - parent_reservation.new_child_reservation("ParquetSink(SerializedFileWriter)"); + let mut file_reservation = parent_reservation + .new_child_reservation("ParquetSink(SerializedFileWriter)", false); while let Some(task) = serialize_rx.recv().await { let result = task.join_unwind().await; diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 93b3fd58131b..21015b89e870 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -486,8 +486,13 @@ impl MemoryReservation { /// is a child of this reservation's consumer. /// /// This is useful for creating memory consumers with lineage tracking. - pub fn new_child_reservation(&self, name: impl Into) -> MemoryReservation { + pub fn new_child_reservation( + &self, + name: impl Into, + can_spill: bool, + ) -> MemoryReservation { MemoryConsumer::new_with_parent(name, self.consumer().id()) + .with_can_spill(can_spill) .register(&self.registration.pool) }