Skip to content

Commit c5c84e3

Browse files
author
Andrea Calabrese
committed
Stream input for decode/encode operations
As per title, this is the main feature of this patch set. First, by avoiding looking for the final padding, there is the ability to read data streaming in before the stream finished producing them. This also enables the tool to work with much less memory needed, essentially making it a fixed amount instead of tepending by the file size. Signed-off-by: Andrea Calabrese <andrea.calabrese@amarulasolutions.com>
1 parent 06f59ba commit c5c84e3

File tree

1 file changed

+124
-157
lines changed

1 file changed

+124
-157
lines changed

src/uu/base32/src/base_common.rs

Lines changed: 124 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -171,52 +171,29 @@ pub fn get_input(config: &Config) -> UResult<Box<dyn ReadSeek>> {
171171
}
172172
}
173173

174-
/// Determines if the input buffer ends with padding ('=') after trimming trailing whitespace.
175-
fn read_and_has_padding<R: Read>(input: &mut R) -> UResult<(bool, Vec<u8>)> {
176-
let mut buf = Vec::new();
177-
input
178-
.read_to_end(&mut buf)
179-
.map_err(|err| USimpleError::new(1, format_read_error(err.kind())))?;
180-
181-
// Reverse iterator and skip trailing whitespace without extra collections
182-
let has_padding = buf
183-
.iter()
184-
.rfind(|&&byte| !byte.is_ascii_whitespace())
185-
.is_some_and(|&byte| byte == b'=');
186-
187-
Ok((has_padding, buf))
188-
}
189-
190174
pub fn handle_input<R: Read + Seek>(input: &mut R, format: Format, config: Config) -> UResult<()> {
191-
let (has_padding, read) = read_and_has_padding(input)?;
192-
193-
let supports_fast_decode_and_encode =
194-
get_supports_fast_decode_and_encode(format, config.decode, has_padding);
175+
let supports_fast_decode_and_encode = get_supports_fast_decode_and_encode(format);
195176

196177
let supports_fast_decode_and_encode_ref = supports_fast_decode_and_encode.as_ref();
197178
let mut stdout_lock = io::stdout().lock();
198179
if config.decode {
199180
fast_decode::fast_decode(
200-
read,
181+
input,
201182
&mut stdout_lock,
202183
supports_fast_decode_and_encode_ref,
203184
config.ignore_garbage,
204185
)
205186
} else {
206187
fast_encode::fast_encode(
207-
read,
188+
input,
208189
&mut stdout_lock,
209190
supports_fast_decode_and_encode_ref,
210191
config.wrap_cols,
211192
)
212193
}
213194
}
214195

215-
pub fn get_supports_fast_decode_and_encode(
216-
format: Format,
217-
decode: bool,
218-
has_padding: bool,
219-
) -> Box<dyn SupportsFastDecodeAndEncode> {
196+
pub fn get_supports_fast_decode_and_encode(format: Format) -> Box<dyn SupportsFastDecodeAndEncode> {
220197
const BASE16_VALID_DECODING_MULTIPLE: usize = 2;
221198
const BASE2_VALID_DECODING_MULTIPLE: usize = 8;
222199
const BASE32_VALID_DECODING_MULTIPLE: usize = 8;
@@ -263,13 +240,11 @@ pub fn get_supports_fast_decode_and_encode(
263240
// spell-checker:disable-next-line
264241
b"0123456789ABCDEFGHIJKLMNOPQRSTUV=",
265242
)),
266-
Format::Base64 => {
267-
Box::from(Base64SimdWrapper::new(
268-
BASE64_VALID_DECODING_MULTIPLE,
269-
BASE64_UNPADDED_MULTIPLE,
270-
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=",
271-
))
272-
}
243+
Format::Base64 => Box::from(Base64SimdWrapper::new(
244+
BASE64_VALID_DECODING_MULTIPLE,
245+
BASE64_UNPADDED_MULTIPLE,
246+
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/=",
247+
)),
273248
Format::Base64Url => Box::from(EncodingWrapper::new(
274249
BASE64URL,
275250
BASE64_VALID_DECODING_MULTIPLE,
@@ -283,14 +258,16 @@ pub fn get_supports_fast_decode_and_encode(
283258
}
284259

285260
pub mod fast_encode {
286-
use crate::base_common::WRAP_DEFAULT;
261+
use crate::base_common::{WRAP_DEFAULT, format_read_error};
287262
use std::{
288-
cmp::min,
289263
collections::VecDeque,
290-
io::{self, Write},
264+
io::{self, BufRead, BufReader, ErrorKind, Read, Write},
291265
num::NonZeroUsize,
292266
};
293-
use uucore::{encoding::SupportsFastDecodeAndEncode, error::UResult};
267+
use uucore::{
268+
encoding::SupportsFastDecodeAndEncode,
269+
error::{UResult, USimpleError},
270+
};
294271

295272
struct LineWrapping {
296273
line_length: NonZeroUsize,
@@ -393,18 +370,16 @@ pub mod fast_encode {
393370
// End of helper functions
394371

395372
pub fn fast_encode(
396-
input: Vec<u8>,
373+
input: &mut dyn Read,
397374
output: &mut dyn Write,
398375
supports_fast_decode_and_encode: &dyn SupportsFastDecodeAndEncode,
399376
wrap: Option<usize>,
400377
) -> UResult<()> {
401378
// Based on performance testing
402379

403380
const ENCODE_IN_CHUNKS_OF_SIZE_MULTIPLE: usize = 1_024;
404-
405381
let encode_in_chunks_of_size =
406382
supports_fast_decode_and_encode.unpadded_multiple() * ENCODE_IN_CHUNKS_OF_SIZE_MULTIPLE;
407-
408383
assert!(encode_in_chunks_of_size > 0);
409384

410385
// The "data-encoding" crate supports line wrapping, but not arbitrary line wrapping, only certain widths, so
@@ -425,79 +400,73 @@ pub mod fast_encode {
425400
}),
426401
};
427402

428-
let input_size = input.len();
429-
430403
// Start of buffers
431404
// Data that was read from `input` but has not been encoded yet
432405
let mut leftover_buffer = VecDeque::<u8>::new();
433-
434406
// Encoded data that needs to be written to `output`
435407
let mut encoded_buffer = VecDeque::<u8>::new();
408+
let mut in_reader = BufReader::with_capacity(encode_in_chunks_of_size, input);
436409
// End of buffers
437-
438-
input
439-
.iter()
440-
.enumerate()
441-
.step_by(encode_in_chunks_of_size)
442-
.map(|(idx, _)| {
443-
// The part of `input_buffer` that was actually filled by the call
444-
// to `read`
445-
&input[idx..min(input_size, idx + encode_in_chunks_of_size)]
446-
})
447-
.map(|buffer| {
410+
loop {
411+
let buf_res = in_reader.fill_buf();
412+
if let Ok(buffer) = buf_res {
413+
let buffer_len = buffer.len();
448414
if buffer.len() < encode_in_chunks_of_size {
449415
leftover_buffer.extend(buffer);
450416
assert!(leftover_buffer.len() < encode_in_chunks_of_size);
451-
return None;
417+
break;
452418
}
453-
Some(buffer)
454-
})
455-
.for_each(|buffer| {
456-
if let Some(read_buffer) = buffer {
457-
// Encode data in chunks, then place it in `encoded_buffer`
458-
assert_eq!(read_buffer.len(), encode_in_chunks_of_size);
459-
encode_in_chunks_to_buffer(
460-
supports_fast_decode_and_encode,
461-
read_buffer,
462-
&mut encoded_buffer,
463-
)
464-
.unwrap();
465-
// Write all data in `encoded_buffer` to `output`
466-
write_to_output(
467-
&mut line_wrapping,
468-
&mut encoded_buffer,
469-
output,
470-
false,
471-
wrap == Some(0),
472-
)
473-
.unwrap();
419+
assert_eq!(buffer.len(), encode_in_chunks_of_size);
420+
encode_in_chunks_to_buffer(
421+
supports_fast_decode_and_encode,
422+
buffer,
423+
&mut encoded_buffer,
424+
)?;
425+
// Write all data in `encoded_buffer` to `output`
426+
write_to_output(
427+
&mut line_wrapping,
428+
&mut encoded_buffer,
429+
output,
430+
false,
431+
wrap == Some(0),
432+
)?;
433+
in_reader.consume(buffer_len);
434+
} else if let Err(err) = buf_res {
435+
let kind = err.kind();
436+
437+
if kind == ErrorKind::Interrupted {
438+
// Retry reading
439+
continue;
474440
}
475-
});
476441

442+
return Err(USimpleError::new(1, format_read_error(kind)));
443+
}
444+
}
477445
// Cleanup
478446
// `input` has finished producing data, so the data remaining in the buffers needs to be encoded and printed
479-
{
480-
// Encode all remaining unencoded bytes, placing them in `encoded_buffer`
481-
supports_fast_decode_and_encode
482-
.encode_to_vec_deque(leftover_buffer.make_contiguous(), &mut encoded_buffer)?;
483-
484-
// Write all data in `encoded_buffer` to output
485-
// `is_cleanup` triggers special cleanup-only logic
486-
write_to_output(
487-
&mut line_wrapping,
488-
&mut encoded_buffer,
489-
output,
490-
true,
491-
wrap == Some(0),
492-
)?;
493-
}
447+
// Encode all remaining unencoded bytes, placing them in `encoded_buffer`
448+
supports_fast_decode_and_encode
449+
.encode_to_vec_deque(leftover_buffer.make_contiguous(), &mut encoded_buffer)?;
450+
// Write all data in `encoded_buffer` to output
451+
// `is_cleanup` triggers special cleanup-only logic
452+
write_to_output(
453+
&mut line_wrapping,
454+
&mut encoded_buffer,
455+
output,
456+
true,
457+
wrap == Some(0),
458+
)?;
494459
Ok(())
495460
}
496461
}
497462

498463
pub mod fast_decode {
499-
use std::io::{self, Write};
500-
use uucore::{encoding::SupportsFastDecodeAndEncode, error::UResult};
464+
use crate::base_common::format_read_error;
465+
use std::io::{self, BufRead, BufReader, ErrorKind, Read, Write};
466+
use uucore::{
467+
encoding::SupportsFastDecodeAndEncode,
468+
error::{UResult, USimpleError},
469+
};
501470

502471
// Start of helper functions
503472
fn alphabet_to_table(alphabet: &[u8], ignore_garbage: bool) -> [bool; 256] {
@@ -556,17 +525,15 @@ pub mod fast_decode {
556525
// End of helper functions
557526

558527
pub fn fast_decode(
559-
input: Vec<u8>,
528+
input: &mut dyn Read,
560529
output: &mut dyn Write,
561530
supports_fast_decode_and_encode: &dyn SupportsFastDecodeAndEncode,
562531
ignore_garbage: bool,
563532
) -> UResult<()> {
564533
const DECODE_IN_CHUNKS_OF_SIZE_MULTIPLE: usize = 1_024;
565-
566534
let alphabet = supports_fast_decode_and_encode.alphabet();
567535
let decode_in_chunks_of_size = supports_fast_decode_and_encode.valid_decoding_multiple()
568536
* DECODE_IN_CHUNKS_OF_SIZE_MULTIPLE;
569-
570537
assert!(decode_in_chunks_of_size > 0);
571538

572539
// Note that it's not worth using "data-encoding"'s ignore functionality if `ignore_garbage` is true, because
@@ -585,42 +552,71 @@ pub mod fast_decode {
585552

586553
// Decoded data that needs to be written to `output`
587554
let mut decoded_buffer = Vec::<u8>::new();
588-
555+
let mut buffers = [
556+
Vec::with_capacity(decode_in_chunks_of_size),
557+
Vec::with_capacity(decode_in_chunks_of_size),
558+
];
559+
let mut current_buffer_index = 0usize;
560+
let mut in_reader = BufReader::with_capacity(decode_in_chunks_of_size, input);
589561
// End of buffers
562+
loop {
563+
while buffers[current_buffer_index].len() < decode_in_chunks_of_size {
564+
let read_res = in_reader.fill_buf();
565+
if let Ok(read_buffer) = read_res {
566+
let read_size = read_buffer.len();
567+
if read_size == 0 {
568+
break;
569+
}
570+
// Filter and fill the valid buffer. When it is filled, we
571+
// switch buffer to avoid reading again.
572+
read_buffer
573+
.iter()
574+
.filter(|ch| table[usize::from(**ch)])
575+
.for_each(|ch| {
576+
if buffers[current_buffer_index].len() < decode_in_chunks_of_size {
577+
buffers[current_buffer_index].push(*ch);
578+
} else {
579+
buffers[(current_buffer_index + 1) % buffers.len()].push(*ch);
580+
}
581+
});
582+
in_reader.consume(read_size);
583+
} else if let Err(err) = read_res {
584+
let kind = err.kind();
585+
586+
if kind == ErrorKind::Interrupted {
587+
// Retry reading
588+
continue;
589+
}
590590

591-
let mut buffer = Vec::with_capacity(decode_in_chunks_of_size);
592-
593-
input
594-
.iter()
595-
.filter(|ch| table[usize::from(**ch)])
596-
.for_each(|ch| {
597-
buffer.push(*ch);
598-
// How many bytes to steal from `read_buffer` to get
599-
// `leftover_buffer` to the right size
600-
if buffer.len() == decode_in_chunks_of_size {
601-
assert_eq!(decode_in_chunks_of_size, buffer.len());
602-
// Decode data in chunks, then place it in `decoded_buffer`
603-
decode_in_chunks_to_buffer(
604-
supports_fast_decode_and_encode,
605-
&buffer,
606-
&mut decoded_buffer,
607-
)
608-
.unwrap();
609-
// Write all data in `decoded_buffer` to `output`
610-
write_to_output(&mut decoded_buffer, output).unwrap();
611-
buffer.clear();
591+
return Err(USimpleError::new(1, format_read_error(kind)));
612592
}
613-
});
614-
// Cleanup
615-
// `input` has finished producing data, so the data remaining in the buffers needs to be decoded and printed
616-
{
617-
// Decode all remaining encoded bytes, placing them in `decoded_buffer`
618-
supports_fast_decode_and_encode.decode_into_vec(&buffer, &mut decoded_buffer)?;
619-
593+
}
594+
if buffers[current_buffer_index].len() < decode_in_chunks_of_size {
595+
break;
596+
}
597+
assert_eq!(
598+
buffers[current_buffer_index].len(),
599+
decode_in_chunks_of_size
600+
);
601+
// Decode data in chunks, then place it in `decoded_buffer`
602+
decode_in_chunks_to_buffer(
603+
supports_fast_decode_and_encode,
604+
&buffers[current_buffer_index],
605+
&mut decoded_buffer,
606+
)?;
620607
// Write all data in `decoded_buffer` to `output`
621608
write_to_output(&mut decoded_buffer, output)?;
622-
}
623609

610+
buffers[current_buffer_index].clear();
611+
current_buffer_index = (current_buffer_index + 1) % buffers.len();
612+
}
613+
// Cleanup
614+
// `input` has finished producing data, so the data remaining in the buffers needs to be decoded and printed
615+
// Decode all remaining encoded bytes, placing them in `decoded_buffer`
616+
supports_fast_decode_and_encode
617+
.decode_into_vec(&buffers[current_buffer_index], &mut decoded_buffer)?;
618+
// Write all data in `decoded_buffer` to `output`
619+
write_to_output(&mut decoded_buffer, output)?;
624620
Ok(())
625621
}
626622
}
@@ -643,32 +639,3 @@ fn format_read_error(kind: ErrorKind) -> String {
643639

644640
translate!("base-common-read-error", "error" => kind_string_capitalized)
645641
}
646-
647-
#[cfg(test)]
648-
mod tests {
649-
use crate::base_common::read_and_has_padding;
650-
use std::io::Cursor;
651-
652-
#[test]
653-
fn test_has_padding() {
654-
let test_cases = vec![
655-
("aGVsbG8sIHdvcmxkIQ==", true),
656-
("aGVsbG8sIHdvcmxkIQ== ", true),
657-
("aGVsbG8sIHdvcmxkIQ==\n", true),
658-
("aGVsbG8sIHdvcmxkIQ== \n", true),
659-
("aGVsbG8sIHdvcmxkIQ=", true),
660-
("aGVsbG8sIHdvcmxkIQ= ", true),
661-
("aGVsbG8sIHdvcmxkIQ \n", false),
662-
("aGVsbG8sIHdvcmxkIQ", false),
663-
];
664-
665-
for (input, expected) in test_cases {
666-
let mut cursor = Cursor::new(input.as_bytes());
667-
assert_eq!(
668-
read_and_has_padding(&mut cursor).unwrap().0,
669-
expected,
670-
"Failed for input: '{input}'"
671-
);
672-
}
673-
}
674-
}

0 commit comments

Comments
 (0)