Skip to content

Commit 6618f29

Browse files
Parallelize writing of splitstream
Add dependencies on Rayon and Crossbeam Have two modes, single and multi-threaded, for Zstd Encoder Spwan threads in splitstream writer for writing external objects and spawn separate threads for Zstd Encoder. We handle communication between these threads using channels Any image's layers will be handed off to one of the Encoder threads. For now we only have one channel for external object writing, but multiple for Encoders. The reasoning for this is Encoder threads are usually CPU bound while the object writer threads are more IO bound Signed-off-by: Pragyan Poudyal <[email protected]>
1 parent 3111cf1 commit 6618f29

File tree

6 files changed

+647
-77
lines changed

6 files changed

+647
-77
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ anyhow = { version = "1.0.97", default-features = false }
2121
async-compression = { version = "0.4.22", default-features = false, features = ["tokio", "zstd", "gzip"] }
2222
clap = { version = "4.5.32", default-features = false, features = ["std", "help", "usage", "derive"] }
2323
containers-image-proxy = "0.7.0"
24+
crossbeam = "0.8.4"
2425
env_logger = "0.11.7"
2526
hex = "0.4.3"
2627
indicatif = { version = "0.17.11", features = ["tokio"] }
2728
log = "0.4.27"
2829
oci-spec = "0.7.1"
30+
rayon = "1.10.0"
2931
regex-automata = { version = "0.4.9", default-features = false }
3032
rustix = { version = "1.0.3", features = ["fs", "mount", "process"] }
3133
serde = "1.0.219"

src/oci/mod.rs

Lines changed: 185 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ use crate::{
1818
fsverity::Sha256HashValue,
1919
oci::tar::{get_entry, split_async},
2020
repository::Repository,
21-
splitstream::DigestMap,
21+
splitstream::{
22+
handle_external_object, DigestMap, EnsureObjectMessages, ResultChannelReceiver,
23+
ResultChannelSender, WriterMessages,
24+
},
2225
util::parse_sha256,
26+
zstd_encoder,
2327
};
2428

2529
pub fn import_layer(
@@ -83,6 +87,7 @@ impl<'repo> ImageOp<'repo> {
8387
let proxy = containers_image_proxy::ImageProxy::new_with_config(config).await?;
8488
let img = proxy.open_image(imgref).await.context("Opening image")?;
8589
let progress = MultiProgress::new();
90+
8691
Ok(ImageOp {
8792
repo,
8893
proxy,
@@ -95,47 +100,49 @@ impl<'repo> ImageOp<'repo> {
95100
&self,
96101
layer_sha256: &Sha256HashValue,
97102
descriptor: &Descriptor,
98-
) -> Result<Sha256HashValue> {
103+
layer_num: usize,
104+
object_sender: crossbeam::channel::Sender<EnsureObjectMessages>,
105+
) -> Result<()> {
99106
// We need to use the per_manifest descriptor to download the compressed layer but it gets
100107
// stored in the repository via the per_config descriptor. Our return value is the
101108
// fsverity digest for the corresponding splitstream.
102109

103-
if let Some(layer_id) = self.repo.check_stream(layer_sha256)? {
104-
self.progress
105-
.println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
106-
Ok(layer_id)
107-
} else {
108-
// Otherwise, we need to fetch it...
109-
let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
110-
111-
// See https://github.com/containers/containers-image-proxy-rs/issues/71
112-
let blob_reader = blob_reader.take(descriptor.size());
113-
114-
let bar = self.progress.add(ProgressBar::new(descriptor.size()));
115-
bar.set_style(ProgressStyle::with_template("[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}")
116-
.unwrap()
117-
.progress_chars("##-"));
118-
let progress = bar.wrap_async_read(blob_reader);
119-
self.progress
120-
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
110+
// Otherwise, we need to fetch it...
111+
let (blob_reader, driver) = self.proxy.get_descriptor(&self.img, descriptor).await?;
112+
113+
// See https://github.com/containers/containers-image-proxy-rs/issues/71
114+
let blob_reader = blob_reader.take(descriptor.size());
115+
116+
let bar = self.progress.add(ProgressBar::new(descriptor.size()));
117+
bar.set_style(
118+
ProgressStyle::with_template(
119+
"[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}",
120+
)
121+
.unwrap()
122+
.progress_chars("##-"),
123+
);
124+
let progress = bar.wrap_async_read(blob_reader);
125+
self.progress
126+
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
127+
128+
let mut splitstream =
129+
self.repo
130+
.create_stream(Some(*layer_sha256), None, Some(object_sender));
131+
match descriptor.media_type() {
132+
MediaType::ImageLayer => {
133+
split_async(progress, &mut splitstream, layer_num).await?;
134+
}
135+
MediaType::ImageLayerGzip => {
136+
split_async(GzipDecoder::new(progress), &mut splitstream, layer_num).await?;
137+
}
138+
MediaType::ImageLayerZstd => {
139+
split_async(ZstdDecoder::new(progress), &mut splitstream, layer_num).await?;
140+
}
141+
other => bail!("Unsupported layer media type {:?}", other),
142+
};
143+
driver.await?;
121144

122-
let mut splitstream = self.repo.create_stream(Some(*layer_sha256), None);
123-
match descriptor.media_type() {
124-
MediaType::ImageLayer => {
125-
split_async(progress, &mut splitstream).await?;
126-
}
127-
MediaType::ImageLayerGzip => {
128-
split_async(GzipDecoder::new(progress), &mut splitstream).await?;
129-
}
130-
MediaType::ImageLayerZstd => {
131-
split_async(ZstdDecoder::new(progress), &mut splitstream).await?;
132-
}
133-
other => bail!("Unsupported layer media type {:?}", other),
134-
};
135-
let layer_id = self.repo.write_stream(splitstream, None)?;
136-
driver.await?;
137-
Ok(layer_id)
138-
}
145+
Ok(())
139146
}
140147

141148
pub async fn ensure_config(
@@ -154,32 +161,162 @@ impl<'repo> ImageOp<'repo> {
154161
} else {
155162
// We need to add the config to the repo. We need to parse the config and make sure we
156163
// have all of the layers first.
157-
//
158164
self.progress
159165
.println(format!("Fetching config {}", hex::encode(config_sha256)))?;
160166
let raw_config = self.proxy.fetch_config_raw(&self.img).await?;
161167
let config = ImageConfiguration::from_reader(raw_config.as_slice())?;
162168

169+
let (done_chan_sender, done_chan_recver, object_sender) = self.spawn_threads(&config);
170+
163171
let mut config_maps = DigestMap::new();
164-
for (mld, cld) in zip(manifest_layers, config.rootfs().diff_ids()) {
172+
173+
for (idx, (mld, cld)) in zip(manifest_layers, config.rootfs().diff_ids()).enumerate() {
165174
let layer_sha256 = sha256_from_digest(cld)?;
166-
let layer_id = self
167-
.ensure_layer(&layer_sha256, mld)
168-
.await
169-
.with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?;
175+
176+
if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
177+
self.progress
178+
.println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
179+
180+
config_maps.insert(&layer_sha256, &layer_id);
181+
} else {
182+
self.ensure_layer(&layer_sha256, mld, idx, object_sender.clone())
183+
.await
184+
.with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?;
185+
}
186+
}
187+
188+
drop(done_chan_sender);
189+
190+
while let Ok(res) = done_chan_recver.recv() {
191+
let (layer_sha256, layer_id) = res?;
170192
config_maps.insert(&layer_sha256, &layer_id);
171193
}
172194

173-
let mut splitstream = self
174-
.repo
175-
.create_stream(Some(config_sha256), Some(config_maps));
195+
let mut splitstream =
196+
self.repo
197+
.create_stream(Some(config_sha256), Some(config_maps), None);
176198
splitstream.write_inline(&raw_config);
177199
let config_id = self.repo.write_stream(splitstream, None)?;
178200

179201
Ok((config_sha256, config_id))
180202
}
181203
}
182204

205+
fn spawn_threads(
206+
&self,
207+
config: &ImageConfiguration,
208+
) -> (
209+
ResultChannelSender,
210+
ResultChannelReceiver,
211+
crossbeam::channel::Sender<EnsureObjectMessages>,
212+
) {
213+
use crossbeam::channel::{unbounded, Receiver, Sender};
214+
215+
let encoder_threads = 2;
216+
let external_object_writer_threads = 4;
217+
218+
let pool = rayon::ThreadPoolBuilder::new()
219+
.num_threads(encoder_threads + external_object_writer_threads)
220+
.build()
221+
.unwrap();
222+
223+
// We need this as writers have internal state that can't be shared between threads
224+
//
225+
// We'll actually need as many writers (not writer threads, but writer instances) as there are layers.
226+
let zstd_writer_channels: Vec<(Sender<WriterMessages>, Receiver<WriterMessages>)> =
227+
(0..encoder_threads).map(|_| unbounded()).collect();
228+
229+
let (object_sender, object_receiver) = unbounded::<EnsureObjectMessages>();
230+
231+
// (layer_sha256, layer_id)
232+
let (done_chan_sender, done_chan_recver) =
233+
std::sync::mpsc::channel::<Result<(Sha256HashValue, Sha256HashValue)>>();
234+
235+
let chunk_len = (config.rootfs().diff_ids().len() + encoder_threads - 1) / encoder_threads;
236+
237+
// Divide the layers into chunks of some specific size so each worker
238+
// thread can work on multiple deterministic layers
239+
let mut chunks: Vec<Vec<Sha256HashValue>> = config
240+
.rootfs()
241+
.diff_ids()
242+
.iter()
243+
.map(|x| sha256_from_digest(x).unwrap())
244+
.collect::<Vec<Sha256HashValue>>()
245+
.chunks(chunk_len)
246+
.map(|x| x.to_vec())
247+
.collect();
248+
249+
// Mapping from layer_id -> index in writer_channels
250+
// This is to make sure that all messages relating to a particular layer
251+
// always reach the same writer
252+
let layers_to_chunks = chunks
253+
.iter()
254+
.enumerate()
255+
.map(|(i, chunk)| std::iter::repeat(i).take(chunk.len()).collect::<Vec<_>>())
256+
.flatten()
257+
.collect::<Vec<_>>();
258+
259+
let _ = (0..encoder_threads)
260+
.map(|i| {
261+
let repository = self.repo.try_clone().unwrap();
262+
let object_sender = object_sender.clone();
263+
let done_chan_sender = done_chan_sender.clone();
264+
let chunk = std::mem::take(&mut chunks[i]);
265+
let receiver = zstd_writer_channels[i].1.clone();
266+
267+
pool.spawn({
268+
move || {
269+
let start = i * (chunk_len);
270+
let end = start + chunk_len;
271+
272+
let enc = zstd_encoder::MultipleZstdWriters::new(
273+
chunk,
274+
repository,
275+
object_sender,
276+
done_chan_sender,
277+
);
278+
279+
if let Err(e) = enc.recv_data(receiver, start, end) {
280+
eprintln!("zstd_encoder returned with error: {}", e.to_string());
281+
return;
282+
}
283+
}
284+
});
285+
})
286+
.collect::<Vec<()>>();
287+
288+
let _ = (0..external_object_writer_threads)
289+
.map(|_| {
290+
pool.spawn({
291+
let repository = self.repo.try_clone().unwrap();
292+
let zstd_writer_channels = zstd_writer_channels
293+
.iter()
294+
.map(|(s, _)| s.clone())
295+
.collect::<Vec<_>>();
296+
let layers_to_chunks = layers_to_chunks.clone();
297+
let external_object_receiver = object_receiver.clone();
298+
299+
move || {
300+
if let Err(e) = handle_external_object(
301+
repository,
302+
external_object_receiver,
303+
zstd_writer_channels,
304+
layers_to_chunks,
305+
) {
306+
eprintln!(
307+
"handle_external_object returned with error: {}",
308+
e.to_string()
309+
);
310+
return;
311+
}
312+
}
313+
});
314+
})
315+
.collect::<Vec<_>>();
316+
317+
return (done_chan_sender, done_chan_recver, object_sender);
318+
}
319+
183320
pub async fn pull(&self) -> Result<(Sha256HashValue, Sha256HashValue)> {
184321
let (_manifest_digest, raw_manifest) = self
185322
.proxy
@@ -192,6 +329,7 @@ impl<'repo> ImageOp<'repo> {
192329
let manifest = ImageManifest::from_reader(raw_manifest.as_slice())?;
193330
let config_descriptor = manifest.config();
194331
let layers = manifest.layers();
332+
195333
self.ensure_config(layers, config_descriptor)
196334
.await
197335
.with_context(|| format!("Failed to pull config {config_descriptor:?}"))
@@ -271,7 +409,7 @@ pub fn write_config(
271409
let json = config.to_string()?;
272410
let json_bytes = json.as_bytes();
273411
let sha256 = hash(json_bytes);
274-
let mut stream = repo.create_stream(Some(sha256), Some(refs));
412+
let mut stream = repo.create_stream(Some(sha256), Some(refs), None);
275413
stream.write_inline(json_bytes);
276414
let id = repo.write_stream(stream, None)?;
277415
Ok((sha256, id))

src/oci/tar.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use tokio::io::{AsyncRead, AsyncReadExt};
1616
use crate::{
1717
dumpfile,
1818
image::{LeafContent, Stat},
19-
splitstream::{SplitStreamData, SplitStreamReader, SplitStreamWriter},
19+
splitstream::{
20+
EnsureObjectMessages, FinishMessage, SplitStreamData, SplitStreamReader, SplitStreamWriter,
21+
},
2022
util::{read_exactish, read_exactish_async},
2123
INLINE_CONTENT_MAX,
2224
};
@@ -60,7 +62,7 @@ pub fn split<R: Read>(tar_stream: &mut R, writer: &mut SplitStreamWriter) -> Res
6062
if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
6163
// non-empty regular file: store the data in the object store
6264
let padding = buffer.split_off(actual_size);
63-
writer.write_external(&buffer, padding)?;
65+
writer.write_external(buffer, padding, 0, 0)?;
6466
} else {
6567
// else: store the data inline in the split stream
6668
writer.write_inline(&buffer);
@@ -72,7 +74,10 @@ pub fn split<R: Read>(tar_stream: &mut R, writer: &mut SplitStreamWriter) -> Res
7274
pub async fn split_async(
7375
mut tar_stream: impl AsyncRead + Unpin,
7476
writer: &mut SplitStreamWriter<'_>,
77+
layer_num: usize,
7578
) -> Result<()> {
79+
let mut seq_num = 0;
80+
7681
while let Some(header) = read_header_async(&mut tar_stream).await? {
7782
// the header always gets stored as inline data
7883
writer.write_inline(header.as_bytes());
@@ -90,12 +95,22 @@ pub async fn split_async(
9095
if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
9196
// non-empty regular file: store the data in the object store
9297
let padding = buffer.split_off(actual_size);
93-
writer.write_external(&buffer, padding)?;
98+
writer.write_external(buffer, padding, seq_num, layer_num)?;
99+
seq_num += 1;
94100
} else {
95101
// else: store the data inline in the split stream
96102
writer.write_inline(&buffer);
97103
}
98104
}
105+
106+
if let Some(sender) = &writer.object_sender {
107+
sender.send(EnsureObjectMessages::Finish(FinishMessage {
108+
data: std::mem::take(&mut writer.inline_content),
109+
total_msgs: seq_num,
110+
layer_num,
111+
}))?;
112+
}
113+
99114
Ok(())
100115
}
101116

0 commit comments

Comments
 (0)