@@ -13,7 +13,7 @@ use std::{
1313 sync:: mpsc:: SyncSender ,
1414} ;
1515
16- use memchr:: memchr_iter ;
16+ use memchr:: memrchr ;
1717use self_cell:: self_cell;
1818use uucore:: error:: { UResult , USimpleError } ;
1919
@@ -119,38 +119,54 @@ impl RecycledChunk {
119119 }
120120}
121121
122- /// Read a chunk, parse lines and send them.
122+ #[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
123+ pub enum ReadProgress {
124+ /// At least one full line was read and sent as a chunk to the sorter.
125+ SentChunk ,
126+ /// Buffer cap reached without a separator; caller should spill the current record.
127+ NeedSpill ,
128+ /// No more input remains; nothing was sent.
129+ Finished ,
130+ /// No complete line available yet, but more input may remain.
131+ NoChunk ,
132+ }
133+
134+ /// Read a chunk, parse complete records, and send them to the sorter.
123135///
124- /// No empty chunk will be sent. If we reach the end of the input, `false` is returned.
125- /// However, if this function returns `true`, it is not guaranteed that there is still
126- /// input left: If the input fits _exactly_ into a buffer, we will only notice that there's
127- /// nothing more to read at the next invocation. In case there is no input left, nothing will
128- /// be sent.
136+ /// This function attempts to read at least one complete record (delimited by `separator`).
137+ /// Data after the last complete record is left in `carry_over` to be prefixed on the
138+ /// next invocation. Memory growth is bounded by `max_buffer_size` when provided.
129139///
130- /// # Arguments
140+ /// # Returns
141+ /// - `SentChunk`: At least one full record was read; a `Chunk` was sent to `sender`.
142+ /// - `NoChunk`: No full record yet, but more input may remain; call again.
143+ /// - `NeedSpill`: The buffer hit the cap with no separator found; caller should spill the
144+ /// current oversized record to a run file and then continue.
145+ /// - `Finished`: No more input remains; nothing was sent.
131146///
132- /// (see also `read_to_chunk` for a more detailed documentation)
147+ /// # Arguments
133148///
134- /// * `sender`: The sender to send the lines to the sorter.
135- /// * `recycled_chunk`: The recycled chunk, as returned by `Chunk::recycle`.
136- /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
137- /// * `max_buffer_size`: How big `buffer` can be.
138- /// * `carry_over`: The bytes that must be carried over in between invocations.
139- /// * `file`: The current file.
140- /// * `next_files`: What `file` should be updated to next.
141- /// * `separator`: The line separator.
142- /// * `settings`: The global settings.
149+ /// * `sender`: Channel to send the populated `Chunk` to the sorter thread.
150+ /// * `recycled_chunk`: Result of `Chunk::recycle`, providing reusable vectors and buffer.
151+ /// `buffer.len()` equals its current capacity and will be reused for reading.
152+ /// * `max_buffer_size`: Maximum buffer size in bytes; if `Some`, reading respects this cap.
153+ /// * `carry_over`: Bytes from the previous call after the last separator; they are copied
154+ /// to the beginning of the buffer before reading.
155+ /// * `file`: Current reader.
156+ /// * `next_files`: Iterator to advance to the next file once `file` reaches EOF.
157+ /// * `separator`: Record delimiter (e.g., `b'\n'` or `b'\0'`).
158+ /// * `settings`: Global sort settings (used for tokenization decisions when building `Chunk`).
143159#[ allow( clippy:: too_many_arguments) ]
144160pub fn read < T : Read > (
145161 sender : & SyncSender < Chunk > ,
146162 recycled_chunk : RecycledChunk ,
147163 max_buffer_size : Option < usize > ,
148164 carry_over : & mut Vec < u8 > ,
149165 file : & mut T ,
150- next_files : & mut impl Iterator < Item = UResult < T > > ,
166+ next_files : & mut dyn Iterator < Item = UResult < T > > ,
151167 separator : u8 ,
152168 settings : & GlobalSettings ,
153- ) -> UResult < bool > {
169+ ) -> UResult < ReadProgress > {
154170 let RecycledChunk {
155171 lines,
156172 selections,
@@ -163,7 +179,7 @@ pub fn read<T: Read>(
163179 buffer. resize ( carry_over. len ( ) + 10 * 1024 , 0 ) ;
164180 }
165181 buffer[ ..carry_over. len ( ) ] . copy_from_slice ( carry_over) ;
166- let ( read, should_continue) = read_to_buffer (
182+ let ( read, should_continue, need_spill ) = read_to_buffer (
167183 file,
168184 next_files,
169185 & mut buffer,
@@ -174,6 +190,10 @@ pub fn read<T: Read>(
174190 carry_over. clear ( ) ;
175191 carry_over. extend_from_slice ( & buffer[ read..] ) ;
176192
193+ if need_spill {
194+ return Ok ( ReadProgress :: NeedSpill ) ;
195+ }
196+
177197 if read != 0 {
178198 let payload: UResult < Chunk > = Chunk :: try_new ( buffer, |buffer| {
179199 let selections = unsafe {
@@ -197,8 +217,16 @@ pub fn read<T: Read>(
197217 Ok ( ChunkContents { lines, line_data } )
198218 } ) ;
199219 sender. send ( payload?) . unwrap ( ) ;
220+ return Ok ( ReadProgress :: SentChunk ) ;
200221 }
201- Ok ( should_continue)
222+ Ok ( if should_continue {
223+ // No full line could be sent now, but there might still be input.
224+ // This case happens when the input exactly fits the buffer without a separator at the end.
225+ // The next call will continue reading and eventually emit a chunk or finish.
226+ ReadProgress :: NoChunk
227+ } else {
228+ ReadProgress :: Finished
229+ } )
202230}
203231
204232/// Split `read` into `Line`s, and add them to `lines`.
@@ -224,101 +252,85 @@ fn parse_lines<'a>(
224252 ) ;
225253}
226254
227- /// Read from `file` into `buffer`.
255+ /// Read from `file` into `buffer` until at least one complete record is present or EOF .
228256///
229- /// This function makes sure that at least two lines are read (unless we reach EOF and there's no next file),
230- /// growing the buffer if necessary.
231- /// The last line is likely to not have been fully read into the buffer. Its bytes must be copied to
232- /// the front of the buffer for the next invocation so that it can be continued to be read
233- /// (see the return values and `start_offset`).
257+ /// This function makes sure that at least one complete record (terminated by `separator`) is
258+ /// available in `buffer` (unless we reach EOF and there is no next file). The buffer is grown
259+ /// if necessary, respecting `max_buffer_size` when provided. The bytes after the last complete
260+ /// record remain in `buffer` and should be carried over to the next invocation.
234261///
235- /// # Arguments
236- ///
237- /// * `file`: The file to start reading from.
238- /// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`,
239- /// and this function continues reading.
240- /// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset`
241- /// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines.
242- /// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines.
243- /// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over
244- /// from the previous read and should not be overwritten.
245- /// * `separator`: The byte that separates lines.
262+ /// (see also `read_to_chunk` for a more detailed documentation)
246263///
247- /// # Returns
264+ /// Arguments:
265+ /// - `file`: The file to read from initially.
266+ /// - `next_files`: Iterator used to advance to the next file when `file` reaches EOF; reading continues seamlessly.
267+ /// - `buffer`: The destination buffer. Contents from `start_offset` onward will be overwritten by new data.
268+ /// - `max_buffer_size`: Optional cap for `buffer` growth in bytes.
269+ /// - `start_offset`: Number of bytes at the start of `buffer` containing carry-over data that must be preserved.
270+ /// - `separator`: Record delimiter byte.
248271///
249- /// * The amount of bytes in `buffer` that can now be interpreted as lines.
250- /// The remaining bytes must be copied to the start of the buffer for the next invocation,
251- /// if another invocation is necessary, which is determined by the other return value.
252- /// * Whether this function should be called again.
272+ /// Returns `(read_len, should_continue, need_spill)`:
273+ /// - `read_len`: The number of bytes in `buffer` that form complete records ready for parsing.
274+ /// - `should_continue`: `true` if more input may remain and another call could read additional data.
275+ /// - `need_spill`: `true` if the buffer reached `max_buffer_size` without encountering a separator,
276+ /// indicating the caller should spill the current oversized record to disk.
253277fn read_to_buffer < T : Read > (
254278 file : & mut T ,
255- next_files : & mut impl Iterator < Item = UResult < T > > ,
279+ next_files : & mut dyn Iterator < Item = UResult < T > > ,
256280 buffer : & mut Vec < u8 > ,
257281 max_buffer_size : Option < usize > ,
258282 start_offset : usize ,
259283 separator : u8 ,
260- ) -> UResult < ( usize , bool ) > {
261- let mut read_target = & mut buffer[ start_offset..] ;
262- let mut last_file_target_size = read_target. len ( ) ;
284+ ) -> UResult < ( usize , bool , bool ) > {
285+ let mut filled = start_offset;
286+ let mut last_separator = memrchr ( separator, & buffer[ ..filled] ) ;
287+ let mut current_file_had_data = filled > 0 ;
288+
263289 loop {
264- match file. read ( read_target) {
290+ if filled == buffer. len ( ) {
291+ if let Some ( last) = last_separator {
292+ return Ok ( ( last + 1 , true , false ) ) ;
293+ }
294+ // Buffer full and we haven't seen a separator yet
295+ if let Some ( max) = max_buffer_size {
296+ if buffer. len ( ) >= max {
297+ // Signal to caller that we need to spill this oversized record.
298+ return Ok ( ( 0 , true , true ) ) ;
299+ }
300+ }
301+ grow_buffer ( buffer, max_buffer_size) ;
302+ continue ;
303+ }
304+
305+ match file. read ( & mut buffer[ filled..] ) {
265306 Ok ( 0 ) => {
266- if read_target. is_empty ( ) {
267- // chunk is full
268- if let Some ( max_buffer_size) = max_buffer_size {
269- if max_buffer_size > buffer. len ( ) {
270- // we can grow the buffer
271- let prev_len = buffer. len ( ) ;
272- if buffer. len ( ) < max_buffer_size / 2 {
273- buffer. resize ( buffer. len ( ) * 2 , 0 ) ;
274- } else {
275- buffer. resize ( max_buffer_size, 0 ) ;
276- }
277- read_target = & mut buffer[ prev_len..] ;
278- continue ;
279- }
280- }
281- let mut sep_iter = memchr_iter ( separator, buffer) . rev ( ) ;
282- let last_line_end = sep_iter. next ( ) ;
283- if sep_iter. next ( ) . is_some ( ) {
284- // We read enough lines.
285- let end = last_line_end. unwrap ( ) ;
286- // We want to include the separator here, because it shouldn't be carried over.
287- return Ok ( ( end + 1 , true ) ) ;
307+ if current_file_had_data && ( filled == 0 || buffer[ filled - 1 ] != separator) {
308+ if filled == buffer. len ( ) {
309+ grow_buffer ( buffer, max_buffer_size) ;
288310 }
311+ buffer[ filled] = separator;
312+ filled += 1 ;
313+ last_separator = Some ( filled - 1 ) ;
314+ }
289315
290- // We need to read more lines
291- let len = buffer. len ( ) ;
292- // resize the vector to 10 KB more
293- buffer. resize ( len + 1024 * 10 , 0 ) ;
294- read_target = & mut buffer[ len..] ;
295- } else {
296- // This file has been fully read.
297- let mut leftover_len = read_target. len ( ) ;
298- if last_file_target_size != leftover_len {
299- // The file was not empty.
300- let read_len = buffer. len ( ) - leftover_len;
301- if buffer[ read_len - 1 ] != separator {
302- // The file did not end with a separator. We have to insert one.
303- buffer[ read_len] = separator;
304- leftover_len -= 1 ;
305- }
306- let read_len = buffer. len ( ) - leftover_len;
307- read_target = & mut buffer[ read_len..] ;
308- }
309- if let Some ( next_file) = next_files. next ( ) {
310- // There is another file.
311- last_file_target_size = leftover_len;
312- * file = next_file?;
313- } else {
314- // This was the last file.
315- let read_len = buffer. len ( ) - leftover_len;
316- return Ok ( ( read_len, false ) ) ;
317- }
316+ if let Some ( next_file) = next_files. next ( ) {
317+ * file = next_file?;
318+ current_file_had_data = false ;
319+ continue ;
318320 }
321+ let read_len = last_separator. map_or ( filled, |pos| pos + 1 ) ;
322+ return Ok ( ( read_len, false , false ) ) ;
319323 }
320324 Ok ( n) => {
321- read_target = & mut read_target[ n..] ;
325+ if n == 0 {
326+ continue ;
327+ }
328+ let start = filled;
329+ filled += n;
330+ current_file_had_data = true ;
331+ if let Some ( rel) = memrchr ( separator, & buffer[ start..filled] ) {
332+ last_separator = Some ( start + rel) ;
333+ }
322334 }
323335 Err ( e) if e. kind ( ) == ErrorKind :: Interrupted => {
324336 // retry
@@ -327,3 +339,36 @@ fn read_to_buffer<T: Read>(
327339 }
328340 }
329341}
342+
343+ /// Grow `buffer` by at least a minimal increment, up to an optional cap.
344+ ///
345+ /// If `max_buffer_size` is `Some`, the new length will not exceed it. Once the buffer
346+ /// size reaches the cap, no further growth occurs.
347+ /// Otherwise, the buffer grows approximately by doubling, with a minimum increment of 10 KiB.
348+ ///
349+ /// Ensures monotonic growth: the resulting length is always greater than the current length.
350+ fn grow_buffer ( buffer : & mut Vec < u8 > , max_buffer_size : Option < usize > ) {
351+ const MIN_GROW : usize = 10 * 1024 ;
352+ let current_len = buffer. len ( ) ;
353+ let mut next_len = if current_len == 0 {
354+ MIN_GROW
355+ } else if let Some ( max_buffer_size) = max_buffer_size {
356+ if current_len < max_buffer_size {
357+ std:: cmp:: min ( current_len. saturating_mul ( 2 ) , max_buffer_size)
358+ . max ( current_len. saturating_add ( MIN_GROW ) )
359+ } else {
360+ // Respect the cap: do not grow further.
361+ current_len
362+ }
363+ } else {
364+ current_len
365+ . saturating_mul ( 2 )
366+ . max ( current_len. saturating_add ( MIN_GROW ) )
367+ } ;
368+
369+ if next_len <= current_len {
370+ next_len = current_len. saturating_add ( MIN_GROW . max ( 1 ) ) ;
371+ }
372+
373+ buffer. resize ( next_len, 0 ) ;
374+ }
0 commit comments