Skip to content

Commit 7b7eff6

Browse files
authored
Merge pull request #1248 from jeckersb/more-decompressor
unencapsulate: more Decompressor improvements
2 parents 7593910 + d50ca14 commit 7b7eff6

File tree

3 files changed

+90
-26
lines changed

3 files changed

+90
-26
lines changed

ostree-ext/src/container/store.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
88
use super::*;
99
use crate::chunking::{self, Chunk};
10+
use crate::container::Decompressor;
1011
use crate::logging::system_repo_journal_print;
1112
use crate::refescape;
1213
use crate::sysroot::SysrootLock;
@@ -779,8 +780,8 @@ impl ImageImporter {
779780
let txn = repo.auto_transaction(Some(cancellable))?;
780781
let mut importer = crate::tar::Importer::new_for_object_set(&repo);
781782
let blob = tokio_util::io::SyncIoBridge::new(blob);
782-
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
783-
let mut archive = tar::Archive::new(blob);
783+
let mut blob = Decompressor::new(&media_type, blob)?;
784+
let mut archive = tar::Archive::new(&mut blob);
784785
importer.import_objects(&mut archive, Some(cancellable))?;
785786
let commit = if write_refs {
786787
let commit = importer.finish_import_object_set()?;
@@ -791,6 +792,7 @@ impl ImageImporter {
791792
None
792793
};
793794
txn.commit(Some(cancellable))?;
795+
blob.finish()?;
794796
Ok::<_, anyhow::Error>(commit)
795797
})
796798
.map_err(|e| e.context(format!("Layer {}", layer.layer.digest())));
@@ -825,8 +827,8 @@ impl ImageImporter {
825827
let txn = repo.auto_transaction(Some(cancellable))?;
826828
let mut importer = crate::tar::Importer::new_for_commit(&repo, remote);
827829
let blob = tokio_util::io::SyncIoBridge::new(blob);
828-
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
829-
let mut archive = tar::Archive::new(blob);
830+
let mut blob = Decompressor::new(&media_type, blob)?;
831+
let mut archive = tar::Archive::new(&mut blob);
830832
importer.import_commit(&mut archive, Some(cancellable))?;
831833
let (commit, verify_text) = importer.finish_import_commit();
832834
if write_refs {
@@ -835,6 +837,7 @@ impl ImageImporter {
835837
}
836838
repo.mark_commit_partial(&commit, false)?;
837839
txn.commit(Some(cancellable))?;
840+
blob.finish()?;
838841
Ok::<_, anyhow::Error>((commit, verify_text))
839842
});
840843
let (commit, verify_text) =

ostree-ext/src/container/unencapsulate.rs

Lines changed: 79 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
193193

194194
pub(crate) struct Decompressor {
195195
inner: Box<dyn Read + Send + 'static>,
196+
finished: bool,
196197
}
197198

198199
impl Read for Decompressor {
@@ -203,6 +204,50 @@ impl Read for Decompressor {
203204

204205
impl Drop for Decompressor {
205206
fn drop(&mut self) {
207+
if self.finished {
208+
return;
209+
}
210+
211+
// We really should not get here; users are required to call
212+
// `finish()` to clean up the stream. But we'll give
213+
// best-effort to clean things up nonetheless. If things go
214+
// wrong, then panic, because we're in a bad state and it's
215+
// likely that we end up with a broken pipe error or a
216+
// deadlock.
217+
self._finish().expect("Decompressor::finish MUST be called")
218+
}
219+
}
220+
221+
impl Decompressor {
222+
/// Create a decompressor for this MIME type, given a stream of input.
223+
pub(crate) fn new(
224+
media_type: &oci_image::MediaType,
225+
src: impl Read + Send + 'static,
226+
) -> Result<Self> {
227+
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
228+
oci_image::MediaType::ImageLayerZstd => {
229+
Box::new(zstd::stream::read::Decoder::new(src)?)
230+
}
231+
oci_image::MediaType::ImageLayerGzip => Box::new(flate2::bufread::GzDecoder::new(
232+
std::io::BufReader::new(src),
233+
)),
234+
oci_image::MediaType::ImageLayer => Box::new(src),
235+
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
236+
o => anyhow::bail!("Unhandled layer type: {}", o),
237+
};
238+
Ok(Self {
239+
inner: r,
240+
finished: false,
241+
})
242+
}
243+
244+
pub(crate) fn finish(mut self) -> Result<()> {
245+
self._finish()
246+
}
247+
248+
fn _finish(&mut self) -> Result<()> {
249+
self.finished = true;
250+
206251
// We need to make sure to flush out the decompressor and/or
207252
// tar stream here. For tar, we might not read through the
208253
// entire stream, because the archive has zero-block-markers
@@ -219,29 +264,14 @@ impl Drop for Decompressor {
219264
// https://github.com/bootc-dev/bootc/issues/1204
220265

221266
let mut sink = std::io::sink();
222-
match std::io::copy(&mut self.inner, &mut sink) {
223-
Err(e) => tracing::debug!("Ignoring error while dropping decompressor: {e}"),
224-
Ok(0) => { /* We already read everything and are happy */ }
225-
Ok(n) => tracing::debug!("Read extra {n} bytes at end of decompressor stream"),
267+
let n = std::io::copy(&mut self.inner, &mut sink)?;
268+
269+
if n > 0 {
270+
tracing::debug!("Read extra {n} bytes at end of decompressor stream");
226271
}
227-
}
228-
}
229272

230-
/// Create a decompressor for this MIME type, given a stream of input.
231-
pub(crate) fn decompressor(
232-
media_type: &oci_image::MediaType,
233-
src: impl Read + Send + 'static,
234-
) -> Result<Decompressor> {
235-
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
236-
oci_image::MediaType::ImageLayerZstd => Box::new(zstd::stream::read::Decoder::new(src)?),
237-
oci_image::MediaType::ImageLayerGzip => Box::new(flate2::bufread::GzDecoder::new(
238-
std::io::BufReader::new(src),
239-
)),
240-
oci_image::MediaType::ImageLayer => Box::new(src),
241-
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
242-
o => anyhow::bail!("Unhandled layer type: {}", o),
243-
};
244-
Ok(Decompressor { inner: r })
273+
Ok(())
274+
}
245275
}
246276

247277
/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
@@ -305,3 +335,31 @@ pub(crate) async fn fetch_layer<'a>(
305335
Ok((Box::new(blob), Either::Right(driver), media_type))
306336
}
307337
}
338+
339+
#[cfg(test)]
340+
mod tests {
341+
use super::*;
342+
343+
struct BrokenPipe;
344+
345+
impl Read for BrokenPipe {
346+
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
347+
std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into())
348+
}
349+
}
350+
351+
#[test]
352+
#[should_panic(expected = "Decompressor::finish MUST be called")]
353+
fn test_drop_decompressor_with_finish_error_should_panic() {
354+
let broken = BrokenPipe;
355+
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap();
356+
drop(d)
357+
}
358+
359+
#[test]
360+
fn test_drop_decompressor_with_successful_finish() {
361+
let empty = std::io::empty();
362+
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap();
363+
drop(d)
364+
}
365+
}

ostree-ext/src/tar/write.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//! In the future, this may also evolve into parsing the tar
88
//! stream in Rust, not in C.
99
10+
use crate::container::Decompressor;
1011
use crate::Result;
1112
use anyhow::{anyhow, Context};
1213
use camino::{Utf8Component, Utf8Path, Utf8PathBuf};
@@ -362,11 +363,13 @@ async fn filter_tar_async(
362363
let config = config.clone();
363364
let tar_transformer = crate::tokio_util::spawn_blocking_flatten(move || {
364365
let src = tokio_util::io::SyncIoBridge::new(src);
365-
let mut src = crate::container::decompressor(&media_type, src)?;
366+
let mut src = Decompressor::new(&media_type, src)?;
366367
let dest = tokio_util::io::SyncIoBridge::new(tx_buf);
367368

368369
let r = filter_tar(&mut src, dest, &config, &repo_tmpdir);
369370

371+
src.finish()?;
372+
370373
Ok(r)
371374
});
372375
let copier = tokio::io::copy(&mut rx_buf, &mut dest);

0 commit comments

Comments
 (0)