Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Commit 42ed08b

Browse files
authored
New futures (#1)
* update deps Signed-off-by: Yoshua Wuyts <[email protected]> * update to Context API Signed-off-by: Yoshua Wuyts <[email protected]> * Pin Box own types Signed-off-by: Yoshua Wuyts <[email protected]> * more poll Signed-off-by: Yoshua Wuyts <[email protected]> * pin runtime-{native,tokio} Signed-off-by: Yoshua Wuyts <[email protected]> * fix benches & unsafe warning Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent 3e132e9 commit 42ed08b

File tree

17 files changed

+165
-114
lines changed

17 files changed

+165
-114
lines changed

benches/baseline.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod baseline {
66
use futures::executor;
77
use futures::prelude::*;
88
use std::pin::Pin;
9-
use std::task::{Poll, Waker};
9+
use std::task::{Context, Poll};
1010

1111
#[bench]
1212
fn smoke(b: &mut test::Bencher) {
@@ -28,13 +28,13 @@ mod baseline {
2828
impl Future for Task {
2929
type Output = ();
3030

31-
fn poll(mut self: Pin<&mut Self>, w: &Waker) -> Poll<Self::Output> {
31+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3232
self.depth += 1;
3333

3434
if self.depth == 300 {
3535
Poll::Ready(())
3636
} else {
37-
w.wake();
37+
cx.waker().wake_by_ref();
3838
Poll::Pending
3939
}
4040
}
@@ -142,8 +142,8 @@ mod baseline {
142142
impl<T> Future for JoinHandle<T> {
143143
type Output = T;
144144

145-
fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
146-
match self.rx.poll_unpin(waker) {
145+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146+
match self.rx.poll_unpin(cx) {
147147
Poll::Pending => Poll::Pending,
148148
Poll::Ready(Ok(t)) => Poll::Ready(t),
149149
Poll::Ready(Err(_)) => panic!(), // TODO: Is this OK? Print a better error message?

benches/common/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ macro_rules! benchmark_suite {
66
#[runtime::bench($rt)]
77
async fn notify_self() {
88
use futures::future::Future;
9-
use futures::task::{Poll, Waker};
9+
use futures::task::{Context, Poll};
1010
use std::pin::Pin;
1111

1212
struct Task {
@@ -16,13 +16,13 @@ macro_rules! benchmark_suite {
1616
impl Future for Task {
1717
type Output = ();
1818

19-
fn poll(mut self: Pin<&mut Self>, w: &Waker) -> Poll<Self::Output> {
19+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2020
self.depth += 1;
2121

2222
if self.depth == 300 {
2323
Poll::Ready(())
2424
} else {
25-
w.wake();
25+
cx.waker().wake_by_ref();
2626
Poll::Pending
2727
}
2828
}

runtime-native/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ async-datagram = "2.0.0"
1212
futures-preview = "0.3.0-alpha.13"
1313
lazy_static = "1.0.0"
1414
mio = "0.6.16"
15-
romio = "0.3.0-alpha.3"
15+
romio = "0.3.0-alpha.4"
1616
runtime-raw = { path = "../runtime-raw" }
17-
juliex = "0.3.0-alpha.2"
17+
juliex = "0.3.0-alpha.3"

runtime-native/src/lib.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ impl runtime_raw::Runtime for Native {
4646
fn connect_tcp_stream(
4747
&self,
4848
addr: &SocketAddr,
49-
) -> Pin<Box<dyn Future<Output = io::Result<Box<dyn runtime_raw::TcpStream>>> + Send>> {
49+
) -> Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> + Send>>
50+
{
5051
let romio_connect = romio::TcpStream::connect(addr);
5152
let connect = romio_connect.map(|res| {
5253
res.map(|romio_stream| {
53-
Box::new(TcpStream { romio_stream }) as Box<dyn runtime_raw::TcpStream>
54+
Box::pin(TcpStream { romio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
5455
})
5556
});
5657
Box::pin(connect)
@@ -59,13 +60,16 @@ impl runtime_raw::Runtime for Native {
5960
fn bind_tcp_listener(
6061
&self,
6162
addr: &SocketAddr,
62-
) -> io::Result<Box<dyn runtime_raw::TcpListener>> {
63+
) -> io::Result<Pin<Box<dyn runtime_raw::TcpListener>>> {
6364
let romio_listener = romio::TcpListener::bind(&addr)?;
64-
Ok(Box::new(TcpListener { romio_listener }))
65+
Ok(Box::pin(TcpListener { romio_listener }))
6566
}
6667

67-
fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result<Box<dyn runtime_raw::UdpSocket>> {
68+
fn bind_udp_socket(
69+
&self,
70+
addr: &SocketAddr,
71+
) -> io::Result<Pin<Box<dyn runtime_raw::UdpSocket>>> {
6872
let romio_socket = romio::UdpSocket::bind(&addr)?;
69-
Ok(Box::new(UdpSocket { romio_socket }))
73+
Ok(Box::pin(UdpSocket { romio_socket }))
7074
}
7175
}

runtime-native/src/tcp.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use romio::async_ready::{AsyncReadReady, AsyncReady, AsyncWriteReady};
33

44
use std::io;
55
use std::net::SocketAddr;
6-
use std::task::{Poll, Waker};
6+
use std::pin::Pin;
7+
use std::task::{Context, Poll};
78

89
#[derive(Debug)]
910
pub(crate) struct TcpStream {
@@ -16,12 +17,12 @@ pub(crate) struct TcpListener {
1617
}
1718

1819
impl runtime_raw::TcpStream for TcpStream {
19-
fn poll_write_ready(&self, waker: &Waker) -> Poll<io::Result<()>> {
20-
self.romio_stream.poll_write_ready(&waker).map_ok(|_| ())
20+
fn poll_write_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
21+
Pin::new(&mut self.romio_stream).poll_write_ready(cx).map_ok(|_| ())
2122
}
2223

23-
fn poll_read_ready(&self, waker: &Waker) -> Poll<io::Result<()>> {
24-
self.romio_stream.poll_read_ready(&waker).map_ok(|_| ())
24+
fn poll_read_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
25+
Pin::new(&mut self.romio_stream).poll_read_ready(cx).map_ok(|_| ())
2526
}
2627

2728
fn take_error(&self) -> io::Result<Option<io::Error>> {
@@ -48,22 +49,30 @@ impl runtime_raw::TcpStream for TcpStream {
4849
}
4950

5051
impl AsyncRead for TcpStream {
51-
fn poll_read(&mut self, waker: &Waker, mut buf: &mut [u8]) -> Poll<io::Result<usize>> {
52-
self.romio_stream.poll_read(&waker, &mut buf)
52+
fn poll_read(
53+
mut self: Pin<&mut Self>,
54+
cx: &mut Context<'_>,
55+
mut buf: &mut [u8],
56+
) -> Poll<io::Result<usize>> {
57+
Pin::new(&mut self.romio_stream).poll_read(cx, &mut buf)
5358
}
5459
}
5560

5661
impl AsyncWrite for TcpStream {
57-
fn poll_write(&mut self, waker: &Waker, buf: &[u8]) -> Poll<io::Result<usize>> {
58-
self.romio_stream.poll_write(&waker, &buf)
62+
fn poll_write(
63+
mut self: Pin<&mut Self>,
64+
cx: &mut Context<'_>,
65+
buf: &[u8],
66+
) -> Poll<io::Result<usize>> {
67+
Pin::new(&mut self.romio_stream).poll_write(cx, &buf)
5968
}
6069

61-
fn poll_flush(&mut self, waker: &Waker) -> Poll<io::Result<()>> {
62-
self.romio_stream.poll_flush(&waker)
70+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
71+
Pin::new(&mut self.romio_stream).poll_flush(cx)
6372
}
6473

65-
fn poll_close(&mut self, waker: &Waker) -> Poll<io::Result<()>> {
66-
self.romio_stream.poll_close(&waker)
74+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
75+
Pin::new(&mut self.romio_stream).poll_close(cx)
6776
}
6877
}
6978

@@ -72,11 +81,14 @@ impl runtime_raw::TcpListener for TcpListener {
7281
self.romio_listener.local_addr()
7382
}
7483

75-
fn poll_accept(&mut self, waker: &Waker) -> Poll<io::Result<Box<dyn runtime_raw::TcpStream>>> {
76-
self.romio_listener
77-
.poll_ready(&waker)
84+
fn poll_accept(
85+
mut self: Pin<&mut Self>,
86+
cx: &mut Context<'_>,
87+
) -> Poll<io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
88+
Pin::new(&mut self.romio_listener)
89+
.poll_ready(cx)
7890
.map_ok(|(romio_stream, _)| {
79-
Box::new(TcpStream { romio_stream }) as Box<dyn runtime_raw::TcpStream>
91+
Box::pin(TcpStream { romio_stream }) as Pin<Box<dyn runtime_raw::TcpStream>>
8092
})
8193
}
8294

runtime-native/src/udp.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use async_datagram::AsyncDatagram;
22

33
use std::io;
44
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
5-
use std::task::{Poll, Waker};
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
67

78
#[derive(Debug)]
89
pub(crate) struct UdpSocket {
@@ -15,20 +16,20 @@ impl runtime_raw::UdpSocket for UdpSocket {
1516
}
1617

1718
fn poll_send_to(
18-
&mut self,
19-
waker: &Waker,
19+
mut self: Pin<&mut Self>,
20+
cx: &mut Context<'_>,
2021
buf: &[u8],
2122
receiver: &SocketAddr,
2223
) -> Poll<io::Result<usize>> {
23-
self.romio_socket.poll_send_to(waker, buf, receiver)
24+
Pin::new(&mut self.romio_socket).poll_send_to(cx, buf, receiver)
2425
}
2526

2627
fn poll_recv_from(
27-
&mut self,
28-
waker: &Waker,
28+
mut self: Pin<&mut Self>,
29+
cx: &mut Context<'_>,
2930
buf: &mut [u8],
3031
) -> Poll<io::Result<(usize, SocketAddr)>> {
31-
self.romio_socket.poll_recv_from(waker, buf)
32+
Pin::new(&mut self.romio_socket).poll_recv_from(cx, buf)
3233
}
3334

3435
/// Gets the value of the `SO_BROADCAST` option for this socket.

runtime-raw/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,17 @@ pub trait Runtime: Send + Sync + 'static {
8383
fn connect_tcp_stream(
8484
&self,
8585
addr: &SocketAddr,
86-
) -> Pin<Box<dyn Future<Output = io::Result<Box<dyn TcpStream>>> + Send>>;
86+
) -> Pin<Box<dyn Future<Output = io::Result<Pin<Box<dyn TcpStream>>>> + Send>>;
8787

8888
/// Create a new `TcpListener`.
8989
///
9090
/// This method is defined on the `Runtime` trait because defining it on
9191
/// `TcpListener` would prevent it from being a trait object.
92-
fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result<Box<dyn TcpListener>>;
92+
fn bind_tcp_listener(&self, addr: &SocketAddr) -> io::Result<Pin<Box<dyn TcpListener>>>;
9393

9494
/// Create a new `UdpSocket`.
9595
///
9696
/// This method is defined on the `Runtime` trait because defining it on
9797
/// `UdpSocket` would prevent it from being a trait object.
98-
fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result<Box<dyn UdpSocket>>;
98+
fn bind_udp_socket(&self, addr: &SocketAddr) -> io::Result<Pin<Box<dyn UdpSocket>>>;
9999
}

runtime-raw/src/tcp.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
use futures::prelude::*;
2-
use futures::task::{Poll, Waker};
2+
use futures::task::{Context, Poll};
33

44
use std::fmt::Debug;
55
use std::io;
66
use std::net::SocketAddr;
7+
use std::pin::Pin;
78

89
/// A TcpStream for this Runtime
910
pub trait TcpStream: AsyncRead + AsyncWrite + Debug + Send {
1011
/// Check if the stream can be written to.
11-
fn poll_write_ready(&self, waker: &Waker) -> Poll<io::Result<()>>;
12+
fn poll_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
1213

1314
/// Check if the stream can be read from.
14-
fn poll_read_ready(&self, waker: &Waker) -> Poll<io::Result<()>>;
15+
fn poll_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
1516

1617
/// Check if any socket errors exist on the `TcpStream`.
1718
///
@@ -39,7 +40,7 @@ pub trait TcpListener: Debug + Send {
3940
fn local_addr(&self) -> io::Result<SocketAddr>;
4041

4142
/// Check if the listener is ready to accept connections.
42-
fn poll_accept(&mut self, waker: &Waker) -> Poll<io::Result<Box<dyn TcpStream>>>;
43+
fn poll_accept(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<Pin<Box<dyn TcpStream>>>>;
4344

4445
/// Extracts the raw file descriptor.
4546
#[cfg(unix)]

runtime-raw/src/udp.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::fmt::Debug;
22
use std::io;
33
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
4-
use std::task::{Poll, Waker};
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
56

67
/// A UDP socket.
78
pub trait UdpSocket: Debug + Send {
@@ -15,8 +16,8 @@ pub trait UdpSocket: Debug + Send {
1516
///
1617
/// On success, returns the number of bytes written.
1718
fn poll_send_to(
18-
&mut self,
19-
waker: &Waker,
19+
self: Pin<&mut Self>,
20+
cx: &mut Context<'_>,
2021
buf: &[u8],
2122
receiver: &SocketAddr,
2223
) -> Poll<io::Result<usize>>;
@@ -26,8 +27,8 @@ pub trait UdpSocket: Debug + Send {
2627
/// On success, returns the number of bytes read and the target from whence
2728
/// the data came.
2829
fn poll_recv_from(
29-
&mut self,
30-
waker: &Waker,
30+
self: Pin<&mut Self>,
31+
cx: &mut Context<'_>,
3132
buf: &mut [u8],
3233
) -> Poll<io::Result<(usize, SocketAddr)>>;
3334

runtime-tokio/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ futures01 = { package = "futures", version = "0.1" }
1313
lazy_static = "1.0.0"
1414
mio = "0.6.16"
1515
runtime-raw = { path = "../runtime-raw" }
16-
tokio = { version = "0.1", features = ["async-await-preview"] }
16+
tokio = { version = "0.1" }

0 commit comments

Comments
 (0)