Skip to content

Commit 6c34c7a

Browse files
authored
fix: native shuffle now reports spill metrics correctly (apache#3197)
1 parent 4bd664e commit 6c34c7a

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ impl MultiPartitionShuffleRepartitioner {
700700
}
701701

702702
fn spill(&mut self) -> Result<()> {
703-
log::debug!(
703+
log::info!(
704704
"ShuffleRepartitioner spilling shuffle data of {} to disk while inserting ({} time(s) so far)",
705705
self.used(),
706706
self.spill_count()

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,14 @@ class CometNativeShuffleWriter[K, V](
132132
metricsReporter.incRecordsWritten(metricsOutputRows.value)
133133
metricsReporter.incWriteTime(metricsWriteTime.value)
134134

135+
// Report spill metrics to Spark's task metrics so they appear in
136+
// Spark UI task summaries (not just SQL metrics)
137+
val spilledBytes = nativeSQLMetrics.get("spilled_bytes").map(_.value).getOrElse(0L)
138+
if (spilledBytes > 0) {
139+
context.taskMetrics().incMemoryBytesSpilled(spilledBytes)
140+
context.taskMetrics().incDiskBytesSpilled(spilledBytes)
141+
}
142+
135143
// commit
136144
shuffleBlockResolver.writeMetadataFileAndCommit(
137145
shuffleId,

0 commit comments

Comments
 (0)