Skip to content

Commit b9fd050

Browse files
authored
chore: Reduce timer overhead in native shuffle writer (#2941)
1 parent 60c0f1e commit b9fd050

File tree

3 files changed

+1
-21
lines changed

3 files changed

+1
-21
lines changed

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,6 @@ struct ShuffleRepartitionerMetrics {
275275
/// Time to perform repartitioning
276276
repart_time: Time,
277277

278-
/// Time interacting with memory pool
279-
mempool_time: Time,
280-
281278
/// Time encoding batches to IPC format
282279
encode_time: Time,
283280

@@ -302,7 +299,6 @@ impl ShuffleRepartitionerMetrics {
302299
Self {
303300
baseline: BaselineMetrics::new(metrics, partition),
304301
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
305-
mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition),
306302
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
307303
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
308304
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
@@ -648,13 +644,7 @@ impl MultiPartitionShuffleRepartitioner {
648644
mem_growth += after_size.saturating_sub(before_size);
649645
}
650646

651-
let grow_result = {
652-
let mut timer = self.metrics.mempool_time.timer();
653-
let result = self.reservation.try_grow(mem_growth);
654-
timer.stop();
655-
result
656-
};
657-
if grow_result.is_err() {
647+
if self.reservation.try_grow(mem_growth).is_err() {
658648
self.spill()?;
659649
}
660650

@@ -737,9 +727,7 @@ impl MultiPartitionShuffleRepartitioner {
737727
)?;
738728
}
739729

740-
let mut timer = self.metrics.mempool_time.timer();
741730
self.reservation.free();
742-
timer.stop();
743731
self.metrics.spill_count.add(1);
744732
self.metrics.spilled_bytes.add(spilled_bytes);
745733
Ok(())
@@ -952,8 +940,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
952940
if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size {
953941
let concatenated_batch = self.concat_buffered_batches()?;
954942

955-
let write_start_time = Instant::now();
956-
957943
// Write the concatenated buffered batch
958944
if let Some(batch) = concatenated_batch {
959945
self.output_data_writer.write(
@@ -974,10 +960,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
974960
// Add the new batch to the buffer
975961
self.add_buffered_batch(batch);
976962
}
977-
978-
self.metrics
979-
.write_time
980-
.add_duration(write_start_time.elapsed());
981963
} else {
982964
self.add_buffered_batch(batch);
983965
}

spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ object CometMetricNode {
247247
def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = {
248248
Map(
249249
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"),
250-
"mempool_time" -> SQLMetrics.createNanoTimingMetric(sc, "memory pool time"),
251250
"repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"),
252251
"encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"),
253252
"decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"),

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ class CometNativeShuffleWriter[K, V](
7979
"elapsed_compute",
8080
"encode_time",
8181
"repart_time",
82-
"mempool_time",
8382
"input_batches",
8483
"spill_count",
8584
"spilled_bytes")

0 commit comments

Comments
 (0)