Skip to content

Commit 7ff13f0

Browse files
committed
Add checkpoint bucket usage + compactors early termination
1 parent 04f0378 commit 7ff13f0

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

crates/io/src/concurrent/temp_reads/creads_utils.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ pub mod helpers {
272272
$reader:expr,
273273
$read_thread:expr,
274274
$with_multiplicity:expr,
275+
|$checkpoint_data:ident| $c:expr,
275276
|$data: ident, $extra_buffer: ident| $f:expr
276277
);
277278

@@ -293,7 +294,8 @@ pub mod helpers {
293294
$BucketMode,
294295
WithMultiplicity,
295296
>, false>(read_thread, Vec::new(), <$E>::new_temp_buffer());
296-
while let Some((items, _)) = items.get_next_checkpoint() {
297+
while let Some((items, $checkpoint_data)) = items.get_next_checkpoint() {
298+
$c
297299
while let Some(($data, $extra_buffer)) = items.next() {
298300
$f
299301
}

crates/kmers_transform/src/reader.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
426426
}
427427

428428
let data_format: MinimizerBucketMode = reader.get_data_format_info().unwrap();
429+
let mut checkpoint_rewrite_bucket = None;
429430

430431
creads_helper! {
431432
helper_read_bucket_with_opt_multiplicity::<
@@ -436,17 +437,19 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
436437
reader,
437438
async_reader_thread.clone(),
438439
matches!(data_format, MinimizerBucketMode::Compacted),
440+
|checkpoint_data| { checkpoint_rewrite_bucket = checkpoint_data.map(|d| d.target_subbucket); } ,
439441
|read_info, extra_buffer| {
440442
let bucket = if has_single_addr {
441443
0
442444
} else {
443-
let orig_bucket = F::PreprocessorType::get_rewrite_bucket(
445+
let orig_bucket = checkpoint_rewrite_bucket
446+
.unwrap_or_else(|| F::PreprocessorType::get_rewrite_bucket(
444447
global_extra_data.get_k(),
445448
global_extra_data.get_m(),
446449
&read_info,
447450
bucket_info.used_hash_bits,
448451
bucket_info.second_buckets_log_max,
449-
) as usize;
452+
)) as usize;
450453

451454
bucket_info.buckets_remapping[orig_bucket]
452455
};

crates/minimizer_bucketing/src/compactor.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
237237
processed_buckets += 1;
238238

239239
let format_data: MinimizerBucketMode = reader.get_data_format_info().unwrap();
240+
let mut checkpoint_rewrite_bucket = None;
240241
creads_helper! {
241242
helper_read_bucket_with_opt_multiplicity::<
242243
E::ExtraData,
@@ -246,14 +247,16 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
246247
&reader,
247248
read_thread.clone(),
248249
matches!(format_data, MinimizerBucketMode::Compacted),
250+
|checkpoint_data| { checkpoint_rewrite_bucket = checkpoint_data.map(|d| d.target_subbucket); } ,
249251
|data, _extra_buffer| {
250252

251-
let rewrite_bucket = E::RewriteBucketCompute::get_rewrite_bucket(global_params.common.k,
253+
let rewrite_bucket = checkpoint_rewrite_bucket
254+
.unwrap_or_else(|| E::RewriteBucketCompute::get_rewrite_bucket(global_params.common.k,
252255
global_params.common.m,
253256
&data,
254257
used_hash_bits,
255258
second_buckets_log_max,
256-
);
259+
));
257260
sequences_deltas[rewrite_bucket as usize] += 1;
258261

259262
let (flags, _, _extra, read, multiplicity) = data;
@@ -281,7 +284,9 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
281284
}
282285

283286
// Do not process more buckets if it will increase the maximum number of allowed sequences
284-
if processed_buckets >= 2 && total_sequences > MAXIMUM_SEQUENCES {
287+
if !global_params.common.is_active.load(Ordering::Relaxed)
288+
|| processed_buckets >= 2 && total_sequences > MAXIMUM_SEQUENCES
289+
{
285290
break;
286291
}
287292
}

crates/minimizer_bucketing/src/lib.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use std::marker::PhantomData;
5252
use std::ops::Deref;
5353
use std::ops::Range;
5454
use std::path::{Path, PathBuf};
55-
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
55+
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
5656
use std::sync::Arc;
5757

5858
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
@@ -167,6 +167,7 @@ pub struct MinimizerBucketingCommonData<GlobalData> {
167167
pub global_counters: Vec<Vec<AtomicU64>>,
168168
pub compaction_offsets: Vec<AtomicI64>,
169169
pub global_data: GlobalData,
170+
pub is_active: AtomicBool,
170171
}
171172

172173
impl<GlobalData> MinimizerBucketingCommonData<GlobalData> {
@@ -197,6 +198,7 @@ impl<GlobalData> MinimizerBucketingCommonData<GlobalData> {
197198
.collect(),
198199
compaction_offsets: (0..buckets_count).map(|_| AtomicI64::new(0)).collect(),
199200
global_data,
201+
is_active: AtomicBool::new(true),
200202
}
201203
}
202204
}
@@ -607,6 +609,14 @@ impl GenericMinimizerBucketing {
607609
global_context.executor_group_address.write().take();
608610

609611
execution_context.wait_for_completion(writer_executors);
612+
613+
// Let compactors know that the phase is finishing,
614+
// so they can shortcut and avoid processing other buckets
615+
global_context
616+
.common
617+
.is_active
618+
.store(false, Ordering::Relaxed);
619+
610620
execution_context.wait_for_completion(compactor_executors);
611621

612622
execution_context.join_all();

0 commit comments

Comments
 (0)