From a8e8fc771c3a2037ee10074e3c91ab1e5ee9229a Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sat, 10 May 2025 15:45:13 -0400 Subject: [PATCH 1/7] Add get_raw_blob This is needed for my "registry frontend for containers-storage" project, but should also be useful in general as we've discussed moving checksum verification into callers. Then there's no need for a driver etc. Assisted-by: Gemini Code Assist (for unit tests) Signed-off-by: Colin Walters Signed-off-by: Allison Karlitskaya Signed-off-by: Colin Walters --- examples/client.rs | 61 ++++++-- src/imageproxy.rs | 353 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 378 insertions(+), 36 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index b18ad9b..a8ac375 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -28,13 +28,24 @@ struct GetBlobOpts { size: u64, } +#[derive(clap::Parser, Debug)] +struct FetchContainerToDevNullOpts { + #[clap(flatten)] + metaopts: GetMetadataOpts, + + /// Use the "raw" path for fetching blobs + #[clap(long)] + raw_blobs: bool, +} + /// Simple program to greet a person #[derive(clap::Parser, Debug)] #[command(version, about, long_about = None)] enum Opt { GetMetadata(GetMetadataOpts), GetBlob(GetBlobOpts), - FetchContainerToDevNull(GetMetadataOpts), + GetBlobRaw(GetBlobOpts), + FetchContainerToDevNull(FetchContainerToDevNullOpts), } #[derive(serde::Serialize, Debug)] @@ -86,18 +97,49 @@ async fn get_blob(o: GetBlobOpts) -> Result<()> { Ok(()) } -async fn fetch_container_to_devnull(o: GetMetadataOpts) -> Result<()> { - let config = o.proxy_opts(); +async fn get_blob_raw(o: GetBlobOpts) -> Result<()> { + let proxy = containers_image_proxy::ImageProxy::new().await?; + let img = proxy.open_image(&o.reference).await?; + let (_, mut datafd, err) = proxy.get_raw_blob(&img, &o.digest).await?; + + let mut stdout = std::io::stdout().lock(); + let reader = async move { + let mut buffer = [0u8; 8192]; + loop { + let n = datafd.read(&mut buffer).await?; + if n == 0 { + return anyhow::Ok(()); + } + stdout.write_all(&buffer[..n])?; + } + }; + + let (a, b) = tokio::join!(reader, err); + a?; + b?; + Ok(()) +} + +async fn fetch_container_to_devnull(o: FetchContainerToDevNullOpts) -> Result<()> { + let config = o.metaopts.proxy_opts(); let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?; - let img = &proxy.open_image(&o.reference).await?; + let img = &proxy.open_image(&o.metaopts.reference).await?; let manifest = proxy.fetch_manifest(img).await?.1; for layer in manifest.layers() { - let (mut blob, driver) = proxy.get_descriptor(img, layer).await?; let mut devnull = tokio::io::sink(); - let copier = tokio::io::copy(&mut blob, &mut devnull); - let (copier, driver) = tokio::join!(copier, driver); - copier?; - driver?; + if o.raw_blobs { + let (_, mut blob, err) = proxy.get_raw_blob(img, layer.digest()).await?; + let copier = tokio::io::copy(&mut blob, &mut devnull); + let (copier, err) = tokio::join!(copier, err); + copier?; + err?; + } else { + let (mut blob, driver) = proxy.get_descriptor(img, layer).await?; + let copier = tokio::io::copy(&mut blob, &mut devnull); + let (copier, driver) = tokio::join!(copier, driver); + copier?; + driver?; + } } Ok(()) } @@ -106,6 +148,7 @@ async fn run() -> Result<()> { match Opt::parse() { Opt::GetMetadata(o) => get_metadata(o).await, Opt::GetBlob(o) => get_blob(o).await, + Opt::GetBlobRaw(o) => get_blob_raw(o).await, Opt::FetchContainerToDevNull(o) => fetch_container_to_devnull(o).await, } } diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 53d3b72..a882664 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -6,10 +6,11 @@ use cap_std_ext::prelude::CapStdExtCommandExt; use cap_std_ext::{cap_std, cap_tempfile}; -use futures_util::Future; +use futures_util::{Future, FutureExt}; use oci_spec::image::{Descriptor, Digest}; use serde::{Deserialize, Serialize}; use std::fs::File; +use std::num::NonZeroU32; use std::ops::Range; use std::os::fd::OwnedFd; use std::os::unix::prelude::CommandExt; @@ -64,6 +65,18 @@ impl Error { } } +/// Errors returned by get_raw_blob +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum GetBlobError { + /// A client may reasonably retry on this type of error. + #[error("retryable error")] + Retryable(Box), + #[error("error")] + /// An unknown other error + Other(Box), +} + impl From for Error { fn from(value: rustix::io::Errno) -> Self { Self::Io(value.into()) @@ -318,6 +331,65 @@ pub struct ConvertedLayerInfo { pub media_type: oci_spec::image::MediaType, } +/// Maps the two types of return values from the proxy. +/// For more information, see +#[derive(Debug)] +enum FileDescriptors { + /// There is a data file descriptor, and the calling + /// process must invoke FinishPipe to check for errors. + /// The client will not get EOF until FinishPipe has been invoked. + FinishPipe { pipeid: NonZeroU32, datafd: OwnedFd }, + /// There is a data FD and an error FD. The error FD will + /// be JSON. + DualFds { datafd: OwnedFd, errfd: OwnedFd }, +} + +impl FileDescriptors { + /// Given a return value from the proxy, parse it into one of the three + /// possible cases: + /// - No file descriptors + /// - A FinishPipe instance + /// - A DualFds instance + fn new_from_raw_values( + fds: impl Iterator, + pipeid: u32, + ) -> Result> { + let mut fds = fds.fuse(); + let first_fd = fds.next(); + let second_fd = fds.next(); + if fds.next().is_some() { + return Err(Error::Other("got more than two file descriptors".into())); + } + let pipeid = NonZeroU32::new(pipeid); + let r = match (first_fd, second_fd, pipeid) { + // No fds, no pipeid + (None, None, None) => None, + // A FinishPipe instance + (Some(datafd), None, Some(pipeid)) => { + Some(FileDescriptors::FinishPipe { pipeid, datafd }) + } + // A dualfd instance + (Some(datafd), Some(errfd), None) => Some(FileDescriptors::DualFds { datafd, errfd }), + // Everything after here is error cases + (Some(_), None, None) => { + return Err(Error::Other("got fd with zero pipeid".into())); + } + (None, Some(_), _) => { + return Err(Error::Other("got errfd with no datafd".into())); + } + (Some(_), Some(_), Some(n)) => { + return Err(Error::Other( + format!("got pipeid {} with both datafd and errfd", n).into(), + )); + } + (None, _, Some(n)) => { + return Err(Error::Other(format!("got no fd with pipeid {n}").into())); + } + }; + Ok(r) + } +} + impl ImageProxy { /// Create an image proxy that fetches the target image, using default configuration. pub async fn new() -> Result { @@ -373,7 +445,7 @@ impl ImageProxy { async fn impl_request_raw( sockfd: Arc>, req: Request, - ) -> Result<(T, Option<(OwnedFd, u32)>)> { + ) -> Result<(T, Option)> { tracing::trace!("sending request {}", req.method.as_str()); // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio let r = tokio::task::spawn_blocking(move || { @@ -402,7 +474,7 @@ impl ImageProxy { _ => None, }) .flatten() - .next(); + .fuse(); let buf = &buf[..nread]; let reply: Reply = serde_json::from_slice(buf)?; if !reply.success { @@ -411,21 +483,8 @@ impl ImageProxy { error: reply.error.into(), }); } - let fdret = match (fdret, reply.pipeid) { - (Some(fd), n) => { - if n == 0 { - return Err(Error::Other("got fd but no pipeid".into())); - } - Some((fd, n)) - } - (None, n) => { - if n != 0 { - return Err(Error::Other(format!("got no fd with pipeid {}", n).into())); - } - None - } - }; - let reply = serde_json::from_value(reply.value)?; + let fdret = FileDescriptors::new_from_raw_values(fdret, reply.pipeid)?; + let reply: T = serde_json::from_value(reply.value)?; Ok((reply, fdret)) }) .await @@ -439,7 +498,7 @@ impl ImageProxy { &self, method: &str, args: T, - ) -> Result<(R, Option<(OwnedFd, u32)>)> + ) -> Result<(R, Option)> where T: IntoIterator, I: Into, @@ -457,9 +516,9 @@ impl ImageProxy { } #[instrument] - async fn finish_pipe(&self, pipeid: u32) -> Result<()> { + async fn finish_pipe(&self, pipeid: NonZeroU32) -> Result<()> { tracing::debug!("closing pipe"); - let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?; + let (r, fd) = self.impl_request("FinishPipe", [pipeid.get()]).await?; if fd.is_some() { return Err(Error::Other("Unexpected fd in finish_pipe reply".into())); } @@ -495,9 +554,12 @@ impl ImageProxy { Ok(r) } - async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result> { - let (fd, pipeid) = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; - let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + async fn read_all_fd(&self, fd: Option) -> Result> { + let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; + let FileDescriptors::FinishPipe { pipeid, datafd } = fd else { + return Err(Error::Other("got dualfds, expecting FinishPipe fd".into())); + }; + let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); let mut fd = tokio::io::BufReader::new(fd); let mut r = Vec::new(); let reader = fd.read_to_end(&mut r); @@ -570,13 +632,67 @@ impl ImageProxy { let args: Vec = vec![img.0.into(), digest.to_string().into(), size.into()]; let (_bloblen, fd) = self.impl_request::("GetBlob", args).await?; - let (fd, pipeid) = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?; - let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; + let FileDescriptors::FinishPipe { pipeid, datafd } = fd else { + return Err(Error::Other("got dualfds, expecting FinishPipe fd".into())); + }; + let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); let fd = tokio::io::BufReader::new(fd); let finish = Box::pin(self.finish_pipe(pipeid)); Ok((fd, finish)) } + async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> { + let fd = tokio::fs::File::from_std(std::fs::File::from(fd)); + let mut errfd = tokio::io::BufReader::new(fd); + let mut buf = Vec::new(); + errfd + .read_to_end(&mut buf) + .await + .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?; + if buf.is_empty() { + return Ok(()); + } + #[derive(Deserialize)] + struct RemoteError { + code: String, + message: String, + } + let e: RemoteError = serde_json::from_slice(&buf) + .map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?; + match e.code.as_str() { + // Actually this is OK + "EPIPE" => Ok(()), + "retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())), + _ => Err(GetBlobError::Other(e.message.into_boxed_str())), + } + } + + /// Fetch a blob identified by e.g. `sha256:`; does not perform + /// any verification that the blob matches the digest. The size of the + /// blob and a pipe file descriptor are returned. + #[instrument] + pub async fn get_raw_blob( + &self, + img: &OpenedImage, + digest: &Digest, + ) -> Result<( + u64, + tokio::fs::File, + impl Future> + Unpin + '_, + )> { + tracing::debug!("fetching blob"); + let args: Vec = vec![img.0.into(), digest.to_string().into()]; + let (bloblen, fd) = self.impl_request::("GetRawBlob", args).await?; + let fd = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?; + let FileDescriptors::DualFds { datafd, errfd } = fd else { + return Err(Error::Other("got single fd, expecting dual fds".into())); + }; + let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); + let err = Self::read_blob_error(errfd).boxed(); + Ok((bloblen, fd, err)) + } + /// Fetch a descriptor. The requested size and digest are verified (by the proxy process). #[instrument] pub async fn get_descriptor( @@ -642,9 +758,13 @@ impl ImageProxy { #[cfg(test)] mod tests { - use std::io::{Seek, Write}; + use std::io::{BufWriter, Seek, Write}; + use std::num::NonZeroU32; + use std::os::fd::{AsRawFd, OwnedFd}; use super::*; + use cap_std_ext::cap_std::fs::Dir; + use rustix::fs::{memfd_create, MemfdFlags}; fn validate(c: Command, contains: &[&str], not_contains: &[&str]) { // Format via debug, because @@ -745,4 +865,183 @@ mod tests { assert_send_sync(&opened); assert_send_sync(opened); } + + fn generate_err_fd(v: serde_json::Value) -> Result { + let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?; + let mut tf = cap_tempfile::TempFile::new_anonymous(&tmp).map(BufWriter::new)?; + serde_json::to_writer(&mut tf, &v)?; + let mut tf = tf.into_inner().map_err(|e| e.into_error())?; + tf.seek(std::io::SeekFrom::Start(0))?; + let r = tf.into_std().into(); + Ok(r) + } + + #[tokio::test] + async fn test_read_blob_error_retryable() -> Result<()> { + let retryable = serde_json::json!({ + "code": "retryable", + "message": "foo", + }); + let retryable = generate_err_fd(retryable)?; + let err = ImageProxy::read_blob_error(retryable).boxed(); + let e = err.await.unwrap_err(); + match e { + GetBlobError::Retryable(s) => assert_eq!(s.as_ref(), "foo"), + _ => panic!("Unexpected error {e:?}"), + } + Ok(()) + } + + #[tokio::test] + async fn test_read_blob_error_none() -> Result<()> { + let tmp = Dir::open_ambient_dir("/tmp", cap_std::ambient_authority())?; + let tf = cap_tempfile::TempFile::new_anonymous(&tmp)?.into_std(); + let err = ImageProxy::read_blob_error(tf.into()).boxed(); + err.await.unwrap(); + Ok(()) + } + + #[tokio::test] + async fn test_read_blob_error_other() -> Result<()> { + let other = serde_json::json!({ + "code": "other", + "message": "bar", + }); + let other = generate_err_fd(other)?; + let err = ImageProxy::read_blob_error(other).boxed(); + let e = err.await.unwrap_err(); + match e { + GetBlobError::Other(s) => assert_eq!(s.as_ref(), "bar"), + _ => panic!("Unexpected error {e:?}"), + } + Ok(()) + } + + #[tokio::test] + async fn test_read_blob_error_epipe() -> Result<()> { + let epipe = serde_json::json!({ + "code": "EPIPE", + "message": "baz", + }); + let epipe = generate_err_fd(epipe)?; + let err = ImageProxy::read_blob_error(epipe).boxed(); + err.await.unwrap(); + Ok(()) + } + + // Helper to create a dummy OwnedFd using memfd_create for testing. + fn create_dummy_fd() -> OwnedFd { + memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap() + } + + #[test] + fn test_new_from_raw_values_no_fds_no_pipeid() { + let fds: Vec = vec![]; + let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + } + + #[test] + fn test_new_from_raw_values_finish_pipe() { + let datafd = create_dummy_fd(); + // Keep a raw fd to compare later, as into_iter consumes datafd + let raw_datafd_val = datafd.as_raw_fd(); + let fds = vec![datafd]; + let pipeid = NonZeroU32::new(1).unwrap(); + let result = FileDescriptors::new_from_raw_values(fds.into_iter(), pipeid.get()); + assert!(result.is_ok()); + match result.unwrap() { + Some(FileDescriptors::FinishPipe { + pipeid: res_pipeid, + datafd: res_datafd, + }) => { + assert_eq!(res_pipeid, pipeid); + assert_eq!(res_datafd.as_raw_fd(), raw_datafd_val); + } + _ => panic!("Expected FinishPipe variant"), + } + } + + #[test] + fn test_new_from_raw_values_dual_fds() { + let datafd = create_dummy_fd(); + let errfd = create_dummy_fd(); + let raw_datafd_val = datafd.as_raw_fd(); + let raw_errfd_val = errfd.as_raw_fd(); + let fds = vec![datafd, errfd]; + let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); + assert!(result.is_ok()); + match result.unwrap() { + Some(FileDescriptors::DualFds { + datafd: res_datafd, + errfd: res_errfd, + }) => { + assert_eq!( + rustix::fd::AsFd::as_fd(&res_datafd).as_raw_fd(), + raw_datafd_val + ); + assert_eq!( + rustix::fd::AsFd::as_fd(&res_errfd).as_raw_fd(), + raw_errfd_val + ); + } + _ => panic!("Expected DualFds variant"), + } + } + + #[test] + fn test_new_from_raw_values_error_too_many_fds() { + let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()]; + let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); + assert!(result.is_err()); + match result.unwrap_err() { + Error::Other(msg) => assert_eq!(msg.as_ref(), "got more than two file descriptors"), + _ => panic!("Expected Other error variant"), + } + } + + #[test] + fn test_new_from_raw_values_error_fd_with_zero_pipeid() { + let fds = vec![create_dummy_fd()]; + let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); + assert!(result.is_err()); + match result.unwrap_err() { + Error::Other(msg) => assert_eq!(msg.as_ref(), "got fd with zero pipeid"), + _ => panic!("Expected Other error variant"), + } + } + + #[test] + fn test_new_from_raw_values_error_errfd_no_datafd() { + // This case is tricky because the logic first checks for first_fd. + // To simulate this, we'd need an iterator that returns None then Some. + // The current implementation path makes this specific error message hard to hit directly + // if fds is a simple Vec. The `(None, Some(_), _)` pattern in `match` + // is more of a safeguard for potential iterator behaviors. + } + + #[test] + fn test_new_from_raw_values_error_pipeid_with_both_fds() { + let fds = vec![create_dummy_fd(), create_dummy_fd()]; + let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 1); + assert!(result.is_err()); + match result.unwrap_err() { + Error::Other(msg) => { + assert_eq!(msg.as_ref(), "got pipeid 1 with both datafd and errfd") + } + _ => panic!("Expected Other error variant"), + } + } + + #[test] + fn test_new_from_raw_values_error_no_fd_with_pipeid() { + let fds: Vec = vec![]; + let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 1); + assert!(result.is_err()); + match result.unwrap_err() { + Error::Other(msg) => assert_eq!(msg.as_ref(), "got no fd with pipeid 1"), + _ => panic!("Expected Other error variant"), + } + } } From a0ab1e1e6b0d27059da7f5cbac01f54400b7981a Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 29 May 2025 16:39:59 +0200 Subject: [PATCH 2/7] imageproxy: drop generic parameters from impl_request() These can be expressed with `impl Trait` argument syntax, making things a bit easier to read. Signed-off-by: Allison Karlitskaya Signed-off-by: Colin Walters --- src/imageproxy.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index a882664..626c739 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -426,7 +426,7 @@ impl ImageProxy { }; // Verify semantic version - let protover = r.impl_request::("Initialize", []).await?.0; + let protover = r.impl_request::("Initialize", [(); 0]).await?.0; tracing::debug!("Remote protocol version: {protover}"); let protover = semver::Version::parse(protover.as_str())?; // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`. @@ -494,15 +494,11 @@ impl ImageProxy { } #[instrument(skip(args))] - async fn impl_request( + async fn impl_request( &self, method: &str, - args: T, - ) -> Result<(R, Option)> - where - T: IntoIterator, - I: Into, - { + args: impl IntoIterator>, + ) -> Result<(R, Option)> { let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args)); let mut childwait = self.childwait.lock().await; tokio::select! { @@ -529,7 +525,7 @@ impl ImageProxy { pub async fn open_image(&self, imgref: &str) -> Result { tracing::debug!("opening image"); let (imgid, _) = self - .impl_request::("OpenImage", [imgref]) + .impl_request::("OpenImage", [imgref]) .await?; Ok(OpenedImage(imgid)) } @@ -538,7 +534,7 @@ impl ImageProxy { pub async fn open_image_optional(&self, imgref: &str) -> Result> { tracing::debug!("opening image"); let (imgid, _) = self - .impl_request::("OpenImageOptional", [imgref]) + .impl_request::("OpenImageOptional", [imgref]) .await?; if imgid == 0 { Ok(None) @@ -592,7 +588,7 @@ impl ImageProxy { /// For more information on OCI config, see pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result> { let (_, fd) = self - .impl_request::<(), _, _>("GetFullConfig", [img.0]) + .impl_request::<()>("GetFullConfig", [img.0]) .await?; self.read_all_fd(fd).await } @@ -631,7 +627,7 @@ impl ImageProxy { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into(), size.into()]; - let (_bloblen, fd) = self.impl_request::("GetBlob", args).await?; + let (_bloblen, fd) = self.impl_request::("GetBlob", args).await?; let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; let FileDescriptors::FinishPipe { pipeid, datafd } = fd else { return Err(Error::Other("got dualfds, expecting FinishPipe fd".into())); @@ -683,7 +679,7 @@ impl ImageProxy { )> { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into()]; - let (bloblen, fd) = self.impl_request::("GetRawBlob", args).await?; + let (bloblen, fd) = self.impl_request::("GetRawBlob", args).await?; let fd = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?; let FileDescriptors::DualFds { datafd, errfd } = fd else { return Err(Error::Other("got single fd, expecting dual fds".into())); @@ -716,7 +712,7 @@ impl ImageProxy { tracing::debug!("Getting layer info"); if layer_info_piped_proto_version().matches(&self.protover) { let (_, fd) = self - .impl_request::<(), _, _>("GetLayerInfoPiped", [img.0]) + .impl_request::<()>("GetLayerInfoPiped", [img.0]) .await?; let buf = self.read_all_fd(fd).await?; return Ok(Some(serde_json::from_slice(&buf)?)); From a5f6ba7385908b177bd8752366e5b3d9ba462f51 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 29 May 2025 19:27:58 +0200 Subject: [PATCH 3/7] imageproxy: kill some turbofish It's always nicer to directly hint the return type or let it be inferred by its surroundings. There's only one case where we need to be a bit more explicit about it, and even this explicitness is nicer. Signed-off-by: Allison Karlitskaya Signed-off-by: Colin Walters --- src/imageproxy.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 626c739..43f4827 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -426,7 +426,7 @@ impl ImageProxy { }; // Verify semantic version - let protover = r.impl_request::("Initialize", [(); 0]).await?.0; + let protover: String = r.impl_request("Initialize", [(); 0]).await?.0; tracing::debug!("Remote protocol version: {protover}"); let protover = semver::Version::parse(protover.as_str())?; // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`. @@ -524,18 +524,14 @@ impl ImageProxy { #[instrument] pub async fn open_image(&self, imgref: &str) -> Result { tracing::debug!("opening image"); - let (imgid, _) = self - .impl_request::("OpenImage", [imgref]) - .await?; + let (imgid, _) = self.impl_request("OpenImage", [imgref]).await?; Ok(OpenedImage(imgid)) } #[instrument] pub async fn open_image_optional(&self, imgref: &str) -> Result> { tracing::debug!("opening image"); - let (imgid, _) = self - .impl_request::("OpenImageOptional", [imgref]) - .await?; + let (imgid, _) = self.impl_request("OpenImageOptional", [imgref]).await?; if imgid == 0 { Ok(None) } else { @@ -587,9 +583,7 @@ impl ImageProxy { /// Fetch the config. /// For more information on OCI config, see pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result> { - let (_, fd) = self - .impl_request::<()>("GetFullConfig", [img.0]) - .await?; + let ((), fd) = self.impl_request("GetFullConfig", [img.0]).await?; self.read_all_fd(fd).await } @@ -627,7 +621,8 @@ impl ImageProxy { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into(), size.into()]; - let (_bloblen, fd) = self.impl_request::("GetBlob", args).await?; + let (bloblen, fd) = self.impl_request("GetBlob", args).await?; + let _: u64 = bloblen; let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; let FileDescriptors::FinishPipe { pipeid, datafd } = fd else { return Err(Error::Other("got dualfds, expecting FinishPipe fd".into())); @@ -679,7 +674,7 @@ impl ImageProxy { )> { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into()]; - let (bloblen, fd) = self.impl_request::("GetRawBlob", args).await?; + let (bloblen, fd) = self.impl_request("GetRawBlob", args).await?; let fd = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?; let FileDescriptors::DualFds { datafd, errfd } = fd else { return Err(Error::Other("got single fd, expecting dual fds".into())); @@ -711,9 +706,7 @@ impl ImageProxy { ) -> Result>> { tracing::debug!("Getting layer info"); if layer_info_piped_proto_version().matches(&self.protover) { - let (_, fd) = self - .impl_request::<()>("GetLayerInfoPiped", [img.0]) - .await?; + let ((), fd) = self.impl_request("GetLayerInfoPiped", [img.0]).await?; let buf = self.read_all_fd(fd).await?; return Ok(Some(serde_json::from_slice(&buf)?)); } From f2118a3533d5bf18038cea0e55aea6d65efb21c6 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Thu, 29 May 2025 22:58:54 +0200 Subject: [PATCH 4/7] imageproxy: make .impl_request() generic over fds ...and the presence of a pipe ID. Create a new internal PipeId structure to keep track of pipe ids on the server. This is a very small wrapper around NonZeroU32. Have .impl_request() return a Result<> of a 3-tuple: - a deserialized result, as before - a fixed-sized array of fds ([OwnedFd; N]) - a fixed-sized array of PipeIds ([PipeId; M]) Introduce a bit of generics magic to make sure that we got the expected number of fds and pipeids from the other side, or an error. Then you can make calls like: - `let ((), [], []) = self.impl_request("CloseImage", [img.0]).await?;` - `let (digest, [datafd], [pipeid]) = self.impl_request("GetManifest", [img.0]).await?;` - `let (bloblen, [datafd, errfd], []) = self.impl_request("GetRawBlob", args).await?;` and you'll get an Ok() with the statically-correct number of fds and pipeids, or you'll get an Err() with a message why, like: Expected 1 OwnedFd but got 0 Of course, we can only ever have 0 or 1 PipeIds, but the fixed-sized array syntax has rather nice ergonomics here and its lets us share logic with the fds. Signed-off-by: Allison Karlitskaya Signed-off-by: Colin Walters --- src/imageproxy.rs | 274 +++++++++++++++++++--------------------------- 1 file changed, 110 insertions(+), 164 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 43f4827..25eaecb 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -10,6 +10,7 @@ use futures_util::{Future, FutureExt}; use oci_spec::image::{Descriptor, Digest}; use serde::{Deserialize, Serialize}; use std::fs::File; +use std::iter::FusedIterator; use std::num::NonZeroU32; use std::ops::Range; use std::os::fd::OwnedFd; @@ -173,6 +174,15 @@ impl std::fmt::Debug for ImageProxy { #[derive(Debug, PartialEq, Eq)] pub struct OpenedImage(u32); +#[derive(Debug, PartialEq, Eq)] +struct PipeId(NonZeroU32); + +impl PipeId { + fn try_new(pipeid: u32) -> Option { + Some(Self(NonZeroU32::new(pipeid)?)) + } +} + /// Configuration for the proxy. #[derive(Debug, Default)] pub struct ImageProxyConfig { @@ -331,62 +341,30 @@ pub struct ConvertedLayerInfo { pub media_type: oci_spec::image::MediaType, } -/// Maps the two types of return values from the proxy. -/// For more information, see -#[derive(Debug)] -enum FileDescriptors { - /// There is a data file descriptor, and the calling - /// process must invoke FinishPipe to check for errors. - /// The client will not get EOF until FinishPipe has been invoked. - FinishPipe { pipeid: NonZeroU32, datafd: OwnedFd }, - /// There is a data FD and an error FD. The error FD will - /// be JSON. - DualFds { datafd: OwnedFd, errfd: OwnedFd }, -} - -impl FileDescriptors { - /// Given a return value from the proxy, parse it into one of the three - /// possible cases: - /// - No file descriptors - /// - A FinishPipe instance - /// - A DualFds instance - fn new_from_raw_values( - fds: impl Iterator, - pipeid: u32, - ) -> Result> { - let mut fds = fds.fuse(); - let first_fd = fds.next(); - let second_fd = fds.next(); - if fds.next().is_some() { - return Err(Error::Other("got more than two file descriptors".into())); - } - let pipeid = NonZeroU32::new(pipeid); - let r = match (first_fd, second_fd, pipeid) { - // No fds, no pipeid - (None, None, None) => None, - // A FinishPipe instance - (Some(datafd), None, Some(pipeid)) => { - Some(FileDescriptors::FinishPipe { pipeid, datafd }) - } - // A dualfd instance - (Some(datafd), Some(errfd), None) => Some(FileDescriptors::DualFds { datafd, errfd }), - // Everything after here is error cases - (Some(_), None, None) => { - return Err(Error::Other("got fd with zero pipeid".into())); - } - (None, Some(_), _) => { - return Err(Error::Other("got errfd with no datafd".into())); - } - (Some(_), Some(_), Some(n)) => { - return Err(Error::Other( - format!("got pipeid {} with both datafd and errfd", n).into(), - )); - } - (None, _, Some(n)) => { - return Err(Error::Other(format!("got no fd with pipeid {n}").into())); - } - }; - Ok(r) +// Consumes an iterable and tries to convert it to a fixed-size array. Returns Ok([T; N]) if the +// number of items in the iterable was correct, else an error describing the mismatch. +fn fixed_from_iterable( + iterable: impl IntoIterator, +) -> Result<[T; N]> { + let mut iter = iterable.into_iter(); + // We make use of the fact that [_; N].map() returns [_; N]. That makes this a bit more + // awkward than it would otherwise be, but it's not too bad. + let collected = [(); N].map(|_| iter.next()); + // Count the Some() in `collected` plus leftovers in the iter. + let actual = collected.iter().flatten().count() + iter.count(); + if actual == N { + // SAFETY: This is a fused iter, so all N items are in our array + Ok(collected.map(Option::unwrap)) + } else { + let type_name = std::any::type_name::(); + let basename = type_name + .rsplit_once("::") + .map(|(_path, name)| name) + .unwrap_or(type_name); + + Err(Error::Other( + format!("Expected {N} {basename} but got {actual}").into(), + )) } } @@ -426,7 +404,7 @@ impl ImageProxy { }; // Verify semantic version - let protover: String = r.impl_request("Initialize", [(); 0]).await?.0; + let (protover, [], []): (String, _, _) = r.impl_request("Initialize", [(); 0]).await?; tracing::debug!("Remote protocol version: {protover}"); let protover = semver::Version::parse(protover.as_str())?; // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`. @@ -442,10 +420,14 @@ impl ImageProxy { Ok(r) } - async fn impl_request_raw( + async fn impl_request_raw< + T: serde::de::DeserializeOwned + Send + 'static, + const N: usize, + const M: usize, + >( sockfd: Arc>, req: Request, - ) -> Result<(T, Option)> { + ) -> Result<(T, [OwnedFd; N], [PipeId; M])> { tracing::trace!("sending request {}", req.method.as_str()); // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio let r = tokio::task::spawn_blocking(move || { @@ -473,8 +455,7 @@ impl ImageProxy { rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f), _ => None, }) - .flatten() - .fuse(); + .flatten(); let buf = &buf[..nread]; let reply: Reply = serde_json::from_slice(buf)?; if !reply.success { @@ -483,9 +464,11 @@ impl ImageProxy { error: reply.error.into(), }); } - let fdret = FileDescriptors::new_from_raw_values(fdret, reply.pipeid)?; - let reply: T = serde_json::from_value(reply.value)?; - Ok((reply, fdret)) + Ok(( + serde_json::from_value(reply.value)?, + fixed_from_iterable(fdret)?, + fixed_from_iterable(PipeId::try_new(reply.pipeid))?, + )) }) .await .map_err(|e| Error::Other(e.to_string().into()))??; @@ -494,11 +477,15 @@ impl ImageProxy { } #[instrument(skip(args))] - async fn impl_request( + async fn impl_request< + T: serde::de::DeserializeOwned + Send + 'static, + const N: usize, + const M: usize, + >( &self, method: &str, args: impl IntoIterator>, - ) -> Result<(R, Option)> { + ) -> Result<(T, [OwnedFd; N], [PipeId; M])> { let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args)); let mut childwait = self.childwait.lock().await; tokio::select! { @@ -512,26 +499,23 @@ impl ImageProxy { } #[instrument] - async fn finish_pipe(&self, pipeid: NonZeroU32) -> Result<()> { + async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> { tracing::debug!("closing pipe"); - let (r, fd) = self.impl_request("FinishPipe", [pipeid.get()]).await?; - if fd.is_some() { - return Err(Error::Other("Unexpected fd in finish_pipe reply".into())); - } + let (r, [], []) = self.impl_request("FinishPipe", [pipeid.0.get()]).await?; Ok(r) } #[instrument] pub async fn open_image(&self, imgref: &str) -> Result { tracing::debug!("opening image"); - let (imgid, _) = self.impl_request("OpenImage", [imgref]).await?; + let (imgid, [], []) = self.impl_request("OpenImage", [imgref]).await?; Ok(OpenedImage(imgid)) } #[instrument] pub async fn open_image_optional(&self, imgref: &str) -> Result> { tracing::debug!("opening image"); - let (imgid, _) = self.impl_request("OpenImageOptional", [imgref]).await?; + let (imgid, [], []) = self.impl_request("OpenImageOptional", [imgref]).await?; if imgid == 0 { Ok(None) } else { @@ -542,15 +526,11 @@ impl ImageProxy { #[instrument] pub async fn close_image(&self, img: &OpenedImage) -> Result<()> { tracing::debug!("closing image"); - let (r, _) = self.impl_request("CloseImage", [img.0]).await?; + let (r, [], []) = self.impl_request("CloseImage", [img.0]).await?; Ok(r) } - async fn read_all_fd(&self, fd: Option) -> Result> { - let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; - let FileDescriptors::FinishPipe { pipeid, datafd } = fd else { - return Err(Error::Other("got dualfds, expecting FinishPipe fd".into())); - }; + async fn read_all_fd(&self, datafd: OwnedFd, pipeid: PipeId) -> Result> { let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); let mut fd = tokio::io::BufReader::new(fd); let mut r = Vec::new(); @@ -565,8 +545,8 @@ impl ImageProxy { /// The original digest of the unconverted manifest is also returned. /// For more information on OCI manifests, see pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec)> { - let (digest, fd) = self.impl_request("GetManifest", [img.0]).await?; - Ok((digest, self.read_all_fd(fd).await?)) + let (digest, [datafd], [pipeid]) = self.impl_request("GetManifest", [img.0]).await?; + Ok((digest, self.read_all_fd(datafd, pipeid).await?)) } /// Fetch the manifest. @@ -583,8 +563,8 @@ impl ImageProxy { /// Fetch the config. /// For more information on OCI config, see pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result> { - let ((), fd) = self.impl_request("GetFullConfig", [img.0]).await?; - self.read_all_fd(fd).await + let ((), [datafd], [pipeid]) = self.impl_request("GetFullConfig", [img.0]).await?; + self.read_all_fd(datafd, pipeid).await } /// Fetch the config. @@ -621,12 +601,8 @@ impl ImageProxy { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into(), size.into()]; - let (bloblen, fd) = self.impl_request("GetBlob", args).await?; + let (bloblen, [datafd], [pipeid]) = self.impl_request("GetBlob", args).await?; let _: u64 = bloblen; - let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?; - let FileDescriptors::FinishPipe { pipeid, datafd } = fd else { - return Err(Error::Other("got dualfds, expecting FinishPipe fd".into())); - }; let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); let fd = tokio::io::BufReader::new(fd); let finish = Box::pin(self.finish_pipe(pipeid)); @@ -674,11 +650,7 @@ impl ImageProxy { )> { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into()]; - let (bloblen, fd) = self.impl_request("GetRawBlob", args).await?; - let fd = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?; - let FileDescriptors::DualFds { datafd, errfd } = fd else { - return Err(Error::Other("got single fd, expecting dual fds".into())); - }; + let (bloblen, [datafd, errfd], []) = self.impl_request("GetRawBlob", args).await?; let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); let err = Self::read_blob_error(errfd).boxed(); Ok((bloblen, fd, err)) @@ -706,15 +678,14 @@ impl ImageProxy { ) -> Result>> { tracing::debug!("Getting layer info"); if layer_info_piped_proto_version().matches(&self.protover) { - let ((), fd) = self.impl_request("GetLayerInfoPiped", [img.0]).await?; - let buf = self.read_all_fd(fd).await?; + let ((), [datafd], [pipeid]) = self.impl_request("GetLayerInfoPiped", [img.0]).await?; + let buf = self.read_all_fd(datafd, pipeid).await?; return Ok(Some(serde_json::from_slice(&buf)?)); } if !layer_info_proto_version().matches(&self.protover) { return Ok(None); } - let reply = self.impl_request("GetLayerInfo", [img.0]).await?; - let layers: Vec = reply.0; + let (layers, [], []) = self.impl_request("GetLayerInfo", [img.0]).await?; Ok(Some(layers)) } @@ -748,7 +719,6 @@ impl ImageProxy { #[cfg(test)] mod tests { use std::io::{BufWriter, Seek, Write}; - use std::num::NonZeroU32; use std::os::fd::{AsRawFd, OwnedFd}; use super::*; @@ -923,33 +893,31 @@ mod tests { memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap() } + fn fds_and_pipeid( + fds: impl IntoIterator, + pipeid: u32, + ) -> Result<([OwnedFd; N], [PipeId; M])> { + Ok(( + fixed_from_iterable(fds)?, + fixed_from_iterable(PipeId::try_new(pipeid))?, + )) + } + #[test] fn test_new_from_raw_values_no_fds_no_pipeid() { - let fds: Vec = vec![]; - let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); - assert!(result.is_ok()); - assert!(result.unwrap().is_none()); + let ([], []) = fds_and_pipeid([], 0).unwrap(); } #[test] fn test_new_from_raw_values_finish_pipe() { let datafd = create_dummy_fd(); - // Keep a raw fd to compare later, as into_iter consumes datafd + // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd let raw_datafd_val = datafd.as_raw_fd(); let fds = vec![datafd]; - let pipeid = NonZeroU32::new(1).unwrap(); - let result = FileDescriptors::new_from_raw_values(fds.into_iter(), pipeid.get()); - assert!(result.is_ok()); - match result.unwrap() { - Some(FileDescriptors::FinishPipe { - pipeid: res_pipeid, - datafd: res_datafd, - }) => { - assert_eq!(res_pipeid, pipeid); - assert_eq!(res_datafd.as_raw_fd(), raw_datafd_val); - } - _ => panic!("Expected FinishPipe variant"), - } + let pipeid = PipeId::try_new(1).unwrap(); + let ([res_datafd], [res_pipeid]) = fds_and_pipeid(fds, pipeid.0.get()).unwrap(); + assert_eq!(res_pipeid, pipeid); + assert_eq!(res_datafd.as_raw_fd(), raw_datafd_val); } #[test] @@ -959,78 +927,56 @@ mod tests { let raw_datafd_val = datafd.as_raw_fd(); let raw_errfd_val = errfd.as_raw_fd(); let fds = vec![datafd, errfd]; - let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); - assert!(result.is_ok()); - match result.unwrap() { - Some(FileDescriptors::DualFds { - datafd: res_datafd, - errfd: res_errfd, - }) => { - assert_eq!( - rustix::fd::AsFd::as_fd(&res_datafd).as_raw_fd(), - raw_datafd_val - ); - assert_eq!( - rustix::fd::AsFd::as_fd(&res_errfd).as_raw_fd(), - raw_errfd_val - ); - } - _ => panic!("Expected DualFds variant"), - } + let ([res_datafd, res_errfd], []) = fds_and_pipeid(fds, 0).unwrap(); + assert_eq!(res_datafd.as_raw_fd(), raw_datafd_val); + assert_eq!(res_errfd.as_raw_fd(), raw_errfd_val); } #[test] fn test_new_from_raw_values_error_too_many_fds() { let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()]; - let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); - assert!(result.is_err()); - match result.unwrap_err() { - Error::Other(msg) => assert_eq!(msg.as_ref(), "got more than two file descriptors"), - _ => panic!("Expected Other error variant"), + match fds_and_pipeid(fds, 0) { + Ok(([datafd, errfd], [])) => unreachable!("{datafd:?} {errfd:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Expected 2 OwnedFd but got 3") + } + Err(other) => unreachable!("{other}"), } } #[test] fn test_new_from_raw_values_error_fd_with_zero_pipeid() { let fds = vec![create_dummy_fd()]; - let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 0); - assert!(result.is_err()); - match result.unwrap_err() { - Error::Other(msg) => assert_eq!(msg.as_ref(), "got fd with zero pipeid"), - _ => panic!("Expected Other error variant"), + match fds_and_pipeid(fds, 0) { + Ok(([datafd], [pipeid])) => unreachable!("{datafd:?} {pipeid:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Expected 1 PipeId but got 0") + } + Err(other) => unreachable!("{other}"), } } - #[test] - fn test_new_from_raw_values_error_errfd_no_datafd() { - // This case is tricky because the logic first checks for first_fd. - // To simulate this, we'd need an iterator that returns None then Some. - // The current implementation path makes this specific error message hard to hit directly - // if fds is a simple Vec. The `(None, Some(_), _)` pattern in `match` - // is more of a safeguard for potential iterator behaviors. - } - #[test] fn test_new_from_raw_values_error_pipeid_with_both_fds() { let fds = vec![create_dummy_fd(), create_dummy_fd()]; - let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 1); - assert!(result.is_err()); - match result.unwrap_err() { - Error::Other(msg) => { - assert_eq!(msg.as_ref(), "got pipeid 1 with both datafd and errfd") + match fds_and_pipeid(fds, 1) { + Ok(([datafd, errfd], [])) => unreachable!("{datafd:?} {errfd:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Expected 0 PipeId but got 1") } - _ => panic!("Expected Other error variant"), + Err(other) => unreachable!("{other}"), } } #[test] fn test_new_from_raw_values_error_no_fd_with_pipeid() { let fds: Vec = vec![]; - let result = FileDescriptors::new_from_raw_values(fds.into_iter(), 1); - assert!(result.is_err()); - match result.unwrap_err() { - Error::Other(msg) => assert_eq!(msg.as_ref(), "got no fd with pipeid 1"), - _ => panic!("Expected Other error variant"), + match fds_and_pipeid(fds, 1) { + Ok(([datafd], [pipeid])) => unreachable!("{datafd:?} {pipeid:?}"), + Err(Error::Other(msg)) => { + assert_eq!(msg.as_ref(), "Expected 1 OwnedFd but got 0") + } + Err(other) => unreachable!("{other}"), } } } From 4487174e1fb008867f85c059a2022a381dd9dad1 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 29 May 2025 19:15:16 -0400 Subject: [PATCH 5/7] proxy: Reintroduce structures Instead of a generic array, use a trait on different structs. This ensures even more type safety for callers. Signed-off-by: Colin Walters --- src/imageproxy.rs | 216 +++++++++++++++++++++++++--------------------- 1 file changed, 120 insertions(+), 96 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 25eaecb..337bfc9 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -341,30 +341,81 @@ pub struct ConvertedLayerInfo { pub media_type: oci_spec::image::MediaType, } -// Consumes an iterable and tries to convert it to a fixed-size array. Returns Ok([T; N]) if the -// number of items in the iterable was correct, else an error describing the mismatch. -fn fixed_from_iterable( - iterable: impl IntoIterator, -) -> Result<[T; N]> { - let mut iter = iterable.into_iter(); - // We make use of the fact that [_; N].map() returns [_; N]. That makes this a bit more - // awkward than it would otherwise be, but it's not too bad. - let collected = [(); N].map(|_| iter.next()); - // Count the Some() in `collected` plus leftovers in the iter. - let actual = collected.iter().flatten().count() + iter.count(); - if actual == N { - // SAFETY: This is a fused iter, so all N items are in our array - Ok(collected.map(Option::unwrap)) - } else { - let type_name = std::any::type_name::(); - let basename = type_name - .rsplit_once("::") - .map(|(_path, name)| name) - .unwrap_or(type_name); - - Err(Error::Other( - format!("Expected {N} {basename} but got {actual}").into(), - )) +/// A single fd; requires invoking FinishPipe +#[derive(Debug)] +struct FinishPipe { + pipeid: PipeId, + datafd: OwnedFd, +} + +/// There is a data FD and an error FD. The error FD will be JSON. +#[derive(Debug)] +struct DualFds { + datafd: OwnedFd, + errfd: OwnedFd, +} + +/// Helper trait for parsing the pipeid and/or file descriptors of a reply +trait FromReplyFds: Send + 'static +where + Self: Sized, +{ + fn from_reply( + iterable: impl IntoIterator, + pipeid: u32, + ) -> Result; +} + +/// No file descriptors or pipeid expected +impl FromReplyFds for () { + fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { + if fds.into_iter().next().is_some() { + return Err(Error::Other("expected no fds".into())); + } + if pipeid != 0 { + return Err(Error::Other("unexpected pipeid".into())); + } + Ok(()) + } +} + +/// A FinishPipe instance +impl FromReplyFds for FinishPipe { + fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { + let mut fds = fds.into_iter(); + let Some(first_fd) = fds.next() else { + return Err(Error::Other("Expected fd for FinishPipe".into())); + }; + if fds.next().is_some() { + return Err(Error::Other("More than one fd for FinishPipe".into())); + } + let Some(pipeid) = PipeId::try_new(pipeid) else { + return Err(Error::Other("Expected pipeid for FinishPipe".into())); + }; + Ok(Self { + pipeid, + datafd: first_fd, + }) + } +} + +/// A DualFds instance +impl FromReplyFds for DualFds { + fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { + let mut fds = fds.into_iter(); + let Some(datafd) = fds.next() else { + return Err(Error::Other("Expected data fd for DualFds".into())); + }; + let Some(errfd) = fds.next() else { + return Err(Error::Other("Expected err fd for DualFds".into())); + }; + if fds.next().is_some() { + return Err(Error::Other("More than two fds for DualFds".into())); + } + if pipeid != 0 { + return Err(Error::Other("Unexpected pipeid with DualFds".into())); + } + Ok(Self { datafd, errfd }) } } @@ -404,7 +455,7 @@ impl ImageProxy { }; // Verify semantic version - let (protover, [], []): (String, _, _) = r.impl_request("Initialize", [(); 0]).await?; + let (protover, _): (String, ()) = r.impl_request("Initialize", [(); 0]).await?; tracing::debug!("Remote protocol version: {protover}"); let protover = semver::Version::parse(protover.as_str())?; // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`. @@ -420,14 +471,10 @@ impl ImageProxy { Ok(r) } - async fn impl_request_raw< - T: serde::de::DeserializeOwned + Send + 'static, - const N: usize, - const M: usize, - >( + async fn impl_request_raw( sockfd: Arc>, req: Request, - ) -> Result<(T, [OwnedFd; N], [PipeId; M])> { + ) -> Result<(T, F)> { tracing::trace!("sending request {}", req.method.as_str()); // TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio let r = tokio::task::spawn_blocking(move || { @@ -464,11 +511,8 @@ impl ImageProxy { error: reply.error.into(), }); } - Ok(( - serde_json::from_value(reply.value)?, - fixed_from_iterable(fdret)?, - fixed_from_iterable(PipeId::try_new(reply.pipeid))?, - )) + let fds = FromReplyFds::from_reply(fdret, reply.pipeid)?; + Ok((serde_json::from_value(reply.value)?, fds)) }) .await .map_err(|e| Error::Other(e.to_string().into()))??; @@ -477,15 +521,11 @@ impl ImageProxy { } #[instrument(skip(args))] - async fn impl_request< - T: serde::de::DeserializeOwned + Send + 'static, - const N: usize, - const M: usize, - >( + async fn impl_request( &self, method: &str, args: impl IntoIterator>, - ) -> Result<(T, [OwnedFd; N], [PipeId; M])> { + ) -> Result<(T, F)> { let req = Self::impl_request_raw(Arc::clone(&self.sockfd), Request::new(method, args)); let mut childwait = self.childwait.lock().await; tokio::select! { @@ -501,21 +541,21 @@ impl ImageProxy { #[instrument] async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> { tracing::debug!("closing pipe"); - let (r, [], []) = self.impl_request("FinishPipe", [pipeid.0.get()]).await?; + let (r, ()) = self.impl_request("FinishPipe", [pipeid.0.get()]).await?; Ok(r) } #[instrument] pub async fn open_image(&self, imgref: &str) -> Result { tracing::debug!("opening image"); - let (imgid, [], []) = self.impl_request("OpenImage", [imgref]).await?; + let (imgid, ()) = self.impl_request("OpenImage", [imgref]).await?; Ok(OpenedImage(imgid)) } #[instrument] pub async fn open_image_optional(&self, imgref: &str) -> Result> { tracing::debug!("opening image"); - let (imgid, [], []) = self.impl_request("OpenImageOptional", [imgref]).await?; + let (imgid, ()) = self.impl_request("OpenImageOptional", [imgref]).await?; if imgid == 0 { Ok(None) } else { @@ -526,16 +566,16 @@ impl ImageProxy { #[instrument] pub async fn close_image(&self, img: &OpenedImage) -> Result<()> { tracing::debug!("closing image"); - let (r, [], []) = self.impl_request("CloseImage", [img.0]).await?; + let (r, ()) = self.impl_request("CloseImage", [img.0]).await?; Ok(r) } - async fn read_all_fd(&self, datafd: OwnedFd, pipeid: PipeId) -> Result> { - let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); + async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result> { + let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd)); let mut fd = tokio::io::BufReader::new(fd); let mut r = Vec::new(); let reader = fd.read_to_end(&mut r); - let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipeid)); + let (nbytes, finish) = tokio::join!(reader, self.finish_pipe(pipe.pipeid)); finish?; assert_eq!(nbytes?, r.len()); Ok(r) @@ -545,8 +585,8 @@ impl ImageProxy { /// The original digest of the unconverted manifest is also returned. /// For more information on OCI manifests, see pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec)> { - let (digest, [datafd], [pipeid]) = self.impl_request("GetManifest", [img.0]).await?; - Ok((digest, self.read_all_fd(datafd, pipeid).await?)) + let (digest, pipefd) = self.impl_request("GetManifest", [img.0]).await?; + Ok((digest, self.read_finish_pipe(pipefd).await?)) } /// Fetch the manifest. @@ -563,8 +603,8 @@ impl ImageProxy { /// Fetch the config. /// For more information on OCI config, see pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result> { - let ((), [datafd], [pipeid]) = self.impl_request("GetFullConfig", [img.0]).await?; - self.read_all_fd(datafd, pipeid).await + let ((), pipe) = self.impl_request("GetFullConfig", [img.0]).await?; + self.read_finish_pipe(pipe).await } /// Fetch the config. @@ -601,11 +641,11 @@ impl ImageProxy { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into(), size.into()]; - let (bloblen, [datafd], [pipeid]) = self.impl_request("GetBlob", args).await?; + let (bloblen, pipe): (u64, FinishPipe) = self.impl_request("GetBlob", args).await?; let _: u64 = bloblen; - let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); + let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd)); let fd = tokio::io::BufReader::new(fd); - let finish = Box::pin(self.finish_pipe(pipeid)); + let finish = Box::pin(self.finish_pipe(pipe.pipeid)); Ok((fd, finish)) } @@ -650,9 +690,9 @@ impl ImageProxy { )> { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into()]; - let (bloblen, [datafd, errfd], []) = self.impl_request("GetRawBlob", args).await?; - let fd = tokio::fs::File::from_std(std::fs::File::from(datafd)); - let err = Self::read_blob_error(errfd).boxed(); + let (bloblen, fds): (u64, DualFds) = self.impl_request("GetRawBlob", args).await?; + let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd)); + let err = Self::read_blob_error(fds.errfd).boxed(); Ok((bloblen, fd, err)) } @@ -678,14 +718,14 @@ impl ImageProxy { ) -> Result>> { tracing::debug!("Getting layer info"); if layer_info_piped_proto_version().matches(&self.protover) { - let ((), [datafd], [pipeid]) = self.impl_request("GetLayerInfoPiped", [img.0]).await?; - let buf = self.read_all_fd(datafd, pipeid).await?; + let ((), pipe) = self.impl_request("GetLayerInfoPiped", [img.0]).await?; + let buf = self.read_finish_pipe(pipe).await?; return Ok(Some(serde_json::from_slice(&buf)?)); } if !layer_info_proto_version().matches(&self.protover) { return Ok(None); } - let (layers, [], []) = self.impl_request("GetLayerInfo", [img.0]).await?; + let (layers, ()) = self.impl_request("GetLayerInfo", [img.0]).await?; Ok(Some(layers)) } @@ -893,31 +933,15 @@ mod tests { memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap() } - fn fds_and_pipeid( - fds: impl IntoIterator, - pipeid: u32, - ) -> Result<([OwnedFd; N], [PipeId; M])> { - Ok(( - fixed_from_iterable(fds)?, - fixed_from_iterable(PipeId::try_new(pipeid))?, - )) - } - - #[test] - fn test_new_from_raw_values_no_fds_no_pipeid() { - let ([], []) = fds_and_pipeid([], 0).unwrap(); - } - #[test] fn test_new_from_raw_values_finish_pipe() { let datafd = create_dummy_fd(); // Keep a raw fd to compare later, as fds_and_pipeid consumes datafd let raw_datafd_val = datafd.as_raw_fd(); let fds = vec![datafd]; - let pipeid = PipeId::try_new(1).unwrap(); - let ([res_datafd], [res_pipeid]) = fds_and_pipeid(fds, pipeid.0.get()).unwrap(); - assert_eq!(res_pipeid, pipeid); - assert_eq!(res_datafd.as_raw_fd(), raw_datafd_val); + let v = FinishPipe::from_reply(fds, 1).unwrap(); + assert_eq!(v.pipeid.0.get(), 1); + assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val); } #[test] @@ -927,18 +951,18 @@ mod tests { let raw_datafd_val = datafd.as_raw_fd(); let raw_errfd_val = errfd.as_raw_fd(); let fds = vec![datafd, errfd]; - let ([res_datafd, res_errfd], []) = fds_and_pipeid(fds, 0).unwrap(); - assert_eq!(res_datafd.as_raw_fd(), raw_datafd_val); - assert_eq!(res_errfd.as_raw_fd(), raw_errfd_val); + let v = DualFds::from_reply(fds, 0).unwrap(); + assert_eq!(v.datafd.as_raw_fd(), raw_datafd_val); + assert_eq!(v.errfd.as_raw_fd(), raw_errfd_val); } #[test] fn test_new_from_raw_values_error_too_many_fds() { let fds = vec![create_dummy_fd(), create_dummy_fd(), create_dummy_fd()]; - match fds_and_pipeid(fds, 0) { - Ok(([datafd, errfd], [])) => unreachable!("{datafd:?} {errfd:?}"), + match DualFds::from_reply(fds, 0) { + Ok(v) => unreachable!("{v:?}"), Err(Error::Other(msg)) => { - assert_eq!(msg.as_ref(), "Expected 2 OwnedFd but got 3") + assert_eq!(msg.as_ref(), "More than two fds for DualFds") } Err(other) => unreachable!("{other}"), } @@ -947,10 +971,10 @@ mod tests { #[test] fn test_new_from_raw_values_error_fd_with_zero_pipeid() { let fds = vec![create_dummy_fd()]; - match fds_and_pipeid(fds, 0) { - Ok(([datafd], [pipeid])) => unreachable!("{datafd:?} {pipeid:?}"), + match FinishPipe::from_reply(fds, 0) { + Ok(v) => unreachable!("{v:?}"), Err(Error::Other(msg)) => { - assert_eq!(msg.as_ref(), "Expected 1 PipeId but got 0") + assert_eq!(msg.as_ref(), "Expected pipeid for FinishPipe") } Err(other) => unreachable!("{other}"), } @@ -959,10 +983,10 @@ mod tests { #[test] fn test_new_from_raw_values_error_pipeid_with_both_fds() { let fds = vec![create_dummy_fd(), create_dummy_fd()]; - match fds_and_pipeid(fds, 1) { - Ok(([datafd, errfd], [])) => unreachable!("{datafd:?} {errfd:?}"), + match DualFds::from_reply(fds, 1) { + Ok(v) => unreachable!("{v:?}"), Err(Error::Other(msg)) => { - assert_eq!(msg.as_ref(), "Expected 0 PipeId but got 1") + assert_eq!(msg.as_ref(), "Unexpected pipeid with DualFds") } Err(other) => unreachable!("{other}"), } @@ -971,10 +995,10 @@ mod tests { #[test] fn test_new_from_raw_values_error_no_fd_with_pipeid() { let fds: Vec = vec![]; - match fds_and_pipeid(fds, 1) { - Ok(([datafd], [pipeid])) => unreachable!("{datafd:?} {pipeid:?}"), + match FinishPipe::from_reply(fds, 1) { + Ok(v) => unreachable!("{v:?}"), Err(Error::Other(msg)) => { - assert_eq!(msg.as_ref(), "Expected 1 OwnedFd but got 0") + assert_eq!(msg.as_ref(), "Expected fd for FinishPipe") } Err(other) => unreachable!("{other}"), } From d5dc71a14d42f3cae1b4279cc9a2167354194803 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Fri, 30 May 2025 09:13:30 -0400 Subject: [PATCH 6/7] Use itertools to reduce manual iterator parsing Signed-off-by: Colin Walters --- Cargo.toml | 1 + src/imageproxy.rs | 35 ++++++++++++----------------------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 84eed9c..8c696d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tokio = { features = ["fs", "io-util", "macros", "process", "rt", "sync"], versi tracing = "0.1" # We support versions 2, 3 and 4 cap-std-ext = ">= 2.0, <= 4.0" +itertools = "0.14.0" [dev-dependencies] anyhow = "1.0" diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 337bfc9..46b3af6 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -7,6 +7,7 @@ use cap_std_ext::prelude::CapStdExtCommandExt; use cap_std_ext::{cap_std, cap_tempfile}; use futures_util::{Future, FutureExt}; +use itertools::Itertools; use oci_spec::image::{Descriptor, Digest}; use serde::{Deserialize, Serialize}; use std::fs::File; @@ -382,39 +383,27 @@ impl FromReplyFds for () { /// A FinishPipe instance impl FromReplyFds for FinishPipe { fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { - let mut fds = fds.into_iter(); - let Some(first_fd) = fds.next() else { - return Err(Error::Other("Expected fd for FinishPipe".into())); - }; - if fds.next().is_some() { - return Err(Error::Other("More than one fd for FinishPipe".into())); - } let Some(pipeid) = PipeId::try_new(pipeid) else { return Err(Error::Other("Expected pipeid for FinishPipe".into())); }; - Ok(Self { - pipeid, - datafd: first_fd, - }) + let datafd = fds + .into_iter() + .exactly_one() + .map_err(|_| Error::Other("Expected exactly one fd for FinishPipe".into()))?; + Ok(Self { pipeid, datafd }) } } /// A DualFds instance impl FromReplyFds for DualFds { fn from_reply(fds: impl IntoIterator, pipeid: u32) -> Result { - let mut fds = fds.into_iter(); - let Some(datafd) = fds.next() else { - return Err(Error::Other("Expected data fd for DualFds".into())); - }; - let Some(errfd) = fds.next() else { - return Err(Error::Other("Expected err fd for DualFds".into())); - }; - if fds.next().is_some() { - return Err(Error::Other("More than two fds for DualFds".into())); - } if pipeid != 0 { return Err(Error::Other("Unexpected pipeid with DualFds".into())); } + let [datafd, errfd] = fds + .into_iter() + .collect_array() + .ok_or_else(|| Error::Other("Expected two fds for DualFds".into()))?; Ok(Self { datafd, errfd }) } } @@ -962,7 +951,7 @@ mod tests { match DualFds::from_reply(fds, 0) { Ok(v) => unreachable!("{v:?}"), Err(Error::Other(msg)) => { - assert_eq!(msg.as_ref(), "More than two fds for DualFds") + assert_eq!(msg.as_ref(), "Expected two fds for DualFds") } Err(other) => unreachable!("{other}"), } @@ -998,7 +987,7 @@ mod tests { match FinishPipe::from_reply(fds, 1) { Ok(v) => unreachable!("{v:?}"), Err(Error::Other(msg)) => { - assert_eq!(msg.as_ref(), "Expected fd for FinishPipe") + assert_eq!(msg.as_ref(), "Expected exactly one fd for FinishPipe") } Err(other) => unreachable!("{other}"), } From 6f6622b5c5cb04acb039332035df7100d4422dd8 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Fri, 30 May 2025 11:21:09 -0400 Subject: [PATCH 7/7] Cleanly split fd-vs-nofds request APIs Most requests don't have fds, so let's optimize it a bit and avoid the callers needing to have `()` spelled out. Signed-off-by: Colin Walters --- src/imageproxy.rs | 47 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 46b3af6..d3fe99e 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -444,7 +444,7 @@ impl ImageProxy { }; // Verify semantic version - let (protover, _): (String, ()) = r.impl_request("Initialize", [(); 0]).await?; + let protover: String = r.impl_request("Initialize", [(); 0]).await?; tracing::debug!("Remote protocol version: {protover}"); let protover = semver::Version::parse(protover.as_str())?; // Previously we had a feature to opt-in to requiring newer versions using `if cfg!()`. @@ -460,6 +460,7 @@ impl ImageProxy { Ok(r) } + /// Create and send a request. Should only be used by impl_request. async fn impl_request_raw( sockfd: Arc>, req: Request, @@ -509,8 +510,13 @@ impl ImageProxy { Ok(r) } + /// Create a request that may return file descriptors, and also check for an unexpected + /// termination of the child process. #[instrument(skip(args))] - async fn impl_request( + async fn impl_request_with_fds< + T: serde::de::DeserializeOwned + Send + 'static, + F: FromReplyFds, + >( &self, method: &str, args: impl IntoIterator>, @@ -527,24 +533,36 @@ impl ImageProxy { } } + /// A synchronous invocation which does not return any file descriptors. + async fn impl_request( + &self, + method: &str, + args: impl IntoIterator>, + ) -> Result { + let (r, ()) = self.impl_request_with_fds(method, args).await?; + Ok(r) + } + #[instrument] async fn finish_pipe(&self, pipeid: PipeId) -> Result<()> { tracing::debug!("closing pipe"); - let (r, ()) = self.impl_request("FinishPipe", [pipeid.0.get()]).await?; + let (r, ()) = self + .impl_request_with_fds("FinishPipe", [pipeid.0.get()]) + .await?; Ok(r) } #[instrument] pub async fn open_image(&self, imgref: &str) -> Result { tracing::debug!("opening image"); - let (imgid, ()) = self.impl_request("OpenImage", [imgref]).await?; + let imgid = self.impl_request("OpenImage", [imgref]).await?; Ok(OpenedImage(imgid)) } #[instrument] pub async fn open_image_optional(&self, imgref: &str) -> Result> { tracing::debug!("opening image"); - let (imgid, ()) = self.impl_request("OpenImageOptional", [imgref]).await?; + let imgid = self.impl_request("OpenImageOptional", [imgref]).await?; if imgid == 0 { Ok(None) } else { @@ -554,9 +572,7 @@ impl ImageProxy { #[instrument] pub async fn close_image(&self, img: &OpenedImage) -> Result<()> { - tracing::debug!("closing image"); - let (r, ()) = self.impl_request("CloseImage", [img.0]).await?; - Ok(r) + self.impl_request("CloseImage", [img.0]).await } async fn read_finish_pipe(&self, pipe: FinishPipe) -> Result> { @@ -574,7 +590,7 @@ impl ImageProxy { /// The original digest of the unconverted manifest is also returned. /// For more information on OCI manifests, see pub async fn fetch_manifest_raw_oci(&self, img: &OpenedImage) -> Result<(String, Vec)> { - let (digest, pipefd) = self.impl_request("GetManifest", [img.0]).await?; + let (digest, pipefd) = self.impl_request_with_fds("GetManifest", [img.0]).await?; Ok((digest, self.read_finish_pipe(pipefd).await?)) } @@ -592,7 +608,7 @@ impl ImageProxy { /// Fetch the config. /// For more information on OCI config, see pub async fn fetch_config_raw(&self, img: &OpenedImage) -> Result> { - let ((), pipe) = self.impl_request("GetFullConfig", [img.0]).await?; + let ((), pipe) = self.impl_request_with_fds("GetFullConfig", [img.0]).await?; self.read_finish_pipe(pipe).await } @@ -630,7 +646,8 @@ impl ImageProxy { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into(), size.into()]; - let (bloblen, pipe): (u64, FinishPipe) = self.impl_request("GetBlob", args).await?; + let (bloblen, pipe): (u64, FinishPipe) = + self.impl_request_with_fds("GetBlob", args).await?; let _: u64 = bloblen; let fd = tokio::fs::File::from_std(std::fs::File::from(pipe.datafd)); let fd = tokio::io::BufReader::new(fd); @@ -679,7 +696,7 @@ impl ImageProxy { )> { tracing::debug!("fetching blob"); let args: Vec = vec![img.0.into(), digest.to_string().into()]; - let (bloblen, fds): (u64, DualFds) = self.impl_request("GetRawBlob", args).await?; + let (bloblen, fds): (u64, DualFds) = self.impl_request_with_fds("GetRawBlob", args).await?; let fd = tokio::fs::File::from_std(std::fs::File::from(fds.datafd)); let err = Self::read_blob_error(fds.errfd).boxed(); Ok((bloblen, fd, err)) @@ -707,14 +724,16 @@ impl ImageProxy { ) -> Result>> { tracing::debug!("Getting layer info"); if layer_info_piped_proto_version().matches(&self.protover) { - let ((), pipe) = self.impl_request("GetLayerInfoPiped", [img.0]).await?; + let ((), pipe) = self + .impl_request_with_fds("GetLayerInfoPiped", [img.0]) + .await?; let buf = self.read_finish_pipe(pipe).await?; return Ok(Some(serde_json::from_slice(&buf)?)); } if !layer_info_proto_version().matches(&self.protover) { return Ok(None); } - let (layers, ()) = self.impl_request("GetLayerInfo", [img.0]).await?; + let layers = self.impl_request("GetLayerInfo", [img.0]).await?; Ok(Some(layers)) }