Skip to content

Commit 5ec12d4

Browse files
authored
feat: Make shuffle writer buffer size configurable (#2899)
1 parent e594aa3 commit 5ec12d4

File tree

7 files changed

+52
-7
lines changed

7 files changed

+52
-7
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,17 @@ object CometConf extends ShimCometConf {
441441
.intConf
442442
.createWithDefault(8192)
443443

444+
val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
445+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize")
446+
.category(CATEGORY_SHUFFLE)
447+
.doc("Size of the write buffer in bytes used by the native shuffle writer when writing " +
448+
"shuffle data to disk. Larger values may improve write performance by reducing " +
449+
"the number of system calls, but will use more memory. " +
450+
"The default is 1MB which provides a good balance between performance and memory usage.")
451+
.bytesConf(ByteUnit.MiB)
452+
.checkValue(v => v > 0, "Write buffer size must be positive")
453+
.createWithDefault(1)
454+
444455
val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
445456
"spark.comet.shuffle.preferDictionary.ratio")
446457
.category(CATEGORY_SHUFFLE)

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ These settings can be used to determine which parts of the plan are accelerated
107107
| `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
108108
| `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 |
109109
| `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true |
110+
| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b |
110111
| `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true |
111112
| `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true |
112113
| `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |

native/core/benches/shuffle_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ fn create_shuffle_writer_exec(
152152
"/tmp/data.out".to_string(),
153153
"/tmp/index.out".to_string(),
154154
false,
155+
1024 * 1024,
155156
)
156157
.unwrap()
157158
}

native/core/src/execution/planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,13 +1244,15 @@ impl PhysicalPlanner {
12441244
))),
12451245
}?;
12461246

1247+
let write_buffer_size = writer.write_buffer_size as usize;
12471248
let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
12481249
Arc::clone(&child.native_plan),
12491250
partitioning,
12501251
codec,
12511252
writer.output_data_file.clone(),
12521253
writer.output_index_file.clone(),
12531254
writer.tracing_enabled,
1255+
write_buffer_size,
12541256
)?);
12551257

12561258
Ok((

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,21 @@ pub struct ShuffleWriterExec {
7878
/// The compression codec to use when compressing shuffle blocks
7979
codec: CompressionCodec,
8080
tracing_enabled: bool,
81+
/// Size of the write buffer in bytes
82+
write_buffer_size: usize,
8183
}
8284

8385
impl ShuffleWriterExec {
8486
/// Create a new ShuffleWriterExec
87+
#[allow(clippy::too_many_arguments)]
8588
pub fn try_new(
8689
input: Arc<dyn ExecutionPlan>,
8790
partitioning: CometPartitioning,
8891
codec: CompressionCodec,
8992
output_data_file: String,
9093
output_index_file: String,
9194
tracing_enabled: bool,
95+
write_buffer_size: usize,
9296
) -> Result<Self> {
9397
let cache = PlanProperties::new(
9498
EquivalenceProperties::new(Arc::clone(&input.schema())),
@@ -106,6 +110,7 @@ impl ShuffleWriterExec {
106110
cache,
107111
codec,
108112
tracing_enabled,
113+
write_buffer_size,
109114
})
110115
}
111116
}
@@ -169,6 +174,7 @@ impl ExecutionPlan for ShuffleWriterExec {
169174
self.output_data_file.clone(),
170175
self.output_index_file.clone(),
171176
self.tracing_enabled,
177+
self.write_buffer_size,
172178
)?)),
173179
_ => panic!("ShuffleWriterExec wrong number of children"),
174180
}
@@ -195,6 +201,7 @@ impl ExecutionPlan for ShuffleWriterExec {
195201
context,
196202
self.codec.clone(),
197203
self.tracing_enabled,
204+
self.write_buffer_size,
198205
)
199206
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
200207
)
@@ -214,6 +221,7 @@ async fn external_shuffle(
214221
context: Arc<TaskContext>,
215222
codec: CompressionCodec,
216223
tracing_enabled: bool,
224+
write_buffer_size: usize,
217225
) -> Result<SendableRecordBatchStream> {
218226
with_trace_async("external_shuffle", tracing_enabled, || async {
219227
let schema = input.schema();
@@ -227,6 +235,7 @@ async fn external_shuffle(
227235
metrics,
228236
context.session_config().batch_size(),
229237
codec,
238+
write_buffer_size,
230239
)?)
231240
}
232241
_ => Box::new(MultiPartitionShuffleRepartitioner::try_new(
@@ -240,6 +249,7 @@ async fn external_shuffle(
240249
context.session_config().batch_size(),
241250
codec,
242251
tracing_enabled,
252+
write_buffer_size,
243253
)?),
244254
};
245255

@@ -331,6 +341,8 @@ struct MultiPartitionShuffleRepartitioner {
331341
/// Reservation for repartitioning
332342
reservation: MemoryReservation,
333343
tracing_enabled: bool,
344+
/// Size of the write buffer in bytes
345+
write_buffer_size: usize,
334346
}
335347

336348
#[derive(Default)]
@@ -362,6 +374,7 @@ impl MultiPartitionShuffleRepartitioner {
362374
batch_size: usize,
363375
codec: CompressionCodec,
364376
tracing_enabled: bool,
377+
write_buffer_size: usize,
365378
) -> Result<Self> {
366379
let num_output_partitions = partitioning.partition_count();
367380
assert_ne!(
@@ -407,6 +420,7 @@ impl MultiPartitionShuffleRepartitioner {
407420
batch_size,
408421
reservation,
409422
tracing_enabled,
423+
write_buffer_size,
410424
})
411425
}
412426

@@ -654,8 +668,10 @@ impl MultiPartitionShuffleRepartitioner {
654668
output_data: &mut BufWriter<File>,
655669
encode_time: &Time,
656670
write_time: &Time,
671+
write_buffer_size: usize,
657672
) -> Result<()> {
658-
let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data);
673+
let mut buf_batch_writer =
674+
BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size);
659675
for batch in partition_iter {
660676
let batch = batch?;
661677
buf_batch_writer.write(&batch, encode_time, write_time)?;
@@ -714,7 +730,12 @@ impl MultiPartitionShuffleRepartitioner {
714730
for partition_id in 0..num_output_partitions {
715731
let partition_writer = &mut self.partition_writers[partition_id];
716732
let mut iter = partitioned_batches.produce(partition_id);
717-
spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?;
733+
spilled_bytes += partition_writer.spill(
734+
&mut iter,
735+
&self.runtime,
736+
&self.metrics,
737+
self.write_buffer_size,
738+
)?;
718739
}
719740

720741
let mut timer = self.metrics.mempool_time.timer();
@@ -795,6 +816,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
795816
&mut output_data,
796817
&self.metrics.encode_time,
797818
&self.metrics.write_time,
819+
self.write_buffer_size,
798820
)?;
799821
}
800822

@@ -862,6 +884,7 @@ impl SinglePartitionShufflePartitioner {
862884
metrics: ShuffleRepartitionerMetrics,
863885
batch_size: usize,
864886
codec: CompressionCodec,
887+
write_buffer_size: usize,
865888
) -> Result<Self> {
866889
let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
867890

@@ -872,7 +895,8 @@ impl SinglePartitionShufflePartitioner {
872895
.open(output_data_path)
873896
.map_err(to_df_err)?;
874897

875-
let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file);
898+
let output_data_writer =
899+
BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size);
876900

877901
Ok(Self {
878902
output_data_writer,
@@ -1131,6 +1155,7 @@ impl PartitionWriter {
11311155
iter: &mut PartitionedBatchIterator,
11321156
runtime: &RuntimeEnv,
11331157
metrics: &ShuffleRepartitionerMetrics,
1158+
write_buffer_size: usize,
11341159
) -> Result<usize> {
11351160
if let Some(batch) = iter.next() {
11361161
self.ensure_spill_file_created(runtime)?;
@@ -1139,6 +1164,7 @@ impl PartitionWriter {
11391164
let mut buf_batch_writer = BufBatchWriter::new(
11401165
&mut self.shuffle_block_writer,
11411166
&mut self.spill_file.as_mut().unwrap().file,
1167+
write_buffer_size,
11421168
);
11431169
let mut bytes_written =
11441170
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
@@ -1194,10 +1220,7 @@ struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
11941220
}
11951221

11961222
impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
1197-
fn new(shuffle_block_writer: S, writer: W) -> Self {
1198-
// 1MB should be good enough to avoid frequent system calls,
1199-
// and also won't cause too much memory usage
1200-
let buffer_max_size = 1024 * 1024;
1223+
fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self {
12011224
Self {
12021225
shuffle_block_writer,
12031226
writer,
@@ -1343,6 +1366,7 @@ mod test {
13431366
1024,
13441367
CompressionCodec::Lz4Frame,
13451368
false,
1369+
1024 * 1024, // write_buffer_size: 1MB default
13461370
)
13471371
.unwrap();
13481372

@@ -1439,6 +1463,7 @@ mod test {
14391463
"/tmp/data.out".to_string(),
14401464
"/tmp/index.out".to_string(),
14411465
false,
1466+
1024 * 1024, // write_buffer_size: 1MB default
14421467
)
14431468
.unwrap();
14441469

native/proto/src/proto/operator.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ message ShuffleWriter {
236236
CompressionCodec codec = 5;
237237
int32 compression_level = 6;
238238
bool tracing_enabled = 7;
239+
// Size of the write buffer in bytes used when writing shuffle data to disk.
240+
// Larger values may improve write performance but use more memory.
241+
int32 write_buffer_size = 8;
239242
}
240243

241244
message ParquetWriter {

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ class CometNativeShuffleWriter[K, V](
183183
}
184184
shuffleWriterBuilder.setCompressionLevel(
185185
CometConf.COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL.get)
186+
shuffleWriterBuilder.setWriteBufferSize(
187+
CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt)
186188

187189
outputPartitioning match {
188190
case p if isSinglePartitioning(p) =>

0 commit comments

Comments
 (0)