Skip to content

Commit e30d232

Browse files
committed
container: Drop async_compression
This is basically just a workaround for Nullus157/async-compression#271 However, in practice I think we may as well just use a native blocking tokio thread here. There's a lot of shenanigans going on though because we're wrapping sync I/O with async and then back to sync because the tar code we're using is still sync. What would be a lot better is to move the compression to be inline with the sync tar parsing, but that would require some API changes and more code motion.
1 parent 3014069 commit e30d232

File tree

3 files changed

+78
-21
lines changed

3 files changed

+78
-21
lines changed

lib/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ rust-version = "1.74.0"
1212
[dependencies]
1313
anyhow = "1.0"
1414
containers-image-proxy = "0.5.5"
15-
async-compression = { version = "0.4", features = ["gzip", "tokio", "zstd"] }
1615
camino = "1.0.4"
1716
chrono = "0.4.19"
1817
olpc-cjson = "0.1.1"
@@ -43,6 +42,7 @@ tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1
4342
tokio-util = { features = ["io-util"], version = "0.7" }
4443
tokio-stream = { features = ["sync"], version = "0.1.8" }
4544
tracing = "0.1"
45+
zstd = "0.13.1"
4646

4747
indoc = { version = "2", optional = true }
4848
xshell = { version = "0.2", optional = true }

lib/src/container/unencapsulate.rs

Lines changed: 72 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::container::store::LayerProgress;
3636
use super::*;
3737
use containers_image_proxy::{ImageProxy, OpenedImage};
3838
use fn_error_context::context;
39-
use futures_util::{Future, FutureExt};
39+
use futures_util::{Future, FutureExt, TryFutureExt as _};
4040
use oci_spec::image as oci_image;
4141
use std::sync::{Arc, Mutex};
4242
use tokio::{
@@ -189,22 +189,76 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
189189
importer.unencapsulate().await
190190
}
191191

192+
/// Take an async AsyncBufRead and handle decompression for it, returning
193+
/// a wrapped AsyncBufRead implementation.
194+
/// This is implemented with a background thread using a pipe-to-self,
195+
/// and so there is an additional Future object returned that is a "driver"
196+
/// task and must also be checked for errors.
197+
pub(crate) fn decompress_bridge<'a>(
198+
src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
199+
is_zstd: bool,
200+
) -> Result<(
201+
// This one is the input reader
202+
impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
203+
// And this represents the worker thread doing copying
204+
impl Future<Output = Result<()>> + Send + Unpin + 'static,
205+
)> {
206+
// We use a plain unix pipe() because it's just a very convenient
207+
// way to bridge arbitrarily between sync and async with a worker
208+
// thread. Yes, it involves going through the kernel, but
209+
// eventually we'll replace all this logic with podman anyways.
210+
let (tx, rx) = tokio::net::unix::pipe::pipe()?;
211+
let task = tokio::task::spawn_blocking(move || -> Result<()> {
212+
// Convert the write half of the pipe() into a regular blocking file descriptor
213+
let tx = tx.into_blocking_fd()?;
214+
let mut tx = std::fs::File::from(tx);
215+
// Convert the async input back to synchronous.
216+
let src = tokio_util::io::SyncIoBridge::new(src);
217+
let bufr = std::io::BufReader::new(src);
218+
// Wrap the input in a decompressor; I originally tried to make
219+
// this function take a function pointer, but yeah that was painful
220+
// with the type system.
221+
let mut src: Box<dyn std::io::Read> = if is_zstd {
222+
Box::new(zstd::stream::read::Decoder::new(bufr)?)
223+
} else {
224+
Box::new(flate2::bufread::GzDecoder::new(bufr))
225+
};
226+
// We don't care about the number of bytes copied
227+
let _n: u64 = std::io::copy(&mut src, &mut tx)?;
228+
Ok(())
229+
})
230+
// Flatten the nested Result<Result<>>
231+
.map(crate::tokio_util::flatten_anyhow);
232+
// And return the pair of futures
233+
Ok((tokio::io::BufReader::new(rx), task))
234+
}
235+
192236
/// Create a decompressor for this MIME type, given a stream of input.
193237
fn new_async_decompressor<'a>(
194238
media_type: &oci_image::MediaType,
195-
src: impl AsyncBufRead + Send + Unpin + 'a,
196-
) -> Result<Box<dyn AsyncBufRead + Send + Unpin + 'a>> {
197-
match media_type {
198-
oci_image::MediaType::ImageLayerGzip => Ok(Box::new(tokio::io::BufReader::new(
199-
async_compression::tokio::bufread::GzipDecoder::new(src),
200-
))),
201-
oci_image::MediaType::ImageLayerZstd => Ok(Box::new(tokio::io::BufReader::new(
202-
async_compression::tokio::bufread::ZstdDecoder::new(src),
203-
))),
204-
oci_image::MediaType::ImageLayer => Ok(Box::new(src)),
205-
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Ok(Box::new(src)),
206-
o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)),
207-
}
239+
src: impl AsyncBufRead + Send + Unpin + 'static,
240+
) -> Result<(
241+
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
242+
impl Future<Output = Result<()>> + Send + Unpin + 'static,
243+
)> {
244+
let r: (
245+
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
246+
Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>,
247+
) = match media_type {
248+
m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
249+
let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd);
250+
let (r, driver) = decompress_bridge(src, is_zstd)?;
251+
(Box::new(r), Box::new(driver) as _)
252+
}
253+
oci_image::MediaType::ImageLayer => {
254+
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
255+
}
256+
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
257+
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
258+
}
259+
o => anyhow::bail!("Unhandled layer type: {}", o),
260+
};
261+
Ok(r)
208262
}
209263

210264
/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
@@ -262,11 +316,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
262316
progress.send_replace(Some(status));
263317
}
264318
};
265-
let reader = new_async_decompressor(media_type, readprogress)?;
319+
let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?;
320+
let driver = driver.and_then(|()| compression_driver);
266321
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
267322
Ok((reader, Either::Left(driver)))
268323
} else {
269-
let blob = new_async_decompressor(media_type, blob)?;
324+
let (blob, compression_driver) = new_async_decompressor(media_type, blob)?;
325+
let driver = driver.and_then(|()| compression_driver);
270326
Ok((blob, Either::Right(driver)))
271327
}
272328
}

lib/tests/it/main.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -401,10 +401,11 @@ async fn test_tar_write() -> Result<()> {
401401
#[tokio::test]
402402
async fn test_tar_write_tar_layer() -> Result<()> {
403403
let fixture = Fixture::new_v1()?;
404-
let uncompressed_tar = tokio::io::BufReader::new(
405-
async_compression::tokio::bufread::GzipDecoder::new(EXAMPLE_TAR_LAYER),
406-
);
407-
ostree_ext::tar::write_tar(fixture.destrepo(), uncompressed_tar, "test", None).await?;
404+
let mut v = Vec::new();
405+
let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
406+
let _n = std::io::copy(&mut dec, &mut v)?;
407+
let r = tokio::io::BufReader::new(std::io::Cursor::new(v));
408+
ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?;
408409
Ok(())
409410
}
410411

0 commit comments

Comments
 (0)