Skip to content

Commit 3a3412c

Browse files
committed
implement streaming compression
this makes an assumption that async-commpression cannot make: every chunk in the body stream has to be compressed and flushed directly, we should not wait for more data to come and get better compression. This is due to the multipart protocol for defer: we know each chunk represent eitehr the primary or a deferred response, and should be sent as soon as possible
1 parent 3cd2e3f commit 3a3412c

File tree

2 files changed

+163
-10
lines changed

2 files changed

+163
-10
lines changed

apollo-router/src/axum_factory/axum_http_server_factory.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@ use axum::middleware::Next;
1515
use axum::response::*;
1616
use axum::routing::get;
1717
use axum::Router;
18+
use flate2::Compression;
1819
use futures::channel::oneshot;
1920
use futures::future::join;
2021
use futures::future::join_all;
2122
use futures::prelude::*;
23+
use http::header::ACCEPT_ENCODING;
24+
use http::header::CONTENT_ENCODING;
25+
use http::HeaderValue;
2226
use http::Request;
2327
use http_body::combinators::UnsyncBoxBody;
2428
use hyper::Body;
@@ -32,16 +36,17 @@ use tokio_rustls::TlsAcceptor;
3236
use tower::service_fn;
3337
use tower::BoxError;
3438
use tower::ServiceExt;
35-
use tower_http::compression::CompressionLayer;
3639
use tower_http::trace::TraceLayer;
3740

41+
use super::compression::codec::DeflateEncoder;
3842
use super::listeners::ensure_endpoints_consistency;
3943
use super::listeners::ensure_listenaddrs_consistency;
4044
use super::listeners::extra_endpoints;
4145
use super::listeners::ListenersAndRouters;
4246
use super::utils::decompress_request_body;
4347
use super::utils::PropagatingMakeSpan;
4448
use super::ListenAddrAndRouter;
49+
use crate::axum_factory::compression::Compressor;
4550
use crate::axum_factory::listeners::get_extra_listeners;
4651
use crate::axum_factory::listeners::serve_router_on_listen_addr;
4752
use crate::configuration::Configuration;
@@ -326,10 +331,7 @@ where
326331
))
327332
.layer(TraceLayer::new_for_http().make_span_with(PropagatingMakeSpan { entitlement }))
328333
.layer(Extension(service_factory))
329-
.layer(cors)
330-
// Compress the response body, except for multipart responses such as with `@defer`.
331-
// This is a work-around for https://github.com/apollographql/router/issues/1572
332-
.layer(CompressionLayer::new());
334+
.layer(cors);
333335

334336
let route = endpoints_on_main_listener
335337
.into_iter()
@@ -429,6 +431,11 @@ async fn handle_graphql(
429431

430432
let request: router::Request = http_request.into();
431433
let context = request.context.clone();
434+
let accept_encoding = request
435+
.router_request
436+
.headers()
437+
.get(ACCEPT_ENCODING)
438+
.cloned();
432439

433440
let res = service.oneshot(request).await;
434441
let dur = context.busy_time().await;
@@ -462,7 +469,25 @@ async fn handle_graphql(
462469
}
463470
Ok(response) => {
464471
tracing::info!(counter.apollo_router_session_count_active = -1,);
465-
response.response.into_response()
472+
let (mut parts, body) = response.response.into_parts();
473+
474+
println!("will compress response, accept-encoding == {accept_encoding:?}");
475+
let first: Option<&str> = accept_encoding
476+
.as_ref()
477+
.and_then(|value| value.to_str().ok())
478+
.and_then(|v| v.split(',').map(|s| s.trim()).next());
479+
println!("first: {first:?}");
480+
let body = if first.is_none() {
481+
body
482+
} else {
483+
let compressor = Compressor::Deflate(DeflateEncoder::new(Compression::fast()));
484+
parts
485+
.headers
486+
.insert(CONTENT_ENCODING, HeaderValue::from_static("deflate"));
487+
Body::wrap_stream(compressor.process(body))
488+
};
489+
490+
http::Response::from_parts(parts, body).into_response()
466491
}
467492
}
468493
}
Lines changed: 132 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,135 @@
1-
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2-
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3-
//#[macro_use]
4-
//mod macros;
1+
use bytes::{Bytes, BytesMut};
2+
use futures::{Stream, StreamExt};
3+
use tokio::sync::mpsc;
4+
use tokio_stream::wrappers::ReceiverStream;
5+
use tower::BoxError;
6+
7+
use self::{
8+
codec::{DeflateEncoder, Encode, GzipEncoder},
9+
util::PartialBuffer,
10+
};
11+
512
pub(crate) mod codec;
613
pub(crate) mod unshared;
714
pub(crate) mod util;
15+
16+
pub(crate) enum Compressor {
17+
//Identity,
18+
Deflate(DeflateEncoder),
19+
Gzip(GzipEncoder),
20+
//Brotli(BrotliEncoder),
21+
//Zstd,
22+
//others?
23+
}
24+
25+
//FIXME: we should call finish at the end
26+
impl Compressor {
27+
pub(crate) fn process(
28+
mut self,
29+
mut stream: hyper::Body,
30+
) -> impl Stream<Item = Result<Bytes, BoxError>>
31+
where {
32+
let (tx, rx) = mpsc::channel(10);
33+
34+
tokio::task::spawn(async move {
35+
while let Some(data) = stream.next().await {
36+
match data {
37+
Err(e) => {
38+
tx.send(Err(e.into())).await;
39+
}
40+
Ok(data) => {
41+
let mut buf = BytesMut::zeroed(1024);
42+
let mut written = 0usize;
43+
44+
let mut partial_input = PartialBuffer::new(&*data);
45+
loop {
46+
let mut partial_output = PartialBuffer::new(&mut buf);
47+
partial_output.advance(written);
48+
49+
match self.encode(&mut partial_input, &mut partial_output) {
50+
Err(e) => panic!("{e:?}"),
51+
Ok(()) => {}
52+
}
53+
54+
let read = partial_input.written().len();
55+
written += partial_output.written().len();
56+
println!("encode: read from input: {read}, written = {written}");
57+
58+
if !partial_input.unwritten().is_empty() {
59+
// there was not enough space in the output buffer to compress everything,
60+
// so we resize and add more data
61+
if partial_output.unwritten().is_empty() {
62+
let _ = partial_output.into_inner();
63+
buf.reserve(written);
64+
}
65+
} else {
66+
// FIXME: what happens if we try to flush in a full buffer
67+
match self.flush(&mut partial_output) {
68+
Err(e) => panic!("{e:?}"),
69+
Ok(_) => {
70+
let flushed = partial_output.written().len() - written;
71+
println!("flush with buffer of size {flushed}");
72+
let _ = partial_output.into_inner();
73+
buf.resize(flushed, 0);
74+
tx.send(Ok(buf.freeze())).await;
75+
break;
76+
}
77+
}
78+
}
79+
}
80+
}
81+
}
82+
}
83+
84+
let buf = BytesMut::zeroed(64);
85+
let mut partial_output = PartialBuffer::new(buf);
86+
87+
match self.finish(&mut partial_output) {
88+
Err(e) => panic!("{e:?}"),
89+
Ok(b) => {
90+
let len = partial_output.written().len();
91+
println!("finish with buffer of size {}", len);
92+
93+
let mut buf = partial_output.into_inner();
94+
buf.resize(len, 0);
95+
tx.send(Ok(buf.freeze())).await;
96+
}
97+
}
98+
//tx.send(partial_output.into_inner().freeze());
99+
});
100+
ReceiverStream::new(rx)
101+
}
102+
}
103+
104+
impl Encode for Compressor {
105+
fn encode(
106+
&mut self,
107+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
108+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
109+
) -> std::io::Result<()> {
110+
match self {
111+
Compressor::Deflate(e) => e.encode(input, output),
112+
Compressor::Gzip(e) => e.encode(input, output),
113+
}
114+
}
115+
116+
fn flush(
117+
&mut self,
118+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
119+
) -> std::io::Result<bool> {
120+
match self {
121+
Compressor::Deflate(e) => e.flush(output),
122+
Compressor::Gzip(e) => e.flush(output),
123+
}
124+
}
125+
126+
fn finish(
127+
&mut self,
128+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
129+
) -> std::io::Result<bool> {
130+
match self {
131+
Compressor::Deflate(e) => e.finish(output),
132+
Compressor::Gzip(e) => e.finish(output),
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)