Skip to content

Commit 06b5368

Browse files
oci: make oci pull multi-threaded async
Add an async version of `Repository::ensure_object()` and wire it through `SplitStreamWriter::write_external()`. Call that when we're splitting OCI layer tarballs to offload the writing of external objects (and the `fdatasync()` that goes with it) to a separate thread. This is something like some prep work for something we've been trying to accomplish for a while in #62 but it doesn't come close to the complete picture (since it still writes the objects sequentially). Modify the (already) async code in oci::ImageOp to download layers in parallel. This is a big deal for images with many layers (as is often the case for bootc images, due to the splitting heuristics). This takes a pull of the Fedora Silverblue 42 container image (when pulled from a local `oci-dir`) from ~90s to ~8.5s time to complete on my laptop. Unfortunately, container images made from large single layers are not substantially improved. In order to make this change we need to depend on a new version of containers-image-proxy-rs which makes ImageProxy: Send + Sync, so bump our required version to the one released today. Signed-off-by: Allison Karlitskaya <[email protected]>
1 parent d902df4 commit 06b5368

File tree

6 files changed

+46
-15
lines changed

6 files changed

+46
-15
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ rhel9 = ['pre-6.15']
2020
anyhow = { version = "1.0.87", default-features = false }
2121
async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] }
2222
clap = { version = "4.0.1", default-features = false, features = ["std", "help", "usage", "derive"] }
23-
containers-image-proxy = "0.7.0"
23+
containers-image-proxy = "0.7.1"
2424
env_logger = "0.11.0"
2525
hex = "0.4.0"
2626
indicatif = { version = "0.17.0", features = ["tokio"] }

src/fsverity/hashvalue.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
77
pub trait FsVerityHashValue
88
where
99
Self: Clone,
10-
Self: Send + Sync + 'static,
1110
Self: From<Output<Self::Digest>>,
1211
Self: FromBytes + Immutable + IntoBytes + KnownLayout + Unaligned,
1312
Self: Hash + Eq,
1413
Self: fmt::Debug,
14+
Self: Send + Sync + Unpin + 'static,
1515
{
1616
type Digest: Digest + FixedOutputReset + fmt::Debug;
1717
const ALGORITHM: u8;

src/oci/mod.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::process::Command;
1+
use std::{cmp::Reverse, process::Command, thread::available_parallelism};
22

33
pub mod image;
44
pub mod tar;
@@ -11,7 +11,7 @@ use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
1111
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
1212
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
1313
use sha2::{Digest, Sha256};
14-
use tokio::io::AsyncReadExt;
14+
use tokio::{io::AsyncReadExt, sync::Semaphore};
1515

1616
use crate::{
1717
fs::write_to_path,
@@ -96,14 +96,14 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
9696

9797
pub async fn ensure_layer(
9898
&self,
99-
layer_sha256: &Sha256Digest,
99+
layer_sha256: Sha256Digest,
100100
descriptor: &Descriptor,
101101
) -> Result<ObjectID> {
102102
// We need to use the per_manifest descriptor to download the compressed layer but it gets
103103
// stored in the repository via the per_config descriptor. Our return value is the
104104
// fsverity digest for the corresponding splitstream.
105105

106-
if let Some(layer_id) = self.repo.check_stream(layer_sha256)? {
106+
if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
107107
self.progress
108108
.println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
109109
Ok(layer_id)
@@ -122,7 +122,7 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
122122
self.progress
123123
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
124124

125-
let mut splitstream = self.repo.create_stream(Some(*layer_sha256), None);
125+
let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
126126
match descriptor.media_type() {
127127
MediaType::ImageLayer => {
128128
split_async(progress, &mut splitstream).await?;
@@ -172,14 +172,31 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
172172
let raw_config = config?;
173173
let config = ImageConfiguration::from_reader(&raw_config[..])?;
174174

175+
// We want to sort the layers based on size so we can get started on the big layers
176+
// first. The last thing we want is to start on the biggest layer right at the end.
177+
let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
178+
layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
179+
180+
// Bound the number of tasks to the available parallelism.
181+
let threads = available_parallelism()?;
182+
let sem = Arc::new(Semaphore::new(threads.into()));
183+
let mut entries = vec![];
184+
for (mld, diff_id) in layers {
185+
let self_ = Arc::clone(self);
186+
let permit = Arc::clone(&sem).acquire_owned().await?;
187+
let layer_sha256 = sha256_from_digest(diff_id)?;
188+
let descriptor = mld.clone();
189+
let future = tokio::spawn(async move {
190+
let _permit = permit;
191+
self_.ensure_layer(layer_sha256, &descriptor).await
192+
});
193+
entries.push((layer_sha256, future));
194+
}
195+
196+
// Collect the results.
175197
let mut config_maps = DigestMap::new();
176-
for (mld, cld) in zip(manifest_layers, config.rootfs().diff_ids()) {
177-
let layer_sha256 = sha256_from_digest(cld)?;
178-
let layer_id = self
179-
.ensure_layer(&layer_sha256, mld)
180-
.await
181-
.with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?;
182-
config_maps.insert(&layer_sha256, &layer_id);
198+
for (layer_sha256, future) in entries {
199+
config_maps.insert(&layer_sha256, &future.await??);
183200
}
184201

185202
let mut splitstream = self

src/oci/tar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub async fn split_async(
9494
if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
9595
// non-empty regular file: store the data in the object store
9696
let padding = buffer.split_off(actual_size);
97-
writer.write_external(&buffer, padding)?;
97+
writer.write_external_async(buffer, padding).await?;
9898
} else {
9999
// else: store the data inline in the split stream
100100
writer.write_inline(&buffer);

src/repository.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
106106
})
107107
}
108108

109+
pub async fn ensure_object_async(self: &Arc<Self>, data: Vec<u8>) -> Result<ObjectID> {
110+
let self_ = Arc::clone(self);
111+
tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await?
112+
}
113+
109114
pub fn ensure_object(&self, data: &[u8]) -> Result<ObjectID> {
110115
let dirfd = self.objects_dir()?;
111116
let id: ObjectID = compute_verity(data);

src/splitstream.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,15 @@ impl<ObjectID: FsVerityHashValue> SplitStreamWriter<ObjectID> {
155155
self.write_reference(&id, padding)
156156
}
157157

158+
pub async fn write_external_async(&mut self, data: Vec<u8>, padding: Vec<u8>) -> Result<()> {
159+
if let Some((ref mut sha256, ..)) = self.sha256 {
160+
sha256.update(&data);
161+
sha256.update(&padding);
162+
}
163+
let id = self.repo.ensure_object_async(data).await?;
164+
self.write_reference(&id, padding)
165+
}
166+
158167
pub fn done(mut self) -> Result<ObjectID> {
159168
self.flush_inline(vec![])?;
160169

0 commit comments

Comments
 (0)