Skip to content

Commit 05dd65f

Browse files
authored
Merge pull request ostreedev#677 from jeckersb/decompress
Remove decompress_bridge and move decompression inline
2 parents b6992cc + 25894c5 commit 05dd65f

File tree

5 files changed

+78
-87
lines changed

5 files changed

+78
-87
lines changed

lib/src/container/store.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,7 @@ impl ImageImporter {
716716
p.send(ImportProgress::OstreeChunkStarted(layer.layer.clone()))
717717
.await?;
718718
}
719-
let (blob, driver) = fetch_layer_decompress(
719+
let (blob, driver, media_type) = fetch_layer(
720720
&self.proxy,
721721
&self.proxy_img,
722722
&import.manifest,
@@ -733,6 +733,7 @@ impl ImageImporter {
733733
let txn = repo.auto_transaction(Some(cancellable))?;
734734
let mut importer = crate::tar::Importer::new_for_object_set(&repo);
735735
let blob = tokio_util::io::SyncIoBridge::new(blob);
736+
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
736737
let mut archive = tar::Archive::new(blob);
737738
importer.import_objects(&mut archive, Some(cancellable))?;
738739
let commit = if write_refs {
@@ -761,7 +762,7 @@ impl ImageImporter {
761762
))
762763
.await?;
763764
}
764-
let (blob, driver) = fetch_layer_decompress(
765+
let (blob, driver, media_type) = fetch_layer(
765766
&self.proxy,
766767
&self.proxy_img,
767768
&import.manifest,
@@ -778,6 +779,7 @@ impl ImageImporter {
778779
let txn = repo.auto_transaction(Some(cancellable))?;
779780
let mut importer = crate::tar::Importer::new_for_commit(&repo, remote);
780781
let blob = tokio_util::io::SyncIoBridge::new(blob);
782+
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
781783
let mut archive = tar::Archive::new(blob);
782784
importer.import_commit(&mut archive, Some(cancellable))?;
783785
let commit = importer.finish_import_commit();
@@ -873,7 +875,7 @@ impl ImageImporter {
873875
p.send(ImportProgress::DerivedLayerStarted(layer.layer.clone()))
874876
.await?;
875877
}
876-
let (blob, driver) = super::unencapsulate::fetch_layer_decompress(
878+
let (blob, driver, media_type) = super::unencapsulate::fetch_layer(
877879
&proxy,
878880
&proxy_img,
879881
&import.manifest,
@@ -891,8 +893,13 @@ impl ImageImporter {
891893
allow_nonusr: root_is_transient,
892894
retain_var: self.ostree_v2024_3,
893895
};
894-
let r =
895-
crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts));
896+
let r = crate::tar::write_tar(
897+
&self.repo,
898+
blob,
899+
media_type,
900+
layer.ostree_ref.as_str(),
901+
Some(opts),
902+
);
896903
let r = super::unencapsulate::join_fetch(r, driver)
897904
.await
898905
.with_context(|| format!("Parsing layer blob {}", layer.layer.digest()))?;

lib/src/container/unencapsulate.rs

Lines changed: 23 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
use crate::container::store::LayerProgress;
3535

3636
use super::*;
37-
use anyhow::Context as _;
3837
use containers_image_proxy::{ImageProxy, OpenedImage};
3938
use fn_error_context::context;
40-
use futures_util::{Future, FutureExt, TryFutureExt as _};
39+
use futures_util::{Future, FutureExt};
4140
use oci_spec::image::{self as oci_image, Digest};
41+
use std::io::Read;
4242
use std::sync::{Arc, Mutex};
4343
use tokio::{
4444
io::{AsyncBufRead, AsyncRead},
@@ -191,80 +191,30 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
191191
importer.unencapsulate().await
192192
}
193193

194-
/// Take an async AsyncBufRead and handle decompression for it, returning
195-
/// a wrapped AsyncBufRead implementation.
196-
/// This is implemented with a background thread using a pipe-to-self,
197-
/// and so there is an additional Future object returned that is a "driver"
198-
/// task and must also be checked for errors.
199-
pub(crate) fn decompress_bridge(
200-
src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
201-
is_zstd: bool,
202-
) -> Result<(
203-
// This one is the input reader
204-
impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
205-
// And this represents the worker thread doing copying
206-
impl Future<Output = Result<()>> + Send + Unpin + 'static,
207-
)> {
208-
// We use a plain unix pipe() because it's just a very convenient
209-
// way to bridge arbitrarily between sync and async with a worker
210-
// thread. Yes, it involves going through the kernel, but
211-
// eventually we'll replace all this logic with podman anyways.
212-
let (tx, rx) = tokio::net::unix::pipe::pipe()?;
213-
let task = tokio::task::spawn_blocking(move || -> Result<()> {
214-
// Convert the write half of the pipe() into a regular blocking file descriptor
215-
let tx = tx.into_blocking_fd()?;
216-
let mut tx = std::fs::File::from(tx);
217-
// Convert the async input back to synchronous.
218-
let src = tokio_util::io::SyncIoBridge::new(src);
219-
let bufr = std::io::BufReader::new(src);
220-
// Wrap the input in a decompressor; I originally tried to make
221-
// this function take a function pointer, but yeah that was painful
222-
// with the type system.
223-
let mut src: Box<dyn std::io::Read> = if is_zstd {
224-
Box::new(zstd::stream::read::Decoder::new(bufr)?)
225-
} else {
226-
Box::new(flate2::bufread::GzDecoder::new(bufr))
227-
};
228-
// We don't care about the number of bytes copied
229-
let _n: u64 = std::io::copy(&mut src, &mut tx).context("Copying for decompression")?;
230-
Ok(())
231-
})
232-
// Flatten the nested Result<Result<>>
233-
.map(crate::tokio_util::flatten_anyhow);
234-
// And return the pair of futures
235-
Ok((tokio::io::BufReader::new(rx), task))
236-
}
237-
238194
/// Create a decompressor for this MIME type, given a stream of input.
239-
fn new_async_decompressor(
195+
pub(crate) fn decompressor(
240196
media_type: &oci_image::MediaType,
241-
src: impl AsyncBufRead + Send + Unpin + 'static,
242-
) -> Result<(
243-
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
244-
impl Future<Output = Result<()>> + Send + Unpin + 'static,
245-
)> {
246-
let r: (
247-
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
248-
Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>,
249-
) = match media_type {
197+
src: impl Read + Send + 'static,
198+
) -> Result<Box<dyn Read + Send + 'static>> {
199+
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
250200
m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
251-
let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd);
252-
let (r, driver) = decompress_bridge(src, is_zstd)?;
253-
(Box::new(r), Box::new(driver) as _)
254-
}
255-
oci_image::MediaType::ImageLayer => {
256-
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
257-
}
258-
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
259-
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
201+
if matches!(m, oci_image::MediaType::ImageLayerZstd) {
202+
Box::new(zstd::stream::read::Decoder::new(src)?)
203+
} else {
204+
Box::new(flate2::bufread::GzDecoder::new(std::io::BufReader::new(
205+
src,
206+
)))
207+
}
260208
}
209+
oci_image::MediaType::ImageLayer => Box::new(src),
210+
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
261211
o => anyhow::bail!("Unhandled layer type: {}", o),
262212
};
263213
Ok(r)
264214
}
265215

266216
/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
267-
pub(crate) async fn fetch_layer_decompress<'a>(
217+
pub(crate) async fn fetch_layer<'a>(
268218
proxy: &'a ImageProxy,
269219
img: &OpenedImage,
270220
manifest: &oci_image::ImageManifest,
@@ -275,12 +225,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
275225
) -> Result<(
276226
Box<dyn AsyncBufRead + Send + Unpin>,
277227
impl Future<Output = Result<()>> + 'a,
228+
oci_image::MediaType,
278229
)> {
279230
use futures_util::future::Either;
280231
tracing::debug!("fetching {}", layer.digest());
281232
let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
282233
let (blob, driver, size);
283-
let media_type: &oci_image::MediaType;
234+
let media_type: oci_image::MediaType;
284235
match transport_src {
285236
Transport::ContainerStorage => {
286237
let layer_info = layer_info
@@ -290,12 +241,12 @@ pub(crate) async fn fetch_layer_decompress<'a>(
290241
anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
291242
})?;
292243
size = layer_blob.size;
293-
media_type = &layer_blob.media_type;
244+
media_type = layer_blob.media_type.clone();
294245
(blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?;
295246
}
296247
_ => {
297248
size = layer.size();
298-
media_type = layer.media_type();
249+
media_type = layer.media_type().clone();
299250
(blob, driver) = proxy.get_blob(img, layer.digest(), size).await?;
300251
}
301252
};
@@ -316,13 +267,10 @@ pub(crate) async fn fetch_layer_decompress<'a>(
316267
progress.send_replace(Some(status));
317268
}
318269
};
319-
let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?;
320-
let driver = driver.and_then(|()| compression_driver);
270+
let reader = Box::new(readprogress);
321271
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
322-
Ok((reader, Either::Left(driver)))
272+
Ok((reader, Either::Left(driver), media_type))
323273
} else {
324-
let (blob, compression_driver) = new_async_decompressor(media_type, blob)?;
325-
let driver = driver.and_then(|()| compression_driver);
326-
Ok((blob, Either::Right(driver)))
274+
Ok((Box::new(blob), Either::Right(driver), media_type))
327275
}
328276
}

lib/src/tar/write.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use cap_std::io_lifetimes;
1515
use cap_std_ext::cap_std::fs::Dir;
1616
use cap_std_ext::cmdext::CapStdExtCommandExt;
1717
use cap_std_ext::{cap_std, cap_tempfile};
18+
use containers_image_proxy::oci_spec::image as oci_image;
1819
use fn_error_context::context;
1920
use ostree::gio;
2021
use ostree::prelude::FileExt;
@@ -337,6 +338,7 @@ pub(crate) fn filter_tar(
337338
#[context("Filtering tar stream")]
338339
async fn filter_tar_async(
339340
src: impl AsyncRead + Send + 'static,
341+
media_type: oci_image::MediaType,
340342
mut dest: impl AsyncWrite + Send + Unpin,
341343
config: &TarImportConfig,
342344
repo_tmpdir: Dir,
@@ -345,12 +347,14 @@ async fn filter_tar_async(
345347
// The source must be moved to the heap so we know it is stable for passing to the worker thread
346348
let src = Box::pin(src);
347349
let config = config.clone();
348-
let tar_transformer = tokio::task::spawn_blocking(move || {
349-
let mut src = tokio_util::io::SyncIoBridge::new(src);
350+
let tar_transformer = crate::tokio_util::spawn_blocking_flatten(move || {
351+
let src = tokio_util::io::SyncIoBridge::new(src);
352+
let mut src = crate::container::decompressor(&media_type, src)?;
350353
let dest = tokio_util::io::SyncIoBridge::new(tx_buf);
354+
351355
let r = filter_tar(&mut src, dest, &config, &repo_tmpdir);
352356
// Pass ownership of the input stream back to the caller - see below.
353-
(r, src)
357+
Ok((r, src))
354358
});
355359
let copier = tokio::io::copy(&mut rx_buf, &mut dest);
356360
let (r, v) = tokio::join!(tar_transformer, copier);
@@ -373,6 +377,7 @@ async fn filter_tar_async(
373377
pub async fn write_tar(
374378
repo: &ostree::Repo,
375379
src: impl tokio::io::AsyncRead + Send + Unpin + 'static,
380+
media_type: oci_image::MediaType,
376381
refname: &str,
377382
options: Option<WriteTarOptions>,
378383
) -> Result<WriteTarResult> {
@@ -430,7 +435,8 @@ pub async fn write_tar(
430435
let repo_tmpdir = Dir::reopen_dir(&repo.dfd_borrow())?
431436
.open_dir("tmp")
432437
.context("Getting repo tmpdir")?;
433-
let filtered_result = filter_tar_async(src, child_stdin, &import_config, repo_tmpdir);
438+
let filtered_result =
439+
filter_tar_async(src, media_type, child_stdin, &import_config, repo_tmpdir);
434440
let output_copier = async move {
435441
// Gather stdout/stderr to buffers
436442
let mut child_stdout_buf = String::new();
@@ -585,7 +591,14 @@ mod tests {
585591
let mut dest = Vec::new();
586592
let src = tokio::io::BufReader::new(tokio::fs::File::open(rootfs_tar_path).await?);
587593
let cap_tmpdir = Dir::open_ambient_dir(&tempd, cap_std::ambient_authority())?;
588-
filter_tar_async(src, &mut dest, &Default::default(), cap_tmpdir).await?;
594+
filter_tar_async(
595+
src,
596+
oci_image::MediaType::ImageLayer,
597+
&mut dest,
598+
&Default::default(),
599+
cap_tmpdir,
600+
)
601+
.await?;
589602
let dest = dest.as_slice();
590603
let mut final_tar = tar::Archive::new(Cursor::new(dest));
591604
let destdir = &tempd.path().join("destdir");

lib/src/tokio_util.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,15 @@ where
7373
spawn_blocking_cancellable(f).map(flatten_anyhow)
7474
}
7575

76+
/// A wrapper around [`tokio::task::spawn_blocking`] that flattens nested results.
77+
pub fn spawn_blocking_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
78+
where
79+
F: FnOnce() -> Result<T> + Send + 'static,
80+
T: Send + 'static,
81+
{
82+
tokio::task::spawn_blocking(f).map(flatten_anyhow)
83+
}
84+
7685
#[cfg(test)]
7786
mod tests {
7887
use super::*;

lib/tests/it/main.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,14 @@ async fn test_tar_write() -> Result<()> {
385385
let src = fixture.dir.open(tmptar)?;
386386
fixture.dir.remove_file(tmptar)?;
387387
let src = tokio::fs::File::from_std(src.into_std());
388-
let r = ostree_ext::tar::write_tar(fixture.destrepo(), src, "layer", None).await?;
388+
let r = ostree_ext::tar::write_tar(
389+
fixture.destrepo(),
390+
src,
391+
oci_image::MediaType::ImageLayer,
392+
"layer",
393+
None,
394+
)
395+
.await?;
389396
let layer_commit = r.commit.as_str();
390397
cmd!(
391398
sh,
@@ -409,7 +416,14 @@ async fn test_tar_write_tar_layer() -> Result<()> {
409416
let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
410417
let _n = std::io::copy(&mut dec, &mut v)?;
411418
let r = tokio::io::BufReader::new(std::io::Cursor::new(v));
412-
ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?;
419+
ostree_ext::tar::write_tar(
420+
fixture.destrepo(),
421+
r,
422+
oci_image::MediaType::ImageLayer,
423+
"test",
424+
None,
425+
)
426+
.await?;
413427
Ok(())
414428
}
415429

0 commit comments

Comments
 (0)