Skip to content

Commit 3e37ade

Browse files
andygroveclaude
andcommitted
fix: Address clippy warnings in sort shuffle writer
- Add FinalizeResult type alias to reduce type complexity - Add #[allow(clippy::too_many_arguments)] for finalize_output function - Use iter_mut().enumerate() instead of index-based loop Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent df378c2 commit 3e37ade

File tree

1 file changed

+8
-3
lines changed
  • ballista/core/src/execution_plans/sort_shuffle

1 file changed

+8
-3
lines changed

ballista/core/src/execution_plans/sort_shuffle/writer.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ use log::{debug, info};
6060

6161
use crate::serde::scheduler::PartitionStats;
6262

63+
/// Result of finalizing shuffle output: (data_path, index_path, partition_write_stats)
64+
/// where partition_write_stats is (partition_id, num_batches, num_rows, num_bytes)
65+
type FinalizeResult = (PathBuf, PathBuf, Vec<(usize, u64, u64, u64)>);
66+
6367
/// Sort-based shuffle writer that produces a single consolidated output file
6468
/// per input partition with an index file for partition offsets.
6569
#[derive(Debug, Clone)]
@@ -352,6 +356,7 @@ fn spill_largest_buffers(
352356
///
353357
/// Returns (data_path, index_path, partition_stats) where partition_stats is
354358
/// a vector of (partition_id, num_batches, num_rows, num_bytes) tuples.
359+
#[allow(clippy::too_many_arguments)]
355360
fn finalize_output(
356361
work_dir: &str,
357362
job_id: &str,
@@ -361,7 +366,7 @@ fn finalize_output(
361366
spill_manager: &mut SpillManager,
362367
schema: &SchemaRef,
363368
config: &SortShuffleConfig,
364-
) -> Result<(PathBuf, PathBuf, Vec<(usize, u64, u64, u64)>)> {
369+
) -> Result<FinalizeResult> {
365370
let num_partitions = buffers.len();
366371
let mut index = ShuffleIndex::new(num_partitions);
367372
let mut partition_stats = Vec::with_capacity(num_partitions);
@@ -391,7 +396,7 @@ fn finalize_output(
391396
let mut cumulative_batch_count: i64 = 0;
392397

393398
// Write partitions in order
394-
for partition_id in 0..num_partitions {
399+
for (partition_id, buffer) in buffers.iter_mut().enumerate() {
395400
// Set the starting batch index for this partition
396401
index.set_offset(partition_id, cumulative_batch_count);
397402

@@ -414,7 +419,7 @@ fn finalize_output(
414419
}
415420

416421
// Then write remaining buffered data
417-
let buffered_batches = buffers[partition_id].take_batches();
422+
let buffered_batches = buffer.take_batches();
418423
for batch in buffered_batches {
419424
partition_rows += batch.num_rows() as u64;
420425
partition_bytes += batch.get_array_memory_size() as u64;

0 commit comments

Comments
 (0)