Skip to content

Commit 87de0ee

Browse files
committed
implemented Stream for Body; refactor logger
1 parent 2734f08 commit 87de0ee

File tree

3 files changed

+150
-85
lines changed

3 files changed

+150
-85
lines changed

roa-core/src/body.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,6 @@ impl Body {
122122
}
123123
self
124124
}
125-
126-
/// Wrap self with a wrapper.
127-
#[inline]
128-
pub fn wrapped(&mut self, wrapper: impl FnOnce(Self) -> Self) -> &mut Self {
129-
*self = wrapper(std::mem::take(self));
130-
self
131-
}
132125
}
133126

134127
impl BodyStream {
@@ -170,6 +163,12 @@ impl BodyBytes {
170163
}
171164
}
172165
}
166+
167+
/// Get size hint.
168+
#[inline]
169+
pub fn size_hint(&self) -> usize {
170+
self.size_hint
171+
}
173172
}
174173

175174
impl From<Body> for hyper::Body {
@@ -251,26 +250,37 @@ impl Stream for BodyStream {
251250
}
252251
}
253252

253+
impl Stream for Body {
254+
type Item = io::Result<Bytes>;
255+
#[inline]
256+
fn poll_next(
257+
mut self: Pin<&mut Self>,
258+
cx: &mut Context<'_>,
259+
) -> Poll<Option<Self::Item>> {
260+
match &mut *self {
261+
Body::Bytes(bytes) => {
262+
if bytes.size_hint == 0 {
263+
Poll::Ready(None)
264+
} else {
265+
Poll::Ready(Some(Ok(mem::take(bytes).bytes())))
266+
}
267+
}
268+
Body::Stream(stream) => Pin::new(stream).poll_next(cx),
269+
}
270+
}
271+
}
272+
254273
#[cfg(test)]
255274
mod tests {
256-
use super::{Body, BodyBytes};
275+
use super::Body;
257276
use async_std::fs::File;
258-
use futures::StreamExt;
277+
use futures::{AsyncReadExt, TryStreamExt};
259278
use std::io;
260279

261280
async fn read_body(body: Body) -> io::Result<String> {
262-
use Body::*;
263-
let data = match body {
264-
Bytes(bytes) => bytes,
265-
Stream(mut stream) => {
266-
let mut bytes = BodyBytes::default();
267-
while let Some(item) = stream.next().await {
268-
bytes.write(item?);
269-
}
270-
bytes
271-
}
272-
};
273-
Ok(String::from_utf8_lossy(&*data.bytes()).to_string())
281+
let mut data = String::new();
282+
body.into_async_read().read_to_string(&mut data).await?;
283+
Ok(data)
274284
}
275285

276286
#[async_std::test]

roa-core/src/response.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ pub struct Response {
1515
/// Raw header map.
1616
pub headers: HeaderMap<HeaderValue>,
1717

18-
body: Body,
18+
/// Response body.
19+
pub body: Body,
1920
}
2021

2122
impl Response {

roa/src/logger.rs

Lines changed: 117 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -26,37 +26,63 @@
2626
//! }
2727
//! ```
2828
29-
use crate::{Context, Executor, JoinHandle, Next, Result, State};
29+
use crate::{Body, Context, Executor, JoinHandle, Next, Result, State};
3030
use bytes::Bytes;
3131
use bytesize::ByteSize;
3232
use futures::task::{self, Poll};
3333
use futures::{Future, Stream};
3434
use log::{error, info};
35+
use roa_core::http::{Method, StatusCode};
3536
use std::io;
3637
use std::pin::Pin;
3738
use std::sync::Arc;
3839
use std::time::Instant;
3940

40-
/// A finite-state machine to log information in each response.
41-
enum Logger<S, F>
42-
where
43-
F: FnOnce(u64),
44-
{
45-
Polling {
46-
counter: u64,
47-
stream: S,
48-
exec: Executor,
49-
task: Arc<F>,
50-
},
41+
/// A finite-state machine to log success information in each streaming response.
42+
enum StreamLogger<S> {
43+
Polling { stream: S, task: Box<LogTask> },
5144

5245
Logging(JoinHandle<()>),
5346

5447
Complete,
5548
}
5649

57-
impl<S, F> Stream for Logger<S, F>
50+
/// A task structure to log when polling is complete.
51+
#[derive(Clone)]
52+
struct LogTask {
53+
counter: u64,
54+
method: Method,
55+
status_code: StatusCode,
56+
path: String,
57+
start: Instant,
58+
exec: Executor,
59+
}
60+
61+
impl LogTask {
62+
fn log(self) -> JoinHandle<()> {
63+
let LogTask {
64+
counter,
65+
method,
66+
status_code,
67+
path,
68+
start,
69+
exec,
70+
} = self;
71+
exec.spawn_blocking(|| {
72+
info!(
73+
"<-- {} {} {}ms {} {}",
74+
method,
75+
path,
76+
start.elapsed().as_millis(),
77+
ByteSize(counter),
78+
status_code,
79+
)
80+
})
81+
}
82+
}
83+
84+
impl<S> Stream for StreamLogger<S>
5885
where
59-
F: 'static + Send + Sync + Unpin + Fn(u64),
6086
S: 'static + Send + Send + Unpin + Stream<Item = io::Result<Bytes>>,
6187
{
6288
type Item = io::Result<Bytes>;
@@ -66,33 +92,28 @@ where
6692
cx: &mut task::Context<'_>,
6793
) -> Poll<Option<Self::Item>> {
6894
match &mut *self {
69-
Logger::Polling {
70-
stream,
71-
exec,
72-
counter,
73-
task,
74-
} => match futures::ready!(Pin::new(stream).poll_next(cx)) {
75-
Some(Ok(bytes)) => {
76-
*counter += bytes.len() as u64;
77-
Poll::Ready(Some(Ok(bytes)))
95+
StreamLogger::Polling { stream, task } => {
96+
match futures::ready!(Pin::new(stream).poll_next(cx)) {
97+
Some(Ok(bytes)) => {
98+
task.counter += bytes.len() as u64;
99+
Poll::Ready(Some(Ok(bytes)))
100+
}
101+
None => {
102+
let handler = task.clone().log();
103+
*self = StreamLogger::Logging(handler);
104+
self.poll_next(cx)
105+
}
106+
err => Poll::Ready(err),
78107
}
79-
None => {
80-
let counter = *counter;
81-
let task = task.clone();
82-
let handler = exec.spawn_blocking(move || task(counter));
83-
*self = Logger::Logging(handler);
84-
self.poll_next(cx)
85-
}
86-
err => Poll::Ready(err),
87-
},
108+
}
88109

89-
Logger::Logging(handler) => {
110+
StreamLogger::Logging(handler) => {
90111
futures::ready!(Pin::new(handler).poll(cx));
91-
*self = Logger::Complete;
112+
*self = StreamLogger::Complete;
92113
self.poll_next(cx)
93114
}
94115

95-
Logger::Complete => Poll::Ready(None),
116+
StreamLogger::Complete => Poll::Ready(None),
96117
}
97118
}
98119
}
@@ -108,47 +129,57 @@ pub async fn logger<S: State>(mut ctx: Context<S>, next: Next) -> Result {
108129

109130
let method = ctx.method().clone();
110131
let path = ctx.uri().path().to_string();
111-
let counter = 0;
112132
let exec = ctx.exec.clone();
113-
match result {
114-
Ok(()) => {
115-
let status_code = ctx.status();
116-
ctx.resp_mut().wrapped(move |stream| Logger::Polling {
117-
counter,
118-
stream,
119-
exec,
120-
task: Arc::new(move |counter| {
121-
info!(
122-
"<-- {} {} {}ms {} {}",
133+
let status_code = ctx.status();
134+
135+
match (&result, &mut ctx.resp_mut().body) {
136+
(Err(err), _) => {
137+
let message = err.message.clone();
138+
ctx.exec
139+
.spawn_blocking(move || {
140+
error!(
141+
"<-- {} {} {}ms {}\n{}",
123142
method,
124143
path,
125144
start.elapsed().as_millis(),
126-
ByteSize(counter),
127145
status_code,
146+
message,
128147
);
129-
}),
130-
});
148+
})
149+
.await
131150
}
132-
Err(ref err) => {
133-
let message = err.message.clone();
134-
let status_code = err.status_code;
135-
ctx.resp_mut().wrapped(move |stream| Logger::Polling {
136-
counter,
137-
stream,
138-
exec,
139-
task: Arc::new(move |_counter| {
140-
error!(
141-
"<-- {} {} {}ms {}\n{}",
151+
(OK(_), Body::Bytes(bytes)) => {
152+
let size = bytes.size_hint();
153+
ctx.exec
154+
.spawn_blocking(move || {
155+
info!(
156+
"<-- {} {} {}ms {} {}",
142157
method,
143158
path,
144159
start.elapsed().as_millis(),
160+
ByteSize(size as u64),
145161
status_code,
146-
message,
147162
);
148-
}),
163+
})
164+
.await
165+
}
166+
(Ok(_), Body::Stream(stream)) => {
167+
let task = Box::new(LogTask {
168+
counter: 0,
169+
method,
170+
path,
171+
status_code,
172+
start,
173+
exec: ctx.exec.clone(),
149174
});
175+
let logger = StreamLogger::Polling {
176+
stream: std::mem::take(stream),
177+
task,
178+
};
179+
ctx.resp_mut().write_stream(logger);
150180
}
151-
};
181+
}
182+
152183
result
153184
}
154185

@@ -158,6 +189,7 @@ mod tests {
158189
use crate::http::StatusCode;
159190
use crate::preload::*;
160191
use crate::{throw, App};
192+
use async_std::fs::File;
161193
use async_std::task::spawn;
162194
use lazy_static::lazy_static;
163195
use log::{Level, LevelFilter, Metadata, Record, SetLoggerError};
@@ -193,7 +225,7 @@ mod tests {
193225
async fn log() -> Result<(), Box<dyn std::error::Error>> {
194226
init()?;
195227

196-
// info
228+
// bytes info
197229
let (addr, server) = App::new(())
198230
.gate_fn(logger)
199231
.end(move |mut ctx| async move {
@@ -232,6 +264,28 @@ mod tests {
232264
assert_eq!("ERROR", records[3].0);
233265
assert!(records[3].1.starts_with("<-- GET /"));
234266
assert!(records[3].1.ends_with("Hello, World!"));
267+
268+
// stream info
269+
let (addr, server) = App::new(())
270+
.gate_fn(logger)
271+
.end(move |mut ctx| async move {
272+
ctx.resp_mut()
273+
.write_reader(File::open("../assets/welcome.html").await?);
274+
Ok(())
275+
})
276+
.run()?;
277+
spawn(server);
278+
let resp = reqwest::get(&format!("http://{}", addr)).await?;
279+
assert_eq!(StatusCode::OK, resp.status());
280+
assert_eq!(236, resp.text().await?.len());
281+
let records = LOGGER.records.read().unwrap().clone();
282+
assert_eq!(6, records.len());
283+
assert_eq!("INFO", records[4].0);
284+
assert_eq!("--> GET /", records[4].1);
285+
assert_eq!("INFO", records[5].0);
286+
assert!(records[5].1.starts_with("<-- GET /"));
287+
assert!(records[5].1.contains("236 B"));
288+
assert!(records[5].1.ends_with("200 OK"));
235289
Ok(())
236290
}
237291
}

0 commit comments

Comments
 (0)