Skip to content

Commit 79e0208

Browse files
committed
feat(file_store)!: optimize EntryIter by reducing syscalls
* Wrap file reader with `BufReader`. This reduces calls to `read`. * Wrap file reader with `CountingReader`. This counts the bytes read by the underlying reader. We can rewind without seeking first.
1 parent 04809e9 commit 79e0208

File tree

1 file changed

+83
-26
lines changed

1 file changed

+83
-26
lines changed
Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
use bincode::Options;
22
use std::{
33
fs::File,
4-
io::{self, Seek},
4+
io::{self, BufReader, Seek},
55
marker::PhantomData,
66
};
77

88
use crate::bincode_options;
99

10+
type EntryReader<'t> = CountingReader<BufReader<&'t mut File>>;
11+
1012
/// Iterator over entries in a file store.
1113
///
1214
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
1315
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
1416
///
1517
/// [`next`]: Self::next
1618
pub struct EntryIter<'t, T> {
17-
db_file: Option<&'t mut File>,
19+
db_file: Option<EntryReader<'t>>,
1820

1921
/// The file position for the first read of `db_file`.
2022
start_pos: Option<u64>,
@@ -24,7 +26,7 @@ pub struct EntryIter<'t, T> {
2426
impl<'t, T> EntryIter<'t, T> {
2527
pub fn new(start_pos: u64, db_file: &'t mut File) -> Self {
2628
Self {
27-
db_file: Some(db_file),
29+
db_file: Some(CountingReader::new(BufReader::new(db_file))),
2830
start_pos: Some(start_pos),
2931
types: PhantomData,
3032
}
@@ -39,32 +41,29 @@ where
3941

4042
fn next(&mut self) -> Option<Self::Item> {
4143
// closure which reads a single entry starting from `self.pos`
42-
let read_one = |f: &mut File, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
43-
let pos = match start_pos {
44-
Some(pos) => f.seek(io::SeekFrom::Start(pos))?,
45-
None => f.stream_position()?,
46-
};
47-
48-
match bincode_options().deserialize_from(&*f) {
49-
Ok(changeset) => {
50-
f.stream_position()?;
51-
Ok(Some(changeset))
44+
let read_one =
45+
|f: &mut EntryReader, start_pos: Option<u64>| -> Result<Option<T>, IterError> {
46+
if let Some(pos) = start_pos {
47+
f.seek(io::SeekFrom::Start(pos))?;
5248
}
53-
Err(e) => {
54-
if let bincode::ErrorKind::Io(inner) = &*e {
55-
if inner.kind() == io::ErrorKind::UnexpectedEof {
56-
let eof = f.seek(io::SeekFrom::End(0))?;
57-
if pos == eof {
49+
match bincode_options().deserialize_from(&mut *f) {
50+
Ok(changeset) => {
51+
f.clear_count();
52+
Ok(Some(changeset))
53+
}
54+
Err(e) => {
55+
// allow unexpected EOF if 0 bytes were read
56+
if let bincode::ErrorKind::Io(inner) = &*e {
57+
if inner.kind() == io::ErrorKind::UnexpectedEof && f.count() == 0 {
58+
f.clear_count();
5859
return Ok(None);
5960
}
6061
}
62+
f.rewind()?;
63+
Err(IterError::Bincode(*e))
6164
}
62-
f.seek(io::SeekFrom::Start(pos))?;
63-
Err(IterError::Bincode(*e))
6465
}
65-
}
66-
};
67-
66+
};
6867
let result = read_one(self.db_file.as_mut()?, self.start_pos.take());
6968
if result.is_err() {
7069
self.db_file = None;
@@ -73,9 +72,13 @@ where
7372
}
7473
}
7574

76-
impl From<io::Error> for IterError {
77-
fn from(value: io::Error) -> Self {
78-
IterError::Io(value)
75+
impl<'t, T> Drop for EntryIter<'t, T> {
76+
fn drop(&mut self) {
77+
if let Some(r) = self.db_file.as_mut() {
78+
// This syncs the underlying file's offset with the buffer's position. This way, no data
79+
// is lost with future reads.
80+
let _ = r.stream_position();
81+
}
7982
}
8083
}
8184

@@ -97,4 +100,58 @@ impl core::fmt::Display for IterError {
97100
}
98101
}
99102

103+
impl From<io::Error> for IterError {
104+
fn from(value: io::Error) -> Self {
105+
IterError::Io(value)
106+
}
107+
}
108+
100109
impl std::error::Error for IterError {}
110+
111+
/// A wrapped [`Reader`] which counts total bytes read.
112+
struct CountingReader<R> {
113+
r: R,
114+
n: u64,
115+
}
116+
117+
impl<R> CountingReader<R> {
118+
fn new(file: R) -> Self {
119+
Self { r: file, n: 0 }
120+
}
121+
122+
/// Counted bytes read.
123+
fn count(&self) -> u64 {
124+
self.n
125+
}
126+
127+
/// Clear read count.
128+
fn clear_count(&mut self) {
129+
self.n = 0;
130+
}
131+
}
132+
133+
impl<R: io::Seek> CountingReader<R> {
134+
/// Rewind file descriptor offset to before all counted read operations. Then clear the read
135+
/// count.
136+
fn rewind(&mut self) -> io::Result<u64> {
137+
let read = self.r.seek(std::io::SeekFrom::Current(-(self.n as i64)))?;
138+
self.n = 0;
139+
Ok(read)
140+
}
141+
}
142+
143+
impl<R: io::Read> io::Read for CountingReader<R> {
144+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
145+
let read = self.r.read(&mut *buf)?;
146+
self.n += read as u64;
147+
Ok(read)
148+
}
149+
}
150+
151+
impl<R: io::Seek> io::Seek for CountingReader<R> {
152+
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
153+
let res = self.r.seek(pos);
154+
self.n = 0;
155+
res
156+
}
157+
}

0 commit comments

Comments
 (0)