@@ -13,7 +13,7 @@ use std::{
1313 sync:: mpsc:: SyncSender ,
1414} ;
1515
16- use memchr:: memchr_iter ;
16+ use memchr:: memrchr ;
1717use self_cell:: self_cell;
1818use uucore:: error:: { UResult , USimpleError } ;
1919
@@ -119,38 +119,54 @@ impl RecycledChunk {
119119 }
120120}
121121
122- /// Read a chunk, parse lines and send them.
122+ #[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
123+ pub enum ReadProgress {
124+ /// At least one full line was read and sent as a chunk to the sorter.
125+ SentChunk ,
126+ /// Buffer cap reached without a separator; caller should spill the current record.
127+ NeedSpill ,
128+ /// No more input remains; nothing was sent.
129+ Finished ,
130+ /// No complete line available yet, but more input may remain.
131+ NoChunk ,
132+ }
133+
134+ /// Read a chunk, parse complete records, and send them to the sorter.
123135///
124- /// No empty chunk will be sent. If we reach the end of the input, `false` is returned.
125- /// However, if this function returns `true`, it is not guaranteed that there is still
126- /// input left: If the input fits _exactly_ into a buffer, we will only notice that there's
127- /// nothing more to read at the next invocation. In case there is no input left, nothing will
128- /// be sent.
136+ /// This function attempts to read at least one complete record (delimited by `separator`).
137+ /// Data after the last complete record is left in `carry_over` to be prefixed on the
138+ /// next invocation. Memory growth is bounded by `max_buffer_size` when provided.
129139///
130- /// # Arguments
140+ /// # Returns
141+ /// - `SentChunk`: At least one full record was read; a `Chunk` was sent to `sender`.
142+ /// - `NoChunk`: No full record yet, but more input may remain; call again.
143+ /// - `NeedSpill`: The buffer hit the cap with no separator found; caller should spill the
144+ /// current oversized record to a run file and then continue.
145+ /// - `Finished`: No more input remains; nothing was sent.
131146///
132- /// (see also `read_to_chunk` for a more detailed documentation)
147+ /// # Arguments
133148///
134- /// * `sender`: The sender to send the lines to the sorter.
135- /// * `recycled_chunk`: The recycled chunk, as returned by `Chunk::recycle`.
136- /// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
137- /// * `max_buffer_size`: How big `buffer` can be.
138- /// * `carry_over`: The bytes that must be carried over in between invocations.
139- /// * `file`: The current file.
140- /// * `next_files`: What `file` should be updated to next.
141- /// * `separator`: The line separator.
142- /// * `settings`: The global settings.
149+ /// * `sender`: Channel to send the populated `Chunk` to the sorter thread.
150+ /// * `recycled_chunk`: Result of `Chunk::recycle`, providing reusable vectors and buffer.
151+ /// `buffer.len()` equals its current capacity and will be reused for reading.
152+ /// * `max_buffer_size`: Maximum buffer size in bytes; if `Some`, reading respects this cap.
153+ /// * `carry_over`: Bytes from the previous call after the last separator; they are copied
154+ /// to the beginning of the buffer before reading.
155+ /// * `file`: Current reader.
156+ /// * `next_files`: Iterator to advance to the next file once `file` reaches EOF.
157+ /// * `separator`: Record delimiter (e.g., `b'\n'` or `b'\0'`).
158+ /// * `settings`: Global sort settings (used for tokenization decisions when building `Chunk`).
143159#[ allow( clippy:: too_many_arguments) ]
144160pub fn read < T : Read > (
145161 sender : & SyncSender < Chunk > ,
146162 recycled_chunk : RecycledChunk ,
147163 max_buffer_size : Option < usize > ,
148164 carry_over : & mut Vec < u8 > ,
149165 file : & mut T ,
150- next_files : & mut impl Iterator < Item = UResult < T > > ,
166+ next_files : & mut dyn Iterator < Item = UResult < T > > ,
151167 separator : u8 ,
152168 settings : & GlobalSettings ,
153- ) -> UResult < bool > {
169+ ) -> UResult < ReadProgress > {
154170 let RecycledChunk {
155171 lines,
156172 selections,
@@ -163,7 +179,7 @@ pub fn read<T: Read>(
163179 buffer. resize ( carry_over. len ( ) + 10 * 1024 , 0 ) ;
164180 }
165181 buffer[ ..carry_over. len ( ) ] . copy_from_slice ( carry_over) ;
166- let ( read, should_continue) = read_to_buffer (
182+ let ( read, should_continue, need_spill ) = read_to_buffer (
167183 file,
168184 next_files,
169185 & mut buffer,
@@ -174,6 +190,10 @@ pub fn read<T: Read>(
174190 carry_over. clear ( ) ;
175191 carry_over. extend_from_slice ( & buffer[ read..] ) ;
176192
193+ if need_spill {
194+ return Ok ( ReadProgress :: NeedSpill ) ;
195+ }
196+
177197 if read != 0 {
178198 let payload: UResult < Chunk > = Chunk :: try_new ( buffer, |buffer| {
179199 let selections = unsafe {
@@ -197,8 +217,16 @@ pub fn read<T: Read>(
197217 Ok ( ChunkContents { lines, line_data } )
198218 } ) ;
199219 sender. send ( payload?) . unwrap ( ) ;
220+ return Ok ( ReadProgress :: SentChunk ) ;
200221 }
201- Ok ( should_continue)
222+ Ok ( if should_continue {
223+ // No full line could be sent now, but there might still be input.
224+ // This case happens when the input exactly fits the buffer without a separator at the end.
225+ // The next call will continue reading and eventually emit a chunk or finish.
226+ ReadProgress :: NoChunk
227+ } else {
228+ ReadProgress :: Finished
229+ } )
202230}
203231
204232/// Split `read` into `Line`s, and add them to `lines`.
@@ -224,101 +252,83 @@ fn parse_lines<'a>(
224252 ) ;
225253}
226254
227- /// Read from `file` into `buffer`.
228- ///
229- /// This function makes sure that at least two lines are read (unless we reach EOF and there's no next file),
230- /// growing the buffer if necessary.
231- /// The last line is likely to not have been fully read into the buffer. Its bytes must be copied to
232- /// the front of the buffer for the next invocation so that it can be continued to be read
233- /// (see the return values and `start_offset`).
255+ /// Read from `file` into `buffer` until at least one complete record is present or EOF.
234256///
235- /// # Arguments
236- ///
237- /// * `file`: The file to start reading from.
238- /// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`,
239- /// and this function continues reading.
240- /// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset`
241- /// as well). It will be grown up to `max_buffer_size` if necessary, but it will always grow to read at least two lines.
242- /// * `max_buffer_size`: Grow the buffer to at most this length. If None, the buffer will not grow, unless needed to read at least two lines.
243- /// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over
244- /// from the previous read and should not be overwritten.
245- /// * `separator`: The byte that separates lines.
257+ /// This function makes sure that at least one complete record (terminated by `separator`) is
258+ /// available in `buffer` (unless we reach EOF and there is no next file). The buffer is grown
259+ /// if necessary, respecting `max_buffer_size` when provided. The bytes after the last complete
260+ /// record remain in `buffer` and should be carried over to the next invocation.
246261///
247- /// # Returns
262+ /// Arguments:
263+ /// - `file`: The file to read from initially.
264+ /// - `next_files`: Iterator used to advance to the next file when `file` reaches EOF; reading continues seamlessly.
265+ /// - `buffer`: The destination buffer. Contents from `start_offset` onward will be overwritten by new data.
266+ /// - `max_buffer_size`: Optional cap for `buffer` growth in bytes.
267+ /// - `start_offset`: Number of bytes at the start of `buffer` containing carry-over data that must be preserved.
268+ /// - `separator`: Record delimiter byte.
248269///
249- /// * The amount of bytes in `buffer` that can now be interpreted as lines.
250- /// The remaining bytes must be copied to the start of the buffer for the next invocation,
251- /// if another invocation is necessary, which is determined by the other return value.
252- /// * Whether this function should be called again.
270+ /// Returns `(read_len, should_continue, need_spill)`:
271+ /// - `read_len`: The number of bytes in `buffer` that form complete records ready for parsing.
272+ /// - `should_continue`: `true` if more input may remain and another call could read additional data.
273+ /// - `need_spill`: `true` if the buffer reached `max_buffer_size` without encountering a separator,
274+ /// indicating the caller should spill the current oversized record to disk.
253275fn read_to_buffer < T : Read > (
254276 file : & mut T ,
255- next_files : & mut impl Iterator < Item = UResult < T > > ,
277+ next_files : & mut dyn Iterator < Item = UResult < T > > ,
256278 buffer : & mut Vec < u8 > ,
257279 max_buffer_size : Option < usize > ,
258280 start_offset : usize ,
259281 separator : u8 ,
260- ) -> UResult < ( usize , bool ) > {
261- let mut read_target = & mut buffer[ start_offset..] ;
262- let mut last_file_target_size = read_target. len ( ) ;
282+ ) -> UResult < ( usize , bool , bool ) > {
283+ let mut filled = start_offset;
284+ let mut last_separator = memrchr ( separator, & buffer[ ..filled] ) ;
285+ let mut current_file_had_data = filled > 0 ;
286+
263287 loop {
264- match file. read ( read_target) {
288+ if filled == buffer. len ( ) {
289+ if let Some ( last) = last_separator {
290+ return Ok ( ( last + 1 , true , false ) ) ;
291+ }
292+ // Buffer full and we haven't seen a separator yet
293+ if let Some ( max) = max_buffer_size {
294+ if buffer. len ( ) >= max {
295+ // Signal to caller that we need to spill this oversized record.
296+ return Ok ( ( 0 , true , true ) ) ;
297+ }
298+ }
299+ grow_buffer ( buffer, max_buffer_size) ;
300+ continue ;
301+ }
302+
303+ match file. read ( & mut buffer[ filled..] ) {
265304 Ok ( 0 ) => {
266- if read_target. is_empty ( ) {
267- // chunk is full
268- if let Some ( max_buffer_size) = max_buffer_size {
269- if max_buffer_size > buffer. len ( ) {
270- // we can grow the buffer
271- let prev_len = buffer. len ( ) ;
272- if buffer. len ( ) < max_buffer_size / 2 {
273- buffer. resize ( buffer. len ( ) * 2 , 0 ) ;
274- } else {
275- buffer. resize ( max_buffer_size, 0 ) ;
276- }
277- read_target = & mut buffer[ prev_len..] ;
278- continue ;
279- }
280- }
281- let mut sep_iter = memchr_iter ( separator, buffer) . rev ( ) ;
282- let last_line_end = sep_iter. next ( ) ;
283- if sep_iter. next ( ) . is_some ( ) {
284- // We read enough lines.
285- let end = last_line_end. unwrap ( ) ;
286- // We want to include the separator here, because it shouldn't be carried over.
287- return Ok ( ( end + 1 , true ) ) ;
305+ if current_file_had_data && ( filled == 0 || buffer[ filled - 1 ] != separator) {
306+ if filled == buffer. len ( ) {
307+ grow_buffer ( buffer, max_buffer_size) ;
288308 }
309+ buffer[ filled] = separator;
310+ filled += 1 ;
311+ last_separator = Some ( filled - 1 ) ;
312+ }
289313
290- // We need to read more lines
291- let len = buffer. len ( ) ;
292- // resize the vector to 10 KB more
293- buffer. resize ( len + 1024 * 10 , 0 ) ;
294- read_target = & mut buffer[ len..] ;
295- } else {
296- // This file has been fully read.
297- let mut leftover_len = read_target. len ( ) ;
298- if last_file_target_size != leftover_len {
299- // The file was not empty.
300- let read_len = buffer. len ( ) - leftover_len;
301- if buffer[ read_len - 1 ] != separator {
302- // The file did not end with a separator. We have to insert one.
303- buffer[ read_len] = separator;
304- leftover_len -= 1 ;
305- }
306- let read_len = buffer. len ( ) - leftover_len;
307- read_target = & mut buffer[ read_len..] ;
308- }
309- if let Some ( next_file) = next_files. next ( ) {
310- // There is another file.
311- last_file_target_size = leftover_len;
312- * file = next_file?;
313- } else {
314- // This was the last file.
315- let read_len = buffer. len ( ) - leftover_len;
316- return Ok ( ( read_len, false ) ) ;
317- }
314+ if let Some ( next_file) = next_files. next ( ) {
315+ * file = next_file?;
316+ current_file_had_data = false ;
317+ continue ;
318318 }
319+ let read_len = last_separator. map_or ( filled, |pos| pos + 1 ) ;
320+ return Ok ( ( read_len, false , false ) ) ;
319321 }
320322 Ok ( n) => {
321- read_target = & mut read_target[ n..] ;
323+ if n == 0 {
324+ continue ;
325+ }
326+ let start = filled;
327+ filled += n;
328+ current_file_had_data = true ;
329+ if let Some ( rel) = memrchr ( separator, & buffer[ start..filled] ) {
330+ last_separator = Some ( start + rel) ;
331+ }
322332 }
323333 Err ( e) if e. kind ( ) == ErrorKind :: Interrupted => {
324334 // retry
@@ -327,3 +337,36 @@ fn read_to_buffer<T: Read>(
327337 }
328338 }
329339}
340+
341+ /// Grow `buffer` by at least a minimal increment, up to an optional cap.
342+ ///
343+ /// If `max_buffer_size` is `Some`, the new length will not exceed it. Once the buffer
344+ /// size reaches the cap, no further growth occurs.
345+ /// Otherwise, the buffer grows approximately by doubling, with a minimum increment of 10 KiB.
346+ ///
347+ /// Ensures monotonic growth: the resulting length is always greater than the current length.
348+ fn grow_buffer ( buffer : & mut Vec < u8 > , max_buffer_size : Option < usize > ) {
349+ const MIN_GROW : usize = 10 * 1024 ;
350+ let current_len = buffer. len ( ) ;
351+ let mut next_len = if current_len == 0 {
352+ MIN_GROW
353+ } else if let Some ( max_buffer_size) = max_buffer_size {
354+ if current_len < max_buffer_size {
355+ std:: cmp:: min ( current_len. saturating_mul ( 2 ) , max_buffer_size)
356+ . max ( current_len. saturating_add ( MIN_GROW ) )
357+ } else {
358+ // Respect the cap: do not grow further.
359+ current_len
360+ }
361+ } else {
362+ current_len
363+ . saturating_mul ( 2 )
364+ . max ( current_len. saturating_add ( MIN_GROW ) )
365+ } ;
366+
367+ if next_len <= current_len {
368+ next_len = current_len. saturating_add ( MIN_GROW . max ( 1 ) ) ;
369+ }
370+
371+ buffer. resize ( next_len, 0 ) ;
372+ }
0 commit comments