Skip to content

Commit 6104a31

Browse files
committed
backport Collect
1 parent 6f722a1 commit 6104a31

File tree

2 files changed

+233
-0
lines changed

2 files changed

+233
-0
lines changed

src/collect.rs

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
use std::{
2+
collections::VecDeque,
3+
future::Future,
4+
pin::Pin,
5+
task::{Context, Poll},
6+
};
7+
8+
use super::Body;
9+
10+
use bytes::{Buf, Bytes};
11+
use http::HeaderMap;
12+
use pin_project_lite::pin_project;
13+
14+
pin_project! {
15+
/// Future that resolves into a [`Collected`].
16+
pub struct Collect<T>
17+
where
18+
T: Body,
19+
{
20+
#[pin]
21+
body: T,
22+
collected: Option<Collected<T::Data>>,
23+
is_data_done: bool,
24+
}
25+
}
26+
27+
impl<T: Body> Collect<T> {
28+
pub(crate) fn new(body: T) -> Self {
29+
Self {
30+
body,
31+
collected: Some(Collected::default()),
32+
is_data_done: false,
33+
}
34+
}
35+
}
36+
37+
impl<T: Body> Future for Collect<T> {
38+
type Output = Result<Collected<T::Data>, T::Error>;
39+
40+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41+
let mut me = self.project();
42+
43+
loop {
44+
if !*me.is_data_done {
45+
match me.body.as_mut().poll_data(cx) {
46+
Poll::Ready(Some(Ok(data))) => {
47+
me.collected.as_mut().unwrap().push_data(data);
48+
}
49+
Poll::Ready(Some(Err(err))) => {
50+
return Poll::Ready(Err(err));
51+
}
52+
Poll::Ready(None) => {
53+
*me.is_data_done = true;
54+
}
55+
Poll::Pending => return Poll::Pending,
56+
}
57+
} else {
58+
match me.body.as_mut().poll_trailers(cx) {
59+
Poll::Ready(Ok(Some(trailers))) => {
60+
me.collected.as_mut().unwrap().push_trailers(trailers);
61+
break;
62+
}
63+
Poll::Ready(Err(err)) => {
64+
return Poll::Ready(Err(err));
65+
}
66+
Poll::Ready(Ok(None)) => break,
67+
Poll::Pending => return Poll::Pending,
68+
}
69+
}
70+
}
71+
72+
Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
73+
}
74+
}
75+
76+
/// A collected body produced by [`Body::collect`] which collects all the DATA frames
77+
/// and trailers.
78+
#[derive(Debug)]
79+
pub struct Collected<B> {
80+
bufs: BufList<B>,
81+
trailers: Option<HeaderMap>,
82+
}
83+
84+
impl<B: Buf> Collected<B> {
85+
/// If there is a trailers frame buffered, returns a reference to it.
86+
///
87+
/// Returns `None` if the body contained no trailers.
88+
pub fn trailers(&self) -> Option<&HeaderMap> {
89+
self.trailers.as_ref()
90+
}
91+
92+
/// Aggregate this buffered into a [`Buf`].
93+
pub fn aggregate(self) -> impl Buf {
94+
self.bufs
95+
}
96+
97+
/// Convert this body into a [`Bytes`].
98+
pub fn to_bytes(mut self) -> Bytes {
99+
self.bufs.copy_to_bytes(self.bufs.remaining())
100+
}
101+
102+
fn push_data(&mut self, data: B) {
103+
// Only push this frame if it has some data in it, to avoid crashing on
104+
// `BufList::push`.
105+
if data.has_remaining() {
106+
self.bufs.push(data);
107+
}
108+
}
109+
110+
fn push_trailers(&mut self, trailers: HeaderMap) {
111+
if let Some(current) = &mut self.trailers {
112+
current.extend(trailers);
113+
} else {
114+
self.trailers = Some(trailers);
115+
}
116+
}
117+
}
118+
119+
impl<B> Default for Collected<B> {
120+
fn default() -> Self {
121+
Self {
122+
bufs: BufList::default(),
123+
trailers: None,
124+
}
125+
}
126+
}
127+
128+
impl<B> Unpin for Collected<B> {}
129+
130+
#[derive(Debug)]
131+
struct BufList<T> {
132+
bufs: VecDeque<T>,
133+
}
134+
135+
impl<T: Buf> BufList<T> {
136+
#[inline]
137+
pub(crate) fn push(&mut self, buf: T) {
138+
debug_assert!(buf.has_remaining());
139+
self.bufs.push_back(buf);
140+
}
141+
142+
/*
143+
#[inline]
144+
pub(crate) fn pop(&mut self) -> Option<T> {
145+
self.bufs.pop_front()
146+
}
147+
*/
148+
}
149+
150+
impl<T: Buf> Buf for BufList<T> {
151+
#[inline]
152+
fn remaining(&self) -> usize {
153+
self.bufs.iter().map(|buf| buf.remaining()).sum()
154+
}
155+
156+
#[inline]
157+
fn chunk(&self) -> &[u8] {
158+
self.bufs.front().map(Buf::chunk).unwrap_or_default()
159+
}
160+
161+
#[inline]
162+
fn advance(&mut self, mut cnt: usize) {
163+
while cnt > 0 {
164+
{
165+
let front = &mut self.bufs[0];
166+
let rem = front.remaining();
167+
if rem > cnt {
168+
front.advance(cnt);
169+
return;
170+
} else {
171+
front.advance(rem);
172+
cnt -= rem;
173+
}
174+
}
175+
self.bufs.pop_front();
176+
}
177+
}
178+
179+
#[inline]
180+
fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
181+
if dst.is_empty() {
182+
return 0;
183+
}
184+
let mut vecs = 0;
185+
for buf in &self.bufs {
186+
vecs += buf.chunks_vectored(&mut dst[vecs..]);
187+
if vecs == dst.len() {
188+
break;
189+
}
190+
}
191+
vecs
192+
}
193+
194+
#[inline]
195+
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
196+
use bytes::{BufMut, BytesMut};
197+
// Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
198+
// request can be fulfilled by the front buffer, we can take advantage.
199+
match self.bufs.front_mut() {
200+
Some(front) if front.remaining() == len => {
201+
let b = front.copy_to_bytes(len);
202+
self.bufs.pop_front();
203+
b
204+
}
205+
Some(front) if front.remaining() > len => front.copy_to_bytes(len),
206+
_ => {
207+
assert!(len <= self.remaining(), "`len` greater than remaining");
208+
let mut bm = BytesMut::with_capacity(len);
209+
bm.put(self.take(len));
210+
bm.freeze()
211+
}
212+
}
213+
}
214+
}
215+
216+
impl<T> Default for BufList<T> {
217+
fn default() -> Self {
218+
BufList {
219+
bufs: VecDeque::new(),
220+
}
221+
}
222+
}

src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
//!
1414
//! [`Body`]: trait.Body.html
1515
16+
mod collect;
1617
mod empty;
1718
mod full;
1819
mod limited;
@@ -21,6 +22,7 @@ mod size_hint;
2122

2223
pub mod combinators;
2324

25+
pub use self::collect::Collected;
2426
pub use self::empty::Empty;
2527
pub use self::full::Full;
2628
pub use self::limited::{LengthLimitError, Limited};
@@ -118,6 +120,15 @@ pub trait Body {
118120
MapErr::new(self, f)
119121
}
120122

123+
/// Turn this body into [`Collected`] body which will collect all the DATA frames
124+
/// and trailers.
125+
fn collect(self) -> crate::collect::Collect<Self>
126+
where
127+
Self: Sized,
128+
{
129+
collect::Collect::new(self)
130+
}
131+
121132
/// Turn this body into a boxed trait object.
122133
fn boxed(self) -> BoxBody<Self::Data, Self::Error>
123134
where

0 commit comments

Comments
 (0)