Skip to content

Commit 04f0378

Browse files
committed
Improve compaction heuristic
1 parent bdcbc3f commit 04f0378

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

crates/config/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ pub const READ_FLAG_INCL_END: u8 = 1 << 1;
7373
pub const COLORS_SINGLE_BATCH_SIZE: u64 = 20000;
7474
pub const QUERIES_COUNT_MIN_BATCH: u64 = 1000;
7575

76+
pub const MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS: usize = 1024 * 4;
77+
7678
pub struct SwapPriority {}
7779
#[allow(non_upper_case_globals)]
7880
impl SwapPriority {

crates/minimizer_bucketing/src/compactor.rs

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use colors::non_colored::NonColoredManager;
1818
use config::{
1919
get_compression_level_info, get_memory_mode, BucketIndexType, MultiplicityCounterType,
2020
SwapPriority, DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES,
21-
MAXIMUM_SECOND_BUCKETS_COUNT, MINIMIZER_BUCKETS_CHECKPOINT_SIZE, WORKERS_PRIORITY_HIGH,
21+
MAXIMUM_SECOND_BUCKETS_COUNT, MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
22+
MINIMIZER_BUCKETS_CHECKPOINT_SIZE, WORKERS_PRIORITY_HIGH,
2223
};
2324
use io::{
2425
compressed_read::CompressedReadIndipendent,
@@ -54,7 +55,7 @@ use parallel_processor::{
5455
declare_counter_i64,
5556
},
5657
};
57-
use rustc_hash::FxHashMap;
58+
use rustc_hash::{FxBuildHasher, FxHashMap};
5859
use utils::track;
5960

6061
pub struct MinimizerBucketingCompactor<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static>
@@ -127,10 +128,18 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
127128

128129
static COMPACTED_INDEX: AtomicUsize = AtomicUsize::new(0);
129130

131+
const MAXIMUM_SEQUENCES: usize =
132+
MAXIMUM_SECOND_BUCKETS_COUNT * MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS;
133+
130134
let mut super_kmers_hashmap: Vec<
131135
FxHashMap<SuperKmerEntry, (u8, MultiplicityCounterType)>,
132136
> = (0..MAXIMUM_SECOND_BUCKETS_COUNT)
133-
.map(|_| FxHashMap::default())
137+
.map(|_| {
138+
FxHashMap::with_capacity_and_hasher(
139+
MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
140+
FxBuildHasher,
141+
)
142+
})
134143
.collect();
135144

136145
while let Ok((_, init_data)) = track!(
@@ -165,6 +174,10 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
165174
let mut last = buckets[bucket_index].chunks.pop().unwrap();
166175
let mut last_size = MemoryFs::get_file_size(&last).unwrap();
167176

177+
// Choose buckets until one of two conditions is met:
178+
// 1. The next bucket would add up to a size greater than half ot the total size
179+
// 2. Two buckets were already selected and the number of sequences is greater than the maximum amount
180+
// The second condition is checked below, after the processing of each bucket
168181
while chosen_size + last_size < total_size / 2 {
169182
chosen_size += last_size;
170183
chosen_buckets.push(last);
@@ -186,6 +199,8 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
186199

187200
drop(buckets);
188201

202+
chosen_buckets.reverse();
203+
189204
let new_path = global_params.output_path.join(format!(
190205
"compacted-{}.dat",
191206
COMPACTED_INDEX.fetch_add(1, Ordering::Relaxed)
@@ -205,7 +220,10 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
205220
MAXIMUM_SECOND_BUCKETS_COUNT.ilog2() as usize,
206221
);
207222

208-
for bucket_path in chosen_buckets {
223+
let mut total_sequences = 0;
224+
let mut processed_buckets = 0;
225+
226+
while let Some(bucket_path) = chosen_buckets.pop() {
209227
// Reading the buckets
210228
let reader = AsyncBinaryReader::new(
211229
&bucket_path,
@@ -216,6 +234,8 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
216234
DEFAULT_PREFETCH_AMOUNT,
217235
);
218236

237+
processed_buckets += 1;
238+
219239
let format_data: MinimizerBucketMode = reader.get_data_format_info().unwrap();
220240
creads_helper! {
221241
helper_read_bucket_with_opt_multiplicity::<
@@ -254,10 +274,16 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
254274
SuperKmerEntry(&kmers_storage as *const _, new_read),
255275
(flags, multiplicity),
256276
);
277+
total_sequences += 1;
257278
}
258279
}
259280
);
260281
}
282+
283+
// Do not process more buckets if it will increase the maximum number of allowed sequences
284+
if processed_buckets >= 2 && total_sequences > MAXIMUM_SEQUENCES {
285+
break;
286+
}
261287
}
262288

263289
let new_bucket = CompressedBinaryWriter::new(
@@ -325,6 +351,14 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
325351
buffer.clear();
326352
}
327353
}
354+
355+
// Reset the hashmap capacity
356+
if super_kmers_hashmap.capacity() > MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS {
357+
*super_kmers_hashmap = FxHashMap::with_capacity_and_hasher(
358+
MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS,
359+
FxBuildHasher,
360+
);
361+
}
328362
}
329363

330364
if buffer.len() > 0 {
@@ -339,6 +373,10 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
339373
buckets[bucket_index].was_compacted = true;
340374
buckets[bucket_index].chunks.push(new_path);
341375

376+
for unused_bucket in chosen_buckets {
377+
buckets[bucket_index].chunks.push(unused_bucket);
378+
}
379+
342380
for (counter, global_counter) in sequences_deltas
343381
.iter()
344382
.zip(global_params.common.global_counters[bucket_index].iter())

0 commit comments

Comments
 (0)