Skip to content

Commit b820ffd

Browse files
committed
Add StrealReader to allow stream download/unpack in client
1 parent 5fe22b9 commit b820ffd

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

mithril-client/src/utils/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
//! This module contains tools needed mostly in services layers.
33
44
mod progress_reporter;
5+
mod stream_reader;
56
mod unpacker;
67

78
pub use progress_reporter::*;
9+
pub use stream_reader::*;
810
pub use unpacker::*;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use flume::Receiver;
2+
use std::io;
3+
4+
// Credits and many thanks to https://stackoverflow.com/a/69967522 for most of this code
5+
6+
/// A channel receiver that implement [io::Read].
7+
pub struct StreamReader {
8+
receiver: Receiver<Vec<u8>>,
9+
current: io::Cursor<Vec<u8>>,
10+
}
11+
12+
impl StreamReader {
13+
/// [StreamReader] factory
14+
pub fn new(receiver: Receiver<Vec<u8>>) -> Self {
15+
Self {
16+
receiver,
17+
current: io::Cursor::new(vec![]),
18+
}
19+
}
20+
}
21+
22+
impl io::Read for StreamReader {
23+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
24+
if self.current.position() == self.current.get_ref().len() as u64 {
25+
// We've exhausted the previous chunk, get a new one.
26+
if let Ok(vec) = self.receiver.recv() {
27+
self.current = io::Cursor::new(vec);
28+
}
29+
// If recv() "fails", it means the sender closed its part of
30+
// the channel, which means EOF. Propagate EOF by allowing
31+
// a read from the exhausted cursor.
32+
}
33+
self.current.read(buf)
34+
}
35+
}

0 commit comments

Comments
 (0)