Skip to content

Commit 778ec6a

Browse files
authored
Merge pull request #1415 from jeckersb/decompressor_finish_on_inner
unencapsulate: use "inner" stream when finishing Decompressor
2 parents 0f3d02e + 75d5e71 commit 778ec6a

File tree

5 files changed

+234
-119
lines changed

5 files changed

+234
-119
lines changed

crates/ostree-ext/src/container/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
88
use super::*;
99
use crate::chunking::{self, Chunk};
10-
use crate::container::Decompressor;
10+
use crate::generic_decompress::Decompressor;
1111
use crate::logging::system_repo_journal_print;
1212
use crate::refescape;
1313
use crate::sysroot::SysrootLock;

crates/ostree-ext/src/container/unencapsulate.rs

Lines changed: 0 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,13 @@ use containers_image_proxy::{ImageProxy, OpenedImage};
3838
use fn_error_context::context;
3939
use futures_util::{Future, FutureExt};
4040
use oci_spec::image::{self as oci_image, Digest};
41-
use std::io::Read;
4241
use std::sync::{Arc, Mutex};
4342
use tokio::{
4443
io::{AsyncBufRead, AsyncRead},
4544
sync::watch::{Receiver, Sender},
4645
};
4746
use tracing::instrument;
4847

49-
/// The legacy MIME type returned by the skopeo/(containers/storage) code
50-
/// when we have local uncompressed docker-formatted image.
51-
/// TODO: change the skopeo code to shield us from this correctly
52-
const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";
53-
5448
type Progress = tokio::sync::watch::Sender<u64>;
5549

5650
/// A read wrapper that updates the download progress.
@@ -191,89 +185,6 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
191185
importer.unencapsulate().await
192186
}
193187

194-
pub(crate) struct Decompressor {
195-
inner: Box<dyn Read + Send + 'static>,
196-
finished: bool,
197-
}
198-
199-
impl Read for Decompressor {
200-
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
201-
self.inner.read(buf)
202-
}
203-
}
204-
205-
impl Drop for Decompressor {
206-
fn drop(&mut self) {
207-
if self.finished {
208-
return;
209-
}
210-
211-
// We really should not get here; users are required to call
212-
// `finish()` to clean up the stream. But we'll give
213-
// best-effort to clean things up nonetheless. If things go
214-
// wrong, then panic, because we're in a bad state and it's
215-
// likely that we end up with a broken pipe error or a
216-
// deadlock.
217-
self._finish().expect("Decompressor::finish MUST be called")
218-
}
219-
}
220-
221-
impl Decompressor {
222-
/// Create a decompressor for this MIME type, given a stream of input.
223-
pub(crate) fn new(
224-
media_type: &oci_image::MediaType,
225-
src: impl Read + Send + 'static,
226-
) -> Result<Self> {
227-
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
228-
oci_image::MediaType::ImageLayerZstd => {
229-
Box::new(zstd::stream::read::Decoder::new(src)?)
230-
}
231-
oci_image::MediaType::ImageLayerGzip => Box::new(flate2::bufread::GzDecoder::new(
232-
std::io::BufReader::new(src),
233-
)),
234-
oci_image::MediaType::ImageLayer => Box::new(src),
235-
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
236-
o => anyhow::bail!("Unhandled layer type: {}", o),
237-
};
238-
Ok(Self {
239-
inner: r,
240-
finished: false,
241-
})
242-
}
243-
244-
pub(crate) fn finish(mut self) -> Result<()> {
245-
self._finish()
246-
}
247-
248-
fn _finish(&mut self) -> Result<()> {
249-
self.finished = true;
250-
251-
// We need to make sure to flush out the decompressor and/or
252-
// tar stream here. For tar, we might not read through the
253-
// entire stream, because the archive has zero-block-markers
254-
// at the end; or possibly because the final entry is filtered
255-
// in filter_tar so we don't advance to read the data. For
256-
// decompressor, zstd:chunked layers will have
257-
// metadata/skippable frames at the end of the stream. That
258-
// data isn't relevant to the tar stream, but if we don't read
259-
// it here then on the skopeo proxy we'll block trying to
260-
// write the end of the stream. That in turn will block our
261-
// client end trying to call FinishPipe, and we end up
262-
// deadlocking ourselves through skopeo.
263-
//
264-
// https://github.com/bootc-dev/bootc/issues/1204
265-
266-
let mut sink = std::io::sink();
267-
let n = std::io::copy(&mut self.inner, &mut sink)?;
268-
269-
if n > 0 {
270-
tracing::debug!("Read extra {n} bytes at end of decompressor stream");
271-
}
272-
273-
Ok(())
274-
}
275-
}
276-
277188
/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
278189
pub(crate) async fn fetch_layer<'a>(
279190
proxy: &'a ImageProxy,
@@ -335,31 +246,3 @@ pub(crate) async fn fetch_layer<'a>(
335246
Ok((Box::new(blob), Either::Right(driver), media_type))
336247
}
337248
}
338-
339-
#[cfg(test)]
340-
mod tests {
341-
use super::*;
342-
343-
struct BrokenPipe;
344-
345-
impl Read for BrokenPipe {
346-
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
347-
std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into())
348-
}
349-
}
350-
351-
#[test]
352-
#[should_panic(expected = "Decompressor::finish MUST be called")]
353-
fn test_drop_decompressor_with_finish_error_should_panic() {
354-
let broken = BrokenPipe;
355-
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap();
356-
drop(d)
357-
}
358-
359-
#[test]
360-
fn test_drop_decompressor_with_successful_finish() {
361-
let empty = std::io::empty();
362-
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap();
363-
drop(d)
364-
}
365-
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
//! This module primarily contains the `Decompressor` struct which is
2+
//! used to decompress a stream based on its OCI media type.
3+
//!
4+
//! It also contains the `ReadWithGetInnerMut` trait and related
5+
//! concrete implementations thereof. These provide a means for each
6+
//! specific decompressor to give mutable access to the inner reader.
7+
//!
8+
//! For example, the GzipDecompressor would give the underlying
9+
//! compressed stream.
10+
//!
11+
//! We need a common way to access this stream so that we can flush
12+
//! the data during cleanup.
13+
//!
14+
//! See: <https://github.com/bootc-dev/bootc/issues/1407>
15+
16+
use std::io::Read;
17+
18+
use crate::oci_spec::image as oci_image;
19+
20+
/// The legacy MIME type returned by the skopeo/(containers/storage) code
21+
/// when we have local uncompressed docker-formatted image.
22+
/// TODO: change the skopeo code to shield us from this correctly
23+
const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar";
24+
25+
/// Extends the `Read` trait with another method to get mutable access to the inner reader
26+
trait ReadWithGetInnerMut: Read + Send + 'static {
27+
fn get_inner_mut(&mut self) -> &mut (dyn Read);
28+
}
29+
30+
// TransparentDecompressor
31+
32+
struct TransparentDecompressor<R: Read + Send + 'static>(R);
33+
34+
impl<R: Read + Send + 'static> Read for TransparentDecompressor<R> {
35+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
36+
self.0.read(buf)
37+
}
38+
}
39+
40+
impl<R: Read + Send + 'static> ReadWithGetInnerMut for TransparentDecompressor<R> {
41+
fn get_inner_mut(&mut self) -> &mut (dyn Read) {
42+
&mut self.0
43+
}
44+
}
45+
46+
// GzipDecompressor
47+
48+
struct GzipDecompressor<R: std::io::BufRead>(flate2::bufread::GzDecoder<R>);
49+
50+
impl<R: std::io::BufRead + Send + 'static> Read for GzipDecompressor<R> {
51+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
52+
self.0.read(buf)
53+
}
54+
}
55+
56+
impl<R: std::io::BufRead + Send + 'static> ReadWithGetInnerMut for GzipDecompressor<R> {
57+
fn get_inner_mut(&mut self) -> &mut (dyn Read) {
58+
self.0.get_mut()
59+
}
60+
}
61+
62+
// ZstdDecompressor
63+
64+
struct ZstdDecompressor<'a, R: std::io::BufRead>(zstd::stream::read::Decoder<'a, R>);
65+
66+
impl<'a: 'static, R: std::io::BufRead + Send + 'static> Read for ZstdDecompressor<'a, R> {
67+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
68+
self.0.read(buf)
69+
}
70+
}
71+
72+
impl<'a: 'static, R: std::io::BufRead + Send + 'static> ReadWithGetInnerMut
73+
for ZstdDecompressor<'a, R>
74+
{
75+
fn get_inner_mut(&mut self) -> &mut (dyn Read) {
76+
self.0.get_mut()
77+
}
78+
}
79+
80+
pub(crate) struct Decompressor {
81+
inner: Box<dyn ReadWithGetInnerMut>,
82+
finished: bool,
83+
}
84+
85+
impl Read for Decompressor {
86+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
87+
self.inner.read(buf)
88+
}
89+
}
90+
91+
impl Drop for Decompressor {
92+
fn drop(&mut self) {
93+
if self.finished {
94+
return;
95+
}
96+
97+
// Ideally we should not get here; users should call
98+
// `finish()` to clean up the stream. But in reality there's
99+
// codepaths that can and will short-circuit error out while
100+
// processing the stream, and the Decompressor will get
101+
// dropped before it's finished in those cases. We'll give
102+
// best-effort to clean things up nonetheless. If things go
103+
// wrong, then panic, because we're in a bad state and it's
104+
// likely that we end up with a broken pipe error or a
105+
// deadlock.
106+
self._finish()
107+
.expect("Failed to flush pipe while dropping Decompressor")
108+
}
109+
}
110+
111+
impl Decompressor {
112+
/// Create a decompressor for this MIME type, given a stream of input.
113+
pub(crate) fn new(
114+
media_type: &oci_image::MediaType,
115+
src: impl Read + Send + 'static,
116+
) -> anyhow::Result<Self> {
117+
let r: Box<dyn ReadWithGetInnerMut> = match media_type {
118+
oci_image::MediaType::ImageLayerZstd => {
119+
Box::new(ZstdDecompressor(zstd::stream::read::Decoder::new(src)?))
120+
}
121+
oci_image::MediaType::ImageLayerGzip => Box::new(GzipDecompressor(
122+
flate2::bufread::GzDecoder::new(std::io::BufReader::new(src)),
123+
)),
124+
oci_image::MediaType::ImageLayer => Box::new(TransparentDecompressor(src)),
125+
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
126+
Box::new(TransparentDecompressor(src))
127+
}
128+
o => anyhow::bail!("Unhandled layer type: {}", o),
129+
};
130+
Ok(Self {
131+
inner: r,
132+
finished: false,
133+
})
134+
}
135+
136+
pub(crate) fn finish(mut self) -> anyhow::Result<()> {
137+
self._finish()
138+
}
139+
140+
fn _finish(&mut self) -> anyhow::Result<()> {
141+
self.finished = true;
142+
143+
// We need to make sure to flush out the decompressor and/or
144+
// tar stream here. For tar, we might not read through the
145+
// entire stream, because the archive has zero-block-markers
146+
// at the end; or possibly because the final entry is filtered
147+
// in filter_tar so we don't advance to read the data. For
148+
// decompressor, zstd:chunked layers will have
149+
// metadata/skippable frames at the end of the stream. That
150+
// data isn't relevant to the tar stream, but if we don't read
151+
// it here then on the skopeo proxy we'll block trying to
152+
// write the end of the stream. That in turn will block our
153+
// client end trying to call FinishPipe, and we end up
154+
// deadlocking ourselves through skopeo.
155+
//
156+
// https://github.com/bootc-dev/bootc/issues/1204
157+
158+
let mut sink = std::io::sink();
159+
let n = std::io::copy(self.inner.get_inner_mut(), &mut sink)?;
160+
161+
if n > 0 {
162+
tracing::debug!("Read extra {n} bytes at end of decompressor stream");
163+
}
164+
165+
Ok(())
166+
}
167+
}
168+
169+
#[cfg(test)]
170+
mod tests {
171+
use super::*;
172+
173+
struct BrokenPipe;
174+
175+
impl Read for BrokenPipe {
176+
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
177+
std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into())
178+
}
179+
}
180+
181+
#[test]
182+
#[should_panic(expected = "Failed to flush pipe while dropping Decompressor")]
183+
fn test_drop_decompressor_with_finish_error_should_panic() {
184+
let broken = BrokenPipe;
185+
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap();
186+
drop(d)
187+
}
188+
189+
#[test]
190+
fn test_drop_decompressor_with_successful_finish() {
191+
let empty = std::io::empty();
192+
let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap();
193+
drop(d)
194+
}
195+
196+
#[test]
197+
fn test_drop_decompressor_with_incomplete_gzip_data() {
198+
let empty = std::io::empty();
199+
let d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, empty).unwrap();
200+
drop(d)
201+
}
202+
203+
#[test]
204+
fn test_drop_decompressor_with_incomplete_zstd_data() {
205+
let empty = std::io::empty();
206+
let d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, empty).unwrap();
207+
drop(d)
208+
}
209+
210+
#[test]
211+
fn test_gzip_decompressor_with_garbage_input() {
212+
let garbage = b"This is not valid gzip data";
213+
let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, &garbage[..]).unwrap();
214+
let mut buf = [0u8; 32];
215+
let e = d.read(&mut buf).unwrap_err();
216+
assert!(matches!(e.kind(), std::io::ErrorKind::InvalidInput));
217+
assert_eq!(e.to_string(), "invalid gzip header".to_string());
218+
drop(d)
219+
}
220+
221+
#[test]
222+
fn test_zstd_decompressor_with_garbage_input() {
223+
let garbage = b"This is not valid zstd data";
224+
let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, &garbage[..]).unwrap();
225+
let mut buf = [0u8; 32];
226+
let e = d.read(&mut buf).unwrap_err();
227+
assert!(matches!(e.kind(), std::io::ErrorKind::Other));
228+
assert_eq!(e.to_string(), "Unknown frame descriptor".to_string());
229+
drop(d)
230+
}
231+
}

crates/ostree-ext/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub mod cli;
3737
pub mod container;
3838
pub mod container_utils;
3939
pub mod diff;
40+
pub(crate) mod generic_decompress;
4041
pub mod ima;
4142
pub mod keyfileext;
4243
pub(crate) mod logging;

crates/ostree-ext/src/tar/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//! In the future, this may also evolve into parsing the tar
88
//! stream in Rust, not in C.
99
10-
use crate::container::Decompressor;
10+
use crate::generic_decompress::Decompressor;
1111
use crate::Result;
1212
use anyhow::{anyhow, Context};
1313
use camino::{Utf8Component, Utf8Path, Utf8PathBuf};

0 commit comments

Comments
 (0)