Skip to content

Commit 3cd2e3f

Browse files
committed
vendor encoders from async-compression
This integrates the encoders from https://github.com/Nemo157/async-compression so we can use them other a stream that flushes data regularly
1 parent 0c6b312 commit 3cd2e3f

File tree

20 files changed

+911
-0
lines changed

20 files changed

+911
-0
lines changed

Cargo.lock

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ dependencies = [
286286
"axum",
287287
"backtrace",
288288
"base64 0.20.0",
289+
"brotli",
289290
"buildstructor 0.5.2",
290291
"bytes",
291292
"ci_info",
@@ -401,6 +402,8 @@ dependencies = [
401402
"wiremock",
402403
"wsl",
403404
"yaml-rust",
405+
"zstd",
406+
"zstd-safe",
404407
]
405408

406409
[[package]]
@@ -7073,3 +7076,33 @@ dependencies = [
70737076
"quote",
70747077
"syn 2.0.13",
70757078
]
7079+
7080+
[[package]]
7081+
name = "zstd"
7082+
version = "0.12.3+zstd.1.5.2"
7083+
source = "registry+https://github.com/rust-lang/crates.io-index"
7084+
checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806"
7085+
dependencies = [
7086+
"zstd-safe",
7087+
]
7088+
7089+
[[package]]
7090+
name = "zstd-safe"
7091+
version = "6.0.5+zstd.1.5.4"
7092+
source = "registry+https://github.com/rust-lang/crates.io-index"
7093+
checksum = "d56d9e60b4b1758206c238a10165fbcae3ca37b01744e394c463463f6529d23b"
7094+
dependencies = [
7095+
"libc",
7096+
"zstd-sys",
7097+
]
7098+
7099+
[[package]]
7100+
name = "zstd-sys"
7101+
version = "2.0.8+zstd.1.5.5"
7102+
source = "registry+https://github.com/rust-lang/crates.io-index"
7103+
checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c"
7104+
dependencies = [
7105+
"cc",
7106+
"libc",
7107+
"pkg-config",
7108+
]

apollo-router/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ yaml-rust = "0.4.5"
203203
wsl = "0.1.0"
204204
tokio-rustls = "0.23.4"
205205
http-serde = "1.1.2"
206+
memchr = "2.5.0"
207+
brotli = "3.3.4"
208+
zstd = "0.12.3"
209+
zstd-safe = "6.0.5"
206210

207211
[target.'cfg(macos)'.dependencies]
208212
uname = "0.1.1"
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
use crate::axum_factory::compression::{codec::Encode, util::PartialBuffer};
4+
use std::{
5+
fmt,
6+
io::{Error, ErrorKind, Result},
7+
};
8+
9+
use brotli::enc::{
10+
backward_references::BrotliEncoderParams,
11+
encode::{
12+
BrotliEncoderCompressStream, BrotliEncoderCreateInstance, BrotliEncoderHasMoreOutput,
13+
BrotliEncoderIsFinished, BrotliEncoderOperation, BrotliEncoderStateStruct,
14+
},
15+
StandardAlloc,
16+
};
17+
18+
pub(crate) struct BrotliEncoder {
19+
state: BrotliEncoderStateStruct<StandardAlloc>,
20+
}
21+
22+
impl BrotliEncoder {
23+
pub(crate) fn new(params: BrotliEncoderParams) -> Self {
24+
let mut state = BrotliEncoderCreateInstance(StandardAlloc::default());
25+
state.params = params;
26+
Self { state }
27+
}
28+
29+
fn encode(
30+
&mut self,
31+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
32+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
33+
op: BrotliEncoderOperation,
34+
) -> Result<()> {
35+
let in_buf = input.unwritten();
36+
let mut out_buf = output.unwritten_mut();
37+
38+
let mut input_len = 0;
39+
let mut output_len = 0;
40+
41+
if BrotliEncoderCompressStream(
42+
&mut self.state,
43+
op,
44+
&mut in_buf.len(),
45+
in_buf,
46+
&mut input_len,
47+
&mut out_buf.len(),
48+
out_buf,
49+
&mut output_len,
50+
&mut None,
51+
&mut |_, _, _, _| (),
52+
) <= 0
53+
{
54+
return Err(Error::new(ErrorKind::Other, "brotli error"));
55+
}
56+
57+
input.advance(input_len);
58+
output.advance(output_len);
59+
60+
Ok(())
61+
}
62+
}
63+
64+
impl Encode for BrotliEncoder {
65+
fn encode(
66+
&mut self,
67+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
68+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
69+
) -> Result<()> {
70+
self.encode(
71+
input,
72+
output,
73+
BrotliEncoderOperation::BROTLI_OPERATION_PROCESS,
74+
)
75+
}
76+
77+
fn flush(
78+
&mut self,
79+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
80+
) -> Result<bool> {
81+
self.encode(
82+
&mut PartialBuffer::new(&[][..]),
83+
output,
84+
BrotliEncoderOperation::BROTLI_OPERATION_FLUSH,
85+
)?;
86+
87+
Ok(BrotliEncoderHasMoreOutput(&self.state) == 0)
88+
}
89+
90+
fn finish(
91+
&mut self,
92+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
93+
) -> Result<bool> {
94+
self.encode(
95+
&mut PartialBuffer::new(&[][..]),
96+
output,
97+
BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
98+
)?;
99+
100+
Ok(BrotliEncoderIsFinished(&self.state) == 1)
101+
}
102+
}
103+
104+
impl fmt::Debug for BrotliEncoder {
105+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106+
f.debug_struct("BrotliEncoder")
107+
.field("compress", &"<no debug>")
108+
.finish()
109+
}
110+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
mod encoder;
4+
5+
pub(crate) use self::encoder::BrotliEncoder;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
use crate::axum_factory::compression::{
4+
codec::{Encode, FlateEncoder},
5+
util::PartialBuffer,
6+
};
7+
use std::io::Result;
8+
9+
use flate2::Compression;
10+
11+
#[derive(Debug)]
12+
pub(crate) struct DeflateEncoder {
13+
inner: FlateEncoder,
14+
}
15+
16+
impl DeflateEncoder {
17+
pub(crate) fn new(level: Compression) -> Self {
18+
Self {
19+
inner: FlateEncoder::new(level, false),
20+
}
21+
}
22+
}
23+
24+
impl Encode for DeflateEncoder {
25+
fn encode(
26+
&mut self,
27+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
28+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
29+
) -> Result<()> {
30+
self.inner.encode(input, output)
31+
}
32+
33+
fn flush(
34+
&mut self,
35+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
36+
) -> Result<bool> {
37+
self.inner.flush(output)
38+
}
39+
40+
fn finish(
41+
&mut self,
42+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
43+
) -> Result<bool> {
44+
self.inner.finish(output)
45+
}
46+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
mod encoder;
4+
5+
pub(crate) use self::encoder::DeflateEncoder;
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
use std::io::{Error, ErrorKind, Result};
4+
5+
use flate2::{Compress, Compression, FlushCompress, Status};
6+
7+
use crate::axum_factory::compression::{codec::Encode, util::PartialBuffer};
8+
9+
#[derive(Debug)]
10+
pub(crate) struct FlateEncoder {
11+
compress: Compress,
12+
flushed: bool,
13+
}
14+
15+
impl FlateEncoder {
16+
pub(crate) fn new(level: Compression, zlib_header: bool) -> Self {
17+
Self {
18+
compress: Compress::new(level, zlib_header),
19+
flushed: true,
20+
}
21+
}
22+
23+
fn encode(
24+
&mut self,
25+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
26+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
27+
flush: FlushCompress,
28+
) -> Result<Status> {
29+
let prior_in = self.compress.total_in();
30+
let prior_out = self.compress.total_out();
31+
32+
let status = self
33+
.compress
34+
.compress(input.unwritten(), output.unwritten_mut(), flush)?;
35+
36+
input.advance((self.compress.total_in() - prior_in) as usize);
37+
output.advance((self.compress.total_out() - prior_out) as usize);
38+
39+
Ok(status)
40+
}
41+
}
42+
43+
impl Encode for FlateEncoder {
44+
fn encode(
45+
&mut self,
46+
input: &mut PartialBuffer<impl AsRef<[u8]>>,
47+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
48+
) -> Result<()> {
49+
self.flushed = false;
50+
match self.encode(input, output, FlushCompress::None)? {
51+
Status::Ok => Ok(()),
52+
Status::StreamEnd => unreachable!(),
53+
Status::BufError => Err(Error::new(ErrorKind::Other, "unexpected BufError")),
54+
}
55+
}
56+
57+
fn flush(
58+
&mut self,
59+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
60+
) -> Result<bool> {
61+
// We need to keep track of whether we've already flushed otherwise we'll just keep writing
62+
// out sync blocks continuously and probably never complete flushing.
63+
if self.flushed {
64+
return Ok(true);
65+
}
66+
67+
self.encode(
68+
&mut PartialBuffer::new(&[][..]),
69+
output,
70+
FlushCompress::Sync,
71+
)?;
72+
73+
loop {
74+
let old_len = output.written().len();
75+
self.encode(
76+
&mut PartialBuffer::new(&[][..]),
77+
output,
78+
FlushCompress::None,
79+
)?;
80+
if output.written().len() == old_len {
81+
break;
82+
}
83+
}
84+
85+
self.flushed = true;
86+
Ok(!output.unwritten().is_empty())
87+
}
88+
89+
fn finish(
90+
&mut self,
91+
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
92+
) -> Result<bool> {
93+
self.flushed = false;
94+
match self.encode(
95+
&mut PartialBuffer::new(&[][..]),
96+
output,
97+
FlushCompress::Finish,
98+
)? {
99+
Status::Ok => Ok(false),
100+
Status::StreamEnd => Ok(true),
101+
Status::BufError => Err(Error::new(ErrorKind::Other, "unexpected BufError")),
102+
}
103+
}
104+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// All code from this module is extracted from https://github.com/Nemo157/async-compression and is under MIT or Apache-2 licence
2+
// it will be removed when we find a long lasting solution to https://github.com/Nemo157/async-compression/issues/154
3+
mod encoder;
4+
5+
pub(crate) use self::encoder::FlateEncoder;

0 commit comments

Comments
 (0)