diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index 659fba72..61652688 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -13,15 +13,22 @@ version.workspace = true [dependencies] anyhow = { version = "1.0.87", default-features = false } async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] } +async-stream = "0.3.6" +bytes = "1.10.1" composefs = { workspace = true } containers-image-proxy = { version = "0.7.1", default-features = false } +futures = "0.3.31" hex = { version = "0.4.0", default-features = false } indicatif = { version = "0.17.0", default-features = false, features = ["tokio"] } +oci-client = "0.15.0" oci-spec = { version = "0.7.0", default-features = false } rustix = { version = "1.0.0", features = ["fs"] } sha2 = { version = "0.10.1", default-features = false } tar = { version = "0.4.38", default-features = false } tokio = { version = "1.24.2", features = ["rt-multi-thread"] } +tokio-util = "0.7.15" +zstd = "0.13.3" +zstd-chunked = "0.2.0" [dev-dependencies] similar-asserts = "1.7.0" diff --git a/crates/composefs-oci/src/client.rs b/crates/composefs-oci/src/client.rs new file mode 100644 index 00000000..79d3e6d1 --- /dev/null +++ b/crates/composefs-oci/src/client.rs @@ -0,0 +1,485 @@ +// Pull an OCI image using oci-client and zstd-chunked (if applicable) + +use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; +use std::{ + collections::HashMap, + fmt, + iter::zip, + ops::Range, + sync::Arc, + sync::Mutex, + thread, + time::{Duration, Instant}, + pin::pin +}; + +use anyhow::{bail, ensure, Context, Result}; +use async_stream::stream; +use bytes::Bytes; +use futures::{ + channel::oneshot, + stream::{self, Stream, StreamExt, TryStreamExt}, + try_join, +}; +use indicatif::{ProgressBar, ProgressStyle}; +use oci_client::{ + client::{BlobResponse, ClientConfig}, + manifest::OciDescriptor, + secrets::RegistryAuth, + Client, Reference, +}; +use oci_spec::image::ImageConfiguration; +use rustix::{ + buffer::spare_capacity, + fs::{readlinkat, symlinkat}, + io::{read, Errno}, +}; +use tokio_util::io::StreamReader; +use zstd_chunked::{ + Chunk, ContentReference, MetadataReference, MetadataReferences, Stream as Metadata, +}; + +use crate::{sha256_from_digest, tar::split_async, ContentAndVerity}; + +use composefs::{ + fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest, +}; + +// The Chameleon keeps track of how well the download is going. Each byte successfully downloaded +// increases the karma by 1 and each network failure decreases it by 1. The passage of time also +// decreases karma, with exponential decay. This means that as long as progress is steady, +// even with really slow download speeds (think 10bytes/sec), we can tolerate a large number of +// network errors, but once we stop making forward progress and exponential decay sets in, our +// patience for errors decreases rapidly. It also means that a single error at the start is +// immediately fatal, which feels correct. +struct Chameleon { + // 🌈🦎📊 + karma: f64, + updated: Instant, +} + +impl Chameleon { + fn get(&self, now: &Instant) -> f64 { + // first order exponential decay, time constant = 1s (ie: drops to 36.79% after 1 sec) + self.karma / now.duration_since(self.updated).as_secs_f64().exp() + } + + fn update(&mut self, delta: impl Into) -> f64 { + let now = Instant::now(); + self.karma = self.get(&now) + delta.into(); + self.updated = now; + self.karma + } +} + +impl fmt::Debug for Chameleon { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Chameleon {{ value: {}, updated: {:?} }} -> {}", + self.karma, + self.updated, + self.get(&Instant::now()) + ) + } +} + +impl Default for Chameleon { + fn default() -> Self { + Self { + karma: 0., + updated: Instant::now(), + } + } +} + +pub(super) struct PullOp { + repository: Arc>, + client: Client, + image: Reference, + progress: ProgressBar, + karma: Mutex, // could be RefCell but then PullOp isn't Send +} + +async fn run_in_thread( + f: impl FnOnce() -> Result + Send + 'static, +) -> Result { + let (tx, rx) = oneshot::channel(); + thread::spawn(move || tx.send(f())); + rx.await.context("Thread panicked or sender dropped")? +} + +impl PullOp { + async fn softfail(&self, err: impl Into) -> std::io::Result<()> { + #[allow(clippy::unwrap_used)] + if self.karma.lock().unwrap().update(-1.) < 0. { + // Karma went negative: let the error bubble out. + Err(err.into()) + } else { + // Give it a second... + tokio::time::sleep(Duration::from_secs(1)).await; + Ok(()) + } + } + + // To simplify progress tracking, if this function fails, the entire operation needs to be + // aborted, so it tries really hard not to fail... it will also never download any byte that it + // has already successfully received (ie: it will make the range request smaller before trying + // again). + fn stream_range<'a>( + &self, + desc: &'a OciDescriptor, + range: &Range, + ) -> impl Stream> + use<'_, 'a, ObjectId> { + let (mut start, end) = (range.start, range.end); + + // https://github.com/rust-lang/rust/issues/43122 + stream! { + 'send_request: while start < end { + let resp = match self + .client + .pull_blob_stream_partial(&self.image, desc, start, Some(end - start)) + .await + { + Ok(resp) => resp, + Err(err) => { + self.softfail(std::io::Error::other(err)).await?; + continue 'send_request; + } + }; + + // Maybe some servers would respond with a full request if we give the complete range + // but let's wait until someone actually encounters that before we try to handle it... + let BlobResponse::Partial(mut stream) = resp else { + yield Err(std::io::Error::other(anyhow::anyhow!("Server has no range support"))); + return; + }; + + // Iterate over the stream of bytes... + while let Some(result) = stream.next().await { + match result { + Ok(bytes) => { + let n_bytes = bytes.len() as u64; + + #[allow(clippy::cast_precision_loss, clippy::unwrap_used)] + self.karma.lock().unwrap().update(n_bytes as f64); + self.progress.inc(n_bytes); + start += n_bytes; + yield Ok(bytes); + } + Err(err) => { + self.softfail(err).await?; + continue 'send_request; + } + } + } + } + } + } + + async fn download_range(&self, desc: &OciDescriptor, range: &Range) -> Result> { + let stream = self.stream_range(desc, range); + // TODO: find a better way... + let bytes_vec: Vec = stream.try_collect().await?; + let data: Vec = bytes_vec.into_iter().flatten().collect(); + Ok(data) + } + + async fn download_all(&self, desc: &OciDescriptor) -> Result> { + let everything = 0..desc.size.try_into()?; + self.download_range(desc, &everything).await + } + + async fn check_and_save( + &self, + digest: String, + decompress: bool, + mut data: Vec, + ) -> Result { + let repository = Arc::clone(&self.repository); + + run_in_thread(move || { + // decompressing here is slightly awkward but we want it in the thread + if decompress { + data = zstd::decode_all(&data[..])?; + } + + // TODO: validate... + let _ = digest; + + // TODO: put this in a more reasonable place... + let id = repository.ensure_object(&data)?; + match symlinkat(id.to_object_pathname(), repository.objects_dir()?, digest) { + Ok(()) | Err(Errno::EXIST) => Ok(()), + Err(err) => Err(err), + }?; + + Ok(id) + }) + .await + } + + fn check_cached(&self, digest: &str) -> Result> { + let dir = self.repository.objects_dir()?; + match readlinkat(dir, digest, []) { + Ok(path) => Ok(Some(ObjectId::from_object_pathname(path.as_bytes())?)), + Err(Errno::NOENT) => Ok(None), + Err(other) => Err(other.into()), + } + } + + fn read_object(&self, id: &ObjectId, size: u64) -> Result> { + let mut data = Vec::with_capacity(size.try_into()?); + read(self.repository.open_object(id)?, spare_capacity(&mut data))?; + ensure!(data.len() as u64 == size, "Short read?"); + Ok(data) + } + + fn read_cached(&self, digest: &str, size: u64) -> Result>> { + self.check_cached(digest)? + .map(|id| self.read_object(&id, size)) + .transpose() + } + + async fn download_metadata( + &self, + layer: &OciDescriptor, + reference: &MetadataReference, + ) -> Result> { + if let Some(digest) = &reference.digest { + if let Some(data) = + self.read_cached(digest, reference.range.end - reference.range.start)? + { + self.progress.dec_length(data.len() as u64); + return Ok(data); + } + } + + let result = self.download_range(layer, &reference.range).await?; + + if let Some(digest) = &reference.digest { + // Caching metadata might not make sense for the "incremental updates" case (since it's + // definitely going to be different next time) but it definitely makes sense from the + // "bad network connection and my download got interrupted" case. + self.check_and_save(digest.clone(), false, result.clone()) + .await?; + } + + Ok(result) + } + + async fn ensure_content( + &self, + layer: &OciDescriptor, + reference: &ContentReference, + ) -> Result { + if let Some(id) = self.check_cached(&reference.digest)? { + self.progress + .dec_length(reference.range.end - reference.range.start); + Ok(id) + } else { + let result = self.download_range(layer, &reference.range).await?; + self.check_and_save(reference.digest.clone(), true, result) + .await + } + } + + async fn download_zstd_chunked_layer( + &self, + layer: &OciDescriptor, + metadata: &MetadataReferences, + diff_id: &str, + ) -> Result { + let (manifest, tarsplit) = try_join!( + self.download_metadata(layer, &metadata.manifest), + self.download_metadata(layer, &metadata.tarsplit) + )?; + + let stream = Metadata::new_from_frames(&manifest[..], &tarsplit[..])?; + + // Remove the parts of the file that we know we won't need (tar headers, etc.) + // We get that by summing up the parts we do need and subtracting it from the total size. + let already_accounted = (manifest.len() + tarsplit.len()) as u64; + let needed: u64 = stream + .references() + .map(|r| r.range.end - r.range.start) + .sum(); + let unneeded = TryInto::::try_into(layer.size)? - needed - already_accounted; + self.progress.dec_length(unneeded); + + // Ensure all external references are in the repository and build an ObjectId map + // Doing this pass first simplifies the async bookkeeping: we can use .buffer_unordered() + // and collect the results as they come in instead of letting large jobs block the queue. + let map: HashMap = stream::iter(stream.references()) + .map(async move |reference| { + let id = self.ensure_content(layer, reference).await?; + Ok::<_, anyhow::Error>((reference.digest.clone(), id)) + }) + .buffer_unordered(100) + .try_collect() + .await?; + + let digest = sha256_from_digest(diff_id)?; + let mut writer = self.repository.create_stream(Some(digest), None); + for chunk in stream.chunks { + match chunk { + Chunk::Inline(data) => { + writer.write_inline(&data); + } + Chunk::External(reference) => { + // SAFETY: We downloaded and mapped all of the references above... + // We could actually avoid building the map if we relied on the sha256 + // symlinks... + let id = &map[&reference.digest]; + + // Unfortunately we have to read the data here: we could build the splitstream + // using only our knowledge of the 'id' but we also need to take a body content + // sha256 of the entire .tar stream to ensure it matches the diff_id... + let data = self.read_object(id, reference.size)?; + writer.write_external(&data, vec![])?; + } + } + } + + self.repository.write_stream(writer, None) + } + + #[allow(clippy::unused_async)] + async fn download_tar_layer( + &self, + layer: &OciDescriptor, + diff_id: &Sha256Digest, + ) -> Result { + // We need to use the layer descriptor to download the compressed layer but it gets + // stored in the repository via the diff_id. Our return value is the + // fsverity digest for the corresponding splitstream. + if let Some(layer_id) = self.repository.check_stream(diff_id)? { + self.progress.dec_length(layer.size.try_into()?); + Ok(layer_id) + } else { + // Otherwise, we need to fetch it... + let stream = self.stream_range(layer, &(0..layer.size as u64)); + let reader = pin!(StreamReader::new(stream)); + + let mut splitstream = self.repository.create_stream(Some(*diff_id), None); + match layer.media_type.as_ref() { + "application/vnd.oci.image.layer.v1.tar" => { + split_async(reader, &mut splitstream).await?; + } + "application/vnd.oci.image.layer.v1.tar+gzip" => { + split_async(GzipDecoder::new(reader), &mut splitstream).await?; + } + "application/vnd.oci.image.layer.v1.tar+zstd" => { + split_async(ZstdDecoder::new(reader), &mut splitstream).await?; + } + other => bail!("Unsupported layer media type {:?}", other), + }; + + self.repository.write_stream(splitstream, None) + } + } + + async fn download_layer(&self, layer: &OciDescriptor, diff_id: &str) -> Result { + let tar_digest = sha256_from_digest(diff_id)?; + + if let Some(metadata) = layer + .annotations + .as_ref() + .and_then(|annotations| MetadataReferences::from_oci(|key| annotations.get(key))) + { + self.download_zstd_chunked_layer(layer, &metadata, diff_id) + .await + } else { + self.download_tar_layer(layer, &tar_digest).await + } + } + + pub async fn ensure_config( + &self, + manifest_layers: &[OciDescriptor], + descriptor: &OciDescriptor, + ) -> Result> { + let config_sha256 = sha256_from_digest(&descriptor.digest)?; + if let Some(config_id) = self.repository.check_stream(&config_sha256)? { + // We already got this config? Nice. + self.progress.println(format!( + "Already have container config {}", + hex::encode(config_sha256) + )); + self.progress.dec_length(descriptor.size as u64); + for layer in manifest_layers { + self.progress.dec_length(layer.size as u64); + } + Ok((config_sha256, config_id)) + } else { + // We need to add the config to the repo. We need to parse the config and make sure we + // have all of the layers first. + // + self.progress + .println(format!("Fetching config {}", hex::encode(config_sha256))); + + let raw_config = self.download_all(descriptor).await?; + let config = ImageConfiguration::from_reader(&raw_config[..])?; + + let mut config_maps = DigestMap::new(); + let layers = zip(manifest_layers, config.rootfs().diff_ids()); + for (descriptor, diff_id) in layers { + let layer_sha256 = sha256_from_digest(diff_id)?; + let id = self.download_layer(descriptor, diff_id).await?; + config_maps.insert(&layer_sha256, &id); + } + + let mut splitstream = self + .repository + .create_stream(Some(config_sha256), Some(config_maps)); + splitstream.write_inline(&raw_config); + let config_id = self.repository.write_stream(splitstream, None)?; + + Ok((config_sha256, config_id)) + } + } + + pub(super) async fn pull( + image: Reference, + repository: Arc>, + ) -> Result> { + let client = Client::new(ClientConfig { + connect_timeout: Some(Duration::from_secs(1)), + read_timeout: Some(Duration::from_secs(1)), + ..Default::default() + }); + + // We don't bother accounting for this in our progress bar + let (manifest, _) = client + .pull_image_manifest(&image, &RegistryAuth::Anonymous) + .await?; + + // But we do include the config + let mut total: i64 = manifest.config.size; + for layer in &manifest.layers { + total += layer.size; + } + + let progress = ProgressBar::new(total.try_into()?); + progress.set_style(ProgressStyle::with_template( + "[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {decimal_bytes_per_sec} {msg}", + )?); + progress.enable_steady_tick(Duration::from_millis(100)); + + let this = Self { + repository, + client, + image, + progress, + karma: Chameleon::default().into(), + }; + + let id = this + .ensure_config(&manifest.layers, &manifest.config) + .await?; + + this.progress.finish(); + + Ok(id) + } +} diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index a2cb6276..0f153503 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -1,17 +1,13 @@ -use std::{cmp::Reverse, process::Command, thread::available_parallelism}; - +pub mod client; pub mod image; +pub mod skopeo; pub mod tar; -use std::{collections::HashMap, io::Read, iter::zip, sync::Arc}; +use std::{collections::HashMap, io::Read, sync::Arc}; use anyhow::{bail, ensure, Context, Result}; -use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; -use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage}; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; +use oci_spec::image::{Descriptor, ImageConfiguration}; use sha2::{Digest, Sha256}; -use tokio::{io::AsyncReadExt, sync::Semaphore}; use composefs::{ fsverity::FsVerityHashValue, @@ -20,7 +16,23 @@ use composefs::{ util::{parse_sha256, Sha256Digest}, }; -use crate::tar::{get_entry, split_async}; +use crate::tar::get_entry; + +type ContentAndVerity = (Sha256Digest, ObjectID); + +pub(crate) fn sha256_from_descriptor(descriptor: &Descriptor) -> Result { + let Some(digest) = descriptor.as_digest_sha256() else { + bail!("Descriptor in oci config is not sha256"); + }; + Ok(parse_sha256(digest)?) +} + +pub(crate) fn sha256_from_digest(digest: &str) -> Result { + match digest.strip_prefix("sha256:") { + Some(rest) => Ok(parse_sha256(rest)?), + None => bail!("Manifest has non-sha256 digest"), + } +} pub fn import_layer( repo: &Arc>, @@ -44,195 +56,6 @@ pub fn ls_layer( Ok(()) } -struct ImageOp { - repo: Arc>, - proxy: ImageProxy, - img: OpenedImage, - progress: MultiProgress, -} - -fn sha256_from_descriptor(descriptor: &Descriptor) -> Result { - let Some(digest) = descriptor.as_digest_sha256() else { - bail!("Descriptor in oci config is not sha256"); - }; - Ok(parse_sha256(digest)?) -} - -fn sha256_from_digest(digest: &str) -> Result { - match digest.strip_prefix("sha256:") { - Some(rest) => Ok(parse_sha256(rest)?), - None => bail!("Manifest has non-sha256 digest"), - } -} - -type ContentAndVerity = (Sha256Digest, ObjectID); - -impl ImageOp { - async fn new(repo: &Arc>, imgref: &str) -> Result { - // See https://github.com/containers/skopeo/issues/2563 - let skopeo_cmd = if imgref.starts_with("containers-storage:") { - let mut cmd = Command::new("podman"); - cmd.args(["unshare", "skopeo"]); - Some(cmd) - } else { - None - }; - - let config = ImageProxyConfig { - skopeo_cmd, - // auth_anonymous: true, debug: true, insecure_skip_tls_verification: Some(true), - ..ImageProxyConfig::default() - }; - let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?; - let img = proxy.open_image(imgref).await.context("Opening image")?; - let progress = MultiProgress::new(); - Ok(ImageOp { - repo: Arc::clone(repo), - proxy, - img, - progress, - }) - } - - pub async fn ensure_layer( - &self, - layer_sha256: Sha256Digest, - descriptor: &Descriptor, - ) -> Result { - // We need to use the per_manifest descriptor to download the compressed layer but it gets - // stored in the repository via the per_config descriptor. Our return value is the - // fsverity digest for the corresponding splitstream. - - if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? { - self.progress - .println(format!("Already have layer {}", hex::encode(layer_sha256)))?; - Ok(layer_id) - } else { - // Otherwise, we need to fetch it... - let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?; - - // See https://github.com/containers/containers-image-proxy-rs/issues/71 - let blob_reader = blob_reader.take(descriptor.size()); - - let bar = self.progress.add(ProgressBar::new(descriptor.size())); - bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}") - .unwrap() - .progress_chars("##-")); - let progress = bar.wrap_async_read(blob_reader); - self.progress - .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; - - let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); - match descriptor.media_type() { - MediaType::ImageLayer => { - split_async(progress, &mut splitstream).await?; - } - MediaType::ImageLayerGzip => { - split_async(GzipDecoder::new(progress), &mut splitstream).await?; - } - MediaType::ImageLayerZstd => { - split_async(ZstdDecoder::new(progress), &mut splitstream).await?; - } - other => bail!("Unsupported layer media type {:?}", other), - }; - let layer_id = self.repo.write_stream(splitstream, None)?; - - // We intentionally explicitly ignore this, even though we're supposed to check it. - // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion - // about why. Note: we only care about the uncompressed layer tar, and we checksum it - // ourselves. - drop(driver); - - Ok(layer_id) - } - } - - pub async fn ensure_config( - self: &Arc, - manifest_layers: &[Descriptor], - descriptor: &Descriptor, - ) -> Result> { - let config_sha256 = sha256_from_descriptor(descriptor)?; - if let Some(config_id) = self.repo.check_stream(&config_sha256)? { - // We already got this config? Nice. - self.progress.println(format!( - "Already have container config {}", - hex::encode(config_sha256) - ))?; - Ok((config_sha256, config_id)) - } else { - // We need to add the config to the repo. We need to parse the config and make sure we - // have all of the layers first. - // - self.progress - .println(format!("Fetching config {}", hex::encode(config_sha256)))?; - - let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?; - let config = async move { - let mut s = Vec::new(); - config.read_to_end(&mut s).await?; - anyhow::Ok(s) - }; - let (config, driver) = tokio::join!(config, driver); - let _: () = driver?; - let raw_config = config?; - let config = ImageConfiguration::from_reader(&raw_config[..])?; - - // We want to sort the layers based on size so we can get started on the big layers - // first. The last thing we want is to start on the biggest layer right at the end. - let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect(); - layers.sort_by_key(|(mld, ..)| Reverse(mld.size())); - - // Bound the number of tasks to the available parallelism. - let threads = available_parallelism()?; - let sem = Arc::new(Semaphore::new(threads.into())); - let mut entries = vec![]; - for (mld, diff_id) in layers { - let self_ = Arc::clone(self); - let permit = Arc::clone(&sem).acquire_owned().await?; - let layer_sha256 = sha256_from_digest(diff_id)?; - let descriptor = mld.clone(); - let future = tokio::spawn(async move { - let _permit = permit; - self_.ensure_layer(layer_sha256, &descriptor).await - }); - entries.push((layer_sha256, future)); - } - - // Collect the results. - let mut config_maps = DigestMap::new(); - for (layer_sha256, future) in entries { - config_maps.insert(&layer_sha256, &future.await??); - } - - let mut splitstream = self - .repo - .create_stream(Some(config_sha256), Some(config_maps)); - splitstream.write_inline(&raw_config); - let config_id = self.repo.write_stream(splitstream, None)?; - - Ok((config_sha256, config_id)) - } - } - - pub async fn pull(self: &Arc) -> Result> { - let (_manifest_digest, raw_manifest) = self - .proxy - .fetch_manifest_raw_oci(&self.img) - .await - .context("Fetching manifest")?; - - // We need to add the manifest to the repo. We need to parse the manifest and make - // sure we have the config first (which will also pull in the layers). - let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?; - let config_descriptor = manifest.config(); - let layers = manifest.layers(); - self.ensure_config(layers, config_descriptor) - .await - .with_context(|| format!("Failed to pull config {config_descriptor:?}")) - } -} - /// Pull the target image, and add the provided tag. If this is a mountable /// image (i.e. not an artifact), it is *not* unpacked by default. pub async fn pull( @@ -240,16 +63,16 @@ pub async fn pull( imgref: &str, reference: Option<&str>, ) -> Result<(Sha256Digest, ObjectID)> { - let op = Arc::new(ImageOp::new(repo, imgref).await?); - let (sha256, id) = op - .pull() - .await - .with_context(|| format!("Unable to pull container image {imgref}"))?; - - if let Some(name) = reference { - repo.name_stream(sha256, name)?; + if imgref.starts_with('.') || imgref.starts_with('/') { + // Reserved for future oci-dir backend: eg. /abs/path, ./rel/path, ../parent/path + bail!("Invalid reference format"); + } else if Some(':') == imgref.chars().filter(char::is_ascii_punctuation).next() { + // If the first punctuation is ':' then this is a skopeo reference (`transport:...`) + skopeo::pull(repo, imgref, reference).await + } else { + // Otherwise treat it as an oci-client reference (like quay.io/fedora/fedora-bootc:42) + client::PullOp::pull(imgref.try_into()?, repo.clone()).await } - Ok((sha256, id)) } pub fn open_config( diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs new file mode 100644 index 00000000..65c485a7 --- /dev/null +++ b/crates/composefs-oci/src/skopeo.rs @@ -0,0 +1,208 @@ +use std::{cmp::Reverse, process::Command, thread::available_parallelism}; + +use std::{iter::zip, sync::Arc}; + +use anyhow::{bail, Context, Result}; +use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; +use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; +use tokio::{io::AsyncReadExt, sync::Semaphore}; + +use composefs::{ + fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest, +}; + +use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity}; + +struct ImageOp { + repo: Arc>, + proxy: ImageProxy, + img: OpenedImage, + progress: MultiProgress, +} + +impl ImageOp { + async fn new(repo: &Arc>, imgref: &str) -> Result { + // See https://github.com/containers/skopeo/issues/2563 + let skopeo_cmd = if imgref.starts_with("containers-storage:") { + let mut cmd = Command::new("podman"); + cmd.args(["unshare", "skopeo"]); + Some(cmd) + } else { + None + }; + + let config = ImageProxyConfig { + skopeo_cmd, + // auth_anonymous: true, debug: true, insecure_skip_tls_verification: Some(true), + ..ImageProxyConfig::default() + }; + let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?; + let img = proxy.open_image(imgref).await.context("Opening image")?; + let progress = MultiProgress::new(); + Ok(ImageOp { + repo: Arc::clone(repo), + proxy, + img, + progress, + }) + } + + pub async fn ensure_layer( + &self, + layer_sha256: Sha256Digest, + descriptor: &Descriptor, + ) -> Result { + // We need to use the per_manifest descriptor to download the compressed layer but it gets + // stored in the repository via the per_config descriptor. Our return value is the + // fsverity digest for the corresponding splitstream. + + if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? { + self.progress + .println(format!("Already have layer {}", hex::encode(layer_sha256)))?; + Ok(layer_id) + } else { + // Otherwise, we need to fetch it... + let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?; + + // See https://github.com/containers/containers-image-proxy-rs/issues/71 + let blob_reader = blob_reader.take(descriptor.size()); + + let bar = self.progress.add(ProgressBar::new(descriptor.size())); + bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}") + .unwrap() + .progress_chars("##-")); + let progress = bar.wrap_async_read(blob_reader); + self.progress + .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; + + let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); + match descriptor.media_type() { + MediaType::ImageLayer => { + split_async(progress, &mut splitstream).await?; + } + MediaType::ImageLayerGzip => { + split_async(GzipDecoder::new(progress), &mut splitstream).await?; + } + MediaType::ImageLayerZstd => { + split_async(ZstdDecoder::new(progress), &mut splitstream).await?; + } + other => bail!("Unsupported layer media type {:?}", other), + }; + let layer_id = self.repo.write_stream(splitstream, None)?; + + // We intentionally explicitly ignore this, even though we're supposed to check it. + // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion + // about why. Note: we only care about the uncompressed layer tar, and we checksum it + // ourselves. + drop(driver); + + Ok(layer_id) + } + } + + pub async fn ensure_config( + self: &Arc, + manifest_layers: &[Descriptor], + descriptor: &Descriptor, + ) -> Result> { + let config_sha256 = sha256_from_descriptor(descriptor)?; + if let Some(config_id) = self.repo.check_stream(&config_sha256)? { + // We already got this config? Nice. + self.progress.println(format!( + "Already have container config {}", + hex::encode(config_sha256) + ))?; + Ok((config_sha256, config_id)) + } else { + // We need to add the config to the repo. We need to parse the config and make sure we + // have all of the layers first. + // + self.progress + .println(format!("Fetching config {}", hex::encode(config_sha256)))?; + + let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?; + let config = async move { + let mut s = Vec::new(); + config.read_to_end(&mut s).await?; + anyhow::Ok(s) + }; + let (config, driver) = tokio::join!(config, driver); + let _: () = driver?; + let raw_config = config?; + let config = ImageConfiguration::from_reader(&raw_config[..])?; + + // We want to sort the layers based on size so we can get started on the big layers + // first. The last thing we want is to start on the biggest layer right at the end. + let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect(); + layers.sort_by_key(|(mld, ..)| Reverse(mld.size())); + + // Bound the number of tasks to the available parallelism. + let threads = available_parallelism()?; + let sem = Arc::new(Semaphore::new(threads.into())); + let mut entries = vec![]; + for (mld, diff_id) in layers { + let self_ = Arc::clone(self); + let permit = Arc::clone(&sem).acquire_owned().await?; + let layer_sha256 = sha256_from_digest(diff_id)?; + let descriptor = mld.clone(); + let future = tokio::spawn(async move { + let _permit = permit; + self_.ensure_layer(layer_sha256, &descriptor).await + }); + entries.push((layer_sha256, future)); + } + + // Collect the results. + let mut config_maps = DigestMap::new(); + for (layer_sha256, future) in entries { + config_maps.insert(&layer_sha256, &future.await??); + } + + let mut splitstream = self + .repo + .create_stream(Some(config_sha256), Some(config_maps)); + splitstream.write_inline(&raw_config); + let config_id = self.repo.write_stream(splitstream, None)?; + + Ok((config_sha256, config_id)) + } + } + + pub async fn pull(self: &Arc) -> Result> { + let (_manifest_digest, raw_manifest) = self + .proxy + .fetch_manifest_raw_oci(&self.img) + .await + .context("Fetching manifest")?; + + // We need to add the manifest to the repo. We need to parse the manifest and make + // sure we have the config first (which will also pull in the layers). + let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?; + let config_descriptor = manifest.config(); + let layers = manifest.layers(); + self.ensure_config(layers, config_descriptor) + .await + .with_context(|| format!("Failed to pull config {config_descriptor:?}")) + } +} + +/// Pull the target image, and add the provided tag. If this is a mountable +/// image (i.e. not an artifact), it is *not* unpacked by default. +pub async fn pull( + repo: &Arc>, + imgref: &str, + reference: Option<&str>, +) -> Result<(Sha256Digest, ObjectID)> { + let op = Arc::new(ImageOp::new(repo, imgref).await?); + let (sha256, id) = op + .pull() + .await + .with_context(|| format!("Unable to pull container image {imgref}"))?; + + if let Some(name) = reference { + repo.name_stream(sha256, name)?; + } + Ok((sha256, id)) +}