Skip to content

Commit 3b6c649

Browse files
oci: split pulling code to new file, skopeo.rs
This code pulls images via containers-image-proxy-rs (which is a wrapper around skopeo). It's a bit too large to have it in the top-level lib.rs, and we're about to add an alternate implementation, so split it out. We keep a pull() function in place for compatibility reasons to avoid needing to update all callers: we will start using this function to dispatch to the correct implementation depending on the reference format. Signed-off-by: Allison Karlitskaya <[email protected]>
1 parent 7a0ad6c commit 3b6c649

File tree

2 files changed

+229
-208
lines changed

2 files changed

+229
-208
lines changed

crates/composefs-oci/src/lib.rs

Lines changed: 21 additions & 208 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
1-
use std::{cmp::Reverse, process::Command, thread::available_parallelism};
2-
31
pub mod image;
2+
pub mod skopeo;
43
pub mod tar;
54

6-
use std::{collections::HashMap, io::Read, iter::zip, sync::Arc};
5+
use std::{collections::HashMap, io::Read, sync::Arc};
76

87
use anyhow::{bail, ensure, Context, Result};
9-
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
10-
use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
11-
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
12-
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
8+
use oci_spec::image::{Descriptor, ImageConfiguration};
139
use sha2::{Digest, Sha256};
14-
use tokio::{io::AsyncReadExt, sync::Semaphore};
1510

1611
use composefs::{
1712
fsverity::FsVerityHashValue,
@@ -20,7 +15,23 @@ use composefs::{
2015
util::{parse_sha256, Sha256Digest},
2116
};
2217

23-
use crate::tar::{get_entry, split_async};
18+
use crate::tar::get_entry;
19+
20+
type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);
21+
22+
pub(crate) fn sha256_from_descriptor(descriptor: &Descriptor) -> Result<Sha256Digest> {
23+
let Some(digest) = descriptor.as_digest_sha256() else {
24+
bail!("Descriptor in oci config is not sha256");
25+
};
26+
Ok(parse_sha256(digest)?)
27+
}
28+
29+
pub(crate) fn sha256_from_digest(digest: &str) -> Result<Sha256Digest> {
30+
match digest.strip_prefix("sha256:") {
31+
Some(rest) => Ok(parse_sha256(rest)?),
32+
None => bail!("Manifest has non-sha256 digest"),
33+
}
34+
}
2435

2536
pub fn import_layer<ObjectID: FsVerityHashValue>(
2637
repo: &Arc<Repository<ObjectID>>,
@@ -44,212 +55,14 @@ pub fn ls_layer<ObjectID: FsVerityHashValue>(
4455
Ok(())
4556
}
4657

47-
struct ImageOp<ObjectID: FsVerityHashValue> {
48-
repo: Arc<Repository<ObjectID>>,
49-
proxy: ImageProxy,
50-
img: OpenedImage,
51-
progress: MultiProgress,
52-
}
53-
54-
fn sha256_from_descriptor(descriptor: &Descriptor) -> Result<Sha256Digest> {
55-
let Some(digest) = descriptor.as_digest_sha256() else {
56-
bail!("Descriptor in oci config is not sha256");
57-
};
58-
Ok(parse_sha256(digest)?)
59-
}
60-
61-
fn sha256_from_digest(digest: &str) -> Result<Sha256Digest> {
62-
match digest.strip_prefix("sha256:") {
63-
Some(rest) => Ok(parse_sha256(rest)?),
64-
None => bail!("Manifest has non-sha256 digest"),
65-
}
66-
}
67-
68-
type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);
69-
70-
impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
71-
async fn new(repo: &Arc<Repository<ObjectID>>, imgref: &str) -> Result<Self> {
72-
// See https://github.com/containers/skopeo/issues/2563
73-
let skopeo_cmd = if imgref.starts_with("containers-storage:") {
74-
let mut cmd = Command::new("podman");
75-
cmd.args(["unshare", "skopeo"]);
76-
Some(cmd)
77-
} else {
78-
None
79-
};
80-
81-
let config = ImageProxyConfig {
82-
skopeo_cmd,
83-
// auth_anonymous: true, debug: true, insecure_skip_tls_verification: Some(true),
84-
..ImageProxyConfig::default()
85-
};
86-
let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
87-
let img = proxy.open_image(imgref).await.context("Opening image")?;
88-
let progress = MultiProgress::new();
89-
Ok(ImageOp {
90-
repo: Arc::clone(repo),
91-
proxy,
92-
img,
93-
progress,
94-
})
95-
}
96-
97-
pub async fn ensure_layer(
98-
&self,
99-
layer_sha256: Sha256Digest,
100-
descriptor: &Descriptor,
101-
) -> Result<ObjectID> {
102-
// We need to use the per_manifest descriptor to download the compressed layer but it gets
103-
// stored in the repository via the per_config descriptor. Our return value is the
104-
// fsverity digest for the corresponding splitstream.
105-
106-
if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
107-
self.progress
108-
.println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
109-
Ok(layer_id)
110-
} else {
111-
// Otherwise, we need to fetch it...
112-
let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
113-
114-
// See https://github.com/containers/containers-image-proxy-rs/issues/71
115-
let blob_reader = blob_reader.take(descriptor.size());
116-
117-
let bar = self.progress.add(ProgressBar::new(descriptor.size()));
118-
bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
119-
.unwrap()
120-
.progress_chars("##-"));
121-
let progress = bar.wrap_async_read(blob_reader);
122-
self.progress
123-
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
124-
125-
let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
126-
match descriptor.media_type() {
127-
MediaType::ImageLayer => {
128-
split_async(progress, &mut splitstream).await?;
129-
}
130-
MediaType::ImageLayerGzip => {
131-
split_async(GzipDecoder::new(progress), &mut splitstream).await?;
132-
}
133-
MediaType::ImageLayerZstd => {
134-
split_async(ZstdDecoder::new(progress), &mut splitstream).await?;
135-
}
136-
other => bail!("Unsupported layer media type {:?}", other),
137-
};
138-
let layer_id = self.repo.write_stream(splitstream, None)?;
139-
140-
// We intentionally explicitly ignore this, even though we're supposed to check it.
141-
// See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion
142-
// about why. Note: we only care about the uncompressed layer tar, and we checksum it
143-
// ourselves.
144-
drop(driver);
145-
146-
Ok(layer_id)
147-
}
148-
}
149-
150-
pub async fn ensure_config(
151-
self: &Arc<Self>,
152-
manifest_layers: &[Descriptor],
153-
descriptor: &Descriptor,
154-
) -> Result<ContentAndVerity<ObjectID>> {
155-
let config_sha256 = sha256_from_descriptor(descriptor)?;
156-
if let Some(config_id) = self.repo.check_stream(&config_sha256)? {
157-
// We already got this config? Nice.
158-
self.progress.println(format!(
159-
"Already have container config {}",
160-
hex::encode(config_sha256)
161-
))?;
162-
Ok((config_sha256, config_id))
163-
} else {
164-
// We need to add the config to the repo. We need to parse the config and make sure we
165-
// have all of the layers first.
166-
//
167-
self.progress
168-
.println(format!("Fetching config {}", hex::encode(config_sha256)))?;
169-
170-
let (mut config, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
171-
let config = async move {
172-
let mut s = Vec::new();
173-
config.read_to_end(&mut s).await?;
174-
anyhow::Ok(s)
175-
};
176-
let (config, driver) = tokio::join!(config, driver);
177-
let _: () = driver?;
178-
let raw_config = config?;
179-
let config = ImageConfiguration::from_reader(&raw_config[..])?;
180-
181-
// We want to sort the layers based on size so we can get started on the big layers
182-
// first. The last thing we want is to start on the biggest layer right at the end.
183-
let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
184-
layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));
185-
186-
// Bound the number of tasks to the available parallelism.
187-
let threads = available_parallelism()?;
188-
let sem = Arc::new(Semaphore::new(threads.into()));
189-
let mut entries = vec![];
190-
for (mld, diff_id) in layers {
191-
let self_ = Arc::clone(self);
192-
let permit = Arc::clone(&sem).acquire_owned().await?;
193-
let layer_sha256 = sha256_from_digest(diff_id)?;
194-
let descriptor = mld.clone();
195-
let future = tokio::spawn(async move {
196-
let _permit = permit;
197-
self_.ensure_layer(layer_sha256, &descriptor).await
198-
});
199-
entries.push((layer_sha256, future));
200-
}
201-
202-
// Collect the results.
203-
let mut config_maps = DigestMap::new();
204-
for (layer_sha256, future) in entries {
205-
config_maps.insert(&layer_sha256, &future.await??);
206-
}
207-
208-
let mut splitstream = self
209-
.repo
210-
.create_stream(Some(config_sha256), Some(config_maps));
211-
splitstream.write_inline(&raw_config);
212-
let config_id = self.repo.write_stream(splitstream, None)?;
213-
214-
Ok((config_sha256, config_id))
215-
}
216-
}
217-
218-
pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
219-
let (_manifest_digest, raw_manifest) = self
220-
.proxy
221-
.fetch_manifest_raw_oci(&self.img)
222-
.await
223-
.context("Fetching manifest")?;
224-
225-
// We need to add the manifest to the repo. We need to parse the manifest and make
226-
// sure we have the config first (which will also pull in the layers).
227-
let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
228-
let config_descriptor = manifest.config();
229-
let layers = manifest.layers();
230-
self.ensure_config(layers, config_descriptor)
231-
.await
232-
.with_context(|| format!("Failed to pull config {config_descriptor:?}"))
233-
}
234-
}
235-
23658
/// Pull the target image, and add the provided tag. If this is a mountable
23759
/// image (i.e. not an artifact), it is *not* unpacked by default.
23860
pub async fn pull<ObjectID: FsVerityHashValue>(
23961
repo: &Arc<Repository<ObjectID>>,
24062
imgref: &str,
24163
reference: Option<&str>,
24264
) -> Result<(Sha256Digest, ObjectID)> {
243-
let op = Arc::new(ImageOp::new(repo, imgref).await?);
244-
let (sha256, id) = op
245-
.pull()
246-
.await
247-
.with_context(|| format!("Unable to pull container image {imgref}"))?;
248-
249-
if let Some(name) = reference {
250-
repo.name_stream(sha256, name)?;
251-
}
252-
Ok((sha256, id))
65+
skopeo::pull(repo, imgref, reference).await
25366
}
25467

25568
pub fn open_config<ObjectID: FsVerityHashValue>(

0 commit comments

Comments
 (0)