Skip to content

Commit 8b3bb04

Browse files
committed
Add WIP priority scheduler
1 parent f7fab47 commit 8b3bb04

File tree

15 files changed

+121
-50
lines changed

15 files changed

+121
-50
lines changed

crates/api/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use parallel_processor::enable_counters_logging;
1616
use parallel_processor::memory_data_size::MemoryDataSize;
1717
use parallel_processor::memory_fs::MemoryFs;
1818
use parallel_processor::phase_times_monitor::PHASES_TIMES_MONITOR;
19+
use parallel_processor::scheduler::PriorityScheduler;
1920
use parking_lot::Mutex;
2021
use std::cmp::max;
2122
use std::fs::create_dir_all;
@@ -233,7 +234,7 @@ impl GGCATInstance {
233234

234235
disk_optimization_level: u32,
235236
) -> anyhow::Result<PathBuf> {
236-
// PriorityScheduler::set_max_threads_count(threads_count);
237+
PriorityScheduler::set_max_threads_count(threads_count);
237238

238239
let merging_hash_dispatch = utils::get_hash_static_id(
239240
debug::DEBUG_HASH_TYPE.lock().clone(),
@@ -334,7 +335,7 @@ impl GGCATInstance {
334335
// Query output format
335336
color_output_format: ColoredQueryOutputFormat,
336337
) -> anyhow::Result<PathBuf> {
337-
// PriorityScheduler::set_max_threads_count(threads_count);
338+
PriorityScheduler::set_max_threads_count(threads_count);
338339

339340
let merging_hash_dispatch = utils::get_hash_static_id(
340341
debug::DEBUG_HASH_TYPE.lock().clone(),
@@ -436,7 +437,7 @@ impl GGCATInstance {
436437
single_thread_output_function: bool,
437438
output_function: impl Fn(&[u8], &[ColorIndexType], bool) + Send + Sync,
438439
) -> anyhow::Result<()> {
439-
// PriorityScheduler::set_max_threads_count(threads_count);
440+
PriorityScheduler::set_max_threads_count(threads_count);
440441

441442
let temp_dir = create_tempdir(self.0.temp_dir.clone());
442443

crates/assembler/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use colors::colors_manager::ColorsMergeManager;
1414
use config::{
1515
get_compression_level_info, get_memory_mode, SwapPriority, DEFAULT_PER_CPU_BUFFER_SIZE,
1616
INTERMEDIATE_COMPRESSION_LEVEL_FAST, INTERMEDIATE_COMPRESSION_LEVEL_SLOW, KEEP_FILES,
17-
MAXIMUM_SECOND_BUCKETS_LOG, MINIMUM_LOG_DELTA_TIME,
17+
MAXIMUM_SECOND_BUCKETS_LOG, MINIMUM_LOG_DELTA_TIME, PRIORITY_SCHEDULING_BASE,
1818
};
1919
use hashes::HashFunctionFactory;
2020
use io::concurrent::structured_sequences::binary::StructSeqBinaryWriter;
@@ -33,6 +33,7 @@ use parallel_processor::buckets::MultiThreadBuckets;
3333
use parallel_processor::memory_data_size::MemoryDataSize;
3434
use parallel_processor::memory_fs::{MemoryFs, RemoveFileMode};
3535
use parallel_processor::phase_times_monitor::PHASES_TIMES_MONITOR;
36+
use parallel_processor::scheduler::PriorityScheduler;
3637
use parallel_processor::utils::scoped_thread_local::ScopedThreadLocal;
3738
use pipeline::eulertigs::build_eulertigs;
3839
use std::fs::remove_file;
@@ -187,13 +188,15 @@ pub fn run_assembler<
187188
buckets.par_iter().enumerate().for_each(|(index, bucket)| {
188189
ggcat_logging::info!("Stats for bucket index: {}", index);
189190
for chunk in &bucket.chunks {
191+
let thread_handle = PriorityScheduler::declare_thread(PRIORITY_SCHEDULING_BASE);
190192
kmers_transform::debug_bucket_stats::compute_stats_for_bucket::<MergingHash>(
191193
chunk.clone(),
192194
index,
193195
buckets.len(),
194196
MAXIMUM_SECOND_BUCKETS_LOG,
195197
k,
196198
m,
199+
&thread_handle,
197200
);
198201
}
199202
});

crates/cmdline/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ ggcat-logging = { version = "1.1.1", path = "../logging" }
6565
[features]
6666
mem-analysis = ["parallel-processor/track-usage"]
6767
no-stats = ["parallel-processor/no-stats"]
68-
process-stats = ["parallel-processor/process-stats"]
68+
process-stats = ["parallel-processor/process-stats", "ggcat-logging/stats"]
6969
tracing = ["instrumenter/enabled"]
7070
devel-build = ["assembler/devel-build", "querier/devel-build"]
7171
kmer-counters = ["assembler/support_kmer_counters"]

crates/config/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ pub const QUERIES_COUNT_MIN_BATCH: u64 = 1000;
7676
pub const DEFAULT_COMPACTION_MAP_SUBBUCKET_ELEMENTS: usize = 512;
7777
pub const MAX_COMPACTION_MAP_SUBBUCKET_ELEMENTS: usize = 1024 * 4;
7878

79+
pub const PRIORITY_SCHEDULING_HIGH: usize = 0;
80+
pub const PRIORITY_SCHEDULING_BASE: usize = 1;
81+
pub const PRIORITY_SCHEDULING_LOW: usize = 2;
82+
7983
pub struct SwapPriority {}
8084
#[allow(non_upper_case_globals)]
8185
impl SwapPriority {

crates/io/src/concurrent/temp_reads/creads_utils.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ pub mod helpers {
276276
$allowed_passtrough:expr,
277277
|$passtrough_info:ident| $p:expr,
278278
|$checkpoint_data:ident| $c:expr,
279-
|$data: ident, $extra_buffer: ident| $f:expr
279+
|$data: ident, $extra_buffer: ident| $f:expr,
280+
$thread_handle:ident
280281
);
281282

282283
) => {
@@ -297,7 +298,7 @@ pub mod helpers {
297298
$FlagsCount,
298299
$BucketMode,
299300
WithMultiplicity,
300-
>>(read_thread, Vec::new(), <$E>::new_temp_buffer(), $allowed_passtrough);
301+
>>(read_thread, Vec::new(), <$E>::new_temp_buffer(), $allowed_passtrough, &$thread_handle);
301302
while let Some(checkpoint) = items.get_next_checkpoint_extended() {
302303
match checkpoint {
303304
AsyncBinaryReaderIteratorData::Stream(items, $checkpoint_data) => {
@@ -324,7 +325,7 @@ pub mod helpers {
324325
$FlagsCount,
325326
$BucketMode,
326327
NoMultiplicity,
327-
>>(read_thread, Vec::new(), <$E>::new_temp_buffer(), $allowed_passtrough);
328+
>>(read_thread, Vec::new(), <$E>::new_temp_buffer(), $allowed_passtrough, &$thread_handle);
328329
while let Some(checkpoint) = items.get_next_checkpoint_extended() {
329330
match checkpoint {
330331
AsyncBinaryReaderIteratorData::Stream(items, $checkpoint_data) => {

crates/kmers_transform/src/debug_bucket_stats.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use parallel_processor::buckets::readers::async_binary_reader::{
1515
AllowedCheckpointStrategy, AsyncBinaryReader, AsyncReaderThread,
1616
};
1717
use parallel_processor::memory_fs::RemoveFileMode;
18+
use parallel_processor::scheduler::ThreadPriorityHandle;
1819
use std::collections::HashSet;
1920
use std::path::PathBuf;
2021

@@ -50,6 +51,7 @@ pub fn compute_stats_for_bucket<MH: HashFunctionFactory>(
5051
second_buckets_log_max: usize,
5152
k: usize,
5253
m: usize,
54+
thread_handle: &ThreadPriorityHandle,
5355
) {
5456
let reader = AsyncBinaryReader::new(
5557
&bucket,
@@ -78,6 +80,7 @@ pub fn compute_stats_for_bucket<MH: HashFunctionFactory>(
7880
Vec::new(),
7981
(),
8082
AllowedCheckpointStrategy::DecompressOnly,
83+
thread_handle,
8184
);
8285

8386
let mut total_counters = vec![0; second_buckets_max];

crates/kmers_transform/src/processor.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ use crate::{
33
KmersTransformContext, KmersTransformExecutorFactory, KmersTransformFinalExecutor,
44
KmersTransformMapProcessor,
55
};
6-
use config::WORKERS_PRIORITY_BASE;
6+
use config::{PRIORITY_SCHEDULING_HIGH, WORKERS_PRIORITY_BASE};
77
use parallel_processor::execution_manager::executor::{AsyncExecutor, ExecutorReceiver};
88
use parallel_processor::execution_manager::memory_tracker::MemoryTracker;
99
use parallel_processor::execution_manager::objects_pool::PoolObjectTrait;
1010
use parallel_processor::execution_manager::packet::{Packet, PacketTrait};
1111
use parallel_processor::mt_debug_counters::counter::{AtomicCounter, SumMode};
1212
use parallel_processor::mt_debug_counters::declare_counter_i64;
13+
use parallel_processor::scheduler::PriorityScheduler;
1314
use std::future::Future;
1415
use std::marker::PhantomData;
1516
use std::path::PathBuf;
@@ -57,9 +58,11 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformProcessor
5758
<F::MapProcessorType as KmersTransformMapProcessor<F>>::MapStruct::allocate_new(&()),
5859
);
5960

61+
let thread_handle = PriorityScheduler::declare_thread(PRIORITY_SCHEDULING_HIGH);
62+
6063
while let Ok((address, proc_info)) = track!(
6164
receiver
62-
.obtain_address_with_priority(WORKERS_PRIORITY_BASE)
65+
.obtain_address_with_priority(WORKERS_PRIORITY_BASE, &thread_handle)
6366
.await,
6467
ADDR_WAITING_COUNTER
6568
) {
@@ -69,9 +72,10 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformProcessor
6972
let mut total_kmers = 0;
7073
let mut unique_kmers = 0;
7174

72-
while let Some(input_packet) =
73-
track!(address.receive_packet().await, PACKET_WAITING_COUNTER)
74-
{
75+
while let Some(input_packet) = track!(
76+
address.receive_packet(&thread_handle).await,
77+
PACKET_WAITING_COUNTER
78+
) {
7579
real_size += input_packet.reads.len() as usize;
7680
let stats = map_processor.process_group_batch_sequences(
7781
&global_context.global_extra_data,

crates/kmers_transform/src/reader.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use config::{
1010
DEFAULT_PER_CPU_BUFFER_SIZE, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES,
1111
MAXIMUM_JIT_PROCESSED_BUCKETS, MAX_INTERMEDIATE_MAP_SIZE, MIN_BUCKET_CHUNKS_FOR_READING_THREAD,
1212
PACKETS_PRIORITY_DEFAULT, PACKETS_PRIORITY_REWRITTEN, PARTIAL_VECS_CHECKPOINT_SIZE,
13-
USE_SECOND_BUCKET, WORKERS_PRIORITY_BASE,
13+
PRIORITY_SCHEDULING_BASE, PRIORITY_SCHEDULING_LOW, USE_SECOND_BUCKET, WORKERS_PRIORITY_BASE,
1414
};
1515
use instrumenter::local_setup_instrumenter;
1616
use io::compressed_read::CompressedReadIndipendent;
@@ -39,6 +39,7 @@ use parallel_processor::execution_manager::packet::{Packet, PacketTrait, Packets
3939
use parallel_processor::memory_fs::RemoveFileMode;
4040
use parallel_processor::mt_debug_counters::counter::{AtomicCounter, SumMode};
4141
use parallel_processor::mt_debug_counters::declare_counter_i64;
42+
use parallel_processor::scheduler::{PriorityScheduler, ThreadPriorityHandle};
4243
use parallel_processor::utils::replace_with_async::replace_with_async;
4344
use std::cmp::{max, min, Reverse};
4445
use std::collections::{BinaryHeap, VecDeque};
@@ -58,6 +59,7 @@ pub(crate) struct KmersTransformReader<F: KmersTransformExecutorFactory> {
5859
pub struct InputBucketDesc {
5960
pub(crate) paths: Vec<PathBuf>,
6061
pub(crate) sub_bucket_counters: Vec<BucketCounter>,
62+
#[allow(dead_code)]
6163
pub(crate) compaction_delta: i64,
6264
pub(crate) out_data_format: MinimizerBucketMode,
6365
pub(crate) resplitted: bool,
@@ -168,11 +170,6 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
168170
})
169171
.collect();
170172

171-
let compaction_ratio = sequences_count
172-
.saturating_add_signed(file.compaction_delta)
173-
.max(16) as f64
174-
/ sequences_count as f64;
175-
176173
let total_file_size = readers.iter().map(|r| r.get_file_size()).sum();
177174

178175
bucket_sizes.make_contiguous().sort();
@@ -202,7 +199,7 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
202199

203200
let is_outlier = !file.resplitted
204201
&& (total_sequences > 0)
205-
&& (biggest_sub_bucket.0.count as f64 * unique_estimator_factor * compaction_ratio
202+
&& (biggest_sub_bucket.0.count as f64 * unique_estimator_factor
206203
>= (MAX_INTERMEDIATE_MAP_SIZE / F::MapProcessorType::MAP_SIZE as u64) as f64);
207204

208205
// if is_outlier {
@@ -398,6 +395,7 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
398395
bucket_info: &BucketsInfo,
399396
async_reader_thread: Arc<AsyncReaderThread>,
400397
packets_pool: Arc<PoolObject<PacketsPool<ReadsBuffer<F::AssociatedExtraData>>>>,
398+
thread_handle: &ThreadPriorityHandle,
401399
) {
402400
if bucket_info.readers.iter().all(|r| r.is_finished()) {
403401
return;
@@ -494,9 +492,9 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
494492
if buffers[bucket].reads.len() == buffers[bucket].reads.capacity() {
495493
match &bucket_info.addresses[bucket] {
496494
AddressMode::Send(address) => {
497-
replace_with_async(&mut buffers[bucket], |mut buffer| async move {
495+
replace_with_async(&mut buffers[bucket], |mut buffer| async {
498496
buffer.sub_bucket = bucket;
499-
ops.packet_send(address.clone(), buffer);
497+
ops.packet_send(address.clone(), buffer, thread_handle);
500498
track!(packets_pool.alloc_packet().await, PACKET_ALLOC_COUNTER)
501499
})
502500
.await;
@@ -512,7 +510,8 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
512510
}
513511
}
514512
F::AssociatedExtraData::clear_temp_buffer(extra_buffer);
515-
}
513+
},
514+
thread_handle
516515
);
517516
}
518517
}
@@ -526,7 +525,7 @@ impl<F: KmersTransformExecutorFactory> KmersTransformReader<F> {
526525
packet.sub_bucket = bucket;
527526
match address {
528527
AddressMode::Send(address) => {
529-
ops.packet_send(address.clone(), packet);
528+
ops.packet_send(address.clone(), packet, &thread_handle);
530529
}
531530
AddressMode::Rewrite(writer, seq_count, _) => {
532531
Self::flush_rewrite_bucket::<MultiplicityModeFromBoolean<WITH_MULTIPLICITY>>(
@@ -563,14 +562,16 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformReader<F>
563562
async move {
564563
let mut async_threads = Vec::new();
565564

565+
let thread_handle = PriorityScheduler::declare_thread(PRIORITY_SCHEDULING_BASE);
566+
566567
while let Ok((address, _)) = track!(
567568
receiver
568-
.obtain_address_with_priority(WORKERS_PRIORITY_BASE)
569+
.obtain_address_with_priority(WORKERS_PRIORITY_BASE, &thread_handle)
569570
.await,
570571
ADDR_WAITING_COUNTER
571572
) {
572573
let file = track!(
573-
address.receive_packet().await.unwrap(),
574+
address.receive_packet(&thread_handle).await.unwrap(),
574575
PACKET_WAITING_COUNTER
575576
);
576577
let is_main_bucket = !file.resplitted && !file.rewritten;
@@ -604,13 +605,18 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformReader<F>
604605
let address = &address;
605606
let buckets_info = &buckets_info;
606607
let packets_pool = address
607-
.pool_alloc_await(max(
608-
global_context.max_buckets / 2,
609-
2 * buckets_info.addresses.len(),
610-
))
608+
.pool_alloc_await(
609+
max(
610+
global_context.max_buckets / 2,
611+
2 * buckets_info.addresses.len(),
612+
),
613+
&thread_handle,
614+
)
611615
.await;
612616

613617
spawner.spawn_executor(async move {
618+
let thread_handle =
619+
PriorityScheduler::declare_thread(PRIORITY_SCHEDULING_LOW);
614620
match buckets_info.data_format {
615621
MinimizerBucketMode::Single => {
616622
Self::read_bucket::<false>(
@@ -619,6 +625,7 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformReader<F>
619625
buckets_info,
620626
async_thread,
621627
packets_pool,
628+
&thread_handle,
622629
)
623630
.await;
624631
}
@@ -629,6 +636,7 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformReader<F>
629636
buckets_info,
630637
async_thread,
631638
packets_pool,
639+
&thread_handle,
632640
)
633641
.await;
634642
}
@@ -695,7 +703,7 @@ impl<F: KmersTransformExecutorFactory> AsyncExecutor for KmersTransformReader<F>
695703
}
696704

697705
assert!(track!(
698-
address.receive_packet().await.is_none(),
706+
address.receive_packet(&thread_handle).await.is_none(),
699707
PACKET_WAITING_COUNTER
700708
));
701709
}

0 commit comments

Comments
 (0)