Skip to content

Commit 75d5e71

Browse files
committed
ostree-ext: Refactor decompression
This moves all of the code related to handling decompression out of container/unencapsulate.rs and into a new module `generic_decompress`. The only exposed API is via the existing (relocated) `Decompressor` type. Internal to `generic_decompress` this adds a new trait `ReadWithGetInnerMut`, which allows access to the original, inner, un-decompressed stream. This is used when finishing the decompressor, whether explicitly through calling its `finish()` method, or implicitly by dropping it. For things like GzDecoder, we don't want to read via the actual decompression reader because we don't care about decompressing at this point. Plus, the inner reader may have encountered an error partway through, and trying to decode via decompression will error with UnexpectedEof. Instead, wrap a reader for each content type which implements `ReadWithGetInnerMut`. When we finish decompressing, use the trait method `get_inner_mut()` to read directly from inner stream to flush any data. Resolves: #1407 Signed-off-by: John Eckersberg <[email protected]>
1 parent 0f3d02e commit 75d5e71

File tree

5 files changed

+234
-119
lines changed

5 files changed

+234
-119
lines changed

crates/ostree-ext/src/container/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
88
use super::*;
99
use crate::chunking::{self, Chunk};
10-
use crate::container::Decompressor;
10+
use crate::generic_decompress::Decompressor;
1111
use crate::logging::system_repo_journal_print;
1212
use crate::refescape;
1313
use crate::sysroot::SysrootLock;

crates/ostree-ext/src/container/unencapsulate.rs

Lines changed: 0 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,13 @@ use containers_image_proxy::{ImageProxy, OpenedImage};
3838
use fn_error_context::context;
3939
use futures_util::{Future, FutureExt};
4040
use oci_spec::image::{self as oci_image, Digest};
41-
use std::io::Read;
4241
use std::sync::{Arc, Mutex};
4342
use tokio::{
4443
io::{AsyncBufRead, AsyncRead},
4544
sync::watch::{Receiver, Sender},
4645
};
4746
use tracing::instrument;
4847

49-
/// The legacy MIME type returned by the skopeo/(containers/storage) code
50-
/// when we have local uncompressed docker-formatted image.
51-
/// TODO: change the skopeo code to shield us from this correctly
52-
const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";
53-
5448
type Progress = tokio::sync::watch::Sender<u64>;
5549

5650
/// A read wrapper that updates the download progress.
@@ -191,89 +185,6 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
191185
importer.unencapsulate().await
192186
}
193187

194-
pub(crate) struct Decompressor {
195-
inner: Box<dyn Read + Send + 'static>,
196-
finished: bool,
197-
}
198-
199-
impl Read for Decompressor {
200-
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
201-
self.inner.read(buf)
202-
}
203-
}
204-
205-
impl Drop for Decompressor {
206-
fn drop(&mut self) {
207-
if self.finished {
208-
return;
209-
}
210-
211-
// We really should not get here; users are required to call
212-
// `finish()` to clean up the stream. But we'll give
213-
// best-effort to clean things up nonetheless. If things go
214-
// wrong, then panic, because we're in a bad state and it's
215-
// likely that we end up with a broken pipe error or a
216-
// deadlock.
217-
self._finish().expect("Decompressor::finish MUST be called")
218-
}
219-
}
220-
221-
impl Decompressor {
222-
/// Create a decompressor for this MIME type, given a stream of input.
223-
pub(crate) fn new(
224-
media_type: &oci_image::MediaType,
225-
src: impl Read + Send + 'static,
226-
) -> Result<Self> {
227-
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
228-
oci_image::MediaType::ImageLayerZstd => {
229-
Box::new(zstd::stream::read::Decoder::new(src)?)
230-
}
231-
oci_image::MediaType::ImageLayerGzip => Box::new(flate2::bufread::GzDecoder::new(
232-
std::io::BufReader::new(src),
233-
)),
234-
oci_image::MediaType::ImageLayer => Box::new(src),
235-
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
236-
o => anyhow::bail!("Unhandled layer type: {}", o),
237-
};
238-
Ok(Self {
239-
inner: r,
240-
finished: false,
241-
})
242-
}
243-
244-
pub(crate) fn finish(mut self) -> Result<()> {
245-
self._finish()
246-
}
247-
248-
fn _finish(&mut self) -> Result<()> {
249-
self.finished = true;
250-
251-
// We need to make sure to flush out the decompressor and/or
252-
// tar stream here. For tar, we might not read through the
253-
// entire stream, because the archive has zero-block-markers
254-
// at the end; or possibly because the final entry is filtered
255-
// in filter_tar so we don't advance to read the data. For
256-
// decompressor, zstd:chunked layers will have
257-
// metadata/skippable frames at the end of the stream. That
258-
// data isn't relevant to the tar stream, but if we don't read
259-
// it here then on the skopeo proxy we'll block trying to
260-
// write the end of the stream. That in turn will block our
261-
// client end trying to call FinishPipe, and we end up
262-
// deadlocking ourselves through skopeo.
263-
//
264-
// https://github.com/bootc-dev/bootc/issues/1204
265-
266-
let mut sink = std::io::sink();
267-
let n = std::io::copy(&mut self.inner, &mut sink)?;
268-
269-
if n > 0 {
270-
tracing::debug!("Read extra {n} bytes at end of decompressor stream");
271-
}
272-
273-
Ok(())
274-
}
275-
}
276-
277188
/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
278189
pub(crate) async fn fetch_layer<'a>(
279190
proxy: &'a ImageProxy,
@@ -335,31 +246,3 @@ pub(crate) async fn fetch_layer<'a>(
335246
Ok((Box::new(blob), Either::Right(driver), media_type))
336247
}
337248
}
338-
339-
#[cfg(test)]
340-
mod tests {
341-
use super::*;
342-
343-
struct BrokenPipe;
344-
345-
impl Read for BrokenPipe {
346-
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
347-
std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into())
348-
}
349-
}
350-
351-
#[test]
352-
#[should_panic(expected = "Decompressor::finish MUST be called")]
353-
fn test_drop_decompressor_with_finish_error_should_panic() {
354-
let broken = BrokenPipe;
355-
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap();
356-
drop(d)
357-
}
358-
359-
#[test]
360-
fn test_drop_decompressor_with_successful_finish() {
361-
let empty = std::io::empty();
362-
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap();
363-
drop(d)
364-
}
365-
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
//! This module primarily contains the `Decompressor` struct which is
2+
//! used to decompress a stream based on its OCI media type.
3+
//!
4+
//! It also contains the `ReadWithGetInnerMut` trait and related
5+
//! concrete implementations thereof. These provide a means for each
6+
//! specific decompressor to give mutable access to the inner reader.
7+
//!
8+
//! For example, the GzipDecompressor would give the underlying
9+
//! compressed stream.
10+
//!
11+
//! We need a common way to access this stream so that we can flush
12+
//! the data during cleanup.
13+
//!
14+
//! See: <https://github.com/bootc-dev/bootc/issues/1407>
15+
16+
use std::io::Read;
17+
18+
use crate::oci_spec::image as oci_image;
19+
20+
/// The legacy MIME type returned by the skopeo/(containers/storage) code
21+
/// when we have local uncompressed docker-formatted image.
22+
/// TODO: change the skopeo code to shield us from this correctly
23+
const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";
24+
25+
/// Extends the `Read` trait with another method to get mutable access to the inner reader
26+
trait ReadWithGetInnerMut: Read + Send + 'static {
27+
fn get_inner_mut(&mut self) -> &mut (dyn Read);
28+
}
29+
30+
// TransparentDecompressor
31+
32+
struct TransparentDecompressor<R: Read + Send + 'static>(R);
33+
34+
impl<R: Read + Send + 'static> Read for TransparentDecompressor<R> {
35+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
36+
self.0.read(buf)
37+
}
38+
}
39+
40+
impl<R: Read + Send + 'static> ReadWithGetInnerMut for TransparentDecompressor<R> {
41+
fn get_inner_mut(&mut self) -> &mut (dyn Read) {
42+
&mut self.0
43+
}
44+
}
45+
46+
// GzipDecompressor
47+
48+
struct GzipDecompressor<R: std::io::BufRead>(flate2::bufread::GzDecoder<R>);
49+
50+
impl<R: std::io::BufRead + Send + 'static> Read for GzipDecompressor<R> {
51+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
52+
self.0.read(buf)
53+
}
54+
}
55+
56+
impl<R: std::io::BufRead + Send + 'static> ReadWithGetInnerMut for GzipDecompressor<R> {
57+
fn get_inner_mut(&mut self) -> &mut (dyn Read) {
58+
self.0.get_mut()
59+
}
60+
}
61+
62+
// ZstdDecompressor
63+
64+
struct ZstdDecompressor<'a, R: std::io::BufRead>(zstd::stream::read::Decoder<'a, R>);
65+
66+
impl<'a: 'static, R: std::io::BufRead + Send + 'static> Read for ZstdDecompressor<'a, R> {
67+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
68+
self.0.read(buf)
69+
}
70+
}
71+
72+
impl<'a: 'static, R: std::io::BufRead + Send + 'static> ReadWithGetInnerMut
73+
for ZstdDecompressor<'a, R>
74+
{
75+
fn get_inner_mut(&mut self) -> &mut (dyn Read) {
76+
self.0.get_mut()
77+
}
78+
}
79+
80+
pub(crate) struct Decompressor {
81+
inner: Box<dyn ReadWithGetInnerMut>,
82+
finished: bool,
83+
}
84+
85+
impl Read for Decompressor {
86+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
87+
self.inner.read(buf)
88+
}
89+
}
90+
91+
impl Drop for Decompressor {
92+
fn drop(&mut self) {
93+
if self.finished {
94+
return;
95+
}
96+
97+
// Ideally we should not get here; users should call
98+
// `finish()` to clean up the stream. But in reality there's
99+
// codepaths that can and will short-circuit error out while
100+
// processing the stream, and the Decompressor will get
101+
// dropped before it's finished in those cases. We'll give
102+
// best-effort to clean things up nonetheless. If things go
103+
// wrong, then panic, because we're in a bad state and it's
104+
// likely that we end up with a broken pipe error or a
105+
// deadlock.
106+
self._finish()
107+
.expect("Failed to flush pipe while dropping Decompressor")
108+
}
109+
}
110+
111+
impl Decompressor {
112+
/// Create a decompressor for this MIME type, given a stream of input.
113+
pub(crate) fn new(
114+
media_type: &oci_image::MediaType,
115+
src: impl Read + Send + 'static,
116+
) -> anyhow::Result<Self> {
117+
let r: Box<dyn ReadWithGetInnerMut> = match media_type {
118+
oci_image::MediaType::ImageLayerZstd => {
119+
Box::new(ZstdDecompressor(zstd::stream::read::Decoder::new(src)?))
120+
}
121+
oci_image::MediaType::ImageLayerGzip => Box::new(GzipDecompressor(
122+
flate2::bufread::GzDecoder::new(std::io::BufReader::new(src)),
123+
)),
124+
oci_image::MediaType::ImageLayer => Box::new(TransparentDecompressor(src)),
125+
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
126+
Box::new(TransparentDecompressor(src))
127+
}
128+
o => anyhow::bail!("Unhandled layer type: {}", o),
129+
};
130+
Ok(Self {
131+
inner: r,
132+
finished: false,
133+
})
134+
}
135+
136+
pub(crate) fn finish(mut self) -> anyhow::Result<()> {
137+
self._finish()
138+
}
139+
140+
fn _finish(&mut self) -> anyhow::Result<()> {
141+
self.finished = true;
142+
143+
// We need to make sure to flush out the decompressor and/or
144+
// tar stream here. For tar, we might not read through the
145+
// entire stream, because the archive has zero-block-markers
146+
// at the end; or possibly because the final entry is filtered
147+
// in filter_tar so we don't advance to read the data. For
148+
// decompressor, zstd:chunked layers will have
149+
// metadata/skippable frames at the end of the stream. That
150+
// data isn't relevant to the tar stream, but if we don't read
151+
// it here then on the skopeo proxy we'll block trying to
152+
// write the end of the stream. That in turn will block our
153+
// client end trying to call FinishPipe, and we end up
154+
// deadlocking ourselves through skopeo.
155+
//
156+
// https://github.com/bootc-dev/bootc/issues/1204
157+
158+
let mut sink = std::io::sink();
159+
let n = std::io::copy(self.inner.get_inner_mut(), &mut sink)?;
160+
161+
if n > 0 {
162+
tracing::debug!("Read extra {n} bytes at end of decompressor stream");
163+
}
164+
165+
Ok(())
166+
}
167+
}
168+
169+
#[cfg(test)]
170+
mod tests {
171+
use super::*;
172+
173+
struct BrokenPipe;
174+
175+
impl Read for BrokenPipe {
176+
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
177+
std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into())
178+
}
179+
}
180+
181+
#[test]
182+
#[should_panic(expected = "Failed to flush pipe while dropping Decompressor")]
183+
fn test_drop_decompressor_with_finish_error_should_panic() {
184+
let broken = BrokenPipe;
185+
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap();
186+
drop(d)
187+
}
188+
189+
#[test]
190+
fn test_drop_decompressor_with_successful_finish() {
191+
let empty = std::io::empty();
192+
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap();
193+
drop(d)
194+
}
195+
196+
#[test]
197+
fn test_drop_decompressor_with_incomplete_gzip_data() {
198+
let empty = std::io::empty();
199+
let d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, empty).unwrap();
200+
drop(d)
201+
}
202+
203+
#[test]
204+
fn test_drop_decompressor_with_incomplete_zstd_data() {
205+
let empty = std::io::empty();
206+
let d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, empty).unwrap();
207+
drop(d)
208+
}
209+
210+
#[test]
211+
fn test_gzip_decompressor_with_garbage_input() {
212+
let garbage = b"This is not valid gzip data";
213+
let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, &garbage[..]).unwrap();
214+
let mut buf = [0u8; 32];
215+
let e = d.read(&mut buf).unwrap_err();
216+
assert!(matches!(e.kind(), std::io::ErrorKind::InvalidInput));
217+
assert_eq!(e.to_string(), "invalid gzip header".to_string());
218+
drop(d)
219+
}
220+
221+
#[test]
222+
fn test_zstd_decompressor_with_garbage_input() {
223+
let garbage = b"This is not valid zstd data";
224+
let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, &garbage[..]).unwrap();
225+
let mut buf = [0u8; 32];
226+
let e = d.read(&mut buf).unwrap_err();
227+
assert!(matches!(e.kind(), std::io::ErrorKind::Other));
228+
assert_eq!(e.to_string(), "Unknown frame descriptor".to_string());
229+
drop(d)
230+
}
231+
}

crates/ostree-ext/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub mod cli;
3737
pub mod container;
3838
pub mod container_utils;
3939
pub mod diff;
40+
pub(crate) mod generic_decompress;
4041
pub mod ima;
4142
pub mod keyfileext;
4243
pub(crate) mod logging;

crates/ostree-ext/src/tar/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//! In the future, this may also evolve into parsing the tar
88
//! stream in Rust, not in C.
99
10-
use crate::container::Decompressor;
10+
use crate::generic_decompress::Decompressor;
1111
use crate::Result;
1212
use anyhow::{anyhow, Context};
1313
use camino::{Utf8Component, Utf8Path, Utf8PathBuf};

0 commit comments

Comments
 (0)