Skip to content

Commit 0b72094

Browse files
committed
test: update OOM integration test to use full stdout
1 parent fe299a4 commit 0b72094

File tree

1 file changed

+70
-29
lines changed
  • datafusion/core/tests/memory_limit

1 file changed

+70
-29
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 70 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,10 @@ async fn group_by_none() {
8585
TestCase::new()
8686
.with_query("select median(request_bytes) from t")
8787
.with_expected_errors(vec![
88-
"Resources exhausted: Additional allocation failed",
89-
"with top memory consumers (across reservations) as:\n AggregateStream",
88+
"Resources exhausted: Additional allocation failed for AggregateStream[partition=0] with top memory consumers (across reservations) as:
89+
AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B,
90+
AggregateStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
91+
Error: Failed to allocate additional x B for AggregateStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
9092
])
9193
.with_memory_limit(2_000)
9294
.run()
@@ -98,7 +100,10 @@ async fn group_by_row_hash() {
98100
TestCase::new()
99101
.with_query("select count(*) from t GROUP BY response_bytes")
100102
.with_expected_errors(vec![
101-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
103+
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as:
104+
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B,
105+
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B.
106+
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool",
102107
])
103108
.with_memory_limit(2_000)
104109
.run()
@@ -111,7 +116,10 @@ async fn group_by_hash() {
111116
// group by dict column
112117
.with_query("select count(*) from t GROUP BY service, host, pod, container")
113118
.with_expected_errors(vec![
114-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n GroupedHashAggregateStream"
119+
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1)) with top memory consumers (across reservations) as:
120+
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B,
121+
GroupedHashAggregateStream[0] (count(1))#ID(can spill: true) consumed x B, peak x B.
122+
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1)) with x B already allocated for this reservation - x B remain available for the total pool"
115123
])
116124
.with_memory_limit(1_000)
117125
.run()
@@ -124,8 +132,9 @@ async fn join_by_key_multiple_partitions() {
124132
TestCase::new()
125133
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
126134
.with_expected_errors(vec![
127-
"Resources exhausted: Additional allocation failed",
128-
"with top memory consumers (across reservations) as:\n HashJoinInput",
135+
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:
136+
HashJoinInput#ID(can spill: false) consumed x B, peak x B.
137+
Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool",
129138
])
130139
.with_memory_limit(1_000)
131140
.with_config(config)
@@ -139,8 +148,9 @@ async fn join_by_key_single_partition() {
139148
TestCase::new()
140149
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service")
141150
.with_expected_errors(vec![
142-
"Resources exhausted: Additional allocation failed",
143-
"with top memory consumers (across reservations) as:\n HashJoinInput",
151+
"Resources exhausted: Additional allocation failed for HashJoinInput with top memory consumers (across reservations) as:
152+
HashJoinInput#ID(can spill: false) consumed x B, peak x B.
153+
Error: Failed to allocate additional x KB for HashJoinInput with x B already allocated for this reservation - x B remain available for the total pool",
144154
])
145155
.with_memory_limit(1_000)
146156
.with_config(config)
@@ -153,7 +163,9 @@ async fn join_by_expression() {
153163
TestCase::new()
154164
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service")
155165
.with_expected_errors(vec![
156-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n NestedLoopJoinLoad[partition=0]",
166+
"Resources exhausted: Additional allocation failed for NestedLoopJoinLoad[partition=0] with top memory consumers (across reservations) as:
167+
NestedLoopJoinLoad[partition=0]#ID(can spill: false) consumed x B, peak x B.
168+
Error: Failed to allocate additional x KB for NestedLoopJoinLoad[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
157169
])
158170
.with_memory_limit(1_000)
159171
.run()
@@ -165,8 +177,9 @@ async fn cross_join() {
165177
TestCase::new()
166178
.with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
167179
.with_expected_errors(vec![
168-
"Resources exhausted: Additional allocation failed",
169-
"with top memory consumers (across reservations) as:\n CrossJoinExec",
180+
"Resources exhausted: Additional allocation failed for CrossJoinExec with top memory consumers (across reservations) as:
181+
CrossJoinExec#ID(can spill: false) consumed x B, peak x B.
182+
Error: Failed to allocate additional x KB for CrossJoinExec with x B already allocated for this reservation - x B remain available for the total pool",
170183
])
171184
.with_memory_limit(1_000)
172185
.run()
@@ -185,9 +198,9 @@ async fn sort_merge_join_no_spill() {
185198
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
186199
)
187200
.with_expected_errors(vec![
188-
"Failed to allocate additional",
189-
"SMJStream",
190-
"Disk spilling disabled",
201+
"Execution error: Additional allocation failed for SMJStream[partition=0] with top memory consumers (across reservations) as:
202+
SMJStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
203+
Error: Failed to allocate additional x KB for SMJStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool. Disk spilling disabled.",
191204
])
192205
.with_memory_limit(1_000)
193206
.with_config(config)
@@ -222,7 +235,9 @@ async fn symmetric_hash_join() {
222235
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
223236
)
224237
.with_expected_errors(vec![
225-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SymmetricHashJoinStream",
238+
"Resources exhausted: Additional allocation failed for SymmetricHashJoinStream[partition=0] with top memory consumers (across reservations) as:
239+
SymmetricHashJoinStream[partition=0]#ID(can spill: false) consumed x B, peak x B.
240+
Error: Failed to allocate additional x KB for SymmetricHashJoinStream[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
226241
])
227242
.with_memory_limit(1_000)
228243
.with_scenario(Scenario::AccessLogStreaming)
@@ -240,7 +255,9 @@ async fn sort_preserving_merge() {
240255
// so only a merge is needed
241256
.with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10")
242257
.with_expected_errors(vec![
243-
"Resources exhausted: Additional allocation failed", "with top memory consumers (across reservations) as:\n SortPreservingMergeExec",
258+
"Resources exhausted: Additional allocation failed for SortPreservingMergeExec[partition=0] with top memory consumers (across reservations) as:
259+
SortPreservingMergeExec[partition=0]#ID(can spill: false) consumed x KB, peak x KB.
260+
Error: Failed to allocate additional x KB for SortPreservingMergeExec[partition=0] with x KB already allocated for this reservation - x B remain available for the total pool",
244261
])
245262
// provide insufficient memory to merge
246263
.with_memory_limit(partition_size / 2)
@@ -319,9 +336,10 @@ async fn sort_spill_reservation() {
319336

320337
test.clone()
321338
.with_expected_errors(vec![
322-
"Resources exhausted: Additional allocation failed",
323-
"with top memory consumers (across reservations) as:",
324-
"B for ExternalSorterMerge",
339+
"Resources exhausted: Additional allocation failed for ExternalSorterMerge[partition=0] with top memory consumers (across reservations) as:
340+
ExternalSorter[partition=0]#ID(can spill: true) consumed x KB, peak x KB,
341+
ExternalSorterMerge[partition=0]#ID(can spill: false) consumed x KB, peak x KB.
342+
Error: Failed to allocate additional x KB for ExternalSorterMerge[partition=0] with x B already allocated for this reservation - x B remain available for the total pool",
325343
])
326344
.with_config(config)
327345
.run()
@@ -350,8 +368,9 @@ async fn oom_recursive_cte() {
350368
SELECT * FROM nodes;",
351369
)
352370
.with_expected_errors(vec![
353-
"Resources exhausted: Additional allocation failed",
354-
"with top memory consumers (across reservations) as:\n RecursiveQuery",
371+
"Resources exhausted: Additional allocation failed for RecursiveQuery with top memory consumers (across reservations) as:
372+
RecursiveQuery#ID(can spill: false) consumed x B, peak x B.
373+
Error: Failed to allocate additional x KB for RecursiveQuery with x B already allocated for this reservation - x B remain available for the total pool",
355374
])
356375
.with_memory_limit(2_000)
357376
.run()
@@ -374,8 +393,13 @@ async fn oom_parquet_sink() {
374393
path.to_string_lossy()
375394
))
376395
.with_expected_errors(vec![
377-
"Failed to allocate additional",
378-
"ParquetSink(ArrowColumnWriter(col=1))",
396+
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=1)) with top memory consumers (across reservations) as:
397+
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB,
398+
ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB,
399+
ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB,
400+
ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB,
401+
ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB.
402+
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=1)) with x KB already allocated for this reservation - x KB remain available for the total pool",
379403
])
380404
.with_memory_limit(200_000)
381405
.run()
@@ -401,9 +425,9 @@ async fn oom_with_tracked_consumer_pool() {
401425
path.to_string_lossy()
402426
))
403427
.with_expected_errors(vec![
404-
"Failed to allocate additional",
405-
"for ParquetSink(ArrowColumnWriter(col=2))",
406-
"Additional allocation failed", "with top memory consumers (across reservations) as:\n ParquetSink(ArrowColumnWriter(col=8))"
428+
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=2)) with top memory consumers (across reservations) as:
429+
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB.
430+
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=2)) with x KB already allocated for this reservation - x KB remain available for the total pool"
407431
])
408432
.with_memory_pool(Arc::new(
409433
TrackConsumersPool::new(
@@ -420,8 +444,10 @@ async fn oom_grouped_hash_aggregate() {
420444
TestCase::new()
421445
.with_query("SELECT COUNT(*), SUM(request_bytes) FROM t GROUP BY host")
422446
.with_expected_errors(vec![
423-
"Failed to allocate additional",
424-
"GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))",
447+
"Resources exhausted: Additional allocation failed for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with top memory consumers (across reservations) as:
448+
GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B,
449+
GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes))#ID(can spill: true) consumed x B, peak x B.
450+
Error: Failed to allocate additional x KB for GroupedHashAggregateStream[0] (count(1), sum(t.request_bytes)) with x B already allocated for this reservation - x B remain available for the total pool",
425451
])
426452
.with_memory_limit(1_000)
427453
.run()
@@ -885,15 +911,30 @@ impl TestCase {
885911
"Unexpected failure when running, expected success but got: {e}"
886912
)
887913
} else {
914+
let err_msg = normalize_oom_errors(&e.to_string());
888915
for error_substring in expected_errors {
889-
assert_contains!(e.to_string(), error_substring);
916+
assert_contains!(&err_msg, error_substring);
890917
}
891918
}
892919
}
893920
}
894921
}
895922
}
896923

924+
fn normalize_oom_errors(err: &str) -> String {
925+
let re = regex::Regex::new(r"\#\d+\(can spill:").unwrap();
926+
let mut err = re.replace_all(err, "#ID(can spill:").to_string();
927+
928+
let re = regex::Regex::new(r"\d+\.\d+ KB").unwrap();
929+
err = re.replace_all(&err, "x KB").to_string();
930+
931+
let re = regex::Regex::new(r"\d+\.\d+ MB").unwrap();
932+
err = re.replace_all(&err, "x MB").to_string();
933+
934+
let re = regex::Regex::new(r"\d+\.\d+ B").unwrap();
935+
re.replace_all(&err, "x B").to_string()
936+
}
937+
897938
/// 50 byte memory limit
898939
const MEMORY_FRACTION: f64 = 0.95;
899940

0 commit comments

Comments
 (0)