Skip to content

Commit 92c582e

Browse files
committed
reimplement async_flush and asyncwrite
1 parent e00dd8b commit 92c582e

File tree

5 files changed

+71
-40
lines changed

5 files changed

+71
-40
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ rustls = { version = "0.22.0" } # keep in sync with actix-web, awc, rustls-acme,
6262
rustls-native-certs = "0.7.0"
6363
awc = { version = "3", features = ["rustls-0_22-webpki-roots"] }
6464
clap = { version = "4.5.17", features = ["derive"] }
65+
tokio-util = "0.7.12"
6566

6667
[build-dependencies]
6768
awc = { version = "3", features = ["rustls-0_22-webpki-roots"] }

src/app_config.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ impl AppConfig {
129129
));
130130
}
131131
}
132+
anyhow::ensure!(self.max_pending_rows > 0, "max_pending_rows cannot be null");
132133
Ok(())
133134
}
134135
}
@@ -470,7 +471,7 @@ fn default_https_acme_directory_url() -> String {
470471
}
471472

472473
fn default_max_pending_rows() -> usize {
473-
256
474+
1
474475
}
475476

476477
fn default_compress_responses() -> bool {

src/render.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::templates::SplitTemplate;
2-
use crate::webserver::http::{RequestContext, ResponseWriter};
2+
use crate::webserver::http::{AsyncResponseWriter, RequestContext, ResponseWriter};
33
use crate::webserver::ErrorWithStatus;
44
use crate::AppState;
55
use actix_web::cookie::time::format_description::well_known::Rfc3339;
@@ -452,15 +452,16 @@ impl<W: std::io::Write> JsonBodyRenderer<W> {
452452
}
453453

454454
pub struct CsvBodyRenderer {
455-
writer: csv_async::AsyncWriter<ResponseWriter>,
455+
writer: csv_async::AsyncWriter<AsyncResponseWriter>,
456456
is_first: bool,
457457
}
458458

459459
impl CsvBodyRenderer {
460-
pub async fn new(mut writer: ResponseWriter) -> anyhow::Result<CsvBodyRenderer> {
461-
tokio::io::AsyncWriteExt::flush(&mut writer).await?;
460+
pub async fn new(writer: ResponseWriter) -> anyhow::Result<CsvBodyRenderer> {
461+
let mut async_writer = AsyncResponseWriter::new(writer);
462+
tokio::io::AsyncWriteExt::flush(&mut async_writer).await?;
462463
Ok(CsvBodyRenderer {
463-
writer: csv_async::AsyncWriter::from_writer(writer),
464+
writer: csv_async::AsyncWriter::from_writer(async_writer),
464465
is_first: true,
465466
})
466467
}
@@ -497,6 +498,7 @@ impl CsvBodyRenderer {
497498
.into_inner()
498499
.await
499500
.expect("Failed to get inner writer")
501+
.into_inner()
500502
}
501503
}
502504

src/webserver/http.rs

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::mem;
2929
use std::path::PathBuf;
3030
use std::pin::Pin;
3131
use std::sync::Arc;
32+
use std::task::Poll;
3233
use std::time::SystemTime;
3334
use tokio::sync::mpsc;
3435

@@ -38,7 +39,7 @@ use tokio::sync::mpsc;
3839
#[derive(Clone)]
3940
pub struct ResponseWriter {
4041
buffer: Vec<u8>,
41-
response_bytes: mpsc::Sender<actix_web::Result<Bytes>>,
42+
response_bytes: mpsc::Sender<Bytes>,
4243
}
4344

4445
#[derive(Clone)]
@@ -48,7 +49,7 @@ pub struct RequestContext {
4849
}
4950

5051
impl ResponseWriter {
51-
fn new(response_bytes: mpsc::Sender<actix_web::Result<Bytes>>) -> Self {
52+
fn new(response_bytes: mpsc::Sender<Bytes>) -> Self {
5253
Self {
5354
response_bytes,
5455
buffer: Vec::new(),
@@ -57,20 +58,30 @@ impl ResponseWriter {
5758
async fn close_with_error(&mut self, mut msg: String) {
5859
if !self.response_bytes.is_closed() {
5960
if let Err(e) = self.async_flush().await {
60-
msg.push_str(&format!("Unable to flush data: {e}"));
61+
use std::fmt::Write;
62+
write!(&mut msg, "Unable to flush data: {e}").unwrap();
6163
}
62-
if let Err(e) = self
63-
.response_bytes
64-
.send(Err(ErrorInternalServerError(msg)))
65-
.await
66-
{
64+
if let Err(e) = self.response_bytes.send(msg.into()).await {
6765
log::error!("Unable to send error back to client: {e}");
6866
}
6967
}
7068
}
7169

7270
pub async fn async_flush(&mut self) -> std::io::Result<()> {
73-
tokio::io::AsyncWriteExt::flush(self).await
71+
if self.buffer.is_empty() {
72+
return Ok(());
73+
}
74+
log::trace!(
75+
"Flushing data to client: {}",
76+
String::from_utf8_lossy(&self.buffer)
77+
);
78+
let sender = self
79+
.response_bytes
80+
.reserve()
81+
.await
82+
.map_err(|_| std::io::ErrorKind::WouldBlock)?;
83+
sender.send(std::mem::take(&mut self.buffer).into());
84+
Ok(())
7485
}
7586
}
7687

@@ -89,7 +100,7 @@ impl Write for ResponseWriter {
89100
String::from_utf8_lossy(&self.buffer)
90101
);
91102
self.response_bytes
92-
.try_send(Ok(mem::take(&mut self.buffer).into()))
103+
.try_send(mem::take(&mut self.buffer).into())
93104
.map_err(|e|
94105
std::io::Error::new(
95106
std::io::ErrorKind::WouldBlock,
@@ -99,35 +110,49 @@ impl Write for ResponseWriter {
99110
}
100111
}
101112

102-
impl tokio::io::AsyncWrite for ResponseWriter {
113+
pub struct AsyncResponseWriter {
114+
poll_sender: tokio_util::sync::PollSender<Bytes>,
115+
writer: ResponseWriter,
116+
}
117+
118+
impl AsyncResponseWriter {
119+
#[must_use]
120+
pub fn new(writer: ResponseWriter) -> Self {
121+
let sender = writer.response_bytes.clone();
122+
Self {
123+
poll_sender: tokio_util::sync::PollSender::new(sender),
124+
writer,
125+
}
126+
}
127+
#[must_use]
128+
pub fn into_inner(self) -> ResponseWriter {
129+
self.writer
130+
}
131+
}
132+
impl tokio::io::AsyncWrite for AsyncResponseWriter {
103133
fn poll_write(
104-
self: Pin<&mut Self>,
134+
mut self: Pin<&mut Self>,
105135
_cx: &mut std::task::Context<'_>,
106136
buf: &[u8],
107-
) -> std::task::Poll<Result<usize, std::io::Error>> {
108-
self.get_mut().buffer.extend_from_slice(buf);
109-
std::task::Poll::Ready(Ok(buf.len()))
137+
) -> std::task::Poll<std::io::Result<usize>> {
138+
Poll::Ready(self.as_mut().writer.write(buf))
110139
}
111140

112141
fn poll_flush(
113-
mut self: Pin<&mut Self>,
114-
_cx: &mut std::task::Context<'_>,
115-
) -> std::task::Poll<Result<(), std::io::Error>> {
116-
if self.buffer.is_empty() {
117-
return std::task::Poll::Ready(Ok(()));
118-
}
119-
log::trace!(
120-
"Async flushing data to client: {}",
121-
String::from_utf8_lossy(&self.buffer)
122-
);
123-
let capacity = self.response_bytes.max_capacity();
124-
let buffer = mem::take(&mut self.buffer);
125-
match self.get_mut().response_bytes.try_send(Ok(buffer.into())) {
126-
Ok(()) => std::task::Poll::Ready(Ok(())),
127-
Err(e) => std::task::Poll::Ready(Err(std::io::Error::new(
128-
std::io::ErrorKind::WouldBlock,
129-
format!("{e}: Row limit exceeded. The server cannot store more than {capacity} pending messages in memory. Try again later or increase max_pending_rows in the configuration.")
130-
)))
142+
self: Pin<&mut Self>,
143+
cx: &mut std::task::Context<'_>,
144+
) -> std::task::Poll<std::io::Result<()>> {
145+
let Self {
146+
poll_sender,
147+
writer,
148+
} = self.get_mut();
149+
match poll_sender.poll_reserve(cx) {
150+
Poll::Ready(Ok(())) => {
151+
let res = poll_sender.send_item(std::mem::take(&mut writer.buffer).into());
152+
Poll::Ready(res.map_err(|_| std::io::ErrorKind::BrokenPipe.into()))
153+
}
154+
Poll::Pending => Poll::Pending,
155+
Poll::Ready(Err(_e)) => Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())),
131156
}
132157
}
133158

@@ -227,7 +252,8 @@ async fn build_response_header_and_stream<S: Stream<Item = DbItem>>(
227252
renderer,
228253
} => {
229254
let body_stream = tokio_stream::wrappers::ReceiverStream::new(receiver);
230-
let http_response = http_response.streaming(body_stream);
255+
let result_stream = body_stream.map(Ok::<_, actix_web::Error>);
256+
let http_response = http_response.streaming(result_stream);
231257
return Ok(ResponseWithWriter::RenderStream {
232258
http_response,
233259
renderer,

0 commit comments

Comments
 (0)