Skip to content

Commit 05d7d9f

Browse files
author
Andrea Calabrese
committed
Stream input for decode/encode operations
As per title, this is the main feature of this patch set. First, by avoiding looking for the final padding, there is the ability to read data streaming in before the stream finished producing them. This also enables the tool to work with much less memory needed, essentially making it a fixed amount instead of tepending by the file size. Signed-off-by: Andrea Calabrese <andrea.calabrese@amarulasolutions.com>
1 parent 021c839 commit 05d7d9f

File tree

1 file changed

+124
-157
lines changed

1 file changed

+124
-157
lines changed

src/uu/base32/src/base_common.rs

Lines changed: 124 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -171,52 +171,29 @@ pub fn get_input(config: &Config) -> UResult<Box<dyn ReadSeek>> {
171171
}
172172
}
173173

174-
/// Determines if the input buffer ends with padding ('=') after trimming trailing whitespace.
175-
fn read_and_has_padding<R: Read>(input: &mut R) -> UResult<(bool, Vec<u8>)> {
176-
let mut buf = Vec::new();
177-
input
178-
.read_to_end(&mut buf)
179-
.map_err(|err| USimpleError::new(1, format_read_error(err.kind())))?;
180-
181-
// Reverse iterator and skip trailing whitespace without extra collections
182-
let has_padding = buf
183-
.iter()
184-
.rfind(|&&byte| !byte.is_ascii_whitespace())
185-
.is_some_and(|&byte| byte == b'=');
186-
187-
Ok((has_padding, buf))
188-
}
189-
190174
pub fn handle_input<R: Read + Seek>(input: &mut R, format: Format, config: Config) -> UResult<()> {
191-
let (has_padding, read) = read_and_has_padding(input)?;
192-
193-
let supports_fast_decode_and_encode =
194-
get_supports_fast_decode_and_encode(format, config.decode, has_padding);
175+
let supports_fast_decode_and_encode = get_supports_fast_decode_and_encode(format);
195176

196177
let supports_fast_decode_and_encode_ref = supports_fast_decode_and_encode.as_ref();
197178
let mut stdout_lock = io::stdout().lock();
198179
if config.decode {
199180
fast_decode::fast_decode(
200-
read,
181+
input,
201182
&mut stdout_lock,
202183
supports_fast_decode_and_encode_ref,
203184
config.ignore_garbage,
204185
)
205186
} else {
206187
fast_encode::fast_encode(
207-
read,
188+
input,
208189
&mut stdout_lock,
209190
supports_fast_decode_and_encode_ref,
210191
config.wrap_cols,
211192
)
212193
}
213194
}
214195

215-
pub fn get_supports_fast_decode_and_encode(
216-
format: Format,
217-
decode: bool,
218-
has_padding: bool,
219-
) -> Box<dyn SupportsFastDecodeAndEncode> {
196+
pub fn get_supports_fast_decode_and_encode(format: Format) -> Box<dyn SupportsFastDecodeAndEncode> {
220197
const BASE16_VALID_DECODING_MULTIPLE: usize = 2;
221198
const BASE2_VALID_DECODING_MULTIPLE: usize = 8;
222199
const BASE32_VALID_DECODING_MULTIPLE: usize = 8;
@@ -263,13 +240,11 @@ pub fn get_supports_fast_decode_and_encode(
263240
// spell-checker:disable-next-line
264241
b"0123456789ABCDEFGHIJKLMNOPQRSTUV=",
265242
)),
266-
Format::Base64 => {
267-
Box::from(Base64SimdWrapper::new(
268-
BASE64_VALID_DECODING_MULTIPLE,
269-
BASE64_UNPADDED_MULTIPLE,
270-
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=",
271-
))
272-
}
243+
Format::Base64 => Box::from(Base64SimdWrapper::new(
244+
BASE64_VALID_DECODING_MULTIPLE,
245+
BASE64_UNPADDED_MULTIPLE,
246+
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=",
247+
)),
273248
Format::Base64Url => Box::from(EncodingWrapper::new(
274249
BASE64URL,
275250
BASE64_VALID_DECODING_MULTIPLE,
@@ -282,14 +257,16 @@ pub fn get_supports_fast_decode_and_encode(
282257
}
283258

284259
pub mod fast_encode {
285-
use crate::base_common::WRAP_DEFAULT;
260+
use crate::base_common::{WRAP_DEFAULT, format_read_error};
286261
use std::{
287-
cmp::min,
288262
collections::VecDeque,
289-
io::{self, Write},
263+
io::{self, BufRead, BufReader, ErrorKind, Read, Write},
290264
num::NonZeroUsize,
291265
};
292-
use uucore::{encoding::SupportsFastDecodeAndEncode, error::UResult};
266+
use uucore::{
267+
encoding::SupportsFastDecodeAndEncode,
268+
error::{UResult, USimpleError},
269+
};
293270

294271
struct LineWrapping {
295272
line_length: NonZeroUsize,
@@ -392,18 +369,16 @@ pub mod fast_encode {
392369
// End of helper functions
393370

394371
pub fn fast_encode(
395-
input: Vec<u8>,
372+
input: &mut dyn Read,
396373
output: &mut dyn Write,
397374
supports_fast_decode_and_encode: &dyn SupportsFastDecodeAndEncode,
398375
wrap: Option<usize>,
399376
) -> UResult<()> {
400377
// Based on performance testing
401378

402379
const ENCODE_IN_CHUNKS_OF_SIZE_MULTIPLE: usize = 1_024;
403-
404380
let encode_in_chunks_of_size =
405381
supports_fast_decode_and_encode.unpadded_multiple() * ENCODE_IN_CHUNKS_OF_SIZE_MULTIPLE;
406-
407382
assert!(encode_in_chunks_of_size > 0);
408383

409384
// The "data-encoding" crate supports line wrapping, but not arbitrary line wrapping, only certain widths, so
@@ -424,79 +399,73 @@ pub mod fast_encode {
424399
}),
425400
};
426401

427-
let input_size = input.len();
428-
429402
// Start of buffers
430403
// Data that was read from `input` but has not been encoded yet
431404
let mut leftover_buffer = VecDeque::<u8>::new();
432-
433405
// Encoded data that needs to be written to `output`
434406
let mut encoded_buffer = VecDeque::<u8>::new();
407+
let mut in_reader = BufReader::with_capacity(encode_in_chunks_of_size, input);
435408
// End of buffers
436-
437-
input
438-
.iter()
439-
.enumerate()
440-
.step_by(encode_in_chunks_of_size)
441-
.map(|(idx, _)| {
442-
// The part of `input_buffer` that was actually filled by the call
443-
// to `read`
444-
&input[idx..min(input_size, idx + encode_in_chunks_of_size)]
445-
})
446-
.map(|buffer| {
409+
loop {
410+
let buf_res = in_reader.fill_buf();
411+
if let Ok(buffer) = buf_res {
412+
let buffer_len = buffer.len();
447413
if buffer.len() < encode_in_chunks_of_size {
448414
leftover_buffer.extend(buffer);
449415
assert!(leftover_buffer.len() < encode_in_chunks_of_size);
450-
return None;
416+
break;
451417
}
452-
Some(buffer)
453-
})
454-
.for_each(|buffer| {
455-
if let Some(read_buffer) = buffer {
456-
// Encode data in chunks, then place it in `encoded_buffer`
457-
assert_eq!(read_buffer.len(), encode_in_chunks_of_size);
458-
encode_in_chunks_to_buffer(
459-
supports_fast_decode_and_encode,
460-
read_buffer,
461-
&mut encoded_buffer,
462-
)
463-
.unwrap();
464-
// Write all data in `encoded_buffer` to `output`
465-
write_to_output(
466-
&mut line_wrapping,
467-
&mut encoded_buffer,
468-
output,
469-
false,
470-
wrap == Some(0),
471-
)
472-
.unwrap();
418+
assert_eq!(buffer.len(), encode_in_chunks_of_size);
419+
encode_in_chunks_to_buffer(
420+
supports_fast_decode_and_encode,
421+
buffer,
422+
&mut encoded_buffer,
423+
)?;
424+
// Write all data in `encoded_buffer` to `output`
425+
write_to_output(
426+
&mut line_wrapping,
427+
&mut encoded_buffer,
428+
output,
429+
false,
430+
wrap == Some(0),
431+
)?;
432+
in_reader.consume(buffer_len);
433+
} else if let Err(err) = buf_res {
434+
let kind = err.kind();
435+
436+
if kind == ErrorKind::Interrupted {
437+
// Retry reading
438+
continue;
473439
}
474-
});
475440

441+
return Err(USimpleError::new(1, format_read_error(kind)));
442+
}
443+
}
476444
// Cleanup
477445
// `input` has finished producing data, so the data remaining in the buffers needs to be encoded and printed
478-
{
479-
// Encode all remaining unencoded bytes, placing them in `encoded_buffer`
480-
supports_fast_decode_and_encode
481-
.encode_to_vec_deque(leftover_buffer.make_contiguous(), &mut encoded_buffer)?;
482-
483-
// Write all data in `encoded_buffer` to output
484-
// `is_cleanup` triggers special cleanup-only logic
485-
write_to_output(
486-
&mut line_wrapping,
487-
&mut encoded_buffer,
488-
output,
489-
true,
490-
wrap == Some(0),
491-
)?;
492-
}
446+
// Encode all remaining unencoded bytes, placing them in `encoded_buffer`
447+
supports_fast_decode_and_encode
448+
.encode_to_vec_deque(leftover_buffer.make_contiguous(), &mut encoded_buffer)?;
449+
// Write all data in `encoded_buffer` to output
450+
// `is_cleanup` triggers special cleanup-only logic
451+
write_to_output(
452+
&mut line_wrapping,
453+
&mut encoded_buffer,
454+
output,
455+
true,
456+
wrap == Some(0),
457+
)?;
493458
Ok(())
494459
}
495460
}
496461

497462
pub mod fast_decode {
498-
use std::io::{self, Write};
499-
use uucore::{encoding::SupportsFastDecodeAndEncode, error::UResult};
463+
use crate::base_common::format_read_error;
464+
use std::io::{self, BufRead, BufReader, ErrorKind, Read, Write};
465+
use uucore::{
466+
encoding::SupportsFastDecodeAndEncode,
467+
error::{UResult, USimpleError},
468+
};
500469

501470
// Start of helper functions
502471
fn alphabet_to_table(alphabet: &[u8], ignore_garbage: bool) -> [bool; 256] {
@@ -555,17 +524,15 @@ pub mod fast_decode {
555524
// End of helper functions
556525

557526
pub fn fast_decode(
558-
input: Vec<u8>,
527+
input: &mut dyn Read,
559528
output: &mut dyn Write,
560529
supports_fast_decode_and_encode: &dyn SupportsFastDecodeAndEncode,
561530
ignore_garbage: bool,
562531
) -> UResult<()> {
563532
const DECODE_IN_CHUNKS_OF_SIZE_MULTIPLE: usize = 1_024;
564-
565533
let alphabet = supports_fast_decode_and_encode.alphabet();
566534
let decode_in_chunks_of_size = supports_fast_decode_and_encode.valid_decoding_multiple()
567535
* DECODE_IN_CHUNKS_OF_SIZE_MULTIPLE;
568-
569536
assert!(decode_in_chunks_of_size > 0);
570537

571538
// Note that it's not worth using "data-encoding"'s ignore functionality if `ignore_garbage` is true, because
@@ -584,42 +551,71 @@ pub mod fast_decode {
584551

585552
// Decoded data that needs to be written to `output`
586553
let mut decoded_buffer = Vec::<u8>::new();
587-
554+
let mut buffers = [
555+
Vec::with_capacity(decode_in_chunks_of_size),
556+
Vec::with_capacity(decode_in_chunks_of_size),
557+
];
558+
let mut current_buffer_index = 0usize;
559+
let mut in_reader = BufReader::with_capacity(decode_in_chunks_of_size, input);
588560
// End of buffers
561+
loop {
562+
while buffers[current_buffer_index].len() < decode_in_chunks_of_size {
563+
let read_res = in_reader.fill_buf();
564+
if let Ok(read_buffer) = read_res {
565+
let read_size = read_buffer.len();
566+
if read_size == 0 {
567+
break;
568+
}
569+
// Filter and fill the valid buffer. When it is filled, we
570+
// switch buffer to avoid reading again.
571+
read_buffer
572+
.iter()
573+
.filter(|ch| table[usize::from(**ch)])
574+
.for_each(|ch| {
575+
if buffers[current_buffer_index].len() < decode_in_chunks_of_size {
576+
buffers[current_buffer_index].push(*ch);
577+
} else {
578+
buffers[(current_buffer_index + 1) % buffers.len()].push(*ch);
579+
}
580+
});
581+
in_reader.consume(read_size);
582+
} else if let Err(err) = read_res {
583+
let kind = err.kind();
584+
585+
if kind == ErrorKind::Interrupted {
586+
// Retry reading
587+
continue;
588+
}
589589

590-
let mut buffer = Vec::with_capacity(decode_in_chunks_of_size);
591-
592-
input
593-
.iter()
594-
.filter(|ch| table[usize::from(**ch)])
595-
.for_each(|ch| {
596-
buffer.push(*ch);
597-
// How many bytes to steal from `read_buffer` to get
598-
// `leftover_buffer` to the right size
599-
if buffer.len() == decode_in_chunks_of_size {
600-
assert_eq!(decode_in_chunks_of_size, buffer.len());
601-
// Decode data in chunks, then place it in `decoded_buffer`
602-
decode_in_chunks_to_buffer(
603-
supports_fast_decode_and_encode,
604-
&buffer,
605-
&mut decoded_buffer,
606-
)
607-
.unwrap();
608-
// Write all data in `decoded_buffer` to `output`
609-
write_to_output(&mut decoded_buffer, output).unwrap();
610-
buffer.clear();
590+
return Err(USimpleError::new(1, format_read_error(kind)));
611591
}
612-
});
613-
// Cleanup
614-
// `input` has finished producing data, so the data remaining in the buffers needs to be decoded and printed
615-
{
616-
// Decode all remaining encoded bytes, placing them in `decoded_buffer`
617-
supports_fast_decode_and_encode.decode_into_vec(&buffer, &mut decoded_buffer)?;
618-
592+
}
593+
if buffers[current_buffer_index].len() < decode_in_chunks_of_size {
594+
break;
595+
}
596+
assert_eq!(
597+
buffers[current_buffer_index].len(),
598+
decode_in_chunks_of_size
599+
);
600+
// Decode data in chunks, then place it in `decoded_buffer`
601+
decode_in_chunks_to_buffer(
602+
supports_fast_decode_and_encode,
603+
&buffers[current_buffer_index],
604+
&mut decoded_buffer,
605+
)?;
619606
// Write all data in `decoded_buffer` to `output`
620607
write_to_output(&mut decoded_buffer, output)?;
621-
}
622608

609+
buffers[current_buffer_index].clear();
610+
current_buffer_index = (current_buffer_index + 1) % buffers.len();
611+
}
612+
// Cleanup
613+
// `input` has finished producing data, so the data remaining in the buffers needs to be decoded and printed
614+
// Decode all remaining encoded bytes, placing them in `decoded_buffer`
615+
supports_fast_decode_and_encode
616+
.decode_into_vec(&buffers[current_buffer_index], &mut decoded_buffer)?;
617+
// Write all data in `decoded_buffer` to `output`
618+
write_to_output(&mut decoded_buffer, output)?;
623619
Ok(())
624620
}
625621
}
@@ -642,32 +638,3 @@ fn format_read_error(kind: ErrorKind) -> String {
642638

643639
translate!("base-common-read-error", "error" => kind_string_capitalized)
644640
}
645-
646-
#[cfg(test)]
647-
mod tests {
648-
use crate::base_common::read_and_has_padding;
649-
use std::io::Cursor;
650-
651-
#[test]
652-
fn test_has_padding() {
653-
let test_cases = vec![
654-
("aGVsbG8sIHdvcmxkIQ==", true),
655-
("aGVsbG8sIHdvcmxkIQ== ", true),
656-
("aGVsbG8sIHdvcmxkIQ==\n", true),
657-
("aGVsbG8sIHdvcmxkIQ== \n", true),
658-
("aGVsbG8sIHdvcmxkIQ=", true),
659-
("aGVsbG8sIHdvcmxkIQ= ", true),
660-
("aGVsbG8sIHdvcmxkIQ \n", false),
661-
("aGVsbG8sIHdvcmxkIQ", false),
662-
];
663-
664-
for (input, expected) in test_cases {
665-
let mut cursor = Cursor::new(input.as_bytes());
666-
assert_eq!(
667-
read_and_has_padding(&mut cursor).unwrap().0,
668-
expected,
669-
"Failed for input: '{input}'"
670-
);
671-
}
672-
}
673-
}

0 commit comments

Comments
 (0)