Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/uu/sort/src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn reader(
) -> UResult<()> {
let mut carry_over = vec![];
for recycled_chunk in receiver {
let should_continue = chunks::read(
let progress = chunks::read(
sender,
recycled_chunk,
None,
Expand All @@ -117,7 +117,7 @@ fn reader(
settings.line_ending.into(),
settings,
)?;
if !should_continue {
if matches!(progress, chunks::ReadProgress::Finished) {
break;
}
}
Expand Down
182 changes: 122 additions & 60 deletions src/uu/sort/src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

//! Utilities for reading files as chunks.

// spell-checker:ignore memrchr

#![allow(dead_code)]
// Ignores non-used warning for `borrow_buffer` in `Chunk`

Expand Down Expand Up @@ -119,38 +121,54 @@ impl RecycledChunk {
}
}

/// Read a chunk, parse lines and send them.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadProgress {
/// At least one full line was read and sent as a chunk to the sorter.
SentChunk,
/// Buffer cap reached without a separator; caller should spill the current record.
NeedSpill,
/// No more input remains; nothing was sent.
Finished,
/// No complete line available yet, but more input may remain.
NoChunk,
}

/// Read a chunk, parse complete records, and send them to the sorter.
///
/// No empty chunk will be sent. If we reach the end of the input, `false` is returned.
/// However, if this function returns `true`, it is not guaranteed that there is still
/// input left: If the input fits _exactly_ into a buffer, we will only notice that there's
/// nothing more to read at the next invocation. In case there is no input left, nothing will
/// be sent.
/// This function attempts to read at least one complete record (delimited by `separator`).
/// Data after the last complete record is left in `carry_over` to be prefixed on the
/// next invocation. Memory growth is bounded by `max_buffer_size` when provided.
///
/// # Arguments
/// # Returns
/// - `SentChunk`: At least one full record was read; a `Chunk` was sent to `sender`.
/// - `NoChunk`: No full record yet, but more input may remain; call again.
/// - `NeedSpill`: The buffer hit the cap with no separator found; caller should spill the
/// current oversized record to a run file and then continue.
/// - `Finished`: No more input remains; nothing was sent.
///
/// (see also `read_to_chunk` for a more detailed documentation)
/// # Arguments
///
/// * `sender`: The sender to send the lines to the sorter.
/// * `recycled_chunk`: The recycled chunk, as returned by `Chunk::recycle`.
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
/// * `max_buffer_size`: How big `buffer` can be.
/// * `carry_over`: The bytes that must be carried over in between invocations.
/// * `file`: The current file.
/// * `next_files`: What `file` should be updated to next.
/// * `separator`: The line separator.
/// * `settings`: The global settings.
/// * `sender`: Channel to send the populated `Chunk` to the sorter thread.
/// * `recycled_chunk`: Result of `Chunk::recycle`, providing reusable vectors and buffer.
/// `buffer.len()` equals its current capacity and will be reused for reading.
/// * `max_buffer_size`: Maximum buffer size in bytes; if `Some`, reading respects this cap.
/// * `carry_over`: Bytes from the previous call after the last separator; they are copied
/// to the beginning of the buffer before reading.
/// * `file`: Current reader.
/// * `next_files`: Iterator to advance to the next file once `file` reaches EOF.
/// * `separator`: Record delimiter (e.g., `b'\n'` or `b'\0'`).
/// * `settings`: Global sort settings (used for tokenization decisions when building `Chunk`).
#[allow(clippy::too_many_arguments)]
pub fn read<T: Read>(
sender: &SyncSender<Chunk>,
recycled_chunk: RecycledChunk,
max_buffer_size: Option<usize>,
carry_over: &mut Vec<u8>,
file: &mut T,
next_files: &mut impl Iterator<Item = UResult<T>>,
next_files: &mut dyn Iterator<Item = UResult<T>>,
separator: u8,
settings: &GlobalSettings,
) -> UResult<bool> {
) -> UResult<ReadProgress> {
let RecycledChunk {
lines,
selections,
Expand All @@ -163,7 +181,7 @@ pub fn read<T: Read>(
buffer.resize(carry_over.len() + 10 * 1024, 0);
}
buffer[..carry_over.len()].copy_from_slice(carry_over);
let (read, should_continue) = read_to_buffer(
let (read, should_continue, need_spill) = read_to_buffer(
file,
next_files,
&mut buffer,
Expand All @@ -174,6 +192,10 @@ pub fn read<T: Read>(
carry_over.clear();
carry_over.extend_from_slice(&buffer[read..]);

if need_spill {
return Ok(ReadProgress::NeedSpill);
}

if read != 0 {
let payload: UResult<Chunk> = Chunk::try_new(buffer, |buffer| {
let selections = unsafe {
Expand All @@ -197,8 +219,16 @@ pub fn read<T: Read>(
Ok(ChunkContents { lines, line_data })
});
sender.send(payload?).unwrap();
return Ok(ReadProgress::SentChunk);
}
Ok(should_continue)
Ok(if should_continue {
// No full line could be sent now, but there might still be input.
// This case happens when the input exactly fits the buffer without a separator at the end.
// The next call will continue reading and eventually emit a chunk or finish.
ReadProgress::NoChunk
} else {
ReadProgress::Finished
})
}

/// Split `read` into `Line`s, and add them to `lines`.
Expand All @@ -224,88 +254,87 @@ fn parse_lines<'a>(
);
}

/// Read from `file` into `buffer`.
///
/// This function makes sure that at least two lines are read (unless we reach EOF and there's no next file),
/// growing the buffer if necessary.
/// The last line is likely to not have been fully read into the buffer. Its bytes must be copied to
/// the front of the buffer for the next invocation so that it can be continued to be read
/// (see the return values and `start_offset`).
///
/// # Arguments
/// Read from `file` into `buffer` until at least one complete record is present or EOF.
///
/// * `file`: The file to start reading from.
/// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`,
/// and this function continues reading.
/// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset`
/// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines.
/// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines.
/// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over
/// from the previous read and should not be overwritten.
/// * `separator`: The byte that separates lines.
/// This function makes sure that at least one complete record (terminated by `separator`) is
/// available in `buffer` (unless we reach EOF and there is no next file). The buffer is grown
/// if necessary, respecting `max_buffer_size` when provided. The bytes after the last complete
/// record remain in `buffer` and should be carried over to the next invocation.
///
/// # Returns
/// Arguments:
/// - `file`: The file to read from initially.
/// - `next_files`: Iterator used to advance to the next file when `file` reaches EOF; reading continues seamlessly.
/// - `buffer`: The destination buffer. Contents from `start_offset` onward will be overwritten by new data.
/// - `max_buffer_size`: Optional cap for `buffer` growth in bytes.
/// - `start_offset`: Number of bytes at the start of `buffer` containing carry-over data that must be preserved.
/// - `separator`: Record delimiter byte.
///
/// * The amount of bytes in `buffer` that can now be interpreted as lines.
/// The remaining bytes must be copied to the start of the buffer for the next invocation,
/// if another invocation is necessary, which is determined by the other return value.
/// * Whether this function should be called again.
/// Returns `(read_len, should_continue, need_spill)`:
/// - `read_len`: The number of bytes in `buffer` that form complete records ready for parsing.
/// - `should_continue`: `true` if more input may remain and another call could read additional data.
/// - `need_spill`: `true` if the buffer reached `max_buffer_size` without encountering a separator,
/// indicating the caller should spill the current oversized record to disk.
fn read_to_buffer<T: Read>(
file: &mut T,
next_files: &mut impl Iterator<Item = UResult<T>>,
next_files: &mut dyn Iterator<Item = UResult<T>>,
buffer: &mut Vec<u8>,
max_buffer_size: Option<usize>,
start_offset: usize,
separator: u8,
) -> UResult<(usize, bool)> {
) -> UResult<(usize, bool, bool)> {
let mut read_target = &mut buffer[start_offset..];
let mut last_file_empty = true;
// Only search for newlines in regions we haven't scanned before to avoid quadratic behavior.
let mut newline_search_offset = 0;
let mut found_newline = false;
loop {
match file.read(read_target) {
Ok(0) => {
if read_target.is_empty() {
// chunk is full
if let Some(max_buffer_size) = max_buffer_size {
if max_buffer_size > buffer.len() {
// we can grow the buffer
// Buffer full
if let Some(max) = max_buffer_size {
if max > buffer.len() {
// We can grow the buffer
let prev_len = buffer.len();
if buffer.len() < max_buffer_size / 2 {
if buffer.len() < max / 2 {
buffer.resize(buffer.len() * 2, 0);
} else {
buffer.resize(max_buffer_size, 0);
buffer.resize(max, 0);
}
read_target = &mut buffer[prev_len..];
continue;
}
}

// Buffer cannot grow further or exactly filled: find the last newline seen so far
let mut sep_iter =
memchr_iter(separator, &buffer[newline_search_offset..buffer.len()]).rev();
newline_search_offset = buffer.len();
if let Some(last_line_end) = sep_iter.next() {
if found_newline || sep_iter.next().is_some() {
// We read enough lines.
// We want to include the separator here, because it shouldn't be carried over.
return Ok((last_line_end + 1, true));
// We read enough lines. Include the separator so it isn't carried over.
return Ok((last_line_end + 1, true, false));
}
found_newline = true;
}

// We need to read more lines
// Need more data for a full line
if let Some(max) = max_buffer_size {
if buffer.len() >= max {
// Hard cap hit and no newline yet: signal spill
return Ok((0, true, true));
}
}
let len = buffer.len();
// resize the vector to 10 KB more
buffer.resize(len + 1024 * 10, 0);
read_target = &mut buffer[len..];
} else {
// This file has been fully read.
let mut leftover_len = read_target.len();
if !last_file_empty {
// The file was not empty.
// The file was not empty: ensure a trailing separator
let read_len = buffer.len() - leftover_len;
if buffer[read_len - 1] != separator {
// The file did not end with a separator. We have to insert one.
buffer[read_len] = separator;
leftover_len -= 1;
}
Expand All @@ -319,7 +348,7 @@ fn read_to_buffer<T: Read>(
} else {
// This was the last file.
let read_len = buffer.len() - leftover_len;
return Ok((read_len, false));
return Ok((read_len, false, false));
}
}
}
Expand All @@ -334,3 +363,36 @@ fn read_to_buffer<T: Read>(
}
}
}

/// Grow `buffer` by at least a minimal increment, up to an optional cap.
///
/// If `max_buffer_size` is `Some`, the new length will not exceed it. Once the buffer
/// size reaches the cap, no further growth occurs.
/// Otherwise, the buffer grows approximately by doubling, with a minimum increment of 10 KiB.
///
/// Ensures monotonic growth: the resulting length is always greater than the current length.
fn grow_buffer(buffer: &mut Vec<u8>, max_buffer_size: Option<usize>) {
const MIN_GROW: usize = 10 * 1024;
let current_len = buffer.len();
let mut next_len = if current_len == 0 {
MIN_GROW
} else if let Some(max_buffer_size) = max_buffer_size {
if current_len < max_buffer_size {
std::cmp::min(current_len.saturating_mul(2), max_buffer_size)
.max(current_len.saturating_add(MIN_GROW))
} else {
// Respect the cap: do not grow further.
current_len
}
} else {
current_len
.saturating_mul(2)
.max(current_len.saturating_add(MIN_GROW))
};

if next_len <= current_len {
next_len = current_len.saturating_add(MIN_GROW.max(1));
}

buffer.resize(next_len, 0);
}
Loading
Loading