Skip to content

Commit edb6cbd

Browse files
authored
Merge pull request #115 from http-rs/upgrade
init upgrade submodule
2 parents 73586c7 + 17ce3e5 commit edb6cbd

File tree

7 files changed

+253
-18
lines changed

7 files changed

+253
-18
lines changed

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,14 @@ mod status_code;
131131
mod version;
132132

133133
cfg_unstable! {
134+
pub mod upgrade;
135+
134136
mod client;
135137
mod server;
136138

137139
pub use client::Client;
138140
pub use server::Server;
141+
139142
}
140143

141144
pub use body::Body;

src/request.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ pin_project_lite::pin_project! {
3333
url: Url,
3434
headers: Headers,
3535
version: Option<Version>,
36-
sender: Option<sync::Sender<Trailers>>,
37-
receiver: Option<sync::Receiver<Trailers>>,
3836
#[pin]
3937
body: Body,
4038
local_addr: Option<String>,
4139
peer_addr: Option<String>,
4240
ext: Extensions,
41+
trailers_sender: Option<sync::Sender<Trailers>>,
42+
trailers_receiver: Option<sync::Receiver<Trailers>>,
4343
}
4444
}
4545

@@ -51,18 +51,18 @@ impl Request {
5151
U::Error: std::fmt::Debug,
5252
{
5353
let url = url.try_into().expect("Could not convert into a valid url");
54-
let (sender, receiver) = sync::channel(1);
54+
let (trailers_sender, trailers_receiver) = sync::channel(1);
5555
Self {
5656
method,
5757
url,
5858
headers: Headers::new(),
5959
version: None,
6060
body: Body::empty(),
61-
sender: Some(sender),
62-
receiver: Some(receiver),
6361
ext: Extensions::new(),
6462
peer_addr: None,
6563
local_addr: None,
64+
trailers_receiver: Some(trailers_receiver),
65+
trailers_sender: Some(trailers_sender),
6666
}
6767
}
6868

@@ -543,7 +543,7 @@ impl Request {
543543
/// Sends trailers to the a receiver.
544544
pub fn send_trailers(&mut self) -> trailers::Sender {
545545
let sender = self
546-
.sender
546+
.trailers_sender
547547
.take()
548548
.expect("Trailers sender can only be constructed once");
549549
trailers::Sender::new(sender)
@@ -552,7 +552,7 @@ impl Request {
552552
/// Receive trailers from a sender.
553553
pub async fn recv_trailers(&mut self) -> trailers::Receiver {
554554
let receiver = self
555-
.receiver
555+
.trailers_receiver
556556
.take()
557557
.expect("Trailers receiver can only be constructed once");
558558
trailers::Receiver::new(receiver)
@@ -867,8 +867,8 @@ impl Clone for Request {
867867
url: self.url.clone(),
868868
headers: self.headers.clone(),
869869
version: self.version.clone(),
870-
sender: self.sender.clone(),
871-
receiver: self.receiver.clone(),
870+
trailers_sender: None,
871+
trailers_receiver: None,
872872
body: Body::empty(),
873873
ext: Extensions::new(),
874874
peer_addr: self.peer_addr.clone(),

src/response.rs

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,42 @@ use crate::mime::Mime;
1717
use crate::trailers::{self, Trailers};
1818
use crate::{Body, Extensions, StatusCode, Version};
1919

20+
cfg_unstable! {
21+
use crate::upgrade;
22+
}
23+
24+
#[cfg(not(feature = "unstable"))]
25+
pin_project_lite::pin_project! {
26+
/// An HTTP response.
27+
///
28+
/// # Examples
29+
///
30+
/// ```
31+
/// # fn main() -> Result<(), http_types::Error> {
32+
/// #
33+
/// use http_types::{Response, StatusCode};
34+
///
35+
/// let mut res = Response::new(StatusCode::Ok);
36+
/// res.set_body("Hello, Nori!");
37+
/// #
38+
/// # Ok(()) }
39+
/// ```
40+
#[derive(Debug)]
41+
pub struct Response {
42+
status: StatusCode,
43+
headers: Headers,
44+
version: Option<Version>,
45+
trailers_sender: Option<sync::Sender<Trailers>>,
46+
trailers_receiver: Option<sync::Receiver<Trailers>>,
47+
#[pin]
48+
body: Body,
49+
ext: Extensions,
50+
local_addr: Option<String>,
51+
peer_addr: Option<String>,
52+
}
53+
}
54+
55+
#[cfg(feature = "unstable")]
2056
pin_project_lite::pin_project! {
2157
/// An HTTP response.
2258
///
@@ -37,8 +73,11 @@ pin_project_lite::pin_project! {
3773
status: StatusCode,
3874
headers: Headers,
3975
version: Option<Version>,
40-
sender: Option<sync::Sender<Trailers>>,
41-
receiver: Option<sync::Receiver<Trailers>>,
76+
trailers_sender: Option<sync::Sender<Trailers>>,
77+
trailers_receiver: Option<sync::Receiver<Trailers>>,
78+
upgrade_sender: Option<sync::Sender<upgrade::Connection>>,
79+
upgrade_receiver: Option<sync::Receiver<upgrade::Connection>>,
80+
has_upgrade: bool,
4281
#[pin]
4382
body: Body,
4483
ext: Extensions,
@@ -49,6 +88,7 @@ pin_project_lite::pin_project! {
4988

5089
impl Response {
5190
/// Create a new response.
91+
#[cfg(not(feature = "unstable"))]
5292
pub fn new<S>(status: S) -> Self
5393
where
5494
S: TryInto<StatusCode>,
@@ -57,14 +97,42 @@ impl Response {
5797
let status = status
5898
.try_into()
5999
.expect("Could not convert into a valid `StatusCode`");
60-
let (sender, receiver) = sync::channel(1);
100+
let (trailers_sender, trailers_receiver) = sync::channel(1);
61101
Self {
62102
status,
63103
headers: Headers::new(),
64104
version: None,
65105
body: Body::empty(),
66-
sender: Some(sender),
67-
receiver: Some(receiver),
106+
trailers_sender: Some(trailers_sender),
107+
trailers_receiver: Some(trailers_receiver),
108+
ext: Extensions::new(),
109+
peer_addr: None,
110+
local_addr: None,
111+
}
112+
}
113+
114+
/// Create a new response.
115+
#[cfg(feature = "unstable")]
116+
pub fn new<S>(status: S) -> Self
117+
where
118+
S: TryInto<StatusCode>,
119+
S::Error: Debug,
120+
{
121+
let status = status
122+
.try_into()
123+
.expect("Could not convert into a valid `StatusCode`");
124+
let (trailers_sender, trailers_receiver) = sync::channel(1);
125+
let (upgrade_sender, upgrade_receiver) = sync::channel(1);
126+
Self {
127+
status,
128+
headers: Headers::new(),
129+
version: None,
130+
body: Body::empty(),
131+
trailers_sender: Some(trailers_sender),
132+
trailers_receiver: Some(trailers_receiver),
133+
upgrade_sender: Some(upgrade_sender),
134+
upgrade_receiver: Some(upgrade_receiver),
135+
has_upgrade: false,
68136
ext: Extensions::new(),
69137
peer_addr: None,
70138
local_addr: None,
@@ -469,7 +537,7 @@ impl Response {
469537
/// Sends trailers to the a receiver.
470538
pub fn send_trailers(&mut self) -> trailers::Sender {
471539
let sender = self
472-
.sender
540+
.trailers_sender
473541
.take()
474542
.expect("Trailers sender can only be constructed once");
475543
trailers::Sender::new(sender)
@@ -478,12 +546,43 @@ impl Response {
478546
/// Receive trailers from a sender.
479547
pub async fn recv_trailers(&mut self) -> trailers::Receiver {
480548
let receiver = self
481-
.receiver
549+
.trailers_receiver
482550
.take()
483551
.expect("Trailers receiver can only be constructed once");
484552
trailers::Receiver::new(receiver)
485553
}
486554

555+
/// Sends an upgrade connection to the a receiver.
556+
#[cfg(feature = "unstable")]
557+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
558+
pub fn send_upgrade(&mut self) -> upgrade::Sender {
559+
self.has_upgrade = true;
560+
let sender = self
561+
.upgrade_sender
562+
.take()
563+
.expect("Upgrade sender can only be constructed once");
564+
upgrade::Sender::new(sender)
565+
}
566+
567+
/// Receive an upgraded connection from a sender.
568+
#[cfg(feature = "unstable")]
569+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
570+
pub async fn recv_upgrade(&mut self) -> upgrade::Receiver {
571+
self.has_upgrade = true;
572+
let receiver = self
573+
.upgrade_receiver
574+
.take()
575+
.expect("Upgrade receiver can only be constructed once");
576+
upgrade::Receiver::new(receiver)
577+
}
578+
579+
/// Returns `true` if a protocol upgrade is in progress.
580+
#[cfg(feature = "unstable")]
581+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
582+
pub fn has_upgrade(&self) -> bool {
583+
self.has_upgrade
584+
}
585+
487586
/// An iterator visiting all header pairs in arbitrary order.
488587
pub fn iter(&self) -> headers::Iter<'_> {
489588
self.headers.iter()
@@ -539,8 +638,14 @@ impl Clone for Response {
539638
status: self.status.clone(),
540639
headers: self.headers.clone(),
541640
version: self.version.clone(),
542-
sender: self.sender.clone(),
543-
receiver: self.receiver.clone(),
641+
trailers_sender: self.trailers_sender.clone(),
642+
trailers_receiver: self.trailers_receiver.clone(),
643+
#[cfg(feature = "unstable")]
644+
upgrade_sender: self.upgrade_sender.clone(),
645+
#[cfg(feature = "unstable")]
646+
upgrade_receiver: self.upgrade_receiver.clone(),
647+
#[cfg(feature = "unstable")]
648+
has_upgrade: false,
544649
body: Body::empty(),
545650
ext: Extensions::new(),
546651
peer_addr: self.peer_addr.clone(),

src/upgrade/connection.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use async_std::io::{self, prelude::*};
2+
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
/// An upgraded HTTP connection.
7+
#[derive(Debug, Clone)]
8+
pub struct RawConnection<Inner> {
9+
inner: Inner,
10+
}
11+
12+
/// A boxed upgraded HTTP connection.
13+
pub type Connection = RawConnection<Box<dyn InnerConnection + 'static>>;
14+
15+
/// Trait to signal the requirements for an underlying connection type.
16+
pub trait InnerConnection: Read + Write + Send + Sync + Unpin {}
17+
impl<T: Read + Write + Send + Sync + Unpin> InnerConnection for T {}
18+
19+
impl<Inner: Read + Unpin> Read for RawConnection<Inner> {
20+
fn poll_read(
21+
mut self: Pin<&mut Self>,
22+
cx: &mut Context<'_>,
23+
buf: &mut [u8],
24+
) -> Poll<io::Result<usize>> {
25+
Pin::new(&mut self.inner).poll_read(cx, buf)
26+
}
27+
}
28+
29+
impl<Inner: Write + Unpin> Write for RawConnection<Inner> {
30+
fn poll_write(
31+
mut self: Pin<&mut Self>,
32+
cx: &mut Context<'_>,
33+
buf: &[u8],
34+
) -> Poll<io::Result<usize>> {
35+
Pin::new(&mut self.inner).poll_write(cx, buf)
36+
}
37+
38+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
39+
Pin::new(&mut self.inner).poll_flush(cx)
40+
}
41+
42+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
43+
Pin::new(&mut self.inner).poll_close(cx)
44+
}
45+
}

src/upgrade/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
//! HTTP protocol upgrades.
2+
//!
3+
//! In HTTP it's not uncommon to convert from one protocol to another. For
4+
//! example `HTTP/1.1` can upgrade a connection to websockets using the
5+
//! [upgrade header](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism),
6+
//! while `HTTP/2` uses [a custom
7+
//! handshake](https://tools.ietf.org/html/rfc8441#section-5.1). Regardless of
8+
//! the HTTP version, changing protocols always involves some handshake,
9+
//! after which it is turned into a stream of bytes. This module provides
10+
//! primitives for upgrading from HTTP request-response pairs to alternate
11+
//! protocols.
12+
13+
mod connection;
14+
mod receiver;
15+
mod sender;
16+
17+
pub use connection::Connection;
18+
pub use receiver::Receiver;
19+
pub use sender::Sender;

src/upgrade/receiver.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use async_std::prelude::*;
2+
use async_std::sync;
3+
4+
use std::future::Future;
5+
use std::pin::Pin;
6+
use std::task::{Context, Poll};
7+
8+
use crate::upgrade::Connection;
9+
10+
/// The receiving half of a channel to send an upgraded connection.
11+
///
12+
/// Unlike `async_std::sync::channel` the `send` method on this type can only be
13+
/// called once, and cannot be cloned. That's because only a single instance of
14+
/// `Connection` should be created.
15+
#[must_use = "Futures do nothing unless polled or .awaited"]
16+
#[derive(Debug)]
17+
pub struct Receiver {
18+
receiver: sync::Receiver<Connection>,
19+
}
20+
21+
impl Receiver {
22+
/// Create a new instance of `Receiver`.
23+
#[allow(unused)]
24+
pub(crate) fn new(receiver: sync::Receiver<Connection>) -> Self {
25+
Self { receiver }
26+
}
27+
}
28+
29+
impl Future for Receiver {
30+
type Output = Option<Connection>;
31+
32+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33+
Pin::new(&mut self.receiver).poll_next(cx)
34+
}
35+
}

src/upgrade/sender.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use async_std::sync;
2+
3+
use crate::upgrade::Connection;
4+
5+
/// The sending half of a channel to send an upgraded connection.
6+
///
7+
/// Unlike `async_std::sync::channel` the `send` method on this type can only be
8+
/// called once, and cannot be cloned. That's because only a single instance of
9+
/// `Connection` should be created.
10+
#[derive(Debug)]
11+
pub struct Sender {
12+
sender: sync::Sender<Connection>,
13+
}
14+
15+
impl Sender {
16+
/// Create a new instance of `Sender`.
17+
#[doc(hidden)]
18+
pub fn new(sender: sync::Sender<Connection>) -> Self {
19+
Self { sender }
20+
}
21+
22+
/// Send a `Trailer`.
23+
///
24+
/// The channel will be consumed after having sent trailers.
25+
pub async fn send(self, trailers: Connection) {
26+
self.sender.send(trailers).await
27+
}
28+
}

0 commit comments

Comments
 (0)