55//! More information: <https://github.com/containers/skopeo/pull/1476>
66
77use anyhow:: { anyhow, Context , Result } ;
8- use futures_util:: { Future , FutureExt , TryFutureExt } ;
8+ use futures_util:: Future ;
99use nix:: sys:: socket:: { self as nixsocket, ControlMessageOwned } ;
1010use nix:: sys:: uio:: IoVec ;
1111use serde:: { Deserialize , Serialize } ;
1212use std:: fs:: File ;
1313use std:: os:: unix:: io:: AsRawFd ;
1414use std:: os:: unix:: prelude:: { FromRawFd , RawFd } ;
1515use std:: pin:: Pin ;
16- use std:: process:: { ExitStatus , Stdio } ;
16+ use std:: process:: Stdio ;
1717use std:: sync:: { Arc , Mutex } ;
1818use tokio:: io:: { AsyncBufRead , AsyncReadExt } ;
1919
@@ -65,13 +65,12 @@ struct Reply {
6565 value : serde_json:: Value ,
6666}
6767
68- type JoinFuture < T > = Pin < Box < dyn Future < Output = Result < Result < T > > > > > ;
68+ type ChildFuture = Pin < Box < dyn Future < Output = std :: io :: Result < std :: process :: Output > > > > ;
6969
7070/// Manage a child process proxy to fetch container images.
7171pub struct ImageProxy {
7272 sockfd : Arc < Mutex < File > > ,
73- stderr : JoinFuture < String > ,
74- procwait : Pin < Box < dyn Future < Output = Result < ExitStatus > > > > ,
73+ childwait : ChildFuture ,
7574}
7675
7776impl std:: fmt:: Debug for ImageProxy {
@@ -117,35 +116,15 @@ impl ImageProxy {
117116 c. stdin ( Stdio :: from ( theirsock) ) ;
118117 let mut c = tokio:: process:: Command :: from ( c) ;
119118 c. kill_on_drop ( true ) ;
120- let mut proc = c. spawn ( ) . context ( "Failed to spawn skopeo" ) ?;
121-
122- // Safety: We passed `Stdio::piped()` above
123- let mut child_stderr = proc. stderr . take ( ) . unwrap ( ) ;
124-
125- let stderr = tokio:: spawn ( async move {
126- let mut buf = String :: new ( ) ;
127- child_stderr. read_to_string ( & mut buf) . await ?;
128- Ok ( buf)
129- } )
130- . map_err ( anyhow:: Error :: msg)
131- . boxed ( ) ;
132-
133- let mut procwait = Box :: pin ( async move { proc. wait ( ) . map_err ( anyhow:: Error :: msg) . await } ) ;
119+ let child = c. spawn ( ) . context ( "Failed to spawn skopeo" ) ?;
120+ let childwait = Box :: pin ( child. wait_with_output ( ) ) ;
134121
135122 let sockfd = Arc :: new ( Mutex :: new ( mysock) ) ;
136123
124+ let mut r = Self { sockfd, childwait } ;
125+
137126 // Verify semantic version
138- let protoreq =
139- Self :: impl_request_raw :: < String > ( Arc :: clone ( & sockfd) , Request :: new_bare ( "Initialize" ) ) ;
140- let protover = tokio:: select! {
141- r = protoreq => {
142- r?. 0
143- }
144- r = & mut procwait => {
145- let errmsg = stderr. await ??;
146- return Err ( anyhow!( "skopeo exited unexpectedly (no support for `experimental-image-proxy`?): {}\n {}" , r?, errmsg) ) ;
147- }
148- } ;
127+ let protover = r. impl_request :: < String , _ , ( ) > ( "Initialize" , [ ] ) . await ?. 0 ;
149128 let protover = semver:: Version :: parse ( protover. as_str ( ) ) ?;
150129 let supported = & * SUPPORTED_PROTO_VERSION ;
151130 if !supported. matches ( & protover) {
@@ -156,11 +135,6 @@ impl ImageProxy {
156135 ) ) ;
157136 }
158137
159- let r = Self {
160- stderr,
161- sockfd,
162- procwait,
163- } ;
164138 Ok ( r)
165139 }
166140
@@ -217,41 +191,50 @@ impl ImageProxy {
217191 }
218192
219193 async fn impl_request < R : serde:: de:: DeserializeOwned + Send + ' static , T , I > (
220- & self ,
194+ & mut self ,
221195 method : & str ,
222196 args : T ,
223197 ) -> Result < ( R , Option < ( File , u32 ) > ) >
224198 where
225199 T : IntoIterator < Item = I > ,
226200 I : Into < serde_json:: Value > ,
227201 {
228- let req = Request :: new ( method, args) ;
229- Self :: impl_request_raw ( Arc :: clone ( & self . sockfd ) , req) . await
202+ let req = Self :: impl_request_raw ( Arc :: clone ( & self . sockfd ) , Request :: new ( method, args) ) ;
203+ tokio:: select! {
204+ r = req => {
205+ Ok ( r?)
206+ }
207+ r = & mut self . childwait => {
208+ let r = r?;
209+ let stderr = String :: from_utf8_lossy( & r. stderr) ;
210+ return Err ( anyhow:: anyhow!( "proxy unexpectedly exited during request method {}: {}\n {}" , method, r. status, stderr) )
211+ }
212+ }
230213 }
231214
232- async fn finish_pipe ( & self , pipeid : u32 ) -> Result < ( ) > {
215+ async fn finish_pipe ( & mut self , pipeid : u32 ) -> Result < ( ) > {
233216 let ( r, fd) = self . impl_request ( "FinishPipe" , [ pipeid] ) . await ?;
234217 if fd. is_some ( ) {
235218 return Err ( anyhow ! ( "Unexpected fd in finish_pipe reply" ) ) ;
236219 }
237220 Ok ( r)
238221 }
239222
240- pub async fn open_image ( & self , imgref : & str ) -> Result < OpenedImage > {
223+ pub async fn open_image ( & mut self , imgref : & str ) -> Result < OpenedImage > {
241224 let ( imgid, _) = self
242225 . impl_request :: < u32 , _ , _ > ( "OpenImage" , [ imgref] )
243226 . await ?;
244227 Ok ( OpenedImage ( imgid) )
245228 }
246229
247- pub async fn close_image ( & self , img : & OpenedImage ) -> Result < ( ) > {
230+ pub async fn close_image ( & mut self , img : & OpenedImage ) -> Result < ( ) > {
248231 let ( r, _) = self . impl_request ( "CloseImage" , [ img. 0 ] ) . await ?;
249232 Ok ( r)
250233 }
251234
252235 /// Fetch the manifest.
253236 /// https://github.com/opencontainers/image-spec/blob/main/manifest.md
254- pub async fn fetch_manifest ( & self , img : & OpenedImage ) -> Result < ( String , Vec < u8 > ) > {
237+ pub async fn fetch_manifest ( & mut self , img : & OpenedImage ) -> Result < ( String , Vec < u8 > ) > {
255238 let ( digest, fd) = self . impl_request ( "GetManifest" , [ img. 0 ] ) . await ?;
256239 let ( fd, pipeid) = fd. ok_or_else ( || anyhow ! ( "Missing fd from reply" ) ) ?;
257240 let mut fd = tokio:: io:: BufReader :: new ( tokio:: fs:: File :: from_std ( fd) ) ;
@@ -268,7 +251,7 @@ impl ImageProxy {
268251 /// Note that right now the proxy does verification of the digest:
269252 /// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
270253 pub async fn get_blob (
271- & self ,
254+ & mut self ,
272255 img : & OpenedImage ,
273256 digest : & str ,
274257 size : u64 ,
@@ -293,13 +276,10 @@ impl ImageProxy {
293276 let sockfd = Arc :: try_unwrap ( self . sockfd ) . unwrap ( ) . into_inner ( ) . unwrap ( ) ;
294277 nixsocket:: send ( sockfd. as_raw_fd ( ) , & sendbuf, nixsocket:: MsgFlags :: empty ( ) ) ?;
295278 drop ( sendbuf) ;
296- let status = self . procwait . await ?;
297- if !status. success ( ) {
298- if let Some ( stderr) = self . stderr . await . map ( |v| v. ok ( ) ) . ok ( ) . flatten ( ) {
299- anyhow:: bail!( "proxy failed: {}\n {}" , status, stderr)
300- } else {
301- anyhow:: bail!( "proxy failed: {} (failed to fetch stderr)" , status)
302- }
279+ let output = self . childwait . await ?;
280+ if !output. status . success ( ) {
281+ let stderr = String :: from_utf8_lossy ( & output. stderr ) ;
282+ anyhow:: bail!( "proxy failed: {}\n {}" , output. status, stderr)
303283 }
304284 Ok ( ( ) )
305285 }
0 commit comments