Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/tests/snapshots/[email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ exit_code: 1
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB.
Error: Failed to allocate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ exit_code: 1
[CLI_VERSION]
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
caused by
Resources exhausted: Additional allocation failed for ExternalSorter[0] with top memory consumers (across reservations) as:
Resources exhausted: Additional allocation failed for ExternalSorter[partition=0] with top memory consumers (across reservations) as:
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB,
Consumer(can spill: bool) consumed XB, peak XB.
Expand Down
99 changes: 70 additions & 29 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ async fn group_by_none() {
TestCase::new()
.with_query("select median(request_bytes) from t")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n AggregateStream",
"Resources exhausted: Additional allocation failed for AggregateStream[partition=0] with top memory consumers (across reservations) as:
AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B,
AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x B for AggregateStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(2_000)
.run()
Expand All @@ -98,7 +100,10 @@ async fn group_by_row_hash() {
TestCase::new()
.with_query("select count(*) from t GROUP BY response_bytes")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as:
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B,
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B.
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(2_000)
.run()
Expand All @@ -111,7 +116,10 @@ async fn group_by_hash() {
// group by dict column
.with_query("select count(*) from t GROUP BY service, host, pod, container")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as:
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B,
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B.
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool"
])
.with_memory_limit(1_000)
.run()
Expand All @@ -124,8 +132,9 @@ async fn join_by_key_multiple_partitions() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n HashJoinInput",
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:
HashJoinInput#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -139,8 +148,9 @@ async fn join_by_key_single_partition() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n HashJoinInput",
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:
HashJoinInput#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_config(config)
Expand All @@ -153,7 +163,9 @@ async fn join_by_expression() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]",
"Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as:
NestedLoopJoinLoad[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for NestedLoopJoinLoad[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.run()
Expand All @@ -165,8 +177,9 @@ async fn cross_join() {
TestCase::new()
.with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n CrossJoinExec",
"Resources exhausted: Additional allocation failed for CrossJoinExec with top memory consumers (across reservations) as:
CrossJoinExec#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for CrossJoinExec with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.run()
Expand All @@ -185,9 +198,9 @@ async fn sort_merge_join_no_spill() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Failed to allocate additional",
"SMJStream",
"Disk spilling disabled",
"Execution error: Additional allocation failed for SMJStream[partition=0] with top memory consumers (across reservations) as:
SMJStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for SMJStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_config(config)
Expand Down Expand Up @@ -222,7 +235,9 @@ async fn symmetric_hash_join() {
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
)
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
"Resources exhausted: Additional allocation failed for SymmetricHashJoinStream[partition=0] with top memory consumers (across reservations) as:
SymmetricHashJoinStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for SymmetricHashJoinStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.with_scenario(Scenario::AccessLogStreaming)
Expand All @@ -240,7 +255,9 @@ async fn sort_preserving_merge() {
// so only a merge is needed
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
"Resources exhausted: Additional allocation failed for SortPreservingMergeExec[partition=0] with top memory consumers (across reservations) as:
SortPreservingMergeExec[partition=0]#ID(can spill: false) consumed x KB, peak x KB.
Error: Failed to allocate additional x KB for SortPreservingMergeExec[partition=0] with x KB already allocated for this reservation - x B remain available for the total pool",
])
// provide insufficient memory to merge
.with_memory_limit(partition_size / 2)
Expand Down Expand Up @@ -319,9 +336,10 @@ async fn sort_spill_reservation() {

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
"Resources exhausted: Additional allocation failed for ExternalSorterMerge[partition=0] with top memory consumers (across reservations) as:
ExternalSorter[partition=0]#ID(can spill: true) consumed x KB, peak x KB,
ExternalSorterMerge[partition=0]#ID(can spill: false) consumed x KB, peak x KB.
Error: Failed to allocate additional x KB for ExternalSorterMerge[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_config(config)
.run()
Expand Down Expand Up @@ -350,8 +368,9 @@ async fn oom_recursive_cte() {
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:\n RecursiveQuery",
"Resources exhausted: Additional allocation failed for RecursiveQuery with top memory consumers (across reservations) as:
RecursiveQuery#ID(can spill: false) consumed x B, peak x B.
Error: Failed to allocate additional x KB for RecursiveQuery with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(2_000)
.run()
Expand All @@ -374,8 +393,13 @@ async fn oom_parquet_sink() {
path.to_string_lossy()
))
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as:
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB,
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB.
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool",
])
.with_memory_limit(200_000)
.run()
Expand All @@ -401,9 +425,9 @@ async fn oom_with_tracked_consumer_pool() {
path.to_string_lossy()
))
.with_expected_errors(vec![
"Failed to allocate additional",
"for ParquetSink(ArrowColumnWriter)",
"Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter)"
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter) with top memory consumers (across reservations) as:
ParquetSink(ArrowColumnWriter)#ID(can spill: false) consumed x KB, peak x KB.
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter) with x KB already allocated for this reservation - x KB remain available for the total pool"
])
.with_memory_pool(Arc::new(
TrackConsumersPool::new(
Expand All @@ -420,8 +444,10 @@ async fn oom_grouped_hash_aggregate() {
TestCase::new()
.with_query("SELECT COUNT(*), SUM(request_bytes) FROM t GROUP BY host")
.with_expected_errors(vec![
"Failed to allocate additional",
"GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))",
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with top memory consumers (across reservations) as:
GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B,
GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B.
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with x B already allocated for this reservation - x B remain available for the total pool",
])
.with_memory_limit(1_000)
.run()
Expand Down Expand Up @@ -885,15 +911,30 @@ impl TestCase {
"Unexpected failure when running, expected success but got: {e}"
)
} else {
let err_msg = normalize_oom_errors(&e.to_string());
for error_substring in expected_errors {
assert_contains!(e.to_string(), error_substring);
assert_contains!(&err_msg, error_substring);
}
}
}
}
}
}

fn normalize_oom_errors(err: &str) -> String {
let re = regex::Regex::new(r"\#\d+\(can spill:").unwrap();
let mut err = re.replace_all(err, "#ID(can spill:").to_string();

let re = regex::Regex::new(r"\d+\.\d+ KB").unwrap();
err = re.replace_all(&err, "x KB").to_string();

let re = regex::Regex::new(r"\d+\.\d+ MB").unwrap();
err = re.replace_all(&err, "x MB").to_string();

let re = regex::Regex::new(r"\d+\.\d+ B").unwrap();
re.replace_all(&err, "x B").to_string()
}

/// 50 byte memory limit
const MEMORY_FRACTION: f64 = 0.95;

Expand Down
5 changes: 3 additions & 2 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1268,8 +1268,9 @@ impl FileSink for ParquetSink {
parquet_props.clone(),
)
.await?;
let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
.register(context.memory_pool());
let mut reservation =
MemoryConsumer::new(format!("ParquetSink[path={path}]"))
.register(context.memory_pool());
file_write_tasks.spawn(async move {
while let Some(batch) = rx.recv().await {
writer.write(&batch).await?;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ impl AggregateStream {
};
let accumulators = create_accumulators(&agg.aggr_expr)?;

let reservation = MemoryConsumer::new(format!("AggregateStream[{partition}]"))
.register(context.memory_pool());
let reservation =
MemoryConsumer::new(format!("AggregateStream[partition={partition}]"))
.register(context.memory_pool());

let inner = AggregateStreamInner {
schema: Arc::clone(&agg.schema),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ impl ExecutionPlan for HashJoinExec {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
MemoryConsumer::new(format!("HashJoinInput[partition={partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
Expand Down Expand Up @@ -4381,13 +4381,13 @@ mod tests {
// Asserting that stream-level reservation attempting to overallocate
assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed for HashJoinInput[1] with top memory consumers (across reservations) as:\n HashJoinInput[1]"
"Resources exhausted: Additional allocation failed for HashJoinInput[partition=1] with top memory consumers (across reservations) as:\n HashJoinInput[partition=1]"

);

assert_contains!(
err.to_string(),
"Failed to allocate additional 120.0 B for HashJoinInput[1]"
"Failed to allocate additional 120.0 B for HashJoinInput[partition=1]"
);
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ impl ExecutionPlan for NestedLoopJoinExec {

// Initialization reservation for load of inner table
let load_reservation =
MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]"))
MemoryConsumer::new(format!("NestedLoopJoinLoad[partition={partition}]"))
.register(context.memory_pool());

let build_side_data = self.build_side_data.try_once(|| {
Expand Down Expand Up @@ -2475,7 +2475,7 @@ pub(crate) mod tests {

assert_contains!(
err.to_string(),
"Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[0]"
"Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[partition=0]"
);
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,9 @@ impl ExecutionPlan for SortMergeJoinExec {
let batch_size = context.session_config().batch_size();

// create memory reservation
let reservation = MemoryConsumer::new(format!("SMJStream[{partition}]"))
.register(context.memory_pool());
let reservation =
MemoryConsumer::new(format!("SMJStream[partition={partition}]"))
.register(context.memory_pool());

// create join stream
Ok(Box::pin(SortMergeJoinStream::try_new(
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1799,7 +1799,7 @@ async fn overallocation_single_batch_no_spill() -> Result<()> {
let err = common::collect(stream).await.unwrap_err();

assert_contains!(err.to_string(), "Failed to allocate additional");
assert_contains!(err.to_string(), "SMJStream[0]");
assert_contains!(err.to_string(), "SMJStream[partition=0]");
assert_contains!(err.to_string(), "Disk spilling disabled");
assert!(join.metrics().is_some());
assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
Expand Down Expand Up @@ -1879,7 +1879,7 @@ async fn overallocation_multi_batch_no_spill() -> Result<()> {
let err = common::collect(stream).await.unwrap_err();

assert_contains!(err.to_string(), "Failed to allocate additional");
assert_contains!(err.to_string(), "SMJStream[0]");
assert_contains!(err.to_string(), "SMJStream[partition=0]");
assert_contains!(err.to_string(), "Disk spilling disabled");
assert!(join.metrics().is_some());
assert_eq!(join.metrics().unwrap().spill_count(), Some(0));
Expand Down
Loading