Skip to content

Commit 02ff642

Browse files
committed
Async support with tokio
1 parent ff2242a commit 02ff642

File tree

10 files changed

+559
-422
lines changed

10 files changed

+559
-422
lines changed

Cargo.toml

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,21 @@
11
[package]
2-
name = "ftp"
3-
version = "3.0.1"
4-
authors = ["Matt McCoy <[email protected]>"]
5-
documentation = "https://docs.rs/ftp/"
6-
repository = "https://github.com/mattnenterprise/rust-ftp"
2+
name = "async_ftp"
3+
version = "4.0.0"
4+
authors = ["Daniel García <[email protected]>", "Matt McCoy <[email protected]>"]
5+
documentation = "https://docs.rs/async_ftp/"
6+
repository = "https://github.com/dani-garcia/rust_async_ftp"
77
description = "FTP client for Rust"
88
readme = "README.md"
99
license = "Apache-2.0/MIT"
1010
edition = "2018"
1111
keywords = ["ftp"]
1212
categories = ["network-programming"]
1313

14-
[badges]
15-
travis-ci = { repository = "mattnenterprise/rust-ftp" }
16-
17-
[lib]
18-
name ="ftp"
19-
path = "src/lib.rs"
20-
2114
[features]
15+
# default = ["secure"]
16+
2217
# Enable support of FTPS which requires openssl
23-
secure = ["openssl"]
18+
secure = ["tokio-rustls"]
2419

2520
# Add debug output (to STDOUT) of commands sent to the server
2621
# and lines read from the server
@@ -31,6 +26,6 @@ lazy_static = "1.4.0"
3126
regex = "1.3.9"
3227
chrono = "0.4.11"
3328

34-
[dependencies.openssl]
35-
version = "0.10.29"
36-
optional = true
29+
tokio = { version = "0.2.21", features = ["tcp", "dns", "io-util"] }
30+
tokio-rustls = { version = "0.13.1", optional = true }
31+
pin-project = "0.4.17"

README.md

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,36 +21,43 @@ ftp = { version = "<version>", features = ["secure"] }
2121

2222
## Usage
2323
```rust
24-
extern crate ftp;
25-
2624
use std::str;
2725
use std::io::Cursor;
28-
use ftp::FtpStream;
26+
use async_ftp::FtpStream;
2927

30-
fn main() {
28+
async fn async_main() -> Result<(), Box<dyn std::error::Error>> {
3129
// Create a connection to an FTP server and authenticate to it.
32-
let mut ftp_stream = FtpStream::connect("127.0.0.1:21").unwrap();
33-
let _ = ftp_stream.login("username", "password").unwrap();
30+
let mut ftp_stream = FtpStream::connect("172.25.82.139:21").await?;
31+
let _ = ftp_stream.login("username", "password").await?;
3432

3533
// Get the current directory that the client will be reading from and writing to.
36-
println!("Current directory: {}", ftp_stream.pwd().unwrap());
34+
println!("Current directory: {}", ftp_stream.pwd().await?);
3735

3836
// Change into a new directory, relative to the one we are currently in.
39-
let _ = ftp_stream.cwd("test_data").unwrap();
37+
let _ = ftp_stream.cwd("test_data").await?;
4038

4139
// Retrieve (GET) a file from the FTP server in the current working directory.
42-
let remote_file = ftp_stream.simple_retr("ftpext-charter.txt").unwrap();
43-
println!("Read file with contents\n{}\n", str::from_utf8(&remote_file.into_inner()).unwrap());
40+
let remote_file = ftp_stream.simple_retr("ftpext-charter.txt").await?;
41+
println!("Read file with contents\n{}\n", str::from_utf8(&remote_file.into_inner()).await?);
4442

4543
// Store (PUT) a file from the client to the current working directory of the server.
4644
let mut reader = Cursor::new("Hello from the Rust \"ftp\" crate!".as_bytes());
47-
let _ = ftp_stream.put("greeting.txt", &mut reader);
45+
let _ = ftp_stream.put("greeting.txt", &mut reader).await?;
4846
println!("Successfully wrote greeting.txt");
4947

5048
// Terminate the connection to the server.
5149
let _ = ftp_stream.quit();
5250
}
5351

52+
fn main() -> Result<(), Box<dyn std::error::Error>> {
53+
tokio::runtime::Builder::new()
54+
.threaded_scheduler()
55+
.enable_all()
56+
.build()
57+
.unwrap()
58+
.block_on(async_main())
59+
}
60+
5461
```
5562

5663
## License

examples/connecting.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,37 @@
1-
extern crate ftp;
2-
3-
use std::str;
1+
use async_ftp::{FtpError, FtpStream};
42
use std::io::Cursor;
5-
use ftp::{FtpStream, FtpError};
3+
use std::str;
64

7-
fn test_ftp(addr: &str, user: &str, pass: &str) -> Result<(), FtpError> {
8-
let mut ftp_stream = FtpStream::connect((addr, 21)).unwrap();
9-
ftp_stream.login(user, pass).unwrap();
10-
println!("current dir: {}", ftp_stream.pwd().unwrap());
5+
async fn test_ftp(addr: &str, user: &str, pass: &str) -> Result<(), FtpError> {
6+
let mut ftp_stream = FtpStream::connect((addr, 21)).await?;
7+
ftp_stream.login(user, pass).await?;
8+
println!("current dir: {}", ftp_stream.pwd().await?);
119

12-
ftp_stream.cwd("test_data").unwrap();
10+
ftp_stream.cwd("test_data").await?;
1311

1412
// An easy way to retrieve a file
15-
let cursor = ftp_stream.simple_retr("ftpext-charter.txt").unwrap();
13+
let cursor = ftp_stream.simple_retr("ftpext-charter.txt").await?;
1614
let vec = cursor.into_inner();
1715
let text = str::from_utf8(&vec).unwrap();
1816
println!("got data: {}", text);
1917

2018
// Store a file
2119
let file_data = format!("Some awesome file data man!!");
2220
let mut reader = Cursor::new(file_data.into_bytes());
23-
ftp_stream.put("my_random_file.txt", &mut reader).unwrap();
21+
ftp_stream.put("my_random_file.txt", &mut reader).await?;
2422

25-
ftp_stream.quit()
23+
ftp_stream.quit().await
2624
}
2725

2826
fn main() {
29-
test_ftp("127.0.0.1", "anonymous", "[email protected]").unwrap();
27+
let future = test_ftp("172.25.82.139", "anonymous", "[email protected]");
28+
29+
tokio::runtime::Builder::new()
30+
.enable_all()
31+
.build()
32+
.unwrap()
33+
.block_on(future)
34+
.unwrap();
35+
3036
println!("test successful")
3137
}

src/data_stream.rs

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1-
use std::io::{Read, Write, Result};
2-
use std::net::TcpStream;
1+
use std::io;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
use tokio::io::{AsyncRead, AsyncWrite};
5+
use tokio::net::TcpStream;
36
#[cfg(feature = "secure")]
4-
use openssl::ssl::SslStream;
5-
7+
use tokio_rustls::client::TlsStream;
68

79
/// Data Stream used for communications
8-
#[derive(Debug)]
10+
#[pin_project::pin_project(project = DataStreamProj)]
911
pub enum DataStream {
10-
Tcp(TcpStream),
12+
Tcp(#[pin] TcpStream),
1113
#[cfg(feature = "secure")]
12-
Ssl(SslStream<TcpStream>),
14+
Ssl(#[pin] TlsStream<TcpStream>),
1315
}
1416

1517
impl DataStream {
@@ -18,54 +20,69 @@ impl DataStream {
1820
match self {
1921
DataStream::Tcp(stream) => stream,
2022
#[cfg(feature = "secure")]
21-
DataStream::Ssl(stream) => stream.get_ref().try_clone().unwrap(),
23+
DataStream::Ssl(stream) => stream.into_inner().0,
2224
}
2325
}
2426

2527
/// Test if the stream is secured
2628
pub fn is_ssl(&self) -> bool {
2729
match self {
2830
#[cfg(feature = "secure")]
29-
&DataStream::Ssl(_) => true,
30-
_ => false
31+
DataStream::Ssl(_) => true,
32+
_ => false,
3133
}
3234
}
3335

3436
/// Returns a reference to the underlying TcpStream.
3537
pub fn get_ref(&self) -> &TcpStream {
3638
match self {
37-
&DataStream::Tcp(ref stream) => stream,
39+
DataStream::Tcp(ref stream) => stream,
3840
#[cfg(feature = "secure")]
39-
&DataStream::Ssl(ref stream) => stream.get_ref(),
41+
DataStream::Ssl(ref stream) => stream.get_ref().0,
4042
}
4143
}
4244
}
4345

44-
impl Read for DataStream {
45-
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
46-
match self {
47-
&mut DataStream::Tcp(ref mut stream) => stream.read(buf),
46+
impl AsyncRead for DataStream {
47+
fn poll_read(
48+
self: Pin<&mut Self>,
49+
cx: &mut Context<'_>,
50+
buf: &mut [u8],
51+
) -> Poll<io::Result<usize>> {
52+
match self.project() {
53+
DataStreamProj::Tcp(stream) => stream.poll_read(cx, buf),
4854
#[cfg(feature = "secure")]
49-
&mut DataStream::Ssl(ref mut stream) => stream.read(buf),
55+
DataStreamProj::Ssl(stream) => stream.poll_read(cx, buf),
5056
}
5157
}
5258
}
5359

60+
impl AsyncWrite for DataStream {
61+
fn poll_write(
62+
self: Pin<&mut Self>,
63+
cx: &mut Context<'_>,
64+
buf: &[u8],
65+
) -> Poll<io::Result<usize>> {
66+
match self.project() {
67+
DataStreamProj::Tcp(stream) => stream.poll_write(cx, buf),
68+
#[cfg(feature = "secure")]
69+
DataStreamProj::Ssl(stream) => stream.poll_write(cx, buf),
70+
}
71+
}
5472

55-
impl Write for DataStream {
56-
fn write(&mut self, buf: &[u8]) -> Result<usize> {
57-
match self {
58-
&mut DataStream::Tcp(ref mut stream) => stream.write(buf),
73+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
74+
match self.project() {
75+
DataStreamProj::Tcp(stream) => stream.poll_flush(cx),
5976
#[cfg(feature = "secure")]
60-
&mut DataStream::Ssl(ref mut stream) => stream.write(buf),
77+
DataStreamProj::Ssl(stream) => stream.poll_flush(cx),
6178
}
6279
}
6380

64-
fn flush(&mut self) -> Result<()> {
65-
match self {
66-
&mut DataStream::Tcp(ref mut stream) => stream.flush(),
81+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
82+
match self.project() {
83+
DataStreamProj::Tcp(stream) => stream.poll_shutdown(cx),
6784
#[cfg(feature = "secure")]
68-
&mut DataStream::Ssl(ref mut stream) => stream.flush(),
85+
DataStreamProj::Ssl(stream) => stream.poll_shutdown(cx),
6986
}
7087
}
7188
}

0 commit comments

Comments
 (0)