diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap index 62f864b3adb6..b04863eb1508 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top2.snap @@ -16,7 +16,7 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by -Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: +Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. Error: Failed to allocate diff --git a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap index 9845d095c918..5d3b809d7e05 100644 --- a/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap +++ b/datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap @@ -14,7 +14,7 @@ exit_code: 1 [CLI_VERSION] Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by -Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as: +Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as: Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB, Consumer(can spill: bool) consumed XB, peak XB. diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 89bc48b1e634..15de08eb437c 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -85,8 +85,10 @@ async fn group_by_none() { TestCase::new() .with_query("select median(request_bytes) from t") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n AggregateStream", + "Resources exhausted: Additional allocation failed for AggregateStream[partition=0] with top memory consumers (across reservations) as: + AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B, + AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x B for AggregateStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(2_000) .run() @@ -98,7 +100,10 @@ async fn group_by_row_hash() { TestCase::new() .with_query("select count(*) from t GROUP BY response_bytes") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" + "Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as: + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B, + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B. +Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(2_000) .run() @@ -111,7 +116,10 @@ async fn group_by_hash() { // group by dict column .with_query("select count(*) from t GROUP BY service, host, pod, container") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream" + "Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as: + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B, + GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B. +Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool" ]) .with_memory_limit(1_000) .run() @@ -124,8 +132,9 @@ async fn join_by_key_multiple_partitions() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n HashJoinInput", + "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as: + HashJoinInput#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_config(config) @@ -139,8 +148,9 @@ async fn join_by_key_single_partition() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n HashJoinInput", + "Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as: + HashJoinInput#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_config(config) @@ -153,7 +163,9 @@ async fn join_by_expression() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]", + "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as: + NestedLoopJoinLoad[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for NestedLoopJoinLoad[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .run() @@ -165,8 +177,9 @@ async fn cross_join() { TestCase::new() .with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n CrossJoinExec", + "Resources exhausted: Additional allocation failed for CrossJoinExec with top memory consumers (across reservations) as: + CrossJoinExec#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for CrossJoinExec with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .run() @@ -185,9 +198,9 @@ async fn sort_merge_join_no_spill() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Failed to allocate additional", - "SMJStream", - "Disk spilling disabled", + "Execution error: Additional allocation failed for SMJStream[partition=0] with top memory consumers (across reservations) as: + SMJStream[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for SMJStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_config(config) @@ -222,7 +235,9 @@ async fn symmetric_hash_join() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream", + "Resources exhausted: Additional allocation failed for SymmetricHashJoinStream[partition=0] with top memory consumers (across reservations) as: + SymmetricHashJoinStream[partition=0]#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for SymmetricHashJoinStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .with_scenario(Scenario::AccessLogStreaming) @@ -240,7 +255,9 @@ async fn sort_preserving_merge() { // so only a merge is needed .with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10") .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec", + "Resources exhausted: Additional allocation failed for SortPreservingMergeExec[partition=0] with top memory consumers (across reservations) as: + SortPreservingMergeExec[partition=0]#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for SortPreservingMergeExec[partition=0] with x KB already allocated for this reservation - x B remain available for the total pool", ]) // provide insufficient memory to merge .with_memory_limit(partition_size / 2) @@ -319,9 +336,10 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:", - "B for ExternalSorterMerge", + "Resources exhausted: Additional allocation failed for ExternalSorterMerge[partition=0] with top memory consumers (across reservations) as: + ExternalSorter[partition=0]#ID(can spill: true) consumed x KB, peak x KB, + ExternalSorterMerge[partition=0]#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ExternalSorterMerge[partition=0] with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_config(config) .run() @@ -350,8 +368,9 @@ async fn oom_recursive_cte() { SELECT * FROM nodes;", ) .with_expected_errors(vec![ - "Resources exhausted: Additional allocation failed", - "with top memory consumers (across reservations) as:\n RecursiveQuery", + "Resources exhausted: Additional allocation failed for RecursiveQuery with top memory consumers (across reservations) as: + RecursiveQuery#ID(can spill: false) consumed x B, peak x B. +Error: Failed to allocate additional x KB for RecursiveQuery with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(2_000) .run() @@ -374,8 +393,13 @@ async fn oom_parquet_sink() { path.to_string_lossy() )) .with_expected_errors(vec![ - "Failed to allocate additional", - "for ParquetSink(ArrowColumnWriter)", + "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as: + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB, + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool", ]) .with_memory_limit(200_000) .run() @@ -401,9 +425,9 @@ async fn oom_with_tracked_consumer_pool() { path.to_string_lossy() )) .with_expected_errors(vec![ - "Failed to allocate additional", - "for ParquetSink(ArrowColumnWriter)", - "Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)" + "Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as: + ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB. +Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( @@ -420,8 +444,10 @@ async fn oom_grouped_hash_aggregate() { TestCase::new() .with_query("SELECT COUNT(*), SUM(request_bytes) FROM t GROUP BY host") .with_expected_errors(vec![ - "Failed to allocate additional", - "GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))", + "Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with top memory consumers (across reservations) as: + GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B, + GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B. +Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with x B already allocated for this reservation - x B remain available for the total pool", ]) .with_memory_limit(1_000) .run() @@ -885,8 +911,9 @@ impl TestCase { "Unexpected failure when running, expected success but got: {e}" ) } else { + let err_msg = normalize_oom_errors(&e.to_string()); for error_substring in expected_errors { - assert_contains!(e.to_string(), error_substring); + assert_contains!(&err_msg, error_substring); } } } @@ -894,6 +921,20 @@ impl TestCase { } } +fn normalize_oom_errors(err: &str) -> String { + let re = regex::Regex::new(r"\#\d+\(can spill:").unwrap(); + let mut err = re.replace_all(err, "#ID(can spill:").to_string(); + + let re = regex::Regex::new(r"\d+\.\d+ KB").unwrap(); + err = re.replace_all(&err, "x KB").to_string(); + + let re = regex::Regex::new(r"\d+\.\d+ MB").unwrap(); + err = re.replace_all(&err, "x MB").to_string(); + + let re = regex::Regex::new(r"\d+\.\d+ B").unwrap(); + re.replace_all(&err, "x B").to_string() +} + /// 50 byte memory limit const MEMORY_FRACTION: f64 = 0.95; diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 963c1d77950c..d4a84e42ce45 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1268,8 +1268,9 @@ impl FileSink for ParquetSink { parquet_props.clone(), ) .await?; - let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]")) - .register(context.memory_pool()); + let mut reservation = + MemoryConsumer::new(format!("ParquetSink[path={path}]")) + .register(context.memory_pool()); file_write_tasks.spawn(async move { while let Some(batch) = rx.recv().await { writer.write(&batch).await?; diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 9474a5f88c92..19e1929c919f 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -88,8 +88,9 @@ impl AggregateStream { }; let accumulators = create_accumulators(&agg.aggr_expr)?; - let reservation = MemoryConsumer::new(format!("AggregateStream[{partition}]")) - .register(context.memory_pool()); + let reservation = + MemoryConsumer::new(format!("AggregateStream[partition={partition}]")) + .register(context.memory_pool()); let inner = AggregateStreamInner { schema: Arc::clone(&agg.schema), diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 4c293b0498e7..e917319efbba 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -953,7 +953,7 @@ impl ExecutionPlan for HashJoinExec { let left_stream = self.left.execute(partition, Arc::clone(&context))?; let reservation = - MemoryConsumer::new(format!("HashJoinInput[{partition}]")) + MemoryConsumer::new(format!("HashJoinInput[partition={partition}]")) .register(context.memory_pool()); OnceFut::new(collect_left_input( @@ -4381,13 +4381,13 @@ mod tests { // Asserting that stream-level reservation attempting to overallocate assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]" + "Resources exhausted: Additional allocation failed for HashJoinInput[partition=1] with top memory consumers (across reservations) as:\n HashJoinInput[partition=1]" ); assert_contains!( err.to_string(), - "Failed to allocate additional 120.0 B for HashJoinInput[1]" + "Failed to allocate additional 120.0 B for HashJoinInput[partition=1]" ); } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 0974b3a9114e..db5fb369d547 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -496,7 +496,7 @@ impl ExecutionPlan for NestedLoopJoinExec { // Initialization reservation for load of inner table let load_reservation = - MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]")) + MemoryConsumer::new(format!("NestedLoopJoinLoad[partition={partition}]")) .register(context.memory_pool()); let build_side_data = self.build_side_data.try_once(|| { @@ -2475,7 +2475,7 @@ pub(crate) mod tests { assert_contains!( err.to_string(), - "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]" + "Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[partition=0]" ); } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 592878a3bb1c..1a66079b9d1b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -491,8 +491,9 @@ impl ExecutionPlan for SortMergeJoinExec { let batch_size = context.session_config().batch_size(); // create memory reservation - let reservation = MemoryConsumer::new(format!("SMJStream[{partition}]")) - .register(context.memory_pool()); + let reservation = + MemoryConsumer::new(format!("SMJStream[partition={partition}]")) + .register(context.memory_pool()); // create join stream Ok(Box::pin(SortMergeJoinStream::try_new( diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs index 83a5c4041cc0..4e7b2b85ea8a 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs @@ -1799,7 +1799,7 @@ async fn overallocation_single_batch_no_spill() -> Result<()> { let err = common::collect(stream).await.unwrap_err(); assert_contains!(err.to_string(), "Failed to allocate additional"); - assert_contains!(err.to_string(), "SMJStream[0]"); + assert_contains!(err.to_string(), "SMJStream[partition=0]"); assert_contains!(err.to_string(), "Disk spilling disabled"); assert!(join.metrics().is_some()); assert_eq!(join.metrics().unwrap().spill_count(), Some(0)); @@ -1879,7 +1879,7 @@ async fn overallocation_multi_batch_no_spill() -> Result<()> { let err = common::collect(stream).await.unwrap_err(); assert_contains!(err.to_string(), "Failed to allocate additional"); - assert_contains!(err.to_string(), "SMJStream[0]"); + assert_contains!(err.to_string(), "SMJStream[partition=0]"); assert_contains!(err.to_string(), "Disk spilling disabled"); assert!(join.metrics().is_some()); assert_eq!(join.metrics().unwrap().spill_count(), Some(0)); diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index b55b7e15f194..5f8ec5f81db4 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -524,8 +524,10 @@ impl ExecutionPlan for SymmetricHashJoinExec { context.session_config().enforce_batch_size_in_joins(); let reservation = Arc::new(Mutex::new( - MemoryConsumer::new(format!("SymmetricHashJoinStream[{partition}]")) - .register(context.memory_pool()), + MemoryConsumer::new(format!( + "SymmetricHashJoinStream[partition={partition}]" + )) + .register(context.memory_pool()), )); if let Some(g) = graph.as_ref() { reservation.lock().try_grow(g.size())?; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 22bc1b5cf924..3670a6c5daa4 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -204,7 +204,7 @@ impl RepartitionExecState { let mut channels = HashMap::with_capacity(txs.len()); for (partition, (tx, rx)) in txs.into_iter().zip(rxs).enumerate() { let reservation = Arc::new(Mutex::new( - MemoryConsumer::new(format!("{name}[{partition}]")) + MemoryConsumer::new(format!("{name}[partition={partition}]")) .register(context.memory_pool()), )); channels.insert(partition, (tx, rx, reservation)); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 7f47d60c735a..c5a2a3af48dd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -280,12 +280,13 @@ impl ExternalSorter { runtime: Arc, ) -> Result { let metrics = ExternalSorterMetrics::new(metrics, partition_id); - let reservation = MemoryConsumer::new(format!("ExternalSorter[{partition_id}]")) - .with_can_spill(true) - .register(&runtime.memory_pool); + let reservation = + MemoryConsumer::new(format!("ExternalSorter[partition={partition_id}]")) + .with_can_spill(true) + .register(&runtime.memory_pool); let merge_reservation = - MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]")) + MemoryConsumer::new(format!("ExternalSorterMerge[partition={partition_id}]")) .register(&runtime.memory_pool); let spill_manager = SpillManager::new( diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 09ad71974e6c..6d3c34693b90 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -292,9 +292,10 @@ impl ExecutionPlan for SortPreservingMergeExec { ); let schema = self.schema(); - let reservation = - MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]")) - .register(&context.runtime_env().memory_pool); + let reservation = MemoryConsumer::new(format!( + "SortPreservingMergeExec[partition={partition}]" + )) + .register(&context.runtime_env().memory_pool); match input_partitions { 0 => internal_err!( diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 9435de1cc448..1c72d1defad7 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -188,7 +188,7 @@ impl TopK { metrics: &ExecutionPlanMetricsSet, filter: Arc>, ) -> Result { - let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) + let reservation = MemoryConsumer::new(format!("TopK[partition={partition_id}]")) .register(&runtime.memory_pool); let sort_fields = build_sort_fields(&expr, &schema)?;