Skip to content

Commit 87b7796

Browse files
Parallelize writing of splitstream
Add dependencies on Rayon and Crossbeam Have two modes, single and multi-threaded, for Zstd Encoder Spwan threads in splitstream writer for writing external objects and spawn separate threads for Zstd Encoder. We handle communication between these threads using channels Any image's layers will be handed off to one of the Encoder threads. For now we only have one channel for external object writing, but multiple for Encoders. The reasoning for this is Encoder threads are usually CPU bound while the object writer threads are more IO bound Signed-off-by: Pragyan Poudyal <[email protected]>
1 parent cf63476 commit 87b7796

File tree

6 files changed

+647
-77
lines changed

6 files changed

+647
-77
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ anyhow = { version = "1.0.97", default-features = false }
2121
async-compression = { version = "0.4.22", default-features = false, features = ["tokio", "zstd", "gzip"] }
2222
clap = { version = "4.5.32", default-features = false, features = ["std", "help", "usage", "derive"] }
2323
containers-image-proxy = "0.7.0"
24+
crossbeam = "0.8.4"
2425
env_logger = "0.11.7"
2526
hex = "0.4.3"
2627
indicatif = { version = "0.17.11", features = ["tokio"] }
2728
log = "0.4.27"
2829
oci-spec = "0.7.1"
30+
rayon = "1.10.0"
2931
regex-automata = { version = "0.4.9", default-features = false }
3032
rustix = { version = "1.0.3", features = ["fs", "mount", "process"] }
3133
serde = "1.0.219"

src/oci/mod.rs

Lines changed: 185 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ use crate::{
1818
fsverity::Sha256HashValue,
1919
oci::tar::{get_entry, split_async},
2020
repository::Repository,
21-
splitstream::DigestMap,
21+
splitstream::{
22+
handle_external_object, DigestMap, EnsureObjectMessages, ResultChannelReceiver,
23+
ResultChannelSender, WriterMessages,
24+
},
2225
util::parse_sha256,
26+
zstd_encoder,
2327
};
2428

2529
pub fn import_layer(
@@ -83,6 +87,7 @@ impl<'repo> ImageOp<'repo> {
8387
let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
8488
let img = proxy.open_image(imgref).await.context("Opening image")?;
8589
let progress = MultiProgress::new();
90+
8691
Ok(ImageOp {
8792
repo,
8893
proxy,
@@ -95,47 +100,49 @@ impl<'repo> ImageOp<'repo> {
95100
&self,
96101
layer_sha256: &Sha256HashValue,
97102
descriptor: &Descriptor,
98-
) -> Result<Sha256HashValue> {
103+
layer_num: usize,
104+
object_sender: crossbeam::channel::Sender<EnsureObjectMessages>,
105+
) -> Result<()> {
99106
// We need to use the per_manifest descriptor to download the compressed layer but it gets
100107
// stored in the repository via the per_config descriptor. Our return value is the
101108
// fsverity digest for the corresponding splitstream.
102109

103-
if let Some(layer_id) = self.repo.check_stream(layer_sha256)? {
104-
self.progress
105-
.println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
106-
Ok(layer_id)
107-
} else {
108-
// Otherwise, we need to fetch it...
109-
let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
110-
111-
// See https://github.com/containers/containers-image-proxy-rs/issues/71
112-
let blob_reader = blob_reader.take(descriptor.size());
113-
114-
let bar = self.progress.add(ProgressBar::new(descriptor.size()));
115-
bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
116-
.unwrap()
117-
.progress_chars("##-"));
118-
let progress = bar.wrap_async_read(blob_reader);
119-
self.progress
120-
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
110+
// Otherwise, we need to fetch it...
111+
let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
112+
113+
// See https://github.com/containers/containers-image-proxy-rs/issues/71
114+
let blob_reader = blob_reader.take(descriptor.size());
115+
116+
let bar = self.progress.add(ProgressBar::new(descriptor.size()));
117+
bar.set_style(
118+
ProgressStyle::with_template(
119+
"[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}",
120+
)
121+
.unwrap()
122+
.progress_chars("##-"),
123+
);
124+
let progress = bar.wrap_async_read(blob_reader);
125+
self.progress
126+
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
127+
128+
let mut splitstream =
129+
self.repo
130+
.create_stream(Some(*layer_sha256), None, Some(object_sender));
131+
match descriptor.media_type() {
132+
MediaType::ImageLayer => {
133+
split_async(progress, &mut splitstream, layer_num).await?;
134+
}
135+
MediaType::ImageLayerGzip => {
136+
split_async(GzipDecoder::new(progress), &mut splitstream, layer_num).await?;
137+
}
138+
MediaType::ImageLayerZstd => {
139+
split_async(ZstdDecoder::new(progress), &mut splitstream, layer_num).await?;
140+
}
141+
other => bail!("Unsupported layer media type {:?}", other),
142+
};
143+
driver.await?;
121144

122-
let mut splitstream = self.repo.create_stream(Some(*layer_sha256), None);
123-
match descriptor.media_type() {
124-
MediaType::ImageLayer => {
125-
split_async(progress, &mut splitstream).await?;
126-
}
127-
MediaType::ImageLayerGzip => {
128-
split_async(GzipDecoder::new(progress), &mut splitstream).await?;
129-
}
130-
MediaType::ImageLayerZstd => {
131-
split_async(ZstdDecoder::new(progress), &mut splitstream).await?;
132-
}
133-
other => bail!("Unsupported layer media type {:?}", other),
134-
};
135-
let layer_id = self.repo.write_stream(splitstream, None)?;
136-
driver.await?;
137-
Ok(layer_id)
138-
}
145+
Ok(())
139146
}
140147

141148
pub async fn ensure_config(
@@ -154,7 +161,6 @@ impl<'repo> ImageOp<'repo> {
154161
} else {
155162
// We need to add the config to the repo. We need to parse the config and make sure we
156163
// have all of the layers first.
157-
//
158164
self.progress
159165
.println(format!("Fetching config {}", hex::encode(config_sha256)))?;
160166

@@ -169,26 +175,157 @@ impl<'repo> ImageOp<'repo> {
169175
let raw_config = config?;
170176
let config = ImageConfiguration::from_reader(&raw_config[..])?;
171177

178+
let (done_chan_sender, done_chan_recver, object_sender) = self.spawn_threads(&config);
179+
172180
let mut config_maps = DigestMap::new();
173-
for (mld, cld) in zip(manifest_layers, config.rootfs().diff_ids()) {
181+
182+
for (idx, (mld, cld)) in zip(manifest_layers, config.rootfs().diff_ids()).enumerate() {
174183
let layer_sha256 = sha256_from_digest(cld)?;
175-
let layer_id = self
176-
.ensure_layer(&layer_sha256, mld)
177-
.await
178-
.with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?;
184+
185+
if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
186+
self.progress
187+
.println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
188+
189+
config_maps.insert(&layer_sha256, &layer_id);
190+
} else {
191+
self.ensure_layer(&layer_sha256, mld, idx, object_sender.clone())
192+
.await
193+
.with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?;
194+
}
195+
}
196+
197+
drop(done_chan_sender);
198+
199+
while let Ok(res) = done_chan_recver.recv() {
200+
let (layer_sha256, layer_id) = res?;
179201
config_maps.insert(&layer_sha256, &layer_id);
180202
}
181203

182-
let mut splitstream = self
183-
.repo
184-
.create_stream(Some(config_sha256), Some(config_maps));
204+
let mut splitstream =
205+
self.repo
206+
.create_stream(Some(config_sha256), Some(config_maps), None);
185207
splitstream.write_inline(&raw_config);
186208
let config_id = self.repo.write_stream(splitstream, None)?;
187209

188210
Ok((config_sha256, config_id))
189211
}
190212
}
191213

214+
fn spawn_threads(
215+
&self,
216+
config: &ImageConfiguration,
217+
) -> (
218+
ResultChannelSender,
219+
ResultChannelReceiver,
220+
crossbeam::channel::Sender<EnsureObjectMessages>,
221+
) {
222+
use crossbeam::channel::{unbounded, Receiver, Sender};
223+
224+
let encoder_threads = 2;
225+
let external_object_writer_threads = 4;
226+
227+
let pool = rayon::ThreadPoolBuilder::new()
228+
.num_threads(encoder_threads + external_object_writer_threads)
229+
.build()
230+
.unwrap();
231+
232+
// We need this as writers have internal state that can't be shared between threads
233+
//
234+
// We'll actually need as many writers (not writer threads, but writer instances) as there are layers.
235+
let zstd_writer_channels: Vec<(Sender<WriterMessages>, Receiver<WriterMessages>)> =
236+
(0..encoder_threads).map(|_| unbounded()).collect();
237+
238+
let (object_sender, object_receiver) = unbounded::<EnsureObjectMessages>();
239+
240+
// (layer_sha256, layer_id)
241+
let (done_chan_sender, done_chan_recver) =
242+
std::sync::mpsc::channel::<Result<(Sha256HashValue, Sha256HashValue)>>();
243+
244+
let chunk_len = (config.rootfs().diff_ids().len() + encoder_threads - 1) / encoder_threads;
245+
246+
// Divide the layers into chunks of some specific size so each worker
247+
// thread can work on multiple deterministic layers
248+
let mut chunks: Vec<Vec<Sha256HashValue>> = config
249+
.rootfs()
250+
.diff_ids()
251+
.iter()
252+
.map(|x| sha256_from_digest(x).unwrap())
253+
.collect::<Vec<Sha256HashValue>>()
254+
.chunks(chunk_len)
255+
.map(|x| x.to_vec())
256+
.collect();
257+
258+
// Mapping from layer_id -> index in writer_channels
259+
// This is to make sure that all messages relating to a particular layer
260+
// always reach the same writer
261+
let layers_to_chunks = chunks
262+
.iter()
263+
.enumerate()
264+
.map(|(i, chunk)| std::iter::repeat(i).take(chunk.len()).collect::<Vec<_>>())
265+
.flatten()
266+
.collect::<Vec<_>>();
267+
268+
let _ = (0..encoder_threads)
269+
.map(|i| {
270+
let repository = self.repo.try_clone().unwrap();
271+
let object_sender = object_sender.clone();
272+
let done_chan_sender = done_chan_sender.clone();
273+
let chunk = std::mem::take(&mut chunks[i]);
274+
let receiver = zstd_writer_channels[i].1.clone();
275+
276+
pool.spawn({
277+
move || {
278+
let start = i * (chunk_len);
279+
let end = start + chunk_len;
280+
281+
let enc = zstd_encoder::MultipleZstdWriters::new(
282+
chunk,
283+
repository,
284+
object_sender,
285+
done_chan_sender,
286+
);
287+
288+
if let Err(e) = enc.recv_data(receiver, start, end) {
289+
eprintln!("zstd_encoder returned with error: {}", e.to_string());
290+
return;
291+
}
292+
}
293+
});
294+
})
295+
.collect::<Vec<()>>();
296+
297+
let _ = (0..external_object_writer_threads)
298+
.map(|_| {
299+
pool.spawn({
300+
let repository = self.repo.try_clone().unwrap();
301+
let zstd_writer_channels = zstd_writer_channels
302+
.iter()
303+
.map(|(s, _)| s.clone())
304+
.collect::<Vec<_>>();
305+
let layers_to_chunks = layers_to_chunks.clone();
306+
let external_object_receiver = object_receiver.clone();
307+
308+
move || {
309+
if let Err(e) = handle_external_object(
310+
repository,
311+
external_object_receiver,
312+
zstd_writer_channels,
313+
layers_to_chunks,
314+
) {
315+
eprintln!(
316+
"handle_external_object returned with error: {}",
317+
e.to_string()
318+
);
319+
return;
320+
}
321+
}
322+
});
323+
})
324+
.collect::<Vec<_>>();
325+
326+
return (done_chan_sender, done_chan_recver, object_sender);
327+
}
328+
192329
pub async fn pull(&self) -> Result<(Sha256HashValue, Sha256HashValue)> {
193330
let (_manifest_digest, raw_manifest) = self
194331
.proxy
@@ -201,6 +338,7 @@ impl<'repo> ImageOp<'repo> {
201338
let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
202339
let config_descriptor = manifest.config();
203340
let layers = manifest.layers();
341+
204342
self.ensure_config(layers, config_descriptor)
205343
.await
206344
.with_context(|| format!("Failed to pull config {config_descriptor:?}"))
@@ -280,7 +418,7 @@ pub fn write_config(
280418
let json = config.to_string()?;
281419
let json_bytes = json.as_bytes();
282420
let sha256 = hash(json_bytes);
283-
let mut stream = repo.create_stream(Some(sha256), Some(refs));
421+
let mut stream = repo.create_stream(Some(sha256), Some(refs), None);
284422
stream.write_inline(json_bytes);
285423
let id = repo.write_stream(stream, None)?;
286424
Ok((sha256, id))

src/oci/tar.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use tokio::io::{AsyncRead, AsyncReadExt};
1616
use crate::{
1717
dumpfile,
1818
image::{LeafContent, RegularFile, Stat},
19-
splitstream::{SplitStreamData, SplitStreamReader, SplitStreamWriter},
19+
splitstream::{
20+
EnsureObjectMessages, FinishMessage, SplitStreamData, SplitStreamReader, SplitStreamWriter,
21+
},
2022
util::{read_exactish, read_exactish_async},
2123
INLINE_CONTENT_MAX,
2224
};
@@ -60,7 +62,7 @@ pub fn split<R: Read>(tar_stream: &mut R, writer: &mut SplitStreamWriter) -> Res
6062
if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
6163
// non-empty regular file: store the data in the object store
6264
let padding = buffer.split_off(actual_size);
63-
writer.write_external(&buffer, padding)?;
65+
writer.write_external(buffer, padding, 0, 0)?;
6466
} else {
6567
// else: store the data inline in the split stream
6668
writer.write_inline(&buffer);
@@ -72,7 +74,10 @@ pub fn split<R: Read>(tar_stream: &mut R, writer: &mut SplitStreamWriter) -> Res
7274
pub async fn split_async(
7375
mut tar_stream: impl AsyncRead + Unpin,
7476
writer: &mut SplitStreamWriter<'_>,
77+
layer_num: usize,
7578
) -> Result<()> {
79+
let mut seq_num = 0;
80+
7681
while let Some(header) = read_header_async(&mut tar_stream).await? {
7782
// the header always gets stored as inline data
7883
writer.write_inline(header.as_bytes());
@@ -90,12 +95,22 @@ pub async fn split_async(
9095
if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
9196
// non-empty regular file: store the data in the object store
9297
let padding = buffer.split_off(actual_size);
93-
writer.write_external(&buffer, padding)?;
98+
writer.write_external(buffer, padding, seq_num, layer_num)?;
99+
seq_num += 1;
94100
} else {
95101
// else: store the data inline in the split stream
96102
writer.write_inline(&buffer);
97103
}
98104
}
105+
106+
if let Some(sender) = &writer.object_sender {
107+
sender.send(EnsureObjectMessages::Finish(FinishMessage {
108+
data: std::mem::take(&mut writer.inline_content),
109+
total_msgs: seq_num,
110+
layer_num,
111+
}))?;
112+
}
113+
99114
Ok(())
100115
}
101116

0 commit comments

Comments
 (0)