diff --git a/Cargo.toml b/Cargo.toml index 8334230937..2e5488deed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ tokio = { version = "1", features = ["sync"] } # Optional atomic-waker = { version = "1.1.2", optional = true } -futures-channel = { version = "0.3", optional = true } futures-core = { version = "0.3.31", optional = true } futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true } h2 = { version = "0.4.2", optional = true } @@ -44,7 +43,6 @@ want = { version = "0.3", optional = true } [dev-dependencies] form_urlencoded = "1" -futures-channel = { version = "0.3", features = ["sink"] } futures-util = { version = "0.3", default-features = false, features = ["alloc", "sink"] } http-body-util = "0.1" pretty_env_logger = "0.5" @@ -80,8 +78,8 @@ full = [ ] # HTTP versions -http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"] -http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"] +http1 = ["dep:atomic-waker", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"] +http2 = ["dep:futures-core", "dep:h2"] # Client/Server client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"] diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 07f08c186b..3a5fc6c202 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -341,7 +341,7 @@ impl Opts { let make_request = || { let chunk_cnt = self.request_chunks; let body = if chunk_cnt > 0 { - let (mut tx, rx) = futures_channel::mpsc::channel(0); + let (mut tx, rx) = tokio::sync::mpsc::channel(0); let chunk = self .request_body diff --git a/src/body/incoming.rs b/src/body/incoming.rs index 64ee5001a9..4b6cd20e76 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -5,18 +5,16 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use futures_channel::{mpsc, oneshot}; #[cfg(all( any(feature = "http1", feature = "http2"), any(feature = "client", feature = "server") ))] use futures_core::ready; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] use http::HeaderMap; use http_body::{Body, Frame, SizeHint}; +#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] +use tokio::sync::{mpsc, oneshot}; #[cfg(all( any(feature = "http1", feature = "http2"), @@ -219,8 +217,8 @@ impl Body for Incoming { } => { want_tx.send(WANT_READY); - if !data_rx.is_terminated() { - if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) { + if !data_rx.is_closed() { + if let Some(chunk) = ready!(Pin::new(data_rx).poll_recv(cx)?) { len.sub_if(chunk.len() as u64); return Poll::Ready(Some(Ok(Frame::data(chunk)))); } @@ -353,9 +351,11 @@ impl Sender { pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // Check if the receiver end has tried polling for the body yet ready!(self.poll_want(cx)?); - self.data_tx - .poll_ready(cx) - .map_err(|_| crate::Error::new_closed()) + if self.data_tx.capacity() > 0 { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll> { diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 3860a5afaf..9163c2de71 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -9,13 +9,13 @@ use std::{ use crate::rt::{Read, Write}; use bytes::Bytes; -use futures_channel::mpsc::{Receiver, Sender}; -use futures_channel::{mpsc, oneshot}; -use futures_core::{ready, FusedFuture, FusedStream, Stream}; +use futures_core::{ready, FusedFuture}; use h2::client::{Builder, Connection, SendRequest}; use h2::SendStream; use http::{Method, StatusCode}; use pin_project_lite::pin_project; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{mpsc, oneshot}; use super::ping::{Ponger, Recorder}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; @@ -343,7 +343,7 @@ where return Poll::Ready(()); } - if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() { + if !this.drop_rx.is_closed() && Pin::new(&mut this.drop_rx).poll_recv(cx).is_ready() { // mpsc has been dropped, hopefully polling // the connection some more should start shutdown // and then close. diff --git a/tests/client.rs b/tests/client.rs index d5a0e4e005..430e861525 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -17,10 +17,10 @@ use hyper::header::{HeaderMap, HeaderName, HeaderValue}; use hyper::{Method, Request, StatusCode, Uri, Version}; use bytes::Bytes; -use futures_channel::oneshot; use futures_util::future::{self, FutureExt, TryFuture, TryFutureExt}; use support::TokioIo; use tokio::net::TcpStream; +use tokio::sync::oneshot; mod support; fn s(buf: &[u8]) -> &str { @@ -1494,12 +1494,12 @@ mod conn { use std::time::Duration; use bytes::{Buf, Bytes}; - use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; use http_body_util::{BodyExt, Empty, Full, StreamBody}; use hyper::rt::Timer; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; + use tokio::sync::{mpsc, oneshot}; use hyper::body::{Body, Frame}; use hyper::client::conn; diff --git a/tests/server.rs b/tests/server.rs index 2ba6f92ca3..a733b874f9 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -14,7 +14,6 @@ use std::thread; use std::time::Duration; use bytes::Bytes; -use futures_channel::oneshot; use futures_util::future::{self, Either, FutureExt}; use h2::client::SendRequest; use h2::{RecvStream, SendStream}; @@ -25,6 +24,7 @@ use hyper::rt::{Read as AsyncRead, Write as AsyncWrite}; use support::{TokioExecutor, TokioIo, TokioTimer}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream}; +use tokio::sync::oneshot; use hyper::body::{Body, Incoming as IncomingBody}; use hyper::server::conn::{http1, http2};