@@ -36,7 +36,7 @@ use crate::container::store::LayerProgress;
3636use super :: * ;
3737use containers_image_proxy:: { ImageProxy , OpenedImage } ;
3838use fn_error_context:: context;
39- use futures_util:: { Future , FutureExt } ;
39+ use futures_util:: { Future , FutureExt , TryFutureExt as _ } ;
4040use oci_spec:: image as oci_image;
4141use std:: sync:: { Arc , Mutex } ;
4242use tokio:: {
@@ -189,22 +189,76 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
189189 importer. unencapsulate ( ) . await
190190}
191191
192+ /// Take an async AsyncBufRead and handle decompression for it, returning
193+ /// a wrapped AsyncBufRead implementation.
194+ /// This is implemented with a background thread using a pipe-to-self,
195+ /// and so there is an additional Future object returned that is a "driver"
196+ /// task and must also be checked for errors.
197+ pub ( crate ) fn decompress_bridge < ' a > (
198+ src : impl tokio:: io:: AsyncBufRead + Send + Unpin + ' static ,
199+ is_zstd : bool ,
200+ ) -> Result < (
201+ // This one is the input reader
202+ impl tokio:: io:: AsyncBufRead + Send + Unpin + ' static ,
203+ // And this represents the worker thread doing copying
204+ impl Future < Output = Result < ( ) > > + Send + Unpin + ' static ,
205+ ) > {
206+ // We use a plain unix pipe() because it's just a very convenient
207+ // way to bridge arbitrarily between sync and async with a worker
208+ // thread. Yes, it involves going through the kernel, but
209+ // eventually we'll replace all this logic with podman anyways.
210+ let ( tx, rx) = tokio:: net:: unix:: pipe:: pipe ( ) ?;
211+ let task = tokio:: task:: spawn_blocking ( move || -> Result < ( ) > {
212+ // Convert the write half of the pipe() into a regular blocking file descriptor
213+ let tx = tx. into_blocking_fd ( ) ?;
214+ let mut tx = std:: fs:: File :: from ( tx) ;
215+ // Convert the async input back to synchronous.
216+ let src = tokio_util:: io:: SyncIoBridge :: new ( src) ;
217+ let bufr = std:: io:: BufReader :: new ( src) ;
218+ // Wrap the input in a decompressor; I originally tried to make
219+ // this function take a function pointer, but yeah that was painful
220+ // with the type system.
221+ let mut src: Box < dyn std:: io:: Read > = if is_zstd {
222+ Box :: new ( zstd:: stream:: read:: Decoder :: new ( bufr) ?)
223+ } else {
224+ Box :: new ( flate2:: bufread:: GzDecoder :: new ( bufr) )
225+ } ;
226+ // We don't care about the number of bytes copied
227+ let _n: u64 = std:: io:: copy ( & mut src, & mut tx) ?;
228+ Ok ( ( ) )
229+ } )
230+ // Flatten the nested Result<Result<>>
231+ . map ( crate :: tokio_util:: flatten_anyhow) ;
232+ // And return the pair of futures
233+ Ok ( ( tokio:: io:: BufReader :: new ( rx) , task) )
234+ }
235+
192236/// Create a decompressor for this MIME type, given a stream of input.
193237fn new_async_decompressor < ' a > (
194238 media_type : & oci_image:: MediaType ,
195- src : impl AsyncBufRead + Send + Unpin + ' a ,
196- ) -> Result < Box < dyn AsyncBufRead + Send + Unpin + ' a > > {
197- match media_type {
198- oci_image:: MediaType :: ImageLayerGzip => Ok ( Box :: new ( tokio:: io:: BufReader :: new (
199- async_compression:: tokio:: bufread:: GzipDecoder :: new ( src) ,
200- ) ) ) ,
201- oci_image:: MediaType :: ImageLayerZstd => Ok ( Box :: new ( tokio:: io:: BufReader :: new (
202- async_compression:: tokio:: bufread:: ZstdDecoder :: new ( src) ,
203- ) ) ) ,
204- oci_image:: MediaType :: ImageLayer => Ok ( Box :: new ( src) ) ,
205- oci_image:: MediaType :: Other ( t) if t. as_str ( ) == DOCKER_TYPE_LAYER_TAR => Ok ( Box :: new ( src) ) ,
206- o => Err ( anyhow:: anyhow!( "Unhandled layer type: {}" , o) ) ,
207- }
239+ src : impl AsyncBufRead + Send + Unpin + ' static ,
240+ ) -> Result < (
241+ Box < dyn AsyncBufRead + Send + Unpin + ' static > ,
242+ impl Future < Output = Result < ( ) > > + Send + Unpin + ' static ,
243+ ) > {
244+ let r: (
245+ Box < dyn AsyncBufRead + Send + Unpin + ' static > ,
246+ Box < dyn Future < Output = Result < ( ) > > + Send + Unpin + ' static > ,
247+ ) = match media_type {
248+ m @ ( oci_image:: MediaType :: ImageLayerGzip | oci_image:: MediaType :: ImageLayerZstd ) => {
249+ let is_zstd = matches ! ( m, oci_image:: MediaType :: ImageLayerZstd ) ;
250+ let ( r, driver) = decompress_bridge ( src, is_zstd) ?;
251+ ( Box :: new ( r) , Box :: new ( driver) as _ )
252+ }
253+ oci_image:: MediaType :: ImageLayer => {
254+ ( Box :: new ( src) , Box :: new ( futures_util:: future:: ready ( Ok ( ( ) ) ) ) )
255+ }
256+ oci_image:: MediaType :: Other ( t) if t. as_str ( ) == DOCKER_TYPE_LAYER_TAR => {
257+ ( Box :: new ( src) , Box :: new ( futures_util:: future:: ready ( Ok ( ( ) ) ) ) )
258+ }
259+ o => anyhow:: bail!( "Unhandled layer type: {}" , o) ,
260+ } ;
261+ Ok ( r)
208262}
209263
210264/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
@@ -262,11 +316,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
262316 progress. send_replace ( Some ( status) ) ;
263317 }
264318 } ;
265- let reader = new_async_decompressor ( media_type, readprogress) ?;
319+ let ( reader, compression_driver) = new_async_decompressor ( media_type, readprogress) ?;
320+ let driver = driver. and_then ( |( ) | compression_driver) ;
266321 let driver = futures_util:: future:: join ( readproxy, driver) . map ( |r| r. 1 ) ;
267322 Ok ( ( reader, Either :: Left ( driver) ) )
268323 } else {
269- let blob = new_async_decompressor ( media_type, blob) ?;
324+ let ( blob, compression_driver) = new_async_decompressor ( media_type, blob) ?;
325+ let driver = driver. and_then ( |( ) | compression_driver) ;
270326 Ok ( ( blob, Either :: Right ( driver) ) )
271327 }
272328}
0 commit comments