Skip to content

Commit ba0aa34

Browse files
committed
Make FtpStream::get read the final response
Once the client has finished reading a file from the server, the server sends a status message to the client. The current API of FtpStream::get doesn't allow us to read that response, since we return a BufReader that reads directly from the data stream. Instead of doing this, we can return a new AsyncRead type (FileReader) that wraps the data stream and reads the status message once the data stream is finished. There is one caveat to this - if the FileReader is dropped then the data stream will be closed, and the server will send us an error message, which still needs to be read manually. There isn't really a nice way around this without an AsyncDrop trait. Since this is already a breaking change, we also no longer use a BufReader at all - this lets the caller decide whether to buffer or not.
1 parent da79249 commit ba0aa34

File tree

3 files changed

+104
-5
lines changed

3 files changed

+104
-5
lines changed

src/file_reader.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use std::{
2+
future::Future,
3+
io, mem,
4+
pin::Pin,
5+
task::{Context, Poll},
6+
};
7+
8+
use tokio::io::{AsyncRead, ReadBuf};
9+
10+
use crate::{status, DataStream, FtpStream};
11+
12+
pub struct FileReader<'a> {
13+
state: State<'a>,
14+
}
15+
16+
enum State<'a> {
17+
Stream {
18+
data_stream: DataStream,
19+
ftp_stream: &'a mut FtpStream,
20+
},
21+
FinalRead(Pin<Box<dyn 'a + Future<Output = io::Result<()>>>>),
22+
Finished,
23+
}
24+
25+
impl FileReader<'_> {
26+
pub(crate) fn new(data_stream: DataStream, ftp_stream: &mut FtpStream) -> FileReader {
27+
FileReader {
28+
state: State::Stream {
29+
data_stream,
30+
ftp_stream,
31+
},
32+
}
33+
}
34+
}
35+
36+
impl AsyncRead for FileReader<'_> {
37+
fn poll_read(
38+
mut self: Pin<&mut Self>,
39+
cx: &mut Context<'_>,
40+
buf: &mut ReadBuf<'_>,
41+
) -> Poll<io::Result<()>> {
42+
let bytes_read_before = buf.filled().len();
43+
let (state, result) = match mem::replace(&mut self.state, State::Finished) {
44+
State::Stream {
45+
mut data_stream,
46+
ftp_stream,
47+
} => match Pin::new(&mut data_stream).poll_read(cx, buf) {
48+
Poll::Ready(result) => {
49+
let bytes_read_after = buf.filled().len();
50+
if bytes_read_after == bytes_read_before {
51+
// finished reading the file, wait for a status message from the server
52+
let mut status_fut = Box::pin(async move {
53+
ftp_stream
54+
.read_response_in(&[
55+
status::CLOSING_DATA_CONNECTION,
56+
status::REQUESTED_FILE_ACTION_OK,
57+
])
58+
.await
59+
.map(|_| ())
60+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
61+
.and(result)
62+
});
63+
match Pin::new(&mut status_fut).poll(cx) {
64+
Poll::Ready(r) => (State::Finished, Poll::Ready(r)),
65+
Poll::Pending => (State::FinalRead(status_fut), Poll::Pending),
66+
}
67+
} else {
68+
(
69+
State::Stream {
70+
data_stream,
71+
ftp_stream,
72+
},
73+
Poll::Ready(result),
74+
)
75+
}
76+
}
77+
Poll::Pending => (
78+
State::Stream {
79+
data_stream,
80+
ftp_stream,
81+
},
82+
Poll::Pending,
83+
),
84+
},
85+
State::FinalRead(mut status_fut) => match Pin::new(&mut status_fut).poll(cx) {
86+
Poll::Ready(r) => (State::Finished, Poll::Ready(r)),
87+
Poll::Pending => (State::FinalRead(status_fut), Poll::Pending),
88+
},
89+
State::Finished => panic!("poll called on finished FileReader"),
90+
};
91+
92+
self.state = state;
93+
result
94+
}
95+
}

src/ftp.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use tokio::net::{TcpStream, ToSocketAddrs};
1717
use tokio_rustls::{rustls::ClientConfig, rustls::ServerName, TlsConnector};
1818

1919
use crate::data_stream::DataStream;
20+
use crate::file_reader::FileReader;
2021
use crate::status;
2122
use crate::types::{FileType, FtpError, Line, Result};
2223

@@ -316,14 +317,15 @@ impl FtpStream {
316317

317318
/// Retrieves the file name specified from the server.
318319
/// This method is a more complicated way to retrieve a file.
319-
/// The reader returned should be dropped.
320-
/// Also you will have to read the response to make sure it has the correct value.
321-
pub async fn get(&mut self, file_name: &str) -> Result<BufReader<DataStream>> {
320+
///
321+
/// If the reader is dropped before the file is fully read, the server will send a error message that
322+
/// should be read with [`Self::read_response`]/[`Self::read_response_in`].
323+
pub async fn get(&mut self, file_name: &str) -> Result<FileReader<'_>> {
322324
let retr_command = format!("RETR {}\r\n", file_name);
323-
let data_stream = BufReader::new(self.data_command(&retr_command).await?);
325+
let data_stream = self.data_command(&retr_command).await?;
324326
self.read_response_in(&[status::ABOUT_TO_SEND, status::ALREADY_OPEN])
325327
.await?;
326-
Ok(data_stream)
328+
Ok(FileReader::new(data_stream, self))
327329
}
328330

329331
/// Renames the file from_name to to_name

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,12 @@
5454
//!
5555
5656
mod data_stream;
57+
mod file_reader;
5758
mod ftp;
5859
pub mod status;
5960
pub mod types;
6061

6162
pub use self::data_stream::DataStream;
63+
pub use self::file_reader::FileReader;
6264
pub use self::ftp::FtpStream;
6365
pub use self::types::FtpError;

0 commit comments

Comments
 (0)