Skip to content

Commit 2b2238d

Browse files
committed
Fix bucket counters + implement rewrite during in compaction
1 parent ee3ae4c commit 2b2238d

File tree

1 file changed

+63
-19
lines changed

1 file changed

+63
-19
lines changed

crates/minimizer_bucketing/src/compactor.rs

Lines changed: 63 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ use std::{
99
sync::atomic::{AtomicUsize, Ordering},
1010
};
1111

12+
use crate::resplit_bucket::RewriteBucketCompute;
1213
use crate::{
1314
queue_data::MinimizerBucketingQueueData, MinimizerBucketMode,
1415
MinimizerBucketingExecutionContext, MinimizerBucketingExecutorFactory,
1516
};
1617
use colors::non_colored::NonColoredManager;
1718
use config::{
1819
get_compression_level_info, get_memory_mode, MultiplicityCounterType, SwapPriority,
19-
DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES,
20+
DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES, MAXIMUM_SECOND_BUCKETS_COUNT,
2021
MINIMIZER_BUCKETS_CHECKPOINT_SIZE, WORKERS_PRIORITY_HIGH,
2122
};
2223
use io::{
@@ -174,20 +175,29 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
174175

175176
drop(buckets);
176177

177-
// println!("Compacting bucket {} with total total size {} => chosen {} chunks with total size {}", bucket_index, total_size, chosen_buckets.len(), chosen_size);
178-
179178
let new_path = global_params.output_path.join(format!(
180179
"compacted-{}.dat",
181180
COMPACTED_INDEX.fetch_add(1, Ordering::Relaxed)
182181
));
183182

184-
let mut super_kmers_hashmap: FxHashMap<
185-
SuperKmerEntry,
186-
(u8, MultiplicityCounterType),
187-
> = FxHashMap::default();
183+
let mut super_kmers_hashmap: Vec<
184+
FxHashMap<SuperKmerEntry, (u8, MultiplicityCounterType)>,
185+
> = (0..MAXIMUM_SECOND_BUCKETS_COUNT)
186+
.map(|_| FxHashMap::default())
187+
.collect();
188+
// .try_into()
189+
// .unwrap();
188190
let mut kmers_storage = Vec::with_capacity(DEFAULT_OUTPUT_BUFFER_SIZE);
189191

190-
let mut sequences_delta = 0;
192+
let mut sequences_deltas = vec![0i64; MAXIMUM_SECOND_BUCKETS_COUNT];
193+
194+
let used_hash_bits = global_params.buckets.count().ilog2() as usize;
195+
let second_buckets_log_max = std::cmp::min(
196+
global_params.common.global_counters[bucket_index]
197+
.len()
198+
.ilog2() as usize,
199+
MAXIMUM_SECOND_BUCKETS_COUNT.ilog2() as usize,
200+
);
191201

192202
for bucket_path in chosen_buckets {
193203
// Reading the buckets
@@ -211,9 +221,18 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
211221
read_thread.clone(),
212222
matches!(format_data, MinimizerBucketMode::Compacted),
213223
|data, _extra_buffer| {
224+
225+
let rewrite_bucket = E::RewriteBucketCompute::get_rewrite_bucket(global_params.common.k,
226+
global_params.common.m,
227+
&data,
228+
used_hash_bits,
229+
second_buckets_log_max,
230+
);
231+
sequences_deltas[rewrite_bucket as usize] += 1;
232+
214233
let (flags, _, _extra, read, multiplicity) = data;
215234

216-
sequences_delta -= 1;
235+
let super_kmers_hashmap = &mut super_kmers_hashmap[rewrite_bucket as usize];
217236

218237
if let Some(entry) = super_kmers_hashmap.get_mut(
219238
read.get_borrowable(),
@@ -260,23 +279,39 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
260279
<NonColoredManager as SequenceExtraDataTempBufferManagement>::new_temp_buffer();
261280
let empty_extra = NonColoredManager::default();
262281

263-
for (read, (flags, multiplicity)) in super_kmers_hashmap {
264-
let read = read.get_read();
265-
sequences_delta += 1;
282+
for (rewrite_bucket, super_kmers_hashmap) in
283+
super_kmers_hashmap.into_iter().enumerate()
284+
{
285+
// Flush the buffer before changing checkpoint
286+
if buffer.len() > 0 {
287+
new_bucket.write_data(&buffer);
288+
buffer.clear();
289+
}
290+
// new_bucket.set_checkpoint_data(
291+
// &ReadsCheckpointData {
292+
// target_subbucket: rewrite_bucket as BucketIndexType,
293+
// },
294+
// CheckpointStrategy::Passtrough,
295+
// );
296+
for (read, (flags, multiplicity)) in super_kmers_hashmap {
297+
let read = read.get_read();
298+
sequences_deltas[rewrite_bucket as usize] -= 1;
266299

267-
for _ in 0..multiplicity {
268300
serializer.write_to(
269301
&CompressedReadsBucketData::new_packed_with_multiplicity(
270-
read, flags, 0, 1,
302+
read,
303+
flags,
304+
0,
305+
multiplicity,
271306
),
272307
&mut buffer,
273308
&empty_extra,
274309
&out_extra_buffer,
275310
);
276-
}
277-
if buffer.len() > DEFAULT_OUTPUT_BUFFER_SIZE {
278-
new_bucket.write_data(&buffer);
279-
buffer.clear();
311+
if buffer.len() > DEFAULT_OUTPUT_BUFFER_SIZE {
312+
new_bucket.write_data(&buffer);
313+
buffer.clear();
314+
}
280315
}
281316
}
282317

@@ -291,8 +326,17 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
291326
let mut buckets = global_params.buckets.get_stored_buckets().lock();
292327
buckets[bucket_index].was_compacted = true;
293328
buckets[bucket_index].chunks.push(new_path);
329+
330+
for (counter, global_counter) in sequences_deltas
331+
.iter()
332+
.zip(global_params.common.global_counters[bucket_index].iter())
333+
{
334+
assert!(*counter >= 0);
335+
global_counter.fetch_sub(*counter as u64, Ordering::Relaxed);
336+
}
337+
294338
global_params.common.compaction_offsets[bucket_index]
295-
.fetch_add(sequences_delta, Ordering::Relaxed);
339+
.fetch_add(sequences_deltas.iter().sum::<i64>(), Ordering::Relaxed);
296340
}
297341
}
298342
}

0 commit comments

Comments
 (0)