Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,6 @@ struct ShuffleRepartitionerMetrics {
/// Time to perform repartitioning
repart_time: Time,

/// Time interacting with memory pool
mempool_time: Time,

/// Time encoding batches to IPC format
encode_time: Time,

Expand All @@ -302,7 +299,6 @@ impl ShuffleRepartitionerMetrics {
Self {
baseline: BaselineMetrics::new(metrics, partition),
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition),
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
Expand Down Expand Up @@ -648,13 +644,7 @@ impl MultiPartitionShuffleRepartitioner {
mem_growth += after_size.saturating_sub(before_size);
}

let grow_result = {
let mut timer = self.metrics.mempool_time.timer();
let result = self.reservation.try_grow(mem_growth);
timer.stop();
result
};
if grow_result.is_err() {
if self.reservation.try_grow(mem_growth).is_err() {
self.spill()?;
}

Expand Down Expand Up @@ -737,9 +727,7 @@ impl MultiPartitionShuffleRepartitioner {
)?;
}

let mut timer = self.metrics.mempool_time.timer();
self.reservation.free();
timer.stop();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(spilled_bytes);
Ok(())
Expand Down Expand Up @@ -952,8 +940,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size {
let concatenated_batch = self.concat_buffered_batches()?;

let write_start_time = Instant::now();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this one is actually a bug fix rather than an optimiation. The timer was already being passed into functions that perform the writes.


// Write the concatenated buffered batch
if let Some(batch) = concatenated_batch {
self.output_data_writer.write(
Expand All @@ -974,10 +960,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
// Add the new batch to the buffer
self.add_buffered_batch(batch);
}

self.metrics
.write_time
.add_duration(write_start_time.elapsed());
} else {
self.add_buffered_batch(batch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ object CometMetricNode {
def shuffleMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(sc, "native shuffle writer time"),
"mempool_time" -> SQLMetrics.createNanoTimingMetric(sc, "memory pool time"),
"repart_time" -> SQLMetrics.createNanoTimingMetric(sc, "repartition time"),
"encode_time" -> SQLMetrics.createNanoTimingMetric(sc, "encoding and compression time"),
"decode_time" -> SQLMetrics.createNanoTimingMetric(sc, "decoding and decompression time"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class CometNativeShuffleWriter[K, V](
"elapsed_compute",
"encode_time",
"repart_time",
"mempool_time",
"input_batches",
"spill_count",
"spilled_bytes")
Expand Down
Loading