Skip to content

Commit f7fab47

Browse files
committed
Add working sub-bucket passtrough
1 parent 7ff13f0 commit f7fab47

File tree

8 files changed

+108
-39
lines changed

8 files changed

+108
-39
lines changed

crates/assembler/src/pipeline/eulertigs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use io::concurrent::{
1515
},
1616
temp_reads::creads_utils::CompressedReadsBucketDataSerializer,
1717
};
18-
use parallel_processor::buckets::CheckpointStrategy;
18+
use parallel_processor::buckets::readers::async_binary_reader::AllowedCheckpointStrategy;
1919
use parallel_processor::phase_times_monitor::PHASES_TIMES_MONITOR;
2020
use parallel_processor::{
2121
buckets::readers::compressed_binary_reader::CompressedBinaryReader, memory_fs::RemoveFileMode,
@@ -395,7 +395,7 @@ pub fn build_eulertigs<
395395
(),
396396
SequenceAbundanceType,
397397
)>::new_temp_buffer(),
398-
CheckpointStrategy::Decompress,
398+
AllowedCheckpointStrategy::DecompressOnly,
399399
|(_, _, (_index, mut color, _, abundance), read, _): (
400400
_,
401401
_,
@@ -543,7 +543,7 @@ pub fn build_eulertigs<
543543
(),
544544
SequenceAbundanceType,
545545
)>::new_temp_buffer(),
546-
CheckpointStrategy::Decompress,
546+
AllowedCheckpointStrategy::DecompressOnly,
547547
|(_, _, (_index, color, _, mut _abundance), read, _): (
548548
_,
549549
_,

crates/assembler/src/pipeline/links_compaction.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ use io::structs::unitig_link::{UnitigFlags, UnitigIndex, UnitigLink, UnitigLinkS
77
use nightly_quirks::slice_group_by::SliceGroupBy;
88
use parallel_processor::buckets::bucket_writer::BucketItemSerializer;
99
use parallel_processor::buckets::concurrent::{BucketsThreadBuffer, BucketsThreadDispatcher};
10+
use parallel_processor::buckets::readers::async_binary_reader::AllowedCheckpointStrategy;
1011
use parallel_processor::buckets::readers::generic_binary_reader::ChunkReader;
1112
use parallel_processor::buckets::readers::lock_free_binary_reader::LockFreeBinaryReader;
1213
use parallel_processor::buckets::single::SingleBucketThreadDispatcher;
1314
use parallel_processor::buckets::writers::lock_free_binary_writer::LockFreeBinaryWriter;
14-
use parallel_processor::buckets::{CheckpointStrategy, MultiThreadBuckets, SingleBucket};
15+
use parallel_processor::buckets::{MultiThreadBuckets, SingleBucket};
1516
use parallel_processor::fast_smart_bucket_sort::{fast_smart_radix_sort, SortKey};
1617
use parallel_processor::memory_fs::RemoveFileMode;
1718
use parallel_processor::utils::scoped_thread_local::ScopedThreadLocal;
@@ -90,7 +91,7 @@ pub fn links_compaction(
9091
let mut deserializer = UnitigLinkSerializer::new();
9192

9293
while let Some(checkpoint) =
93-
file_reader.get_read_parallel_stream(CheckpointStrategy::Decompress)
94+
file_reader.get_read_parallel_stream(AllowedCheckpointStrategy::DecompressOnly)
9495
{
9596
match checkpoint {
9697
ChunkReader::Reader(mut stream, _) => {

crates/assembler/src/pipeline/maximal_unitig_links.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ use io::concurrent::temp_reads::creads_utils::{
3232
use io::concurrent::temp_reads::extra_data::SequenceExtraDataTempBufferManagement;
3333
use nightly_quirks::slice_group_by::SliceGroupBy;
3434
use parallel_processor::buckets::concurrent::{BucketsThreadBuffer, BucketsThreadDispatcher};
35+
use parallel_processor::buckets::readers::async_binary_reader::AllowedCheckpointStrategy;
3536
use parallel_processor::buckets::readers::compressed_binary_reader::CompressedBinaryReader;
3637
use parallel_processor::buckets::readers::BucketReader;
3738
use parallel_processor::buckets::writers::compressed_binary_writer::CompressedBinaryWriter;
38-
use parallel_processor::buckets::{CheckpointStrategy, MultiThreadBuckets};
39+
use parallel_processor::buckets::MultiThreadBuckets;
3940
use parallel_processor::fast_smart_bucket_sort::fast_smart_radix_sort;
4041
use parallel_processor::memory_fs::RemoveFileMode;
4142
use parallel_processor::phase_times_monitor::PHASES_TIMES_MONITOR;
@@ -120,7 +121,7 @@ pub fn build_maximal_unitigs_links<
120121
(),
121122
SequenceAbundanceType,
122123
)>::new_temp_buffer(),
123-
CheckpointStrategy::Decompress,
124+
AllowedCheckpointStrategy::DecompressOnly,
124125
|(_, _, (index, _, _, _), read, _): (
125126
_,
126127
_,
@@ -385,7 +386,7 @@ pub fn build_maximal_unitigs_links<
385386
(),
386387
SequenceAbundanceType,
387388
)>::new_temp_buffer(),
388-
CheckpointStrategy::Decompress,
389+
AllowedCheckpointStrategy::DecompressOnly,
389390
|(_, _, (index, color, _, _abundance), read, _): (
390391
_,
391392
_,

crates/config/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pub const READ_FLAG_INCL_END: u8 = 1 << 1;
7373
pub const COLORS_SINGLE_BATCH_SIZE: u64 = 20000;
7474
pub const QUERIES_COUNT_MIN_BATCH: u64 = 1000;
7575

76+
pub const DEFAULT_COMPACTION_MAP_SUBBUCKET_ELEMENTS: usize = 512;
7677
pub const MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS: usize = 1024 * 4;
7778

7879
pub struct SwapPriority {}

crates/io/src/concurrent/temp_reads/creads_utils.rs

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ pub struct CompressedReadsBucketDataSerializer<
122122
_phantom: PhantomData<(FlagsCount, BucketMode, MultiplicityMode)>,
123123
}
124124

125-
#[derive(Serialize, Deserialize)]
125+
#[derive(Serialize, Deserialize, Clone, Copy)]
126126
pub struct ReadsCheckpointData {
127127
pub target_subbucket: BucketIndexType,
128+
pub sequences_count: usize,
128129
}
129130

130131
impl<
@@ -272,6 +273,8 @@ pub mod helpers {
272273
$reader:expr,
273274
$read_thread:expr,
274275
$with_multiplicity:expr,
276+
$allowed_passtrough:expr,
277+
|$passtrough_info:ident| $p:expr,
275278
|$checkpoint_data:ident| $c:expr,
276279
|$data: ident, $extra_buffer: ident| $f:expr
277280
);
@@ -281,6 +284,7 @@ pub mod helpers {
281284
BucketModeOption, CompressedReadsBucketDataSerializer, NoMultiplicity,
282285
WithMultiplicity,
283286
};
287+
use parallel_processor::buckets::readers::async_binary_reader::AsyncBinaryReaderIteratorData;
284288

285289
let reader = $reader;
286290
let read_thread = $read_thread;
@@ -293,12 +297,25 @@ pub mod helpers {
293297
$FlagsCount,
294298
$BucketMode,
295299
WithMultiplicity,
296-
>, false>(read_thread, Vec::new(), <$E>::new_temp_buffer());
297-
while let Some((items, $checkpoint_data)) = items.get_next_checkpoint() {
298-
$c
299-
while let Some(($data, $extra_buffer)) = items.next() {
300-
$f
300+
>>(read_thread, Vec::new(), <$E>::new_temp_buffer(), $allowed_passtrough);
301+
while let Some(checkpoint) = items.get_next_checkpoint_extended() {
302+
match checkpoint {
303+
AsyncBinaryReaderIteratorData::Stream(items, $checkpoint_data) => {
304+
$c
305+
while let Some(($data, $extra_buffer)) = items.next() {
306+
$f
307+
}
308+
},
309+
AsyncBinaryReaderIteratorData::Passtrough {
310+
file_range: $passtrough_info,
311+
checkpoint_data: $checkpoint_data,
312+
} => {
313+
#[allow(unused_assignments)]
314+
$c
315+
$p
316+
}
301317
}
318+
302319
}
303320
} else {
304321
let mut items =
@@ -307,11 +324,25 @@ pub mod helpers {
307324
$FlagsCount,
308325
$BucketMode,
309326
NoMultiplicity,
310-
>, false>(read_thread, Vec::new(), <$E>::new_temp_buffer());
311-
while let Some((items, _)) = items.get_next_checkpoint() {
312-
while let Some(($data, $extra_buffer)) = items.next() {
313-
$f
327+
>>(read_thread, Vec::new(), <$E>::new_temp_buffer(), $allowed_passtrough);
328+
while let Some(checkpoint) = items.get_next_checkpoint_extended() {
329+
match checkpoint {
330+
AsyncBinaryReaderIteratorData::Stream(items, $checkpoint_data) => {
331+
$c
332+
while let Some(($data, $extra_buffer)) = items.next() {
333+
$f
334+
}
335+
},
336+
AsyncBinaryReaderIteratorData::Passtrough {
337+
file_range: $passtrough_info,
338+
checkpoint_data: $checkpoint_data,
339+
} => {
340+
#[allow(unused_assignments)]
341+
$c
342+
$p
343+
}
314344
}
345+
315346
}
316347
}
317348
};

crates/kmers_transform/src/debug_bucket_stats.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use io::concurrent::temp_reads::creads_utils::{
1212
BucketModeFromBoolean, CompressedReadsBucketDataSerializer, NoMultiplicity,
1313
};
1414
use parallel_processor::buckets::readers::async_binary_reader::{
15-
AsyncBinaryReader, AsyncReaderThread,
15+
AllowedCheckpointStrategy, AsyncBinaryReader, AsyncReaderThread,
1616
};
1717
use parallel_processor::memory_fs::RemoveFileMode;
1818
use std::collections::HashSet;
@@ -73,7 +73,12 @@ pub fn compute_stats_for_bucket<MH: HashFunctionFactory>(
7373
typenum::U2,
7474
BucketModeFromBoolean<USE_SECOND_BUCKET>,
7575
NoMultiplicity,
76-
>, false>(reader_thread.clone(), Vec::new(), ());
76+
>>(
77+
reader_thread.clone(),
78+
Vec::new(),
79+
(),
80+
AllowedCheckpointStrategy::DecompressOnly,
81+
);
7782

7883
let mut total_counters = vec![0; second_buckets_max];
7984

crates/kmers_transform/src/reader.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use minimizer_bucketing::resplit_bucket::RewriteBucketCompute;
2525
use minimizer_bucketing::{MinimizerBucketMode, MinimizerBucketingExecutorFactory};
2626
use parallel_processor::buckets::bucket_writer::BucketItemSerializer;
2727
use parallel_processor::buckets::readers::async_binary_reader::{
28-
AsyncBinaryReader, AsyncReaderThread,
28+
AllowedCheckpointStrategy, AsyncBinaryReader, AsyncReaderThread,
2929
};
3030
use parallel_processor::buckets::writers::compressed_binary_writer::CompressedBinaryWriter;
3131
use parallel_processor::buckets::LockFreeBucket;
@@ -118,9 +118,9 @@ enum AddressMode {
118118
struct BucketsInfo {
119119
readers: Vec<AsyncBinaryReader>,
120120
concurrency: usize,
121-
addresses: Vec<AddressMode>,
121+
addresses: Arc<Vec<AddressMode>>,
122122
register_addresses: Vec<ExecutorAddress>,
123-
buckets_remapping: Vec<usize>,
123+
buckets_remapping: Arc<Vec<usize>>,
124124
second_buckets_log_max: usize,
125125
total_file_size: usize,
126126
used_hash_bits: usize,
@@ -331,9 +331,9 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
331331
BucketsInfo {
332332
readers,
333333
concurrency,
334-
addresses,
334+
addresses: Arc::new(addresses),
335335
register_addresses,
336-
buckets_remapping,
336+
buckets_remapping: Arc::new(buckets_remapping),
337337
second_buckets_log_max,
338338
total_file_size,
339339
used_hash_bits: file.used_hash_bits,
@@ -426,7 +426,10 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
426426
}
427427

428428
let data_format: MinimizerBucketMode = reader.get_data_format_info().unwrap();
429-
let mut checkpoint_rewrite_bucket = None;
429+
let mut checkpoint_rewrite_info;
430+
431+
let addresses_passtrough_check = bucket_info.addresses.clone();
432+
let buckets_remapping_passtrough_check = bucket_info.buckets_remapping.clone();
430433

431434
creads_helper! {
432435
helper_read_bucket_with_opt_multiplicity::<
@@ -437,12 +440,29 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
437440
reader,
438441
async_reader_thread.clone(),
439442
matches!(data_format, MinimizerBucketMode::Compacted),
440-
|checkpoint_data| { checkpoint_rewrite_bucket = checkpoint_data.map(|d| d.target_subbucket); } ,
443+
AllowedCheckpointStrategy::AllowPasstrough(Arc::new(move |data| {
444+
if let Some(data) = data {
445+
matches!(&addresses_passtrough_check[buckets_remapping_passtrough_check[data.target_subbucket as usize]], AddressMode::Rewrite(_, _, _))
446+
} else {
447+
false
448+
}
449+
})),
450+
|passtrough| {
451+
let bucket = checkpoint_rewrite_info.unwrap().target_subbucket as usize;
452+
let sequences_count = checkpoint_rewrite_info.unwrap().sequences_count;
453+
if let AddressMode::Rewrite(writer, out_sequences_count, _) = &bucket_info.addresses[bucket_info.buckets_remapping[bucket]] {
454+
out_sequences_count.fetch_add(sequences_count as u64, Ordering::Relaxed);
455+
writer.set_checkpoint_data::<()>(None, Some(passtrough));
456+
} else {
457+
unreachable!();
458+
}
459+
},
460+
|checkpoint_data| { checkpoint_rewrite_info = checkpoint_data; } ,
441461
|read_info, extra_buffer| {
442462
let bucket = if has_single_addr {
443463
0
444464
} else {
445-
let orig_bucket = checkpoint_rewrite_bucket
465+
let orig_bucket = checkpoint_rewrite_info.map(|i| i.target_subbucket)
446466
.unwrap_or_else(|| F::PreprocessorType::get_rewrite_bucket(
447467
global_extra_data.get_k(),
448468
global_extra_data.get_m(),
@@ -620,7 +640,10 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformReader<F>
620640
spawner.executors_await().await;
621641
drop(spawner);
622642

623-
for addr in buckets_info.addresses {
643+
for addr in Arc::try_unwrap(buckets_info.addresses)
644+
.map_err(|_| ())
645+
.unwrap()
646+
{
624647
if let AddressMode::Rewrite(writer, seq_count, init_data) = addr {
625648
let new_bucket_address =
626649
KmersTransformReader::<F>::generate_new_address(());

crates/minimizer_bucketing/src/compactor.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ use crate::{
1717
use colors::non_colored::NonColoredManager;
1818
use config::{
1919
get_compression_level_info, get_memory_mode, BucketIndexType, MultiplicityCounterType,
20-
SwapPriority, DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES,
21-
MAXIMUM_SECOND_BUCKETS_COUNT, MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
22-
MINIMIZER_BUCKETS_CHECKPOINT_SIZE, WORKERS_PRIORITY_HIGH,
20+
SwapPriority, DEFAULT_COMPACTION_MAP_SUBBUCKET_ELEMENTS, DEFAULT_OUTPUT_BUFFER_SIZE,
21+
DEFAULT_PREFETCH_AMOUNT, KEEP_FILES, MAXIMUM_SECOND_BUCKETS_COUNT,
22+
MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS, MINIMIZER_BUCKETS_CHECKPOINT_SIZE,
23+
WORKERS_PRIORITY_HIGH,
2324
};
2425
use io::{
2526
compressed_read::CompressedReadIndipendent,
@@ -36,7 +37,10 @@ use io::{
3637
creads_helper,
3738
};
3839
use parallel_processor::{
39-
buckets::{bucket_writer::BucketItemSerializer, CheckpointStrategy},
40+
buckets::{
41+
bucket_writer::BucketItemSerializer,
42+
readers::async_binary_reader::AllowedCheckpointStrategy,
43+
},
4044
memory_fs::MemoryFs,
4145
};
4246
use parallel_processor::{
@@ -136,7 +140,7 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
136140
> = (0..MAXIMUM_SECOND_BUCKETS_COUNT)
137141
.map(|_| {
138142
FxHashMap::with_capacity_and_hasher(
139-
MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
143+
DEFAULT_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
140144
FxBuildHasher,
141145
)
142146
})
@@ -237,7 +241,7 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
237241
processed_buckets += 1;
238242

239243
let format_data: MinimizerBucketMode = reader.get_data_format_info().unwrap();
240-
let mut checkpoint_rewrite_bucket = None;
244+
let mut checkpoint_rewrite_bucket;
241245
creads_helper! {
242246
helper_read_bucket_with_opt_multiplicity::<
243247
E::ExtraData,
@@ -247,6 +251,8 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
247251
&reader,
248252
read_thread.clone(),
249253
matches!(format_data, MinimizerBucketMode::Compacted),
254+
AllowedCheckpointStrategy::DecompressOnly,
255+
|_passtrough| unreachable!(),
250256
|checkpoint_data| { checkpoint_rewrite_bucket = checkpoint_data.map(|d| d.target_subbucket); } ,
251257
|data, _extra_buffer| {
252258

@@ -330,10 +336,11 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
330336
}
331337

332338
new_bucket.set_checkpoint_data(
333-
&ReadsCheckpointData {
339+
Some(&ReadsCheckpointData {
334340
target_subbucket: rewrite_bucket as BucketIndexType,
335-
},
336-
CheckpointStrategy::Decompress,
341+
sequences_count: super_kmers_hashmap.len(),
342+
}),
343+
None,
337344
);
338345

339346
for (read, (flags, multiplicity)) in super_kmers_hashmap.drain() {
@@ -360,7 +367,7 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
360367
// Reset the hashmap capacity
361368
if super_kmers_hashmap.capacity() > MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS {
362369
*super_kmers_hashmap = FxHashMap::with_capacity_and_hasher(
363-
MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
370+
DEFAULT_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
364371
FxBuildHasher,
365372
);
366373
}

0 commit comments

Comments
 (0)