Skip to content

Commit 32ad060

Browse files
committed
init upgrade submodule
1 parent a9357ed commit 32ad060

File tree

6 files changed

+193
-9
lines changed

6 files changed

+193
-9
lines changed

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,14 @@ mod status_code;
131131
mod version;
132132

133133
cfg_unstable! {
134+
pub mod upgrade;
135+
134136
mod client;
135137
mod server;
136138

137139
pub use client::Client;
138140
pub use server::Server;
141+
139142
}
140143

141144
pub use body::Body;

src/request.rs

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ use crate::mime::Mime;
1616
use crate::trailers::{self, Trailers};
1717
use crate::{Body, Extensions, Method, StatusCode, Url, Version};
1818

19+
cfg_unstable! {
20+
use crate::upgrade;
21+
}
22+
1923
pin_project_lite::pin_project! {
2024
/// An HTTP request.
2125
///
@@ -33,8 +37,10 @@ pin_project_lite::pin_project! {
3337
url: Url,
3438
headers: Headers,
3539
version: Option<Version>,
36-
sender: Option<sync::Sender<Trailers>>,
37-
receiver: Option<sync::Receiver<Trailers>>,
40+
trailers_sender: Option<sync::Sender<Trailers>>,
41+
trailers_receiver: Option<sync::Receiver<Trailers>>,
42+
upgrade_sender: Option<sync::Sender<upgrade::Connection>>,
43+
upgrade_receiver: Option<sync::Receiver<upgrade::Connection>>,
3844
#[pin]
3945
body: Body,
4046
local_addr: Option<String>,
@@ -45,24 +51,51 @@ pin_project_lite::pin_project! {
4551

4652
impl Request {
4753
/// Create a new request.
54+
#[cfg(feature = "unstable")]
4855
pub fn new<U>(method: Method, url: U) -> Self
4956
where
5057
U: TryInto<Url>,
5158
U::Error: std::fmt::Debug,
5259
{
5360
let url = url.try_into().expect("Could not convert into a valid url");
54-
let (sender, receiver) = sync::channel(1);
61+
let (trailers_sender, trailers_receiver) = sync::channel(1);
62+
let (upgrade_sender, upgrade_receiver) = sync::channel(1);
5563
Self {
5664
method,
5765
url,
5866
headers: Headers::new(),
5967
version: None,
6068
body: Body::empty(),
61-
sender: Some(sender),
62-
receiver: Some(receiver),
6369
ext: Extensions::new(),
6470
peer_addr: None,
6571
local_addr: None,
72+
trailers_receiver: Some(trailers_receiver),
73+
trailers_sender: Some(trailers_sender),
74+
upgrade_sender: Some(upgrade_sender),
75+
upgrade_receiver: Some(upgrade_receiver),
76+
}
77+
}
78+
79+
/// Create a new request.
80+
#[cfg(not(feature = "unstable"))]
81+
pub fn new<U>(method: Method, url: U) -> Self
82+
where
83+
U: TryInto<Url>,
84+
U::Error: std::fmt::Debug,
85+
{
86+
let url = url.try_into().expect("Could not convert into a valid url");
87+
let (trailers_sender, trailers_receiver) = sync::channel(1);
88+
Self {
89+
method,
90+
url,
91+
headers: Headers::new(),
92+
version: None,
93+
body: Body::empty(),
94+
ext: Extensions::new(),
95+
peer_addr: None,
96+
local_addr: None,
97+
trailers_receiver: Some(trailers_receiver),
98+
trailers_sender: Some(trailers_sender),
6699
}
67100
}
68101

@@ -543,7 +576,7 @@ impl Request {
543576
/// Sends trailers to the a receiver.
544577
pub fn send_trailers(&mut self) -> trailers::Sender {
545578
let sender = self
546-
.sender
579+
.trailers_sender
547580
.take()
548581
.expect("Trailers sender can only be constructed once");
549582
trailers::Sender::new(sender)
@@ -552,12 +585,34 @@ impl Request {
552585
/// Receive trailers from a sender.
553586
pub async fn recv_trailers(&mut self) -> trailers::Receiver {
554587
let receiver = self
555-
.receiver
588+
.trailers_receiver
556589
.take()
557590
.expect("Trailers receiver can only be constructed once");
558591
trailers::Receiver::new(receiver)
559592
}
560593

594+
/// Sends an upgrade connection to the a receiver.
595+
#[cfg(feature = "unstable")]
596+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
597+
pub fn send_upgrade(&mut self) -> upgrade::Sender {
598+
let sender = self
599+
.upgrade_sender
600+
.take()
601+
.expect("Upgrade sender can only be constructed once");
602+
upgrade::Sender::new(sender)
603+
}
604+
605+
/// Receive an upgraded connection from a sender.
606+
#[cfg(feature = "unstable")]
607+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
608+
pub async fn recv_upgrade(&mut self) -> upgrade::Receiver {
609+
let receiver = self
610+
.upgrade_receiver
611+
.take()
612+
.expect("Upgrade receiver can only be constructed once");
613+
upgrade::Receiver::new(receiver)
614+
}
615+
561616
/// An iterator visiting all header pairs in arbitrary order.
562617
pub fn iter(&self) -> headers::Iter<'_> {
563618
self.headers.iter()
@@ -867,8 +922,10 @@ impl Clone for Request {
867922
url: self.url.clone(),
868923
headers: self.headers.clone(),
869924
version: self.version.clone(),
870-
sender: self.sender.clone(),
871-
receiver: self.receiver.clone(),
925+
trailers_sender: None,
926+
trailers_receiver: None,
927+
upgrade_sender: None,
928+
upgrade_receiver: None,
872929
body: Body::empty(),
873930
ext: Extensions::new(),
874931
peer_addr: self.peer_addr.clone(),

src/upgrade/connection.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use async_std::io::{self, prelude::*};
2+
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
/// An upgraded HTTP connection.
7+
#[derive(Debug, Clone)]
8+
pub struct Connection {
9+
inner: Pin<Box<dyn Read + Write + Clone + Send + Sync + Unpin + 'static>>,
10+
}
11+
12+
impl Read for Connection {
13+
fn poll_read(
14+
self: Pin<&mut Self>,
15+
cx: &mut Context<'_>,
16+
buf: &mut [u8],
17+
) -> Poll<io::Result<usize>> {
18+
let this = self.project();
19+
Pin::new(this.inner).poll_read(cx, buf)
20+
}
21+
}
22+
23+
impl Write for Connection {
24+
fn poll_write(
25+
self: Pin<&mut Self>,
26+
cx: &mut Context<'_>,
27+
buf: &[u8],
28+
) -> Poll<io::Result<usize>> {
29+
let this = self.project();
30+
Pin::new(this.inner).poll_write(cx, buf)
31+
}
32+
33+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
34+
let this = self.project();
35+
Pin::new(this.inner).poll_flush(cx)
36+
}
37+
38+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
39+
let this = self.project();
40+
Pin::new(this.inner).poll_close(cx)
41+
}
42+
}

src/upgrade/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//! HTTP protocol upgrades.
2+
//!
3+
//! In HTTP it's not uncommon to convert from one protocol to another. For
4+
//! example `HTTP/1.1` can upgrade a connection to websockets using the
5+
//! [upgrade header](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism),
6+
//! while `HTTP/2` uses [a custom
7+
//! handshake](https://tools.ietf.org/html/rfc8441#section-5.1). Regardless of
8+
//! the HTTP version, changing protocols always involves some handshake,
9+
//! after which it is turned into a stream of bytes. This module provides
10+
//! primitives for upgrading from HTTP request-response pairs to alternate
11+
//! protocols.
12+
13+
mod connection;
14+
mod receiver;
15+
mod sender;
16+
17+
pub use connection::Connection;
18+
pub use receiver::Receiver;
19+
pub use sender::Sender;

src/upgrade/receiver.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use async_std::prelude::*;
2+
use async_std::sync;
3+
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use crate::upgrade::Connection;
9+
10+
/// The receiving half of a channel to send an upgraded connection.
11+
///
12+
/// Unlike `async_std::sync::channel` the `send` method on this type can only be
13+
/// called once, and cannot be cloned. That's because only a single instance of
14+
/// `Connection` should be created.
15+
#[must_use = "Futures do nothing unless polled or .awaited"]
16+
#[derive(Debug)]
17+
pub struct Receiver {
18+
receiver: sync::Receiver<Connection>,
19+
}
20+
21+
impl Receiver {
22+
/// Create a new instance of `Receiver`.
23+
#[allow(unused)]
24+
pub(crate) fn new(receiver: sync::Receiver<Connection>) -> Self {
25+
Self { receiver }
26+
}
27+
}
28+
29+
impl Future for Receiver {
30+
type Output = Option<Connection>;
31+
32+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33+
Pin::new(&mut self.receiver).poll_next(cx)
34+
}
35+
}

src/upgrade/sender.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use async_std::sync;
2+
3+
use crate::upgrade::Connection;
4+
5+
/// The sending half of a channel to send an upgraded connection.
6+
///
7+
/// Unlike `async_std::sync::channel` the `send` method on this type can only be
8+
/// called once, and cannot be cloned. That's because only a single instance of
9+
/// `Connection` should be created.
10+
#[derive(Debug)]
11+
pub struct Sender {
12+
sender: sync::Sender<Connection>,
13+
}
14+
15+
impl Sender {
16+
/// Create a new instance of `Sender`.
17+
#[doc(hidden)]
18+
pub fn new(sender: sync::Sender<Connection>) -> Self {
19+
Self { sender }
20+
}
21+
22+
/// Send a `Trailer`.
23+
///
24+
/// The channel will be consumed after having sent trailers.
25+
pub async fn send(self, trailers: Connection) {
26+
self.sender.send(trailers).await
27+
}
28+
}

0 commit comments

Comments
 (0)