Skip to content

Commit 1756692

Browse files
authored
feat: Track peak value in tracked consumer (apache#17327)
* feat: Track peak value in tracked consumer * fmt * fix test
1 parent 7cc54d1 commit 1756692

File tree

5 files changed

+45
-34
lines changed

5 files changed

+45
-34
lines changed

datafusion-cli/tests/cli_integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ fn test_cli_top_memory_consumers<'a>(
219219
settings.set_snapshot_suffix(snapshot_name);
220220

221221
settings.add_filter(
222-
r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B",
223-
"Consumer(can spill: bool) consumed XB",
222+
r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B, peak .*?B",
223+
"Consumer(can spill: bool) consumed XB, peak XB",
224224
);
225225
settings.add_filter(
226226
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",

datafusion-cli/tests/snapshots/[email protected]

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ exit_code: 1
1717
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
1818
caused by
1919
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
20-
Consumer(can spill: bool) consumed XB,
21-
Consumer(can spill: bool) consumed XB.
20+
Consumer(can spill: bool) consumed XB, peak XB,
21+
Consumer(can spill: bool) consumed XB, peak XB.
2222
Error: Failed to allocate
2323

2424
----- stderr -----

datafusion-cli/tests/snapshots/cli_top_memory_consumers@top3_default.snap

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ exit_code: 1
1515
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
1616
caused by
1717
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
18-
Consumer(can spill: bool) consumed XB,
19-
Consumer(can spill: bool) consumed XB,
20-
Consumer(can spill: bool) consumed XB.
18+
Consumer(can spill: bool) consumed XB, peak XB,
19+
Consumer(can spill: bool) consumed XB, peak XB,
20+
Consumer(can spill: bool) consumed XB, peak XB.
2121
Error: Failed to allocate
2222

2323
----- stderr -----

datafusion-examples/examples/memory_pool_tracking.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,11 @@ async fn automatic_usage_example() -> datafusion::error::Result<()> {
110110
Error: Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes
111111
caused by
112112
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
113-
ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 MB,
114-
ExternalSorterMerge[10]#147(can spill: false) consumed 10.0 MB,
115-
ExternalSorter[1]#93(can spill: true) consumed 69.0 KB,
116-
ExternalSorter[13]#155(can spill: true) consumed 67.6 KB,
117-
ExternalSorter[8]#140(can spill: true) consumed 67.2 KB.
113+
ExternalSorterMerge[3]#112(can spill: false) consumed 10.0 MB, peak 10.0 MB,
114+
ExternalSorterMerge[10]#147(can spill: false) consumed 10.0 MB, peak 10.0 MB,
115+
ExternalSorter[1]#93(can spill: true) consumed 69.0 KB, peak 69.0 KB,
116+
ExternalSorter[13]#155(can spill: true) consumed 67.6 KB, peak 67.6 KB,
117+
ExternalSorter[8]#140(can spill: true) consumed 67.2 KB, peak 67.2 KB.
118118
Error: Failed to allocate additional 10.0 MB for ExternalSorterMerge[0] with 0.0 B already allocated for this reservation - 7.1 MB remain available for the total pool
119119
*/
120120
}

datafusion/execution/src/memory_pool/pool.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ struct TrackedConsumer {
269269
name: String,
270270
can_spill: bool,
271271
reserved: AtomicUsize,
272+
peak: AtomicUsize,
272273
}
273274

274275
impl TrackedConsumer {
@@ -277,10 +278,16 @@ impl TrackedConsumer {
277278
self.reserved.load(Ordering::Relaxed)
278279
}
279280

281+
/// Return the peak value
282+
fn peak(&self) -> usize {
283+
self.peak.load(Ordering::Relaxed)
284+
}
285+
280286
/// Grows the tracked consumer's reserved size,
281287
/// should be called after the pool has successfully performed the grow().
282288
fn grow(&self, additional: usize) {
283289
self.reserved.fetch_add(additional, Ordering::Relaxed);
290+
self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
284291
}
285292

286293
/// Reduce the tracked consumer's reserved size,
@@ -379,6 +386,7 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
379386
*consumer_id,
380387
tracked_consumer.name.to_owned(),
381388
tracked_consumer.can_spill,
389+
tracked_consumer.peak(),
382390
),
383391
tracked_consumer.reserved(),
384392
)
@@ -388,10 +396,11 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
388396

389397
consumers[0..std::cmp::min(top, consumers.len())]
390398
.iter()
391-
.map(|((id, name, can_spill), size)| {
399+
.map(|((id, name, can_spill, peak), size)| {
392400
format!(
393-
" {name}#{id}(can spill: {can_spill}) consumed {}",
394-
human_readable_size(*size)
401+
" {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
402+
human_readable_size(*size),
403+
human_readable_size(*peak),
395404
)
396405
})
397406
.collect::<Vec<_>>()
@@ -411,6 +420,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
411420
name: consumer.name().to_string(),
412421
can_spill: consumer.can_spill(),
413422
reserved: Default::default(),
423+
peak: Default::default(),
414424
},
415425
);
416426

@@ -581,7 +591,8 @@ mod tests {
581591

582592
// set r1=50, using grow and shrink
583593
let mut r1 = MemoryConsumer::new("r1").register(&pool);
584-
r1.grow(70);
594+
r1.grow(50);
595+
r1.grow(20);
585596
r1.shrink(20);
586597

587598
// set r2=15 using try_grow
@@ -609,9 +620,9 @@ mod tests {
609620
let error = res.unwrap_err().strip_backtrace();
610621
assert_snapshot!(error, @r"
611622
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
612-
r1#[ID](can spill: false) consumed 50.0 B,
613-
r3#[ID](can spill: false) consumed 20.0 B,
614-
r2#[ID](can spill: false) consumed 15.0 B.
623+
r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
624+
r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
625+
r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
615626
Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
616627
");
617628
}
@@ -634,7 +645,7 @@ mod tests {
634645
let error = res.unwrap_err().strip_backtrace();
635646
assert_snapshot!(error, @r"
636647
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
637-
foo#[ID](can spill: false) consumed 0.0 B.
648+
foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
638649
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
639650
");
640651

@@ -651,8 +662,8 @@ mod tests {
651662
let error = res.unwrap_err().strip_backtrace();
652663
assert_snapshot!(error, @r"
653664
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
654-
foo#[ID](can spill: false) consumed 10.0 B,
655-
foo#[ID](can spill: false) consumed 0.0 B.
665+
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
666+
foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
656667
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
657668
");
658669

@@ -664,8 +675,8 @@ mod tests {
664675
let error = res.unwrap_err().strip_backtrace();
665676
assert_snapshot!(error, @r"
666677
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
667-
foo#[ID](can spill: false) consumed 20.0 B,
668-
foo#[ID](can spill: false) consumed 10.0 B.
678+
foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
679+
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
669680
Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
670681
");
671682

@@ -679,9 +690,9 @@ mod tests {
679690
let error = res.unwrap_err().strip_backtrace();
680691
assert_snapshot!(error, @r"
681692
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
682-
foo#[ID](can spill: false) consumed 20.0 B,
683-
foo#[ID](can spill: false) consumed 10.0 B,
684-
foo#[ID](can spill: true) consumed 0.0 B.
693+
foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
694+
foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
695+
foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
685696
Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
686697
");
687698
}
@@ -703,8 +714,8 @@ mod tests {
703714
let error = res.unwrap_err().strip_backtrace();
704715
allow_duplicates!(assert_snapshot!(error, @r"
705716
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
706-
r1#[ID](can spill: false) consumed 20.0 B,
707-
r0#[ID](can spill: false) consumed 10.0 B.
717+
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
718+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
708719
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
709720
"));
710721

@@ -716,7 +727,7 @@ mod tests {
716727
let error = res.unwrap_err().strip_backtrace();
717728
allow_duplicates!(assert_snapshot!(error, @r"
718729
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
719-
r0#[ID](can spill: false) consumed 10.0 B.
730+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
720731
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
721732
"));
722733

@@ -727,7 +738,7 @@ mod tests {
727738
let error = res.unwrap_err().strip_backtrace();
728739
allow_duplicates!(assert_snapshot!(error, @r"
729740
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
730-
r0#[ID](can spill: false) consumed 10.0 B.
741+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
731742
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
732743
"));
733744

@@ -738,7 +749,7 @@ mod tests {
738749
let error = res.unwrap_err().strip_backtrace();
739750
allow_duplicates!(assert_snapshot!(error, @r"
740751
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
741-
r0#[ID](can spill: false) consumed 10.0 B.
752+
r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
742753
Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
743754
"));
744755
}
@@ -785,8 +796,8 @@ mod tests {
785796
// Test: can get runtime metrics, even without an error thrown
786797
let res = downcasted.report_top(2);
787798
assert_snapshot!(res, @r"
788-
r3#[ID](can spill: false) consumed 45.0 B,
789-
r1#[ID](can spill: false) consumed 20.0 B.
799+
r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
800+
r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
790801
");
791802
}
792803
}

0 commit comments

Comments
 (0)