Skip to content

Commit 60ada62

Browse files
committed
move responsewriter to its own module
1 parent 794da10 commit 60ada62

File tree

5 files changed

+157
-138
lines changed

5 files changed

+157
-138
lines changed

src/app_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ pub struct AppConfig {
230230
pub site_prefix: String,
231231

232232
/// Maximum number of messages that can be stored in memory before sending them to the client.
233+
/// This prevents a single request from using up all available memory.
233234
#[serde(default = "default_max_pending_rows")]
234235
pub max_pending_rows: usize,
235236

@@ -470,6 +471,8 @@ fn default_https_acme_directory_url() -> String {
470471
"https://acme-v02.api.letsencrypt.org/directory".to_string()
471472
}
472473

474+
/// If the sending queue exceeds this number of outgoing messages, an error will be thrown
475+
/// This prevents a single request from using up all available memory
473476
fn default_max_pending_rows() -> usize {
474477
256
475478
}

src/render.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::templates::SplitTemplate;
2-
use crate::webserver::http::{AsyncResponseWriter, RequestContext, ResponseWriter};
2+
use crate::webserver::http::RequestContext;
3+
use crate::webserver::response_writer::{AsyncResponseWriter, ResponseWriter};
34
use crate::webserver::ErrorWithStatus;
45
use crate::AppState;
56
use actix_web::cookie::time::format_description::well_known::Rfc3339;

src/webserver/http.rs

Lines changed: 2 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -11,167 +11,32 @@ use actix_web::http::header::{ContentType, Header, HttpDate, IfModifiedSince, La
1111
use actix_web::http::{header, StatusCode, Uri};
1212
use actix_web::web::PayloadConfig;
1313
use actix_web::{
14-
dev::ServiceResponse, middleware, middleware::Logger, web, web::Bytes, App, HttpResponse,
15-
HttpServer,
14+
dev::ServiceResponse, middleware, middleware::Logger, web, App, HttpResponse, HttpServer,
1615
};
1716
use actix_web::{HttpResponseBuilder, ResponseError};
1817

1918
use super::https::make_auto_rustls_config;
19+
use super::response_writer::ResponseWriter;
2020
use super::static_content;
2121
use actix_web::body::MessageBody;
2222
use anyhow::{bail, Context};
2323
use chrono::{DateTime, Utc};
2424
use futures_util::stream::Stream;
2525
use futures_util::StreamExt;
2626
use std::borrow::Cow;
27-
use std::io::Write;
2827
use std::mem;
2928
use std::path::PathBuf;
3029
use std::pin::Pin;
3130
use std::sync::Arc;
32-
use std::task::Poll;
3331
use std::time::SystemTime;
3432
use tokio::sync::mpsc;
3533

36-
/// If the sending queue exceeds this number of outgoing messages, an error will be thrown
37-
/// This prevents a single request from using up all available memory
38-
39-
#[derive(Clone)]
40-
pub struct ResponseWriter {
41-
buffer: Vec<u8>,
42-
response_bytes: mpsc::Sender<Bytes>,
43-
}
44-
4534
#[derive(Clone)]
4635
pub struct RequestContext {
4736
pub is_embedded: bool,
4837
pub content_security_policy: ContentSecurityPolicy,
4938
}
5039

51-
impl ResponseWriter {
52-
fn new(response_bytes: mpsc::Sender<Bytes>) -> Self {
53-
Self {
54-
response_bytes,
55-
buffer: Vec::new(),
56-
}
57-
}
58-
async fn close_with_error(&mut self, mut msg: String) {
59-
if !self.response_bytes.is_closed() {
60-
if let Err(e) = self.async_flush().await {
61-
use std::fmt::Write;
62-
write!(&mut msg, "Unable to flush data: {e}").unwrap();
63-
}
64-
if let Err(e) = self.response_bytes.send(msg.into()).await {
65-
log::error!("Unable to send error back to client: {e}");
66-
}
67-
}
68-
}
69-
70-
pub async fn async_flush(&mut self) -> std::io::Result<()> {
71-
if self.buffer.is_empty() {
72-
return Ok(());
73-
}
74-
log::trace!(
75-
"Flushing data to client: {}",
76-
String::from_utf8_lossy(&self.buffer)
77-
);
78-
let sender = self
79-
.response_bytes
80-
.reserve()
81-
.await
82-
.map_err(|_| std::io::ErrorKind::WouldBlock)?;
83-
sender.send(std::mem::take(&mut self.buffer).into());
84-
Ok(())
85-
}
86-
}
87-
88-
impl Write for ResponseWriter {
89-
#[inline]
90-
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
91-
self.buffer.extend_from_slice(buf);
92-
Ok(buf.len())
93-
}
94-
fn flush(&mut self) -> std::io::Result<()> {
95-
if self.buffer.is_empty() {
96-
return Ok(());
97-
}
98-
log::trace!(
99-
"Flushing data to client: {}",
100-
String::from_utf8_lossy(&self.buffer)
101-
);
102-
self.response_bytes
103-
.try_send(mem::take(&mut self.buffer).into())
104-
.map_err(|e|
105-
std::io::Error::new(
106-
std::io::ErrorKind::WouldBlock,
107-
format!("{e}: Row limit exceeded. The server cannot store more than {} pending messages in memory. Try again later or increase max_pending_rows in the configuration.", self.response_bytes.max_capacity())
108-
)
109-
)
110-
}
111-
}
112-
113-
pub struct AsyncResponseWriter {
114-
poll_sender: tokio_util::sync::PollSender<Bytes>,
115-
writer: ResponseWriter,
116-
}
117-
118-
impl AsyncResponseWriter {
119-
#[must_use]
120-
pub fn new(writer: ResponseWriter) -> Self {
121-
let sender = writer.response_bytes.clone();
122-
Self {
123-
poll_sender: tokio_util::sync::PollSender::new(sender),
124-
writer,
125-
}
126-
}
127-
#[must_use]
128-
pub fn into_inner(self) -> ResponseWriter {
129-
self.writer
130-
}
131-
}
132-
impl tokio::io::AsyncWrite for AsyncResponseWriter {
133-
fn poll_write(
134-
mut self: Pin<&mut Self>,
135-
_cx: &mut std::task::Context<'_>,
136-
buf: &[u8],
137-
) -> std::task::Poll<std::io::Result<usize>> {
138-
Poll::Ready(self.as_mut().writer.write(buf))
139-
}
140-
141-
fn poll_flush(
142-
self: Pin<&mut Self>,
143-
cx: &mut std::task::Context<'_>,
144-
) -> std::task::Poll<std::io::Result<()>> {
145-
let Self {
146-
poll_sender,
147-
writer,
148-
} = self.get_mut();
149-
match poll_sender.poll_reserve(cx) {
150-
Poll::Ready(Ok(())) => {
151-
let res = poll_sender.send_item(std::mem::take(&mut writer.buffer).into());
152-
Poll::Ready(res.map_err(|_| std::io::ErrorKind::BrokenPipe.into()))
153-
}
154-
Poll::Pending => Poll::Pending,
155-
Poll::Ready(Err(_e)) => Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())),
156-
}
157-
}
158-
159-
fn poll_shutdown(
160-
self: Pin<&mut Self>,
161-
cx: &mut std::task::Context<'_>,
162-
) -> std::task::Poll<Result<(), std::io::Error>> {
163-
self.poll_flush(cx)
164-
}
165-
}
166-
167-
impl Drop for ResponseWriter {
168-
fn drop(&mut self) {
169-
if let Err(e) = std::io::Write::flush(self) {
170-
log::error!("Could not flush data to client: {e}");
171-
}
172-
}
173-
}
174-
17540
async fn stream_response(stream: impl Stream<Item = DbItem>, mut renderer: AnyRenderBodyContext) {
17641
let mut stream = Box::pin(stream);
17742

src/webserver/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ pub use error_with_status::ErrorWithStatus;
1111

1212
pub use database::make_placeholder;
1313
pub use database::migrations::apply;
14+
pub mod response_writer;
1415
mod static_content;

src/webserver/response_writer.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use actix_web::web::Bytes;
2+
use std::io::Write;
3+
use std::mem;
4+
use std::pin::Pin;
5+
use tokio::sync::mpsc;
6+
7+
/// The response writer is a buffered async writer that sends data to the client.
8+
/// Writing to it just appends to an in-memory buffer, which is flushed to the client asynchronously
9+
/// when `async_flush()` is called.
10+
/// This allows streaming data to the client without blocking, and has built-in back-pressure:
11+
/// if the client cannot keep up with the data, `async_flush()` will fill the sending queue,
12+
/// then block until the client has consumed some data.
13+
#[derive(Clone)]
14+
pub struct ResponseWriter {
15+
buffer: Vec<u8>,
16+
response_bytes: mpsc::Sender<Bytes>,
17+
}
18+
19+
impl ResponseWriter {
20+
#[must_use]
21+
pub fn new(response_bytes: mpsc::Sender<Bytes>) -> Self {
22+
Self {
23+
response_bytes,
24+
buffer: Vec::new(),
25+
}
26+
}
27+
28+
pub async fn close_with_error(&mut self, mut msg: String) {
29+
if !self.response_bytes.is_closed() {
30+
if let Err(e) = self.async_flush().await {
31+
use std::fmt::Write;
32+
write!(&mut msg, "Unable to flush data: {e}").unwrap();
33+
}
34+
if let Err(e) = self.response_bytes.send(msg.into()).await {
35+
log::error!("Unable to send error back to client: {e}");
36+
}
37+
}
38+
}
39+
40+
pub async fn async_flush(&mut self) -> std::io::Result<()> {
41+
if self.buffer.is_empty() {
42+
return Ok(());
43+
}
44+
log::trace!(
45+
"Flushing data to client: {}",
46+
String::from_utf8_lossy(&self.buffer)
47+
);
48+
let sender = self
49+
.response_bytes
50+
.reserve()
51+
.await
52+
.map_err(|_| std::io::ErrorKind::WouldBlock)?;
53+
sender.send(std::mem::take(&mut self.buffer).into());
54+
Ok(())
55+
}
56+
}
57+
58+
impl Write for ResponseWriter {
59+
#[inline]
60+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
61+
self.buffer.extend_from_slice(buf);
62+
Ok(buf.len())
63+
}
64+
65+
fn flush(&mut self) -> std::io::Result<()> {
66+
if self.buffer.is_empty() {
67+
return Ok(());
68+
}
69+
log::trace!(
70+
"Flushing data to client: {}",
71+
String::from_utf8_lossy(&self.buffer)
72+
);
73+
self.response_bytes
74+
.try_send(mem::take(&mut self.buffer).into())
75+
.map_err(|e|
76+
std::io::Error::new(
77+
std::io::ErrorKind::WouldBlock,
78+
format!("{e}: Row limit exceeded. The server cannot store more than {} pending messages in memory. Try again later or increase max_pending_rows in the configuration.", self.response_bytes.max_capacity())
79+
)
80+
)
81+
}
82+
}
83+
84+
#[allow(clippy::module_name_repetitions)]
85+
pub struct AsyncResponseWriter {
86+
poll_sender: tokio_util::sync::PollSender<Bytes>,
87+
writer: ResponseWriter,
88+
}
89+
90+
impl AsyncResponseWriter {
91+
#[must_use]
92+
pub fn new(writer: ResponseWriter) -> Self {
93+
let sender = writer.response_bytes.clone();
94+
Self {
95+
poll_sender: tokio_util::sync::PollSender::new(sender),
96+
writer,
97+
}
98+
}
99+
100+
#[must_use]
101+
pub fn into_inner(self) -> ResponseWriter {
102+
self.writer
103+
}
104+
}
105+
106+
impl tokio::io::AsyncWrite for AsyncResponseWriter {
107+
fn poll_write(
108+
mut self: Pin<&mut Self>,
109+
_cx: &mut std::task::Context<'_>,
110+
buf: &[u8],
111+
) -> std::task::Poll<std::io::Result<usize>> {
112+
std::task::Poll::Ready(self.as_mut().writer.write(buf))
113+
}
114+
115+
fn poll_flush(
116+
self: Pin<&mut Self>,
117+
cx: &mut std::task::Context<'_>,
118+
) -> std::task::Poll<std::io::Result<()>> {
119+
let Self {
120+
poll_sender,
121+
writer,
122+
} = self.get_mut();
123+
match poll_sender.poll_reserve(cx) {
124+
std::task::Poll::Ready(Ok(())) => {
125+
let res = poll_sender.send_item(std::mem::take(&mut writer.buffer).into());
126+
std::task::Poll::Ready(res.map_err(|_| std::io::ErrorKind::BrokenPipe.into()))
127+
}
128+
std::task::Poll::Pending => std::task::Poll::Pending,
129+
std::task::Poll::Ready(Err(_e)) => {
130+
std::task::Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()))
131+
}
132+
}
133+
}
134+
135+
fn poll_shutdown(
136+
self: Pin<&mut Self>,
137+
cx: &mut std::task::Context<'_>,
138+
) -> std::task::Poll<Result<(), std::io::Error>> {
139+
self.poll_flush(cx)
140+
}
141+
}
142+
143+
impl Drop for ResponseWriter {
144+
fn drop(&mut self) {
145+
if let Err(e) = std::io::Write::flush(self) {
146+
log::error!("Could not flush data to client: {e}");
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)