Skip to content

Commit 96bc942

Browse files
Merge pull request #61 from pangenome/parallelize
Parallelization and a bit less memory usage
2 parents e266067 + dbc03e4 commit 96bc942

File tree

3 files changed

+154
-124
lines changed

3 files changed

+154
-124
lines changed

src/impg.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -297,29 +297,32 @@ pub struct Impg {
297297

298298
impl Impg {
299299
pub fn from_multi_paf_records(
300-
records_by_file: &[(Vec<PafRecord>, String, u32)],
300+
records_by_file: &[(Vec<PafRecord>, String)],
301301
seq_index: SequenceIndex
302302
) -> Result<Self, ParseErr> {
303-
let mut paf_files = Vec::with_capacity(records_by_file.len());
304-
let mut paf_gzi_indices = Vec::with_capacity(records_by_file.len());
305-
306-
for (_, paf_file, _) in records_by_file {
307-
let paf_gzi_index = if [".gz", ".bgz"].iter().any(|e| paf_file.ends_with(e)) {
308-
let paf_gzi_file = paf_file.to_owned() + ".gzi";
309-
Some(
310-
bgzf::gzi::read(paf_gzi_file.clone())
311-
.unwrap_or_else(|_| panic!("Could not open {}", paf_gzi_file)),
312-
)
313-
} else {
314-
None
315-
};
316-
paf_files.push(paf_file.clone());
317-
paf_gzi_indices.push(paf_gzi_index);
318-
}
303+
// Use par_iter to process the files in parallel and collect both pieces of information
304+
let (paf_files, paf_gzi_indices): (Vec<String>, Vec<Option<bgzf::gzi::Index>>) = records_by_file
305+
.par_iter()
306+
.map(|(_, paf_file)| {
307+
let paf_gzi_index = if [".gz", ".bgz"].iter().any(|e| paf_file.ends_with(e)) {
308+
let paf_gzi_file = paf_file.to_owned() + ".gzi";
309+
Some(
310+
bgzf::gzi::read(paf_gzi_file.clone())
311+
.unwrap_or_else(|_| panic!("Could not open {}", paf_gzi_file)),
312+
)
313+
} else {
314+
None
315+
};
316+
317+
// Return both values as a tuple
318+
(paf_file.clone(), paf_gzi_index)
319+
})
320+
.unzip(); // Separate the tuples into two vectors
319321

320322
let intervals: FxHashMap<u32, Vec<Interval<QueryMetadata>>> = records_by_file
321323
.par_iter()
322-
.flat_map(|(records, _, file_index)| {
324+
.enumerate() // Add enumeration to get the position as index
325+
.flat_map(|(file_index, (records, _))| {
323326
records
324327
.par_iter()
325328
.filter_map(|record| {
@@ -329,7 +332,7 @@ impl Impg {
329332
target_end: record.target_end as i32,
330333
query_start: record.query_start as i32,
331334
query_end: record.query_end as i32,
332-
paf_file_index: *file_index,
335+
paf_file_index: file_index as u32,
333336
strand_and_cigar_offset: record.strand_and_cigar_offset, // Already includes strand bit
334337
cigar_bytes: record.cigar_bytes,
335338
};
@@ -360,7 +363,7 @@ impl Impg {
360363
});
361364

362365
let trees: TreeMap = intervals
363-
.into_iter()
366+
.into_par_iter()
364367
.map(|(target_id, interval_nodes)| {
365368
(target_id, BasicCOITree::new(interval_nodes.as_slice()))
366369
})
@@ -373,7 +376,7 @@ impl Impg {
373376
paf_gzi_indices,
374377
})
375378
}
376-
379+
377380
pub fn to_serializable(&self) -> SerializableImpg {
378381
let serializable_trees = self
379382
.trees
@@ -396,22 +399,23 @@ impl Impg {
396399
pub fn from_multi_paf_and_serializable(paf_files: &[String], serializable: SerializableImpg) -> Self {
397400
let (serializable_trees, seq_index) = serializable;
398401

399-
let mut paf_gzi_indices = Vec::with_capacity(paf_files.len());
400-
for paf_file in paf_files {
401-
let paf_gzi_index = if [".gz", ".bgz"].iter().any(|e| paf_file.ends_with(e)) {
402-
let paf_gzi_file = paf_file.to_owned() + ".gzi";
403-
Some(
404-
bgzf::gzi::read(paf_gzi_file.clone())
405-
.unwrap_or_else(|_| panic!("Could not open {}", paf_gzi_file)),
406-
)
407-
} else {
408-
None
409-
};
410-
paf_gzi_indices.push(paf_gzi_index);
411-
}
402+
let paf_gzi_indices: Vec<_> = paf_files
403+
.par_iter()
404+
.map(|paf_file| {
405+
if [".gz", ".bgz"].iter().any(|e| paf_file.ends_with(e)) {
406+
let paf_gzi_file = paf_file.to_owned() + ".gzi";
407+
Some(
408+
bgzf::gzi::read(paf_gzi_file.clone())
409+
.unwrap_or_else(|_| panic!("Could not open {}", paf_gzi_file)),
410+
)
411+
} else {
412+
None
413+
}
414+
})
415+
.collect();
412416

413-
let trees = serializable_trees
414-
.into_iter()
417+
let trees: TreeMap = serializable_trees
418+
.into_par_iter()
415419
.map(|(target_id, intervals)| {
416420
let tree = BasicCOITree::new(
417421
intervals

src/main.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -374,14 +374,15 @@ fn load_multi_index(paf_files: &[String], custom_index: Option<&str>) -> io::Res
374374
let index_file_ts = index_file_metadata.modified().ok();
375375

376376
if let Some(index_ts) = index_file_ts {
377-
for paf_file in paf_files {
378-
let paf_file_metadata = std::fs::metadata(paf_file)?;
379-
if let Ok(paf_file_ts) = paf_file_metadata.modified() {
380-
if paf_file_ts > index_ts {
381-
warn!("WARNING:\tPAF file {} has been modified since impg index creation.", paf_file);
377+
paf_files.par_iter().for_each(|paf_file| {
378+
if let Ok(paf_file_metadata) = std::fs::metadata(paf_file) {
379+
if let Ok(paf_file_ts) = paf_file_metadata.modified() {
380+
if paf_file_ts > index_ts {
381+
warn!("WARNING:\tPAF file {} has been modified since impg index creation.", paf_file);
382+
}
382383
}
383384
}
384-
}
385+
});
385386
}
386387

387388
let file = File::open(index_file)?;
@@ -410,7 +411,7 @@ fn generate_multi_index(paf_files: &[String], num_threads: NonZeroUsize, custom_
410411
// Process PAF files in parallel using Rayon
411412
let records_by_file: Vec<_> = (0..paf_files.len())
412413
.into_par_iter()
413-
.map(|file_index| -> io::Result<(Vec<PafRecord>, String, u32)> {
414+
.map(|file_index| -> io::Result<(Vec<PafRecord>, String)> {
414415
let paf_file = &paf_files[file_index];
415416

416417
// Increment the counter and get the new value atomically
@@ -438,7 +439,7 @@ fn generate_multi_index(paf_files: &[String], num_threads: NonZeroUsize, custom_
438439
)
439440
})?;
440441

441-
Ok((records, paf_file.clone(), file_index as u32))
442+
Ok((records, paf_file.clone()))
442443
})
443444
.collect::<Result<Vec<_>, _>>()?; // Propagate any errors
444445

@@ -717,6 +718,7 @@ fn print_stats(impg: &Impg) {
717718
// Basic stats
718719
let num_sequences = impg.seq_index.len();
719720
let total_sequence_length: usize = (0..num_sequences as u32)
721+
.into_par_iter()
720722
.filter_map(|id| impg.seq_index.get_len_from_id(id))
721723
.sum();
722724
let num_overlaps = impg.trees.values().map(|tree| tree.len()).sum::<usize>();
@@ -735,7 +737,10 @@ fn print_stats(impg: &Impg) {
735737

736738
if !entries.is_empty() {
737739
// Calculate mean and median overlaps
738-
let sum: usize = entries.iter().map(|(_, count)| count).sum();
740+
let sum: usize = entries
741+
.par_iter()
742+
.map(|(_, count)| count)
743+
.sum();
739744
let mean = sum as f64 / entries.len() as f64;
740745

741746
let median = if entries.is_empty() {

0 commit comments

Comments
 (0)