Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 30 additions & 89 deletions crates/file_store/src/entry_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ use std::{

use crate::bincode_options;

type EntryReader<'t> = CountingReader<BufReader<&'t mut File>>;

/// Iterator over entries in a file store.
///
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
///
/// [`next`]: Self::next
pub struct EntryIter<'t, T> {
db_file: Option<EntryReader<'t>>,

/// Buffered reader around the file
db_file: BufReader<&'t mut File>,
finished: bool,
/// The file position for the first read of `db_file`.
start_pos: Option<u64>,
types: PhantomData<T>,
Expand All @@ -26,8 +25,9 @@ pub struct EntryIter<'t, T> {
impl<'t, T> EntryIter<'t, T> {
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
Self {
db_file: Some(CountingReader::new(BufReader::new(db_file))),
db_file: BufReader::new(db_file),
start_pos: Some(start_pos),
finished: false,
types: PhantomData,
}
}
Expand All @@ -40,45 +40,34 @@ where
type Item = Result<T, IterError>;

fn next(&mut self) -> Option<Self::Item> {
// closure which reads a single entry starting from `self.pos`
let read_one =
|f: &mut EntryReader, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
if let Some(pos) = start_pos {
f.seek(io::SeekFrom::Start(pos))?;
}
match bincode_options().deserialize_from(&mut *f) {
Ok(changeset) => {
f.clear_count();
Ok(Some(changeset))
}
Err(e) => {
// allow unexpected EOF if 0 bytes were read
if let bincode::ErrorKind::Io(inner) = &*e {
if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 {
f.clear_count();
return Ok(None);
}
if self.finished {
return None;
}
(|| {
if let Some(start) = self.start_pos.take() {
self.db_file.seek(io::SeekFrom::Start(start))?;
}

let pos_before_read = self.db_file.stream_position()?;
match bincode_options().deserialize_from(&mut self.db_file) {
Ok(changeset) => Ok(Some(changeset)),
Err(e) => {
self.finished = true;
let pos_after_read = self.db_file.stream_position()?;
// allow unexpected EOF if 0 bytes were read
if let bincode::ErrorKind::Io(inner) = &*e {
if inner.kind() == io::ErrorKind::UnexpectedEof
&& pos_after_read == pos_before_read
{
return Ok(None);
}
f.rewind()?;
Err(IterError::Bincode(*e))
}
self.db_file.seek(io::SeekFrom::Start(pos_before_read))?;
Err(IterError::Bincode(*e))
}
};
let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
if result.is_err() {
self.db_file = None;
}
result.transpose()
}
}

impl<'t, T> Drop for EntryIter<'t, T> {
fn drop(&mut self) {
if let Some(r) = self.db_file.as_mut() {
// This syncs the underlying file's offset with the buffer's position. This way, no data
// is lost with future reads.
let _ = r.stream_position();
}
}
})()
.transpose()
}
}
Copy link
Member

@evanlinjin evanlinjin Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need a Drop implementation that syncs the underlying file's offset. However, we should use self.seek(SeekFrom::Current(0)).

Copy link
Collaborator Author

@LLFourn LLFourn Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a test that demonstrates this need? I think I understand the rationale I'll see if I can come up one.

[EDIT] or maybe we can use a BufReader in the main type (not just the iterator).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LLFourn I don't think using a BufReader with the internal file getting written to is a good idea. We'll need to do some crazy stuff with .get_mut and flush the buffer with every write.

I thought that the test I added demonstrated the need for the Drop impl but clearly not. We need a test to write after a failed read without closing the file in-between.


Expand Down Expand Up @@ -107,51 +96,3 @@ impl From<io::Error> for IterError {
}

impl std::error::Error for IterError {}

/// A wrapped [`Reader`] which counts total bytes read.
struct CountingReader<R> {
r: R,
n: u64,
}

impl<R> CountingReader<R> {
fn new(file: R) -> Self {
Self { r: file, n: 0 }
}

/// Counted bytes read.
fn count(&self) -> u64 {
self.n
}

/// Clear read count.
fn clear_count(&mut self) {
self.n = 0;
}
}

impl<R: io::Seek> CountingReader<R> {
/// Rewind file descriptor offset to before all counted read operations. Then clear the read
/// count.
fn rewind(&mut self) -> io::Result<u64> {
let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?;
self.n = 0;
Ok(read)
}
}

impl<R: io::Read> io::Read for CountingReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let read = self.r.read(&mut *buf)?;
self.n += read as u64;
Ok(read)
}
}

impl<R: io::Seek> io::Seek for CountingReader<R> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
let res = self.r.seek(pos);
self.n = 0;
res
}
}