Skip to content

Commit 2202cac

Browse files
committed
Make create command multithreaded
1 parent 22fff51 commit 2202cac

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

factoria/src/create.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,15 @@ fn create(cmd: &Command, json_file: &Path) -> eyre::Result<()> {
8080
let ind = indicatif::ProgressBar::new(entries.iter().filter(|a| a.is_some()).count() as _)
8181
.with_style(style)
8282
.with_prefix(out_dir.display().to_string());
83-
for (id, e) in entries.into_iter().progress_with(ind.clone()).enumerate() {
84-
let (mut ent, data) = process_entry(e, json_file)?;
83+
let iter = par_map(
84+
entries.into_iter(),
85+
{
86+
let json_file = json_file.to_owned();
87+
move |e| process_entry(e, &json_file)
88+
},
89+
).progress_with(ind.clone());
90+
for (id, e) in iter.enumerate() {
91+
let (mut ent, data) = e?;
8592

8693
if let Some(data) = data {
8794
let pos = out_dat.seek(SeekFrom::End(0))?;
@@ -148,6 +155,31 @@ fn process_entry(e: Option<Entry>, json_file: &Path) -> eyre::Result<(DirEntry,
148155
Ok((ent, data))
149156
}
150157

158+
fn par_map<T, U>(
159+
iter: impl Iterator<Item=T> + Send + 'static,
160+
map: impl Fn(T) -> U + Send + Sync + 'static,
161+
) -> impl Iterator<Item=U> where
162+
T: Send + 'static,
163+
U: Send + 'static,
164+
{
165+
use std::sync::mpsc;
166+
use rayon::prelude::*;
167+
let (channel_send, channel_recv) = mpsc::channel();
168+
std::thread::spawn(move || {
169+
iter.map_while(move |item| {
170+
let (result_send, result_recv) = mpsc::sync_channel(0);
171+
channel_send.send(result_recv).ok()?;
172+
Some((item, result_send))
173+
})
174+
.par_bridge()
175+
.try_for_each(|(item, result_send)| {
176+
result_send.send(map(item))
177+
})
178+
.ok()
179+
});
180+
channel_recv.into_iter().map_while(|a| a.recv().ok())
181+
}
182+
151183
fn parse_compress_mode<'de, D: serde::Deserializer<'de>>(des: D) -> Result<Option<bzip::CompressMode>, D::Error> {
152184
match <Option<u8>>::deserialize(des)? {
153185
Some(1) => Ok(Some(bzip::CompressMode::Mode1)),

0 commit comments

Comments
 (0)