Skip to content

Commit d703964

Browse files
committed
feat: turn on stacktrace (if available) in error reporting
1 parent 0b72094 commit d703964

File tree

2 files changed

+46
-7
lines changed
  • datafusion

2 files changed

+46
-7
lines changed

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -394,11 +394,36 @@ async fn oom_parquet_sink() {
394394
))
395395
.with_expected_errors(vec![
396396
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=1)) with top memory consumers (across reservations) as:
397-
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB,
398-
ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB,
399-
ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB,
400-
ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB,
401-
ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB.
397+
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB:
398+
stack backtrace:
399+
0: ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB
400+
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
401+
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
402+
,
403+
ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB:
404+
stack backtrace:
405+
0: ParquetSink(ArrowColumnWriter(col=14))#ID(can spill: false) consumed x KB, peak x KB
406+
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
407+
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
408+
,
409+
ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB:
410+
stack backtrace:
411+
0: ParquetSink(ArrowColumnWriter(col=0))#ID(can spill: false) consumed x KB, peak x KB
412+
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
413+
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
414+
,
415+
ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB:
416+
stack backtrace:
417+
0: ParquetSink(ArrowColumnWriter(col=2))#ID(can spill: false) consumed x KB, peak x KB
418+
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
419+
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
420+
,
421+
ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB:
422+
stack backtrace:
423+
0: ParquetSink(ArrowColumnWriter(col=1))#ID(can spill: false) consumed x KB, peak x KB
424+
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
425+
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
426+
.
402427
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=1)) with x KB already allocated for this reservation - x KB remain available for the total pool",
403428
])
404429
.with_memory_limit(200_000)
@@ -426,7 +451,12 @@ async fn oom_with_tracked_consumer_pool() {
426451
))
427452
.with_expected_errors(vec![
428453
"Resources exhausted: Additional allocation failed for ParquetSink(ArrowColumnWriter(col=2)) with top memory consumers (across reservations) as:
429-
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB.
454+
ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB:
455+
stack backtrace:
456+
0: ParquetSink(ArrowColumnWriter(col=8))#ID(can spill: false) consumed x KB, peak x KB
457+
1: ParquetSink(ParallelColumnWriters)#ID(can spill: false) consumed x B, peak x B
458+
2: ParquetSink(ParallelWriter)#ID(can spill: false) consumed x B, peak x B
459+
.
430460
Error: Failed to allocate additional x KB for ParquetSink(ArrowColumnWriter(col=2)) with x KB already allocated for this reservation - x KB remain available for the total pool"
431461
])
432462
.with_memory_pool(Arc::new(

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,9 +479,18 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
479479
.collect::<Vec<_>>();
480480
consumers.sort_by(|a, b| b.reserved.cmp(&a.reserved)); // inverse ordering
481481

482+
let consumers_map: HashMap<usize, ReportedConsumer> = consumers
483+
.iter()
484+
.map(|consumer| (consumer.consumer_id, consumer.clone()))
485+
.collect();
486+
482487
consumers[0..std::cmp::min(top, consumers.len())]
483488
.iter()
484-
.map(|reported_consumer| format!(" {reported_consumer}"))
489+
.map(|reported_consumer| {
490+
let trace =
491+
build_consumer_stack_trace(reported_consumer.clone(), &consumers_map);
492+
format!(" {trace}")
493+
})
485494
.collect::<Vec<_>>()
486495
.join(",\n")
487496
+ "."

0 commit comments

Comments
 (0)