@@ -32,9 +32,13 @@ use subspace_farmer::single_disk_farm::{
3232 SingleDiskFarm , SingleDiskFarmError , SingleDiskFarmId , SingleDiskFarmInfo ,
3333 SingleDiskFarmOptions , SingleDiskFarmSummary ,
3434} ;
35+ use subspace_farmer:: thread_pool_manager:: PlottingThreadPoolManager ;
3536use subspace_farmer:: utils:: farmer_piece_getter:: FarmerPieceGetter ;
3637use subspace_farmer:: utils:: piece_validator:: SegmentCommitmentPieceValidator ;
3738use subspace_farmer:: utils:: readers_and_pieces:: ReadersAndPieces ;
39+ use subspace_farmer:: utils:: {
40+ all_cpu_cores, create_plotting_thread_pool_manager, thread_pool_core_indices,
41+ } ;
3842use subspace_farmer:: { Identity , KNOWN_PEERS_CACHE_SIZE } ;
3943use subspace_farmer_components:: plotting:: PlottedSector ;
4044use subspace_farmer_components:: sector:: { sector_size, SectorMetadataChecksummed } ;
@@ -43,7 +47,7 @@ use subspace_networking::utils::multihash::ToMultihash;
4347use subspace_networking:: KnownPeersManager ;
4448use subspace_rpc_primitives:: { FarmerAppInfo , SolutionResponse } ;
4549use tokio:: sync:: { mpsc, oneshot, watch, Mutex , Semaphore } ;
46- use tracing:: { debug, error, warn} ;
50+ use tracing:: { debug, error, info , warn} ;
4751use tracing_futures:: Instrument ;
4852
4953/// Description of the farm
@@ -77,26 +81,10 @@ mod builder {
7781 use sdk_traits:: Node ;
7882 use sdk_utils:: { ByteSize , PublicKey } ;
7983 use serde:: { Deserialize , Serialize } ;
80- use tracing:: warn;
8184
8285 use super :: BuildError ;
8386 use crate :: { FarmDescription , Farmer } ;
8487
85- fn available_parallelism ( ) -> usize {
86- match std:: thread:: available_parallelism ( ) {
87- Ok ( parallelism) => parallelism. get ( ) ,
88- Err ( error) => {
89- warn ! (
90- %error,
91- "Unable to identify available parallelism, you might want to configure thread pool sizes with CLI \
92- options manually"
93- ) ;
94-
95- 0
96- }
97- }
98- }
99-
10088 #[ derive(
10189 Debug ,
10290 Clone ,
@@ -175,38 +163,36 @@ mod builder {
175163 pub max_pieces_in_sector : Option < u16 > ,
176164 /// Size of PER FARM thread pool used for farming (mostly for blocking
177165 /// I/O, but also for some compute-intensive operations during
178- /// proving), defaults to number of CPU cores available in
179- /// the system
180- #[ builder( default = "available_parallelism()" ) ]
181- pub farming_thread_pool_size : usize ,
182- /// Size of PER FARM thread pool used for plotting, defaults to number
183- /// of CPU cores available in the system.
166+ /// proving), defaults to number of logical CPUs
167+ /// available on UMA system and number of logical CPUs in
168+ /// first NUMA node on NUMA system.
169+ #[ builder( default ) ]
170+ pub farming_thread_pool_size : Option < NonZeroUsize > ,
171+ /// Size of one thread pool used for plotting, defaults to number of
172+ /// logical CPUs available on UMA system and number of logical
173+ /// CPUs available in NUMA node on NUMA system.
184174 ///
185- /// NOTE: The fact that this parameter is per farm doesn't mean farmer
186- /// will plot multiple sectors concurrently, see
187- /// `sector_downloading_concurrency` and
188- /// `sector_encoding_concurrency` options.
189- #[ builder( default = "available_parallelism()" ) ]
190- pub plotting_thread_pool_size : usize ,
191- /// Size of PER FARM thread pool used for replotting, typically smaller
192- /// pool than for plotting to not affect farming as much,
193- /// defaults to half of the number of CPU cores available in the
194- /// system.
175+ /// Number of thread pools is defined by `--sector-encoding-concurrency`
176+ /// option, different thread pools might have different number
177+ /// of threads if NUMA nodes do not have the same size.
195178 ///
196- /// NOTE: The fact that this parameter is per farm doesn't mean farmer
197- /// will replot multiple sectors concurrently, see
198- /// `sector-downloading-concurrency` and
199- /// `sector-encoding-concurrency` options.
200- #[ builder( default = "available_parallelism() / 2" ) ]
201- pub replotting_thread_pool_size : usize ,
202- /// Sector downloading concurrency
203- #[ builder( default = "NonZeroUsize::new(2).expect(\" 2 > 0\" )" ) ]
204- #[ derivative( Default ( value = "NonZeroUsize::new(2).expect(\" 2 > 0\" )" ) ) ]
205- pub sector_downloading_concurrency : NonZeroUsize ,
206- /// Sector encoding concurrency
207- #[ builder( default = "NonZeroUsize::new(1).expect(\" 1 > 0\" )" ) ]
208- #[ derivative( Default ( value = "NonZeroUsize::new(1).expect(\" 1 > 0\" )" ) ) ]
209- pub sector_encoding_concurrency : NonZeroUsize ,
179+ /// Threads will be pinned to corresponding CPU cores at creation.
180+ #[ builder( default ) ]
181+ pub plotting_thread_pool_size : Option < NonZeroUsize > ,
182+ /// the plotting process, defaults to `--sector-downloading-concurrency`
183+ /// + 1 to download future sector ahead of time
184+ #[ builder( default ) ]
185+ pub sector_downloading_concurrency : Option < NonZeroUsize > ,
186+ /// Defines how many sectors farmer will encode concurrently, defaults
187+ /// to 1 on UMA system and number of NUMA nodes on NUMA system.
188+ /// It is further restricted by `sector_downloading_concurrency`
189+ /// and setting this option higher than
190+ /// `sector_downloading_concurrency` will have no effect.
191+ #[ builder( default ) ]
192+ pub sector_encoding_concurrency : Option < NonZeroUsize > ,
193+ /// Threads will be pinned to corresponding CPU cores at creation.
194+ #[ builder( default ) ]
195+ pub replotting_thread_pool_size : Option < NonZeroUsize > ,
210196 }
211197
212198 impl Builder {
@@ -484,8 +470,65 @@ impl Config {
484470 } ;
485471
486472 let mut plotting_delay_senders = Vec :: with_capacity ( farms. len ( ) ) ;
487- let downloading_semaphore = Arc :: new ( Semaphore :: new ( sector_downloading_concurrency. get ( ) ) ) ;
488- let encoding_semaphore = Arc :: new ( Semaphore :: new ( sector_encoding_concurrency. get ( ) ) ) ;
473+
474+ let plotting_thread_pool_core_indices =
475+ thread_pool_core_indices ( plotting_thread_pool_size, sector_encoding_concurrency) ;
476+ let replotting_thread_pool_core_indices = {
477+ let mut replotting_thread_pool_core_indices =
478+ thread_pool_core_indices ( replotting_thread_pool_size, sector_encoding_concurrency) ;
479+ if replotting_thread_pool_size. is_none ( ) {
480+ // The default behavior is to use all CPU cores, but for replotting we just want
481+ // half
482+ replotting_thread_pool_core_indices
483+ . iter_mut ( )
484+ . for_each ( |set| set. truncate ( set. cpu_cores ( ) . len ( ) / 2 ) ) ;
485+ }
486+ replotting_thread_pool_core_indices
487+ } ;
488+
489+ let downloading_semaphore = Arc :: new ( Semaphore :: new (
490+ sector_downloading_concurrency
491+ . map ( |sector_downloading_concurrency| sector_downloading_concurrency. get ( ) )
492+ . unwrap_or ( plotting_thread_pool_core_indices. len ( ) + 1 ) ,
493+ ) ) ;
494+
495+ let all_cpu_cores = all_cpu_cores ( ) ;
496+ let plotting_thread_pool_manager = create_plotting_thread_pool_manager (
497+ plotting_thread_pool_core_indices. into_iter ( ) . zip ( replotting_thread_pool_core_indices) ,
498+ ) ?;
499+ let farming_thread_pool_size = farming_thread_pool_size
500+ . map ( |farming_thread_pool_size| farming_thread_pool_size. get ( ) )
501+ . unwrap_or_else ( || {
502+ all_cpu_cores
503+ . first ( )
504+ . expect ( "Not empty according to function description; qed" )
505+ . cpu_cores ( )
506+ . len ( )
507+ } ) ;
508+
509+ if all_cpu_cores. len ( ) > 1 {
510+ info ! ( numa_nodes = %all_cpu_cores. len( ) , "NUMA system detected" ) ;
511+
512+ if all_cpu_cores. len ( ) > farms. len ( ) {
513+ warn ! (
514+ numa_nodes = %all_cpu_cores. len( ) ,
515+ farms_count = %farms. len( ) ,
516+ "Too few disk farms, CPU will not be utilized fully during plotting, same number of farms as NUMA \
517+ nodes or more is recommended"
518+ ) ;
519+ }
520+ }
521+
522+ // TODO: Remove code or environment variable once identified whether it helps or
523+ // not
524+ if std:: env:: var ( "NUMA_ALLOCATOR" ) . is_ok ( ) && all_cpu_cores. len ( ) > 1 {
525+ unsafe {
526+ libmimalloc_sys:: mi_option_set (
527+ libmimalloc_sys:: mi_option_use_numa_nodes,
528+ all_cpu_cores. len ( ) as std:: ffi:: c_long ,
529+ ) ;
530+ }
531+ }
489532
490533 for ( disk_farm_idx, description) in farms. iter ( ) . enumerate ( ) {
491534 let ( plotting_delay_sender, plotting_delay_receiver) =
@@ -503,11 +546,9 @@ impl Config {
503546 kzg : kzg. clone ( ) ,
504547 erasure_coding : erasure_coding. clone ( ) ,
505548 farming_thread_pool_size,
506- plotting_thread_pool_size,
507- replotting_thread_pool_size,
508549 plotting_delay : Some ( plotting_delay_receiver) ,
509550 downloading_semaphore : Arc :: clone ( & downloading_semaphore) ,
510- encoding_semaphore : Arc :: clone ( & encoding_semaphore ) ,
551+ plotting_thread_pool_manager : plotting_thread_pool_manager . clone ( ) ,
511552 } )
512553 . await ?;
513554
@@ -835,11 +876,9 @@ struct FarmOptions<'a, PG, N: sdk_traits::Node> {
835876 pub erasure_coding : ErasureCoding ,
836877 pub max_pieces_in_sector : u16 ,
837878 pub farming_thread_pool_size : usize ,
838- pub plotting_thread_pool_size : usize ,
839- pub replotting_thread_pool_size : usize ,
840879 pub plotting_delay : Option < futures:: channel:: oneshot:: Receiver < ( ) > > ,
841880 pub downloading_semaphore : Arc < Semaphore > ,
842- pub encoding_semaphore : Arc < Semaphore > ,
881+ pub plotting_thread_pool_manager : PlottingThreadPoolManager ,
843882}
844883
845884impl < T : subspace_proof_of_space:: Table > Farm < T > {
@@ -855,11 +894,9 @@ impl<T: subspace_proof_of_space::Table> Farm<T> {
855894 erasure_coding,
856895 max_pieces_in_sector,
857896 farming_thread_pool_size,
858- plotting_thread_pool_size,
859- replotting_thread_pool_size,
860897 plotting_delay,
861898 downloading_semaphore,
862- encoding_semaphore ,
899+ plotting_thread_pool_manager ,
863900 } : FarmOptions <
864901 ' _ ,
865902 impl subspace_farmer_components:: plotting:: PieceGetter + Clone + Send + Sync + ' static ,
@@ -884,11 +921,9 @@ impl<T: subspace_proof_of_space::Table> Farm<T> {
884921 piece_getter,
885922 cache_percentage,
886923 downloading_semaphore,
887- encoding_semaphore,
888924 farm_during_initial_plotting : false ,
889925 farming_thread_pool_size,
890- plotting_thread_pool_size,
891- replotting_thread_pool_size,
926+ plotting_thread_pool_manager,
892927 plotting_delay,
893928 } ;
894929 let single_disk_farm_fut = SingleDiskFarm :: new :: < _ , _ , T > ( description, disk_farm_idx) ;
0 commit comments