Skip to content

Commit 7a40dbb

Browse files
author
Devdutt Shenoi
committed
feat: replace StreamReader/StreamWriter with FileReader/FileWriter
1 parent 523ecc7 commit 7a40dbb

File tree

3 files changed

+46
-230
lines changed

3 files changed

+46
-230
lines changed

src/parseable/staging/reader.rs

Lines changed: 38 additions & 215 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,14 @@
1919

2020
use std::{
2121
fs::{remove_file, File},
22-
io::{self, BufReader, Read, Seek, SeekFrom},
22+
io::BufReader,
2323
path::PathBuf,
2424
sync::Arc,
25-
vec::IntoIter,
2625
};
2726

2827
use arrow_array::{RecordBatch, TimestampMillisecondArray};
29-
use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader};
28+
use arrow_ipc::reader::FileReader;
3029
use arrow_schema::Schema;
31-
use byteorder::{LittleEndian, ReadBytesExt};
3230
use itertools::kmerge_by;
3331
use tracing::error;
3432

@@ -39,7 +37,7 @@ use crate::{
3937

4038
#[derive(Debug)]
4139
pub struct MergedRecordReader {
42-
pub readers: Vec<StreamReader<BufReader<File>>>,
40+
pub readers: Vec<FileReader<BufReader<File>>>,
4341
}
4442

4543
impl MergedRecordReader {
@@ -53,7 +51,7 @@ impl MergedRecordReader {
5351
remove_file(file).unwrap();
5452
} else {
5553
let Ok(reader) =
56-
StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
54+
FileReader::try_new(BufReader::new(File::open(file).unwrap()), None)
5755
else {
5856
error!("Invalid file detected, ignoring it: {:?}", file);
5957
continue;
@@ -74,27 +72,6 @@ impl MergedRecordReader {
7472
)
7573
.unwrap()
7674
}
77-
}
78-
79-
#[derive(Debug)]
80-
pub struct MergedReverseRecordReader {
81-
pub readers: Vec<StreamReader<BufReader<OffsetReader<File>>>>,
82-
}
83-
84-
impl MergedReverseRecordReader {
85-
pub fn try_new(files: &[PathBuf]) -> Self {
86-
let mut readers = Vec::with_capacity(files.len());
87-
for file in files {
88-
let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else {
89-
error!("Invalid file detected, ignoring it: {:?}", file);
90-
continue;
91-
};
92-
93-
readers.push(reader);
94-
}
95-
96-
Self { readers }
97-
}
9875

9976
pub fn merged_iter(
10077
self,
@@ -111,15 +88,6 @@ impl MergedReverseRecordReader {
11188
.map(|batch| reverse(&batch))
11289
.map(move |batch| adapt_batch(&schema, &batch))
11390
}
114-
115-
pub fn merged_schema(&self) -> Schema {
116-
Schema::try_merge(
117-
self.readers
118-
.iter()
119-
.map(|reader| reader.schema().as_ref().clone()),
120-
)
121-
.unwrap()
122-
}
12391
}
12492

12593
fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option<String>) -> i64 {
@@ -138,6 +106,7 @@ fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option<String>) ->
138106
None => get_default_timestamp_millis(batch),
139107
}
140108
}
109+
141110
fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 {
142111
match batch
143112
.column(0)
@@ -157,172 +126,18 @@ fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 {
157126
}
158127
}
159128

160-
/// OffsetReader takes in a reader and list of offset and sizes and
161-
/// provides a reader over the file by reading only the offsets
162-
/// from start of the list to end.
163-
///
164-
/// Safety Invariant: Reader is already validated and all offset and limit are valid to read.
165-
///
166-
/// On empty list the reader returns no bytes read.
167-
pub struct OffsetReader<R: Read + Seek> {
168-
reader: R,
169-
offset_list: IntoIter<(u64, usize)>,
170-
current_offset: u64,
171-
current_size: usize,
172-
buffer: Vec<u8>,
173-
buffer_position: usize,
174-
finished: bool,
175-
}
176-
177-
impl<R: Read + Seek> OffsetReader<R> {
178-
fn new(reader: R, offset_list: Vec<(u64, usize)>) -> Self {
179-
let mut offset_list = offset_list.into_iter();
180-
let mut finished = false;
181-
182-
let (current_offset, current_size) = offset_list.next().unwrap_or_default();
183-
if current_offset == 0 && current_size == 0 {
184-
finished = true
185-
}
186-
187-
OffsetReader {
188-
reader,
189-
offset_list,
190-
current_offset,
191-
current_size,
192-
buffer: vec![0; 4096],
193-
buffer_position: 0,
194-
finished,
195-
}
196-
}
197-
}
198-
199-
impl<R: Read + Seek> Read for OffsetReader<R> {
200-
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
201-
let offset = self.current_offset;
202-
let size = self.current_size;
203-
204-
if self.finished {
205-
return Ok(0);
206-
}
207-
// on empty buffer load current data represented by
208-
// current_offset and current_size into self buffer
209-
if self.buffer_position == 0 {
210-
self.reader.seek(SeekFrom::Start(offset))?;
211-
// resize for current message
212-
if self.buffer.len() < size {
213-
self.buffer.resize(size, 0)
214-
}
215-
self.reader.read_exact(&mut self.buffer[0..size])?;
216-
}
217-
218-
let remaining_bytes = size - self.buffer_position;
219-
let max_read = usize::min(remaining_bytes, buf.len());
220-
221-
// Copy data from the buffer to the provided buffer
222-
let read_data = &self.buffer[self.buffer_position..self.buffer_position + max_read];
223-
buf[..max_read].copy_from_slice(read_data);
224-
225-
self.buffer_position += max_read;
226-
227-
if self.buffer_position >= size {
228-
// If we've read the entire section, move to the next offset
229-
match self.offset_list.next() {
230-
Some((offset, size)) => {
231-
self.current_offset = offset;
232-
self.current_size = size;
233-
self.buffer_position = 0;
234-
}
235-
None => {
236-
// iter is exhausted, no more read can be done
237-
self.finished = true
238-
}
239-
}
240-
}
241-
242-
Ok(max_read)
243-
}
244-
}
245-
246-
pub fn get_reverse_reader<T: Read + Seek>(
247-
mut reader: T,
248-
) -> Result<StreamReader<BufReader<OffsetReader<T>>>, io::Error> {
249-
let mut offset = 0;
250-
let mut messages = Vec::new();
251-
252-
while let Some(res) = find_limit_and_type(&mut reader).transpose() {
253-
match res {
254-
Ok((header, size)) => {
255-
messages.push((header, offset, size));
256-
offset += size;
257-
}
258-
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
259-
Err(err) => return Err(err),
260-
}
261-
}
262-
263-
// reverse everything leaving the first because it has schema message.
264-
messages[1..].reverse();
265-
let messages = messages
266-
.into_iter()
267-
.map(|(_, offset, size)| (offset as u64, size))
268-
.collect();
269-
270-
// reset reader
271-
reader.rewind()?;
272-
273-
Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap())
274-
}
275-
276-
// return limit for
277-
fn find_limit_and_type(
278-
reader: &mut (impl Read + Seek),
279-
) -> Result<Option<(MessageHeader, usize)>, io::Error> {
280-
let mut size = 0;
281-
let marker = reader.read_u32::<LittleEndian>()?;
282-
size += 4;
283-
284-
if marker != 0xFFFFFFFF {
285-
return Err(io::Error::new(
286-
io::ErrorKind::InvalidData,
287-
"Invalid Continuation Marker",
288-
));
289-
}
290-
291-
let metadata_size = reader.read_u32::<LittleEndian>()? as usize;
292-
size += 4;
293-
294-
if metadata_size == 0x00000000 {
295-
return Ok(None);
296-
}
297-
298-
let mut message = vec![0u8; metadata_size];
299-
reader.read_exact(&mut message)?;
300-
size += metadata_size;
301-
302-
let message = unsafe { root_as_message_unchecked(&message) };
303-
let header = message.header_type();
304-
let message_size = message.bodyLength();
305-
size += message_size as usize;
306-
307-
let padding = (8 - (size % 8)) % 8;
308-
reader.seek(SeekFrom::Current(padding as i64 + message_size))?;
309-
size += padding;
310-
311-
Ok(Some((header, size)))
312-
}
313-
314129
#[cfg(test)]
315130
mod tests {
316-
use std::{io::Cursor, sync::Arc};
131+
use std::{fs::File, path::Path, sync::Arc};
317132

318133
use arrow_array::{
319134
cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray,
320135
};
321-
use arrow_ipc::writer::{
322-
write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter,
136+
use arrow_ipc::{
137+
reader::FileReader,
138+
writer::{write_message, DictionaryTracker, FileWriter, IpcDataGenerator, IpcWriteOptions},
323139
};
324-
325-
use super::get_reverse_reader;
140+
use temp_dir::TempDir;
326141

327142
fn rb(rows: usize) -> RecordBatch {
328143
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..(rows as i64)));
@@ -339,42 +154,48 @@ mod tests {
339154
.unwrap()
340155
}
341156

342-
fn write_mem(rbs: &[RecordBatch]) -> Vec<u8> {
343-
let buf = Vec::new();
344-
let mut writer = StreamWriter::try_new(buf, &rbs[0].schema()).unwrap();
157+
fn write_file(rbs: &[RecordBatch], path: &Path) {
158+
let file = File::create(path).unwrap();
159+
let mut writer = FileWriter::try_new_buffered(file, &rbs[0].schema()).unwrap();
345160

346161
for rb in rbs {
347162
writer.write(rb).unwrap()
348163
}
349164

350-
writer.into_inner().unwrap()
165+
writer.finish().unwrap();
351166
}
352167

353168
#[test]
354169
fn test_empty_row() {
170+
let temp_dir = TempDir::new().unwrap();
171+
let path = temp_dir.path().join("test.arrows");
355172
let rb = rb(0);
356-
let buf = write_mem(&[rb]);
357-
let reader = Cursor::new(buf);
358-
let mut reader = get_reverse_reader(reader).unwrap();
173+
write_file(&[rb], &path);
174+
let reader = File::open(path).unwrap();
175+
let mut reader = FileReader::try_new_buffered(reader, None).unwrap();
359176
let rb = reader.next().unwrap().unwrap();
360177
assert_eq!(rb.num_rows(), 0);
361178
}
362179

363180
#[test]
364181
fn test_one_row() {
182+
let temp_dir = TempDir::new().unwrap();
183+
let path = temp_dir.path().join("test.arrows");
365184
let rb = rb(1);
366-
let buf = write_mem(&[rb]);
367-
let reader = Cursor::new(buf);
368-
let mut reader = get_reverse_reader(reader).unwrap();
185+
write_file(&[rb], &path);
186+
let reader = File::open(path).unwrap();
187+
let mut reader = FileReader::try_new_buffered(reader, None).unwrap();
369188
let rb = reader.next().unwrap().unwrap();
370189
assert_eq!(rb.num_rows(), 1);
371190
}
372191

373192
#[test]
374193
fn test_multiple_row_multiple_rbs() {
375-
let buf = write_mem(&[rb(1), rb(2), rb(3)]);
376-
let reader = Cursor::new(buf);
377-
let mut reader = get_reverse_reader(reader).unwrap();
194+
let temp_dir = TempDir::new().unwrap();
195+
let path = temp_dir.path().join("test.arrows");
196+
write_file(&[rb(1), rb(2), rb(3)], &path);
197+
let reader = File::open(path).unwrap();
198+
let mut reader = FileReader::try_new_buffered(reader, None).unwrap();
378199
let rb = reader.next().unwrap().unwrap();
379200
assert_eq!(rb.num_rows(), 3);
380201
let col1_val: Vec<i64> = rb
@@ -394,40 +215,42 @@ mod tests {
394215

395216
#[test]
396217
fn manual_write() {
218+
let temp_dir = TempDir::new().unwrap();
219+
let path = temp_dir.path().join("test.arrows");
397220
let error_on_replacement = true;
398221
let options = IpcWriteOptions::default();
399222
let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement);
400223
let data_gen = IpcDataGenerator {};
401-
402-
let mut buf = Vec::new();
224+
let mut file = File::create(&path).unwrap();
403225
let rb1 = rb(1);
404226

405227
let schema = data_gen.schema_to_bytes_with_dictionary_tracker(
406228
&rb1.schema(),
407229
&mut dictionary_tracker,
408230
&options,
409231
);
410-
write_message(&mut buf, schema, &options).unwrap();
232+
write_message(&mut file, schema, &options).unwrap();
411233

412234
for i in (1..=3).cycle().skip(1).take(10000) {
413235
let (_, encoded_message) = data_gen
414236
.encoded_batch(&rb(i), &mut dictionary_tracker, &options)
415237
.unwrap();
416-
write_message(&mut buf, encoded_message, &options).unwrap();
238+
write_message(&mut file, encoded_message, &options).unwrap();
417239
}
418240

419241
let schema = data_gen.schema_to_bytes_with_dictionary_tracker(
420242
&rb1.schema(),
421243
&mut dictionary_tracker,
422244
&options,
423245
);
424-
write_message(&mut buf, schema, &options).unwrap();
246+
write_message(&mut file, schema, &options).unwrap();
425247

426-
let buf = Cursor::new(buf);
427-
let reader = get_reverse_reader(buf).unwrap().flatten();
248+
let reader = File::open(path).unwrap();
249+
let reader = FileReader::try_new_buffered(reader, None).unwrap();
428250

429251
let mut sum = 0;
430252
for rb in reader {
253+
let rb = rb.unwrap();
431254
sum += 1;
432255
assert!(rb.num_rows() > 0);
433256
}

0 commit comments

Comments
 (0)