|
32 | 32 | // which is exactly what is exported by the [`crate::tar::export`] process. |
33 | 33 |
|
34 | 34 | use crate::container::store::LayerProgress; |
35 | | -use crate::generic_decompress::{ |
36 | | - Decompressable, GzipDecompressor, TransparentDecompressor, ZstdDecompressor, |
37 | | -}; |
38 | 35 |
|
39 | 36 | use super::*; |
40 | 37 | use containers_image_proxy::{ImageProxy, OpenedImage}; |
41 | 38 | use fn_error_context::context; |
42 | 39 | use futures_util::{Future, FutureExt}; |
43 | 40 | use oci_spec::image::{self as oci_image, Digest}; |
44 | | -use std::io::Read; |
45 | 41 | use std::sync::{Arc, Mutex}; |
46 | 42 | use tokio::{ |
47 | 43 | io::{AsyncBufRead, AsyncRead}, |
48 | 44 | sync::watch::{Receiver, Sender}, |
49 | 45 | }; |
50 | 46 | use tracing::instrument; |
51 | 47 |
|
52 | | -/// The legacy MIME type returned by the skopeo/(containers/storage) code |
53 | | -/// when we have local uncompressed docker-formatted image. |
54 | | -/// TODO: change the skopeo code to shield us from this correctly |
55 | | -const DOCKER_TYPE_LAYER_TAR: &str = "application/vnd.docker.image.rootfs.diff.tar"; |
56 | | - |
57 | 48 | type Progress = tokio::sync::watch::Sender<u64>; |
58 | 49 |
|
59 | 50 | /// A read wrapper that updates the download progress. |
@@ -194,91 +185,6 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) - |
194 | 185 | importer.unencapsulate().await |
195 | 186 | } |
196 | 187 |
|
197 | | -pub(crate) struct Decompressor { |
198 | | - inner: Box<dyn Decompressable>, |
199 | | - finished: bool, |
200 | | -} |
201 | | - |
202 | | -impl Read for Decompressor { |
203 | | - fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { |
204 | | - self.inner.read(buf) |
205 | | - } |
206 | | -} |
207 | | - |
208 | | -impl Drop for Decompressor { |
209 | | - fn drop(&mut self) { |
210 | | - if self.finished { |
211 | | - return; |
212 | | - } |
213 | | - |
214 | | - // We really should not get here; users are required to call |
215 | | - // `finish()` to clean up the stream. But we'll give |
216 | | - // best-effort to clean things up nonetheless. If things go |
217 | | - // wrong, then panic, because we're in a bad state and it's |
218 | | - // likely that we end up with a broken pipe error or a |
219 | | - // deadlock. |
220 | | - self._finish().expect("Decompressor::finish MUST be called") |
221 | | - } |
222 | | -} |
223 | | - |
224 | | -impl Decompressor { |
225 | | - /// Create a decompressor for this MIME type, given a stream of input. |
226 | | - pub(crate) fn new( |
227 | | - media_type: &oci_image::MediaType, |
228 | | - src: impl Read + Send + 'static, |
229 | | - ) -> Result<Self> { |
230 | | - let r: Box<dyn Decompressable> = match media_type { |
231 | | - oci_image::MediaType::ImageLayerZstd => { |
232 | | - Box::new(ZstdDecompressor(zstd::stream::read::Decoder::new(src)?)) |
233 | | - } |
234 | | - oci_image::MediaType::ImageLayerGzip => Box::new(GzipDecompressor( |
235 | | - flate2::bufread::GzDecoder::new(std::io::BufReader::new(src)), |
236 | | - )), |
237 | | - oci_image::MediaType::ImageLayer => Box::new(TransparentDecompressor(src)), |
238 | | - oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => { |
239 | | - Box::new(TransparentDecompressor(src)) |
240 | | - } |
241 | | - o => anyhow::bail!("Unhandled layer type: {}", o), |
242 | | - }; |
243 | | - Ok(Self { |
244 | | - inner: r, |
245 | | - finished: false, |
246 | | - }) |
247 | | - } |
248 | | - |
249 | | - pub(crate) fn finish(mut self) -> Result<()> { |
250 | | - self._finish() |
251 | | - } |
252 | | - |
253 | | - fn _finish(&mut self) -> Result<()> { |
254 | | - self.finished = true; |
255 | | - |
256 | | - // We need to make sure to flush out the decompressor and/or |
257 | | - // tar stream here. For tar, we might not read through the |
258 | | - // entire stream, because the archive has zero-block-markers |
259 | | - // at the end; or possibly because the final entry is filtered |
260 | | - // in filter_tar so we don't advance to read the data. For |
261 | | - // decompressor, zstd:chunked layers will have |
262 | | - // metadata/skippable frames at the end of the stream. That |
263 | | - // data isn't relevant to the tar stream, but if we don't read |
264 | | - // it here then on the skopeo proxy we'll block trying to |
265 | | - // write the end of the stream. That in turn will block our |
266 | | - // client end trying to call FinishPipe, and we end up |
267 | | - // deadlocking ourselves through skopeo. |
268 | | - // |
269 | | - // https://github.com/bootc-dev/bootc/issues/1204 |
270 | | - |
271 | | - let mut sink = std::io::sink(); |
272 | | - let n = std::io::copy(self.inner.get_inner_mut(), &mut sink)?; |
273 | | - |
274 | | - if n > 0 { |
275 | | - tracing::debug!("Read extra {n} bytes at end of decompressor stream"); |
276 | | - } |
277 | | - |
278 | | - Ok(()) |
279 | | - } |
280 | | -} |
281 | | - |
282 | 188 | /// A wrapper for [`get_blob`] which fetches a layer and decompresses it. |
283 | 189 | pub(crate) async fn fetch_layer<'a>( |
284 | 190 | proxy: &'a ImageProxy, |
@@ -340,67 +246,3 @@ pub(crate) async fn fetch_layer<'a>( |
340 | 246 | Ok((Box::new(blob), Either::Right(driver), media_type)) |
341 | 247 | } |
342 | 248 | } |
343 | | - |
344 | | -#[cfg(test)] |
345 | | -mod tests { |
346 | | - use super::*; |
347 | | - |
348 | | - struct BrokenPipe; |
349 | | - |
350 | | - impl Read for BrokenPipe { |
351 | | - fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> { |
352 | | - std::io::Result::Err(std::io::ErrorKind::BrokenPipe.into()) |
353 | | - } |
354 | | - } |
355 | | - |
356 | | - #[test] |
357 | | - #[should_panic(expected = "Decompressor::finish MUST be called")] |
358 | | - fn test_drop_decompressor_with_finish_error_should_panic() { |
359 | | - let broken = BrokenPipe; |
360 | | - let d = Decompressor::new(&oci_image::MediaType::ImageLayer, broken).unwrap(); |
361 | | - drop(d) |
362 | | - } |
363 | | - |
364 | | - #[test] |
365 | | - fn test_drop_decompressor_with_successful_finish() { |
366 | | - let empty = std::io::empty(); |
367 | | - let d = Decompressor::new(&oci_image::MediaType::ImageLayer, empty).unwrap(); |
368 | | - drop(d) |
369 | | - } |
370 | | - |
371 | | - #[test] |
372 | | - fn test_drop_decompressor_with_incomplete_gzip_data() { |
373 | | - let empty = std::io::empty(); |
374 | | - let d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, empty).unwrap(); |
375 | | - drop(d) |
376 | | - } |
377 | | - |
378 | | - #[test] |
379 | | - fn test_drop_decompressor_with_incomplete_zstd_data() { |
380 | | - let empty = std::io::empty(); |
381 | | - let d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, empty).unwrap(); |
382 | | - drop(d) |
383 | | - } |
384 | | - |
385 | | - #[test] |
386 | | - fn test_gzip_decompressor_with_garbage_input() { |
387 | | - let garbage = b"This is not valid gzip data"; |
388 | | - let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerGzip, &garbage[..]).unwrap(); |
389 | | - let mut buf = [0u8; 32]; |
390 | | - let e = d.read(&mut buf).unwrap_err(); |
391 | | - assert!(matches!(e.kind(), std::io::ErrorKind::InvalidInput)); |
392 | | - assert_eq!(e.to_string(), "invalid gzip header".to_string()); |
393 | | - drop(d) |
394 | | - } |
395 | | - |
396 | | - #[test] |
397 | | - fn test_zstd_decompressor_with_garbage_input() { |
398 | | - let garbage = b"This is not valid zstd data"; |
399 | | - let mut d = Decompressor::new(&oci_image::MediaType::ImageLayerZstd, &garbage[..]).unwrap(); |
400 | | - let mut buf = [0u8; 32]; |
401 | | - let e = d.read(&mut buf).unwrap_err(); |
402 | | - assert!(matches!(e.kind(), std::io::ErrorKind::Other)); |
403 | | - assert_eq!(e.to_string(), "Unknown frame descriptor".to_string()); |
404 | | - drop(d) |
405 | | - } |
406 | | -} |
0 commit comments