Skip to content

Commit b054e01

Browse files
committed
Fix compaction buckets priority
1 parent 8b3bb04 commit b054e01

File tree

3 files changed

+26
-4
lines changed

3 files changed

+26
-4
lines changed

crates/api/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use colors::colors_manager::ColorsManager;
55
use colors::{
66
bundles::multifile_building::ColorBundleMultifileBuilding, non_colored::NonColoredManager,
77
};
8+
use config::{MAX_BUCKET_CHUNK_SIZE, MIN_BUCKET_CHUNK_SIZE};
89
pub use ggcat_logging::MessageLevel;
910
use ggcat_logging::UnrecoverableErrorLogging;
1011
use io::concurrent::structured_sequences::fasta::FastaWriterWrapper;
@@ -272,7 +273,11 @@ impl GGCATInstance {
272273
None
273274
} else {
274275
// Heuristic for chunks used for maximum disk usage
275-
Some((estimated_bases_count as u64) / (disk_optimization_level as u64 + 1))
276+
Some(
277+
((estimated_bases_count as u64) / (disk_optimization_level as u64 + 1))
278+
.min(MAX_BUCKET_CHUNK_SIZE)
279+
.max(MIN_BUCKET_CHUNK_SIZE),
280+
)
276281
}
277282
};
278283

crates/config/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ pub const PRIORITY_SCHEDULING_HIGH: usize = 0;
8080
pub const PRIORITY_SCHEDULING_BASE: usize = 1;
8181
pub const PRIORITY_SCHEDULING_LOW: usize = 2;
8282

83+
pub const MIN_BUCKET_CHUNK_SIZE: u64 = 1024 * 1024 * 8;
84+
pub const MAX_BUCKET_CHUNK_SIZE: u64 = 1024 * 1024 * 1024;
85+
8386
pub struct SwapPriority {}
8487
#[allow(non_upper_case_globals)]
8588
impl SwapPriority {

crates/minimizer_bucketing/src/compactor.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,20 +170,31 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
170170
buckets[bucket_index].chunks.sort_by_cached_key(|c| {
171171
let file_size = MemoryFs::get_file_size(c).unwrap();
172172
total_size += file_size;
173-
Reverse(file_size)
173+
let is_compacted = c
174+
.file_name()
175+
.unwrap()
176+
.to_str()
177+
.unwrap()
178+
.contains("compacted");
179+
Reverse((is_compacted, file_size))
174180
});
175181

176182
let mut chosen_size = 0;
177183

178184
// Choose the buckets to compact, taking all the buckets that strictly do not exceed half of the total buckets size.
179185
// this allows to keep a linear time complexity
180186

187+
// println!(
188+
// "Buckets tot size: {} tbc: {:?}",
189+
// total_size, buckets[bucket_index].chunks
190+
// );
191+
181192
let mut last = buckets[bucket_index].chunks.pop().unwrap();
182193
let mut last_size = MemoryFs::get_file_size(&last).unwrap();
183194

184195
// Choose buckets until one of two conditions is met:
185196
// 1. The next bucket would add up to a size greater than half ot the total size
186-
// 2. Two buckets were already selected and the number of sequences is greater than the maximum amount
197+
// 2. Four buckets were already selected and the number of sequences is greater than the maximum amount
187198
// The second condition is checked below, after the processing of each bucket
188199
while chosen_size + last_size < total_size / 2 {
189200
chosen_size += last_size;
@@ -201,6 +212,9 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
201212
buckets[bucket_index]
202213
.chunks
203214
.extend(chosen_buckets.drain(..));
215+
}
216+
217+
if chosen_buckets.is_empty() {
204218
continue;
205219
}
206220

@@ -295,7 +309,7 @@ impl<E: MinimizerBucketingExecutorFactory + Sync + Send + 'static> AsyncExecutor
295309

296310
// Do not process more buckets if it will increase the maximum number of allowed sequences
297311
if !global_params.common.is_active.load(Ordering::Relaxed)
298-
|| processed_buckets >= 2 && total_sequences > MAXIMUM_SEQUENCES
312+
|| processed_buckets >= 4 && total_sequences > MAXIMUM_SEQUENCES
299313
{
300314
break;
301315
}

0 commit comments

Comments
 (0)