Skip to content

Commit bdcbc3f

Browse files
committed
Fix checkpoints
1 parent 2b2238d commit bdcbc3f

File tree

2 files changed

+31
-19
lines changed

2 files changed

+31
-19
lines changed

crates/minimizer_bucketing/src/compactor.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ use crate::{
1616
};
1717
use colors::non_colored::NonColoredManager;
1818
use config::{
19-
get_compression_level_info, get_memory_mode, MultiplicityCounterType, SwapPriority,
20-
DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES, MAXIMUM_SECOND_BUCKETS_COUNT,
21-
MINIMIZER_BUCKETS_CHECKPOINT_SIZE, WORKERS_PRIORITY_HIGH,
19+
get_compression_level_info, get_memory_mode, BucketIndexType, MultiplicityCounterType,
20+
SwapPriority, DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES,
21+
MAXIMUM_SECOND_BUCKETS_COUNT, MINIMIZER_BUCKETS_CHECKPOINT_SIZE, WORKERS_PRIORITY_HIGH,
2222
};
2323
use io::{
2424
compressed_read::CompressedReadIndipendent,
2525
concurrent::temp_reads::{
2626
creads_utils::{
2727
CompressedReadsBucketData, CompressedReadsBucketDataSerializer, NoSecondBucket,
28-
WithMultiplicity,
28+
ReadsCheckpointData, WithMultiplicity,
2929
},
3030
extra_data::SequenceExtraDataTempBufferManagement,
3131
},
@@ -34,7 +34,10 @@ use io::{
3434
compressed_read::{BorrowableCompressedRead, CompressedRead},
3535
creads_helper,
3636
};
37-
use parallel_processor::{buckets::bucket_writer::BucketItemSerializer, memory_fs::MemoryFs};
37+
use parallel_processor::{
38+
buckets::{bucket_writer::BucketItemSerializer, CheckpointStrategy},
39+
memory_fs::MemoryFs,
40+
};
3841
use parallel_processor::{
3942
buckets::{
4043
readers::async_binary_reader::{AsyncBinaryReader, AsyncReaderThread},
@@ -68,6 +71,8 @@ pub struct CompactorInitData {
6871
}
6972

7073
struct SuperKmerEntry(*const Vec<u8>, CompressedReadIndipendent);
74+
unsafe impl Sync for SuperKmerEntry {}
75+
unsafe impl Send for SuperKmerEntry {}
7176

7277
impl SuperKmerEntry {
7378
fn get_read(&self) -> CompressedRead {
@@ -122,6 +127,12 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
122127

123128
static COMPACTED_INDEX: AtomicUsize = AtomicUsize::new(0);
124129

130+
let mut super_kmers_hashmap: Vec<
131+
FxHashMap<SuperKmerEntry, (u8, MultiplicityCounterType)>,
132+
> = (0..MAXIMUM_SECOND_BUCKETS_COUNT)
133+
.map(|_| FxHashMap::default())
134+
.collect();
135+
125136
while let Ok((_, init_data)) = track!(
126137
receiver
127138
.obtain_address_with_priority(WORKERS_PRIORITY_HIGH)
@@ -180,11 +191,6 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
180191
COMPACTED_INDEX.fetch_add(1, Ordering::Relaxed)
181192
));
182193

183-
let mut super_kmers_hashmap: Vec<
184-
FxHashMap<SuperKmerEntry, (u8, MultiplicityCounterType)>,
185-
> = (0..MAXIMUM_SECOND_BUCKETS_COUNT)
186-
.map(|_| FxHashMap::default())
187-
.collect();
188194
// .try_into()
189195
// .unwrap();
190196
let mut kmers_storage = Vec::with_capacity(DEFAULT_OUTPUT_BUFFER_SIZE);
@@ -280,20 +286,26 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
280286
let empty_extra = NonColoredManager::default();
281287

282288
for (rewrite_bucket, super_kmers_hashmap) in
283-
super_kmers_hashmap.into_iter().enumerate()
289+
super_kmers_hashmap.iter_mut().enumerate()
284290
{
285291
// Flush the buffer before changing checkpoint
286292
if buffer.len() > 0 {
287293
new_bucket.write_data(&buffer);
288294
buffer.clear();
289295
}
290-
// new_bucket.set_checkpoint_data(
291-
// &ReadsCheckpointData {
292-
// target_subbucket: rewrite_bucket as BucketIndexType,
293-
// },
294-
// CheckpointStrategy::Passtrough,
295-
// );
296-
for (read, (flags, multiplicity)) in super_kmers_hashmap {
296+
297+
if super_kmers_hashmap.is_empty() {
298+
continue;
299+
}
300+
301+
new_bucket.set_checkpoint_data(
302+
&ReadsCheckpointData {
303+
target_subbucket: rewrite_bucket as BucketIndexType,
304+
},
305+
CheckpointStrategy::Decompress,
306+
);
307+
308+
for (read, (flags, multiplicity)) in super_kmers_hashmap.drain() {
297309
let read = read.get_read();
298310
sequences_deltas[rewrite_bucket as usize] -= 1;
299311

0 commit comments

Comments
 (0)