Skip to content

Commit ee3ae4c

Browse files
committed
Prepare for compactor rewrite
1 parent 3f62f58 commit ee3ae4c

File tree

22 files changed

+179
-583
lines changed

22 files changed

+179
-583
lines changed

Cargo.lock

Lines changed: 22 additions & 443 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/assembler/src/pipeline/maximal_unitig_links/maximal_hash_entry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl<H: Serialize + DeserializeOwned + Copy> BucketItemSerializer
7070
type ExtraDataBuffer = ();
7171
type ReadType<'a> = MaximalHashEntry<H>;
7272

73-
type ChunkData = ();
73+
type CheckpointData = ();
7474

7575
#[inline(always)]
7676
fn new() -> Self {

crates/assembler/src/pipeline/maximal_unitig_links/maximal_unitig_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl BucketItemSerializer for MaximalUnitigLinkSerializer {
123123
type ExtraDataBuffer = ();
124124
type ReadType<'a> = MaximalUnitigLink;
125125

126-
type ChunkData = ();
126+
type CheckpointData = ();
127127

128128
#[inline(always)]
129129
fn new() -> Self {

crates/assembler/src/structs/link_mapping.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ impl BucketItemSerializer for LinkMappingSerializer {
1919
type ExtraDataBuffer = ();
2020
type ReadType<'a> = LinkMapping;
2121

22-
type ChunkData = ();
22+
type CheckpointData = ();
2323

2424
#[inline(always)]
2525
fn new() -> Self {

crates/assembler_kmers_merge/src/lib.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::final_executor::ParallelKmersMergeFinalExecutor;
22
use crate::map_processor::{ParallelKmersMergeMapProcessor, KMERGE_TEMP_DIR};
3-
use crate::preprocessor::ParallelKmersMergePreprocessor;
43
use crate::structs::{ResultsBucket, RetType};
4+
use assembler_minimizer_bucketing::rewrite_bucket::RewriteBucketComputeAssembler;
55
use assembler_minimizer_bucketing::AssemblerMinimizerBucketingExecutorFactory;
66
use colors::colors_manager::color_types::{
77
GlobalColorsTableWriter, MinimizerBucketingSeqColorDataType,
@@ -17,7 +17,9 @@ use hashes::HashFunctionFactory;
1717
use io::structs::hash_entry::HashEntry;
1818
use io::structs::hash_entry::{Direction, HashEntrySerializer};
1919
use kmers_transform::processor::KmersTransformProcessor;
20-
use kmers_transform::{KmersTransform, KmersTransformExecutorFactory};
20+
use kmers_transform::{
21+
KmersTransform, KmersTransformExecutorFactory, KmersTransformGlobalExtraData,
22+
};
2123
use minimizer_bucketing::{MinimizerBucketingCommonData, MinimizerBucketingExecutorFactory};
2224
use parallel_processor::buckets::bucket_writer::BucketItemSerializer;
2325
use parallel_processor::buckets::concurrent::BucketsThreadDispatcher;
@@ -37,7 +39,6 @@ use utils::owned_drop::OwnedDrop;
3739

3840
mod final_executor;
3941
mod map_processor;
40-
mod preprocessor;
4142
pub mod structs;
4243

4344
pub struct GlobalMergeData<CX: ColorsManager> {
@@ -55,6 +56,17 @@ pub struct GlobalMergeData<CX: ColorsManager> {
5556
kmer_batches_count: AtomicU64,
5657
}
5758

59+
impl<CX: ColorsManager> KmersTransformGlobalExtraData for GlobalMergeData<CX> {
60+
#[inline(always)]
61+
fn get_k(&self) -> usize {
62+
self.k
63+
}
64+
#[inline(always)]
65+
fn get_m(&self) -> usize {
66+
self.m
67+
}
68+
}
69+
5870
pub struct ParallelKmersMergeFactory<
5971
MH: HashFunctionFactory,
6072
CX: ColorsManager,
@@ -68,7 +80,7 @@ impl<MH: HashFunctionFactory, CX: ColorsManager, const COMPUTE_SIMPLITIGS: bool>
6880
type GlobalExtraData = GlobalMergeData<CX>;
6981
type AssociatedExtraData = MinimizerBucketingSeqColorDataType<CX>;
7082

71-
type PreprocessorType = ParallelKmersMergePreprocessor<MH, CX, COMPUTE_SIMPLITIGS>;
83+
type PreprocessorType = RewriteBucketComputeAssembler;
7284
type MapProcessorType = ParallelKmersMergeMapProcessor<MH, CX, COMPUTE_SIMPLITIGS>;
7385
type FinalExecutorType = ParallelKmersMergeFinalExecutor<MH, CX, COMPUTE_SIMPLITIGS>;
7486

@@ -82,10 +94,6 @@ impl<MH: HashFunctionFactory, CX: ColorsManager, const COMPUTE_SIMPLITIGS: bool>
8294
AssemblerMinimizerBucketingExecutorFactory::new(&global_data.global_resplit_data)
8395
}
8496

85-
fn new_preprocessor(_global_data: &Arc<Self::GlobalExtraData>) -> Self::PreprocessorType {
86-
ParallelKmersMergePreprocessor::new()
87-
}
88-
8997
fn new_map_processor(
9098
_global_data: &Arc<Self::GlobalExtraData>,
9199
mem_tracker: MemoryTracker<KmersTransformProcessor<Self>>,

crates/assembler_kmers_merge/src/preprocessor.rs

Lines changed: 0 additions & 64 deletions
This file was deleted.

crates/assembler_minimizer_bucketing/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
pub mod rewrite_bucket;
2+
13
use ::dynamic_dispatch::dynamic_dispatch;
24
use colors::colors_manager::color_types::MinimizerBucketingSeqColorDataType;
35
use colors::colors_manager::{ColorsManager, MinimizerBucketingSeqColorData};
@@ -66,6 +68,7 @@ impl<CX: ColorsManager> MinimizerBucketingExecutorFactory
6668
type StreamInfo = InputFileInfo;
6769

6870
type ColorsManager = CX;
71+
type RewriteBucketCompute = rewrite_bucket::RewriteBucketComputeAssembler;
6972

7073
#[allow(non_camel_case_types)]
7174
type FLAGS_COUNT = typenum::U2;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use config::BucketIndexType;
2+
use config::MultiplicityCounterType;
3+
use config::READ_FLAG_INCL_END;
4+
use hashes::default::MNHFactory;
5+
use hashes::ExtendableHashTraitType;
6+
use hashes::HashFunction;
7+
use hashes::{HashFunctionFactory, HashableSequence, MinimizerHashFunctionFactory};
8+
use io::compressed_read::CompressedRead;
9+
use minimizer_bucketing::resplit_bucket::RewriteBucketCompute;
10+
11+
pub struct RewriteBucketComputeAssembler;
12+
13+
impl RewriteBucketCompute for RewriteBucketComputeAssembler {
14+
fn get_rewrite_bucket<C>(
15+
k: usize,
16+
m: usize,
17+
seq_data: &(u8, u8, C, CompressedRead, MultiplicityCounterType),
18+
used_hash_bits: usize,
19+
bucket_bits_count: usize,
20+
) -> BucketIndexType {
21+
let read = &seq_data.3;
22+
let flags = seq_data.0;
23+
let decr_val = ((read.bases_count() == k) && (flags & READ_FLAG_INCL_END) == 0) as usize;
24+
25+
let hashes = MNHFactory::new(read.sub_slice((1 - decr_val)..(k - decr_val)), m);
26+
27+
let minimizer = hashes
28+
.iter()
29+
.min_by_key(|k| MNHFactory::get_full_minimizer(k.to_unextendable()))
30+
.unwrap();
31+
32+
MNHFactory::get_bucket(
33+
used_hash_bits,
34+
bucket_bits_count,
35+
minimizer.to_unextendable(),
36+
)
37+
}
38+
}

crates/dumper/src/pipeline/dumper_minimizer_bucketing.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use io::concurrent::temp_reads::extra_data::{
1111
use io::sequences_reader::{DnaSequence, DnaSequencesFileType};
1212
use io::sequences_stream::fasta::FastaFileSequencesStream;
1313
use io::sequences_stream::SequenceInfo;
14+
use minimizer_bucketing::resplit_bucket::RewriteBucketCompute;
1415
use minimizer_bucketing::{
1516
GenericMinimizerBucketing, MinimizerBucketingCommonData, MinimizerBucketingExecutor,
1617
MinimizerBucketingExecutorFactory, MinimizerInputSequence,
@@ -101,6 +102,26 @@ pub struct DumperMinimizerBucketingExecutor<CX: ColorsManager> {
101102
_phantom: PhantomData<CX>,
102103
}
103104

105+
pub struct RewriteBucketComputeDumper;
106+
107+
impl RewriteBucketCompute for RewriteBucketComputeDumper {
108+
fn get_rewrite_bucket<C>(
109+
_k: usize,
110+
_m: usize,
111+
_seq_data: &(
112+
u8,
113+
u8,
114+
C,
115+
io::compressed_read::CompressedRead,
116+
config::MultiplicityCounterType,
117+
),
118+
_used_hash_bits: usize,
119+
_bucket_bits_count: usize,
120+
) -> BucketIndexType {
121+
unimplemented!()
122+
}
123+
}
124+
104125
pub struct DumperMinimizerBucketingExecutorFactory<CX: ColorsManager>(PhantomData<CX>);
105126

106127
impl<CX: ColorsManager> MinimizerBucketingExecutorFactory
@@ -112,6 +133,7 @@ impl<CX: ColorsManager> MinimizerBucketingExecutorFactory
112133
type StreamInfo = ();
113134

114135
type ColorsManager = CX;
136+
type RewriteBucketCompute = RewriteBucketComputeDumper;
115137

116138
#[allow(non_camel_case_types)]
117139
type FLAGS_COUNT = typenum::U0;

crates/io/src/concurrent/temp_reads/creads_utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::varint::{
44
VARINT_MAX_SIZE,
55
};
66
use byteorder::ReadBytesExt;
7-
use config::MultiplicityCounterType;
7+
use config::{BucketIndexType, MultiplicityCounterType};
88
use parallel_processor::buckets::bucket_writer::BucketItemSerializer;
99
use serde::{Deserialize, Serialize};
1010
use std::io::Read;
@@ -123,8 +123,8 @@ pub struct CompressedReadsBucketDataSerializer<
123123
}
124124

125125
#[derive(Serialize, Deserialize)]
126-
pub struct ReadsChunkData {
127-
pub target_subbucket: u8,
126+
pub struct ReadsCheckpointData {
127+
pub target_subbucket: BucketIndexType,
128128
}
129129

130130
impl<
@@ -142,7 +142,7 @@ impl<
142142
type ExtraDataBuffer = E::TempBuffer;
143143
type ReadType<'b> = (u8, u8, E, CompressedRead<'b>, MultiplicityCounterType);
144144

145-
type ChunkData = ReadsChunkData;
145+
type CheckpointData = ReadsCheckpointData;
146146

147147
#[inline(always)]
148148
fn new() -> Self {

0 commit comments

Comments
 (0)