diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 44fc06d16d..c4dcddc54a 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -275,9 +275,6 @@ struct ShuffleRepartitionerMetrics { /// Time to perform repartitioning repart_time: Time, - /// Time interacting with memory pool - mempool_time: Time, - /// Time encoding batches to IPC format encode_time: Time, @@ -302,7 +299,6 @@ impl ShuffleRepartitionerMetrics { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), - mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition), encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), @@ -648,13 +644,7 @@ impl MultiPartitionShuffleRepartitioner { mem_growth += after_size.saturating_sub(before_size); } - let grow_result = { - let mut timer = self.metrics.mempool_time.timer(); - let result = self.reservation.try_grow(mem_growth); - timer.stop(); - result - }; - if grow_result.is_err() { + if self.reservation.try_grow(mem_growth).is_err() { self.spill()?; } @@ -737,9 +727,7 @@ impl MultiPartitionShuffleRepartitioner { )?; } - let mut timer = self.metrics.mempool_time.timer(); self.reservation.free(); - timer.stop(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(spilled_bytes); Ok(()) @@ -952,8 +940,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size { let concatenated_batch = self.concat_buffered_batches()?; - let write_start_time = Instant::now(); - // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { self.output_data_writer.write( @@ -974,10 +960,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { // Add the new batch to the buffer self.add_buffered_batch(batch); } - - self.metrics - .write_time - .add_duration(write_start_time.elapsed()); } else { self.add_buffered_batch(batch); } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala index 41c5e9ea76..8c75df1d45 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometMetricNode.scala @@ -247,7 +247,6 @@ object CometMetricNode { def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = { Map( "elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"), - "mempool_time" -> SQLMetrics.createNanoTimingMetric(sc, "memory pool time"), "repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"), "encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"), "decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"), diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index cb2041983e..9fd1018544 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -79,7 +79,6 @@ class CometNativeShuffleWriter[K, V]( "elapsed_compute", "encode_time", "repart_time", - "mempool_time", "input_batches", "spill_count", "spilled_bytes")