Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,17 @@ object CometConf extends ShimCometConf {
.intConf
.createWithDefault(8192)

val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize")
.category(CATEGORY_SHUFFLE)
.doc("Size of the write buffer in bytes used by the native shuffle writer when writing " +
"shuffle data to disk. Larger values may improve write performance by reducing " +
"the number of system calls, but will use more memory. " +
"The default is 1MB which provides a good balance between performance and memory usage.")
.bytesConf(ByteUnit.MiB)
.checkValue(v => v > 0, "Write buffer size must be positive")
.createWithDefault(1)

val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
.category(CATEGORY_SHUFFLE)
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
| `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 |
| `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true |
| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b |
| `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true |
| `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true |
| `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
Expand Down
1 change: 1 addition & 0 deletions native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ fn create_shuffle_writer_exec(
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
false,
1024 * 1024,
)
.unwrap()
}
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,13 +1244,15 @@ impl PhysicalPlanner {
))),
}?;

let write_buffer_size = writer.write_buffer_size as usize;
let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
Arc::clone(&child.native_plan),
partitioning,
codec,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
writer.tracing_enabled,
write_buffer_size,
)?);

Ok((
Expand Down
39 changes: 32 additions & 7 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,21 @@ pub struct ShuffleWriterExec {
/// The compression codec to use when compressing shuffle blocks
codec: CompressionCodec,
tracing_enabled: bool,
/// Size of the write buffer in bytes
write_buffer_size: usize,
}

impl ShuffleWriterExec {
/// Create a new ShuffleWriterExec
#[allow(clippy::too_many_arguments)]
pub fn try_new(
input: Arc<dyn ExecutionPlan>,
partitioning: CometPartitioning,
codec: CompressionCodec,
output_data_file: String,
output_index_file: String,
tracing_enabled: bool,
write_buffer_size: usize,
) -> Result<Self> {
let cache = PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&input.schema())),
Expand All @@ -106,6 +110,7 @@ impl ShuffleWriterExec {
cache,
codec,
tracing_enabled,
write_buffer_size,
})
}
}
Expand Down Expand Up @@ -169,6 +174,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.output_data_file.clone(),
self.output_index_file.clone(),
self.tracing_enabled,
self.write_buffer_size,
)?)),
_ => panic!("ShuffleWriterExec wrong number of children"),
}
Expand All @@ -195,6 +201,7 @@ impl ExecutionPlan for ShuffleWriterExec {
context,
self.codec.clone(),
self.tracing_enabled,
self.write_buffer_size,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand All @@ -214,6 +221,7 @@ async fn external_shuffle(
context: Arc<TaskContext>,
codec: CompressionCodec,
tracing_enabled: bool,
write_buffer_size: usize,
) -> Result<SendableRecordBatchStream> {
with_trace_async("external_shuffle", tracing_enabled, || async {
let schema = input.schema();
Expand All @@ -227,6 +235,7 @@ async fn external_shuffle(
metrics,
context.session_config().batch_size(),
codec,
write_buffer_size,
)?)
}
_ => Box::new(MultiPartitionShuffleRepartitioner::try_new(
Expand All @@ -240,6 +249,7 @@ async fn external_shuffle(
context.session_config().batch_size(),
codec,
tracing_enabled,
write_buffer_size,
)?),
};

Expand Down Expand Up @@ -331,6 +341,8 @@ struct MultiPartitionShuffleRepartitioner {
/// Reservation for repartitioning
reservation: MemoryReservation,
tracing_enabled: bool,
/// Size of the write buffer in bytes
write_buffer_size: usize,
}

#[derive(Default)]
Expand Down Expand Up @@ -362,6 +374,7 @@ impl MultiPartitionShuffleRepartitioner {
batch_size: usize,
codec: CompressionCodec,
tracing_enabled: bool,
write_buffer_size: usize,
) -> Result<Self> {
let num_output_partitions = partitioning.partition_count();
assert_ne!(
Expand Down Expand Up @@ -407,6 +420,7 @@ impl MultiPartitionShuffleRepartitioner {
batch_size,
reservation,
tracing_enabled,
write_buffer_size,
})
}

Expand Down Expand Up @@ -654,8 +668,10 @@ impl MultiPartitionShuffleRepartitioner {
output_data: &mut BufWriter<File>,
encode_time: &Time,
write_time: &Time,
write_buffer_size: usize,
) -> Result<()> {
let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data);
let mut buf_batch_writer =
BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size);
for batch in partition_iter {
let batch = batch?;
buf_batch_writer.write(&batch, encode_time, write_time)?;
Expand Down Expand Up @@ -714,7 +730,12 @@ impl MultiPartitionShuffleRepartitioner {
for partition_id in 0..num_output_partitions {
let partition_writer = &mut self.partition_writers[partition_id];
let mut iter = partitioned_batches.produce(partition_id);
spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?;
spilled_bytes += partition_writer.spill(
&mut iter,
&self.runtime,
&self.metrics,
self.write_buffer_size,
)?;
}

let mut timer = self.metrics.mempool_time.timer();
Expand Down Expand Up @@ -795,6 +816,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
&mut output_data,
&self.metrics.encode_time,
&self.metrics.write_time,
self.write_buffer_size,
)?;
}

Expand Down Expand Up @@ -862,6 +884,7 @@ impl SinglePartitionShufflePartitioner {
metrics: ShuffleRepartitionerMetrics,
batch_size: usize,
codec: CompressionCodec,
write_buffer_size: usize,
) -> Result<Self> {
let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;

Expand All @@ -872,7 +895,8 @@ impl SinglePartitionShufflePartitioner {
.open(output_data_path)
.map_err(to_df_err)?;

let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file);
let output_data_writer =
BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size);

Ok(Self {
output_data_writer,
Expand Down Expand Up @@ -1131,6 +1155,7 @@ impl PartitionWriter {
iter: &mut PartitionedBatchIterator,
runtime: &RuntimeEnv,
metrics: &ShuffleRepartitionerMetrics,
write_buffer_size: usize,
) -> Result<usize> {
if let Some(batch) = iter.next() {
self.ensure_spill_file_created(runtime)?;
Expand All @@ -1139,6 +1164,7 @@ impl PartitionWriter {
let mut buf_batch_writer = BufBatchWriter::new(
&mut self.shuffle_block_writer,
&mut self.spill_file.as_mut().unwrap().file,
write_buffer_size,
);
let mut bytes_written =
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
Expand Down Expand Up @@ -1194,10 +1220,7 @@ struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
}

impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
fn new(shuffle_block_writer: S, writer: W) -> Self {
// 1MB should be good enough to avoid frequent system calls,
// and also won't cause too much memory usage
let buffer_max_size = 1024 * 1024;
fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self {
Self {
shuffle_block_writer,
writer,
Expand Down Expand Up @@ -1343,6 +1366,7 @@ mod test {
1024,
CompressionCodec::Lz4Frame,
false,
1024 * 1024, // write_buffer_size: 1MB default
)
.unwrap();

Expand Down Expand Up @@ -1439,6 +1463,7 @@ mod test {
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
false,
1024 * 1024, // write_buffer_size: 1MB default
)
.unwrap();

Expand Down
3 changes: 3 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ message ShuffleWriter {
CompressionCodec codec = 5;
int32 compression_level = 6;
bool tracing_enabled = 7;
// Size of the write buffer in bytes used when writing shuffle data to disk.
// Larger values may improve write performance but use more memory.
int32 write_buffer_size = 8;
}

message ParquetWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class CometNativeShuffleWriter[K, V](
}
shuffleWriterBuilder.setCompressionLevel(
CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get)
shuffleWriterBuilder.setWriteBufferSize(
CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt)

outputPartitioning match {
case p if isSinglePartitioning(p) =>
Expand Down
Loading