Skip to content

Commit 890c452

Browse files
committed
Add get_raw_blob
This is needed for my "registry frontend for containers-storage" project, but should also be useful in general as we've discussed moving checksum verification into callers. Then there's no need for a driver etc. Signed-off-by: Colin Walters <[email protected]>
1 parent b5a74f7 commit 890c452

File tree

2 files changed

+173
-33
lines changed

2 files changed

+173
-33
lines changed

examples/client.rs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,24 @@ struct GetBlobOpts {
2828
size: u64,
2929
}
3030

31+
#[derive(clap::Parser, Debug)]
32+
struct FetchContainerToDevNullOpts {
33+
#[clap(flatten)]
34+
metaopts: GetMetadataOpts,
35+
36+
/// Use the "raw" path for fetching blobs
37+
#[clap(long)]
38+
raw_blobs: bool,
39+
}
40+
3141
/// Simple program to greet a person
3242
#[derive(clap::Parser, Debug)]
3343
#[command(version, about, long_about = None)]
3444
enum Opt {
3545
GetMetadata(GetMetadataOpts),
3646
GetBlob(GetBlobOpts),
37-
FetchContainerToDevNull(GetMetadataOpts),
47+
GetBlobRaw(GetBlobOpts),
48+
FetchContainerToDevNull(FetchContainerToDevNullOpts),
3849
}
3950

4051
#[derive(serde::Serialize, Debug)]
@@ -86,18 +97,49 @@ async fn get_blob(o: GetBlobOpts) -> Result<()> {
8697
Ok(())
8798
}
8899

89-
async fn fetch_container_to_devnull(o: GetMetadataOpts) -> Result<()> {
90-
let config = o.proxy_opts();
100+
async fn get_blob_raw(o: GetBlobOpts) -> Result<()> {
101+
let proxy = containers_image_proxy::ImageProxy::new().await?;
102+
let img = proxy.open_image(&o.reference).await?;
103+
let (_, mut datafd, err) = proxy.get_raw_blob(&img, &o.digest).await?;
104+
105+
let mut stdout = std::io::stdout().lock();
106+
let reader = async move {
107+
let mut buffer = [0u8; 8192];
108+
loop {
109+
let n = datafd.read(&mut buffer).await?;
110+
if n == 0 {
111+
return anyhow::Ok(());
112+
}
113+
stdout.write_all(&buffer[..n])?;
114+
}
115+
};
116+
117+
let (a, b) = tokio::join!(reader, err);
118+
a?;
119+
b?;
120+
Ok(())
121+
}
122+
123+
async fn fetch_container_to_devnull(o: FetchContainerToDevNullOpts) -> Result<()> {
124+
let config = o.metaopts.proxy_opts();
91125
let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
92-
let img = &proxy.open_image(&o.reference).await?;
126+
let img = &proxy.open_image(&o.metaopts.reference).await?;
93127
let manifest = proxy.fetch_manifest(img).await?.1;
94128
for layer in manifest.layers() {
95-
let (mut blob, driver) = proxy.get_descriptor(img, layer).await?;
96129
let mut devnull = tokio::io::sink();
97-
let copier = tokio::io::copy(&mut blob, &mut devnull);
98-
let (copier, driver) = tokio::join!(copier, driver);
99-
copier?;
100-
driver?;
130+
if o.raw_blobs {
131+
let (_, mut blob, err) = proxy.get_raw_blob(img, layer.digest()).await?;
132+
let copier = tokio::io::copy(&mut blob, &mut devnull);
133+
let (copier, err) = tokio::join!(copier, err);
134+
copier?;
135+
err?;
136+
} else {
137+
let (mut blob, driver) = proxy.get_descriptor(img, layer).await?;
138+
let copier = tokio::io::copy(&mut blob, &mut devnull);
139+
let (copier, driver) = tokio::join!(copier, driver);
140+
copier?;
141+
driver?;
142+
}
101143
}
102144
Ok(())
103145
}
@@ -106,6 +148,7 @@ async fn run() -> Result<()> {
106148
match Opt::parse() {
107149
Opt::GetMetadata(o) => get_metadata(o).await,
108150
Opt::GetBlob(o) => get_blob(o).await,
151+
Opt::GetBlobRaw(o) => get_blob_raw(o).await,
109152
Opt::FetchContainerToDevNull(o) => fetch_container_to_devnull(o).await,
110153
}
111154
}

src/imageproxy.rs

Lines changed: 121 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
77
use cap_std_ext::prelude::CapStdExtCommandExt;
88
use cap_std_ext::{cap_std, cap_tempfile};
9-
use futures_util::Future;
9+
use futures_util::{Future, FutureExt};
1010
use oci_spec::image::{Descriptor, Digest};
1111
use serde::{Deserialize, Serialize};
1212
use std::fs::File;
13+
use std::num::NonZeroU32;
1314
use std::ops::Range;
1415
use std::os::fd::OwnedFd;
1516
use std::os::unix::prelude::CommandExt;
@@ -64,6 +65,18 @@ impl Error {
6465
}
6566
}
6667

68+
/// Errors returned by get_raw_blob
69+
#[derive(Error, Debug)]
70+
#[non_exhaustive]
71+
pub enum GetBlobError {
72+
/// A client may reasonably retry on this type of error.
73+
#[error("retryable error")]
74+
Retryable(Box<str>),
75+
#[error("error")]
76+
/// An unknown other error
77+
Other(Box<str>),
78+
}
79+
6780
impl From<rustix::io::Errno> for Error {
6881
fn from(value: rustix::io::Errno) -> Self {
6982
Self::Io(value.into())
@@ -318,6 +331,12 @@ pub struct ConvertedLayerInfo {
318331
pub media_type: oci_spec::image::MediaType,
319332
}
320333

334+
/// Maps the two types of return values from the proxy
335+
enum FileDescriptors {
336+
FinishPipe { pipeid: NonZeroU32, datafd: OwnedFd },
337+
DualFds { datafd: OwnedFd, errfd: OwnedFd },
338+
}
339+
321340
impl ImageProxy {
322341
/// Create an image proxy that fetches the target image, using default configuration.
323342
pub async fn new() -> Result<Self> {
@@ -373,7 +392,7 @@ impl ImageProxy {
373392
async fn impl_request_raw<T: serde::de::DeserializeOwned + Send + 'static>(
374393
sockfd: Arc<Mutex<OwnedFd>>,
375394
req: Request,
376-
) -> Result<(T, Option<(OwnedFd, u32)>)> {
395+
) -> Result<(T, Option<FileDescriptors>)> {
377396
tracing::trace!("sending request {}", req.method.as_str());
378397
// TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
379398
let r = tokio::task::spawn_blocking(move || {
@@ -394,14 +413,14 @@ impl ImageProxy {
394413
rustix::net::RecvFlags::CMSG_CLOEXEC,
395414
)?
396415
.bytes;
397-
let fdret = cmsg_buffer
416+
let mut fdret = cmsg_buffer
398417
.drain()
399418
.filter_map(|m| match m {
400419
rustix::net::RecvAncillaryMessage::ScmRights(f) => Some(f),
401420
_ => None,
402421
})
403422
.flatten()
404-
.next();
423+
.fuse();
405424
let buf = &buf[..nread];
406425
let reply: Reply = serde_json::from_slice(buf)?;
407426
if !reply.success {
@@ -410,21 +429,42 @@ impl ImageProxy {
410429
error: reply.error.into(),
411430
});
412431
}
413-
let fdret = match (fdret, reply.pipeid) {
414-
(Some(fd), n) => {
415-
if n == 0 {
416-
return Err(Error::Other("got fd but no pipeid".into()));
417-
}
418-
Some((fd, n))
432+
let first_fd = fdret.next();
433+
let second_fd = fdret.next();
434+
if fdret.next().is_some() {
435+
return Err(Error::Other(
436+
format!("got more than two file descriptors").into(),
437+
));
438+
}
439+
let pipeid = NonZeroU32::new(reply.pipeid);
440+
let fdret = match (first_fd, second_fd, pipeid) {
441+
// No fds, no pipeid
442+
(None, None, None) => None,
443+
// A FinishPipe instance
444+
(Some(datafd), None, Some(pipeid)) => {
445+
Some(FileDescriptors::FinishPipe { pipeid, datafd })
446+
}
447+
// A dualfd instance
448+
(Some(datafd), Some(errfd), None) => {
449+
Some(FileDescriptors::DualFds { datafd, errfd })
450+
}
451+
// Everything after here is error cases
452+
(Some(_), None, None) => {
453+
return Err(Error::Other(format!("got fd with zero pipeid").into()));
454+
}
455+
(None, Some(_), _) => {
456+
return Err(Error::Other(format!("got errfd with no datafd").into()));
457+
}
458+
(Some(_), Some(_), Some(n)) => {
459+
return Err(Error::Other(
460+
format!("got pipeid {} with both datafd and errfd", n).into(),
461+
));
419462
}
420-
(None, n) => {
421-
if n != 0 {
422-
return Err(Error::Other(format!("got no fd with pipeid {}", n).into()));
423-
}
424-
None
463+
(None, _, Some(n)) => {
464+
return Err(Error::Other(format!("got no fd with pipeid {n}").into()));
425465
}
426466
};
427-
let reply = serde_json::from_value(reply.value)?;
467+
let reply: T = serde_json::from_value(reply.value)?;
428468
Ok((reply, fdret))
429469
})
430470
.await
@@ -438,7 +478,7 @@ impl ImageProxy {
438478
&self,
439479
method: &str,
440480
args: T,
441-
) -> Result<(R, Option<(OwnedFd, u32)>)>
481+
) -> Result<(R, Option<FileDescriptors>)>
442482
where
443483
T: IntoIterator<Item = I>,
444484
I: Into<serde_json::Value>,
@@ -456,9 +496,9 @@ impl ImageProxy {
456496
}
457497

458498
#[instrument]
459-
async fn finish_pipe(&self, pipeid: u32) -> Result<()> {
499+
async fn finish_pipe(&self, pipeid: NonZeroU32) -> Result<()> {
460500
tracing::debug!("closing pipe");
461-
let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
501+
let (r, fd) = self.impl_request("FinishPipe", [pipeid.get()]).await?;
462502
if fd.is_some() {
463503
return Err(Error::Other("Unexpected fd in finish_pipe reply".into()));
464504
}
@@ -494,9 +534,12 @@ impl ImageProxy {
494534
Ok(r)
495535
}
496536

497-
async fn read_all_fd(&self, fd: Option<(OwnedFd, u32)>) -> Result<Vec<u8>> {
498-
let (fd, pipeid) = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?;
499-
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
537+
async fn read_all_fd(&self, fd: Option<FileDescriptors>) -> Result<Vec<u8>> {
538+
let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?;
539+
let FileDescriptors::FinishPipe { pipeid, datafd } = fd else {
540+
return Err(Error::Other("got dualfds, expecting FinishPipe fd".into()));
541+
};
542+
let fd = tokio::fs::File::from_std(std::fs::File::from(datafd));
500543
let mut fd = tokio::io::BufReader::new(fd);
501544
let mut r = Vec::new();
502545
let reader = fd.read_to_end(&mut r);
@@ -569,13 +612,67 @@ impl ImageProxy {
569612
let args: Vec<serde_json::Value> =
570613
vec![img.0.into(), digest.to_string().into(), size.into()];
571614
let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
572-
let (fd, pipeid) = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?;
573-
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
615+
let fd = fd.ok_or_else(|| Error::Other("Missing fd from reply".into()))?;
616+
let FileDescriptors::FinishPipe { pipeid, datafd } = fd else {
617+
return Err(Error::Other("got dualfds, expecting FinishPipe fd".into()));
618+
};
619+
let fd = tokio::fs::File::from_std(std::fs::File::from(datafd));
574620
let fd = tokio::io::BufReader::new(fd);
575621
let finish = Box::pin(self.finish_pipe(pipeid));
576622
Ok((fd, finish))
577623
}
578624

625+
async fn read_blob_error(fd: OwnedFd) -> std::result::Result<(), GetBlobError> {
626+
let fd = tokio::fs::File::from_std(std::fs::File::from(fd));
627+
let mut errfd = tokio::io::BufReader::new(fd);
628+
let mut buf = Vec::new();
629+
errfd
630+
.read_to_end(&mut buf)
631+
.await
632+
.map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
633+
if buf.is_empty() {
634+
return Ok(());
635+
}
636+
#[derive(Deserialize)]
637+
struct RemoteError {
638+
code: String,
639+
message: String,
640+
}
641+
let e: RemoteError = serde_json::from_slice(&buf)
642+
.map_err(|e| GetBlobError::Other(e.to_string().into_boxed_str()))?;
643+
match e.code.as_str() {
644+
// Actually this is OK
645+
"EPIPE" => Ok(()),
646+
"retryable" => Err(GetBlobError::Retryable(e.message.into_boxed_str())),
647+
_ => Err(GetBlobError::Other(e.message.into_boxed_str())),
648+
}
649+
}
650+
651+
/// Fetch a blob identified by e.g. `sha256:<digest>`; does not perform
652+
/// any verification that the blob matches the digest. The size of the
653+
/// blob and a pipe file descriptor are returned.
654+
#[instrument]
655+
pub async fn get_raw_blob(
656+
&self,
657+
img: &OpenedImage,
658+
digest: &Digest,
659+
) -> Result<(
660+
u64,
661+
tokio::fs::File,
662+
impl Future<Output = std::result::Result<(), GetBlobError>> + Unpin + '_,
663+
)> {
664+
tracing::debug!("fetching blob");
665+
let args: Vec<serde_json::Value> = vec![img.0.into(), digest.to_string().into()];
666+
let (bloblen, fd) = self.impl_request::<u64, _, _>("GetRawBlob", args).await?;
667+
let fd = fd.ok_or_else(|| Error::new_other("Missing fd from reply"))?;
668+
let FileDescriptors::DualFds { datafd, errfd } = fd else {
669+
return Err(Error::Other("got single fd, expecting dual fds".into()));
670+
};
671+
let fd = tokio::fs::File::from_std(std::fs::File::from(datafd));
672+
let err = Self::read_blob_error(errfd).boxed();
673+
Ok((bloblen, fd, err))
674+
}
675+
579676
/// Fetch a descriptor. The requested size and digest are verified (by the proxy process).
580677
#[instrument]
581678
pub async fn get_descriptor(

0 commit comments

Comments
 (0)