Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ tokio = { version = "1", features = ["sync"] }
# Optional

atomic-waker = { version = "1.1.2", optional = true }
futures-channel = { version = "0.3", optional = true }
futures-core = { version = "0.3.31", optional = true }
futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
h2 = { version = "0.4.2", optional = true }
Expand All @@ -44,7 +43,6 @@ want = { version = "0.3", optional = true }

[dev-dependencies]
form_urlencoded = "1"
futures-channel = { version = "0.3", features = ["sink"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "sink"] }
http-body-util = "0.1"
pretty_env_logger = "0.5"
Expand Down Expand Up @@ -80,8 +78,8 @@ full = [
]

# HTTP versions
http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"]
http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"]
http1 = ["dep:atomic-waker", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"]
http2 = ["dep:futures-core", "dep:h2"]

# Client/Server
client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"]
Expand Down
2 changes: 1 addition & 1 deletion benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ impl Opts {
let make_request = || {
let chunk_cnt = self.request_chunks;
let body = if chunk_cnt > 0 {
let (mut tx, rx) = futures_channel::mpsc::channel(0);
let (mut tx, rx) = tokio::sync::mpsc::channel(0);

let chunk = self
.request_body
Expand Down
18 changes: 9 additions & 9 deletions src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use futures_channel::{mpsc, oneshot};
#[cfg(all(
any(feature = "http1", feature = "http2"),
any(feature = "client", feature = "server")
))]
use futures_core::ready;
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use http::HeaderMap;
use http_body::{Body, Frame, SizeHint};
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
use tokio::sync::{mpsc, oneshot};

#[cfg(all(
any(feature = "http1", feature = "http2"),
Expand Down Expand Up @@ -219,8 +217,8 @@ impl Body for Incoming {
} => {
want_tx.send(WANT_READY);

if !data_rx.is_terminated() {
if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
if !data_rx.is_closed() {
if let Some(chunk) = ready!(Pin::new(data_rx).poll_recv(cx)?) {
len.sub_if(chunk.len() as u64);
return Poll::Ready(Some(Ok(Frame::data(chunk))));
}
Expand Down Expand Up @@ -353,9 +351,11 @@ impl Sender {
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.data_tx
.poll_ready(cx)
.map_err(|_| crate::Error::new_closed())
if self.data_tx.capacity() > 0 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}

fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
Expand Down
8 changes: 4 additions & 4 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use std::{

use crate::rt::{Read, Write};
use bytes::Bytes;
use futures_channel::mpsc::{Receiver, Sender};
use futures_channel::{mpsc, oneshot};
use futures_core::{ready, FusedFuture, FusedStream, Stream};
use futures_core::{ready, FusedFuture};
use h2::client::{Builder, Connection, SendRequest};
use h2::SendStream;
use http::{Method, StatusCode};
use pin_project_lite::pin_project;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot};

use super::ping::{Ponger, Recorder};
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
Expand Down Expand Up @@ -343,7 +343,7 @@ where
return Poll::Ready(());
}

if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
if !this.drop_rx.is_closed() && Pin::new(&mut this.drop_rx).poll_recv(cx).is_ready() {
// mpsc has been dropped, hopefully polling
// the connection some more should start shutdown
// and then close.
Expand Down
4 changes: 2 additions & 2 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use hyper::header::{HeaderMap, HeaderName, HeaderValue};
use hyper::{Method, Request, StatusCode, Uri, Version};

use bytes::Bytes;
use futures_channel::oneshot;
use futures_util::future::{self, FutureExt, TryFuture, TryFutureExt};
use support::TokioIo;
use tokio::net::TcpStream;
use tokio::sync::oneshot;
mod support;

fn s(buf: &[u8]) -> &str {
Expand Down Expand Up @@ -1494,12 +1494,12 @@ mod conn {
use std::time::Duration;

use bytes::{Buf, Bytes};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use http_body_util::{BodyExt, Empty, Full, StreamBody};
use hyper::rt::Timer;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
use tokio::net::{TcpListener as TkTcpListener, TcpStream};
use tokio::sync::{mpsc, oneshot};

use hyper::body::{Body, Frame};
use hyper::client::conn;
Expand Down
2 changes: 1 addition & 1 deletion tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::thread;
use std::time::Duration;

use bytes::Bytes;
use futures_channel::oneshot;
use futures_util::future::{self, Either, FutureExt};
use h2::client::SendRequest;
use h2::{RecvStream, SendStream};
Expand All @@ -25,6 +24,7 @@ use hyper::rt::{Read as AsyncRead, Write as AsyncWrite};
use support::{TokioExecutor, TokioIo, TokioTimer};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream};
use tokio::sync::oneshot;

use hyper::body::{Body, Incoming as IncomingBody};
use hyper::server::conn::{http1, http2};
Expand Down
Loading