@@ -27,6 +27,19 @@ worker_manager::worker_manager(const gnb_appconfig& appcfg)
2727 is_blocking_mode_active = sdr_cfg.device_driver == " zmq" ;
2828 }
2929
30+ if (appcfg.expert_config .enable_tuned_affinity_profile ) {
31+ use_tuned_profile = true ;
32+ affinity_manager = std::make_unique<affinity_mask_manager>(appcfg.expert_config .nof_threads_per_cpu ,
33+ appcfg.expert_config .nof_cores_for_non_prio_workers );
34+ } else {
35+ affinity_manager = std::make_unique<affinity_mask_manager>();
36+ }
37+
38+ // If an OFH RU is configured, create its executors first.
39+ if (variant_holds_alternative<ru_ofh_appconfig>(appcfg.ru_cfg )) {
40+ create_ofh_executors (appcfg.cells_cfg , variant_get<ru_ofh_appconfig>(appcfg.ru_cfg ).is_downlink_parallelized );
41+ }
42+
3043 create_du_cu_executors (is_blocking_mode_active,
3144 appcfg.expert_phy_cfg .nof_ul_threads ,
3245 appcfg.expert_phy_cfg .nof_dl_threads ,
@@ -62,16 +75,26 @@ void worker_manager::create_worker(const std::string& name, Args&&... args)
6275 srsran_assert (ret.second , " Unable to create worker {}." , name);
6376}
6477
65- void worker_manager::create_worker_pool (const std::string& name,
66- size_t nof_workers,
67- size_t queue_size,
68- os_thread_realtime_priority prio)
78+ void worker_manager::create_worker_pool (const std::string& name,
79+ size_t nof_workers,
80+ size_t queue_size,
81+ os_thread_realtime_priority prio,
82+ span<const os_sched_affinity_bitmask> cpu_masks)
6983{
7084 auto ret = worker_pools.insert (
71- std::make_pair (name, std::make_unique<task_worker_pool>(nof_workers, queue_size, name, prio)));
85+ std::make_pair (name, std::make_unique<task_worker_pool>(nof_workers, queue_size, name, prio, cpu_masks )));
7286 srsran_assert (ret.second , " Unable to create worker pool {}." , name);
7387}
7488
89+ void worker_manager::create_prio_worker (const std::string& name, size_t queue_size, os_thread_realtime_priority prio)
90+ {
91+ os_sched_affinity_bitmask cpu_mask;
92+ if (use_tuned_profile) {
93+ cpu_mask = affinity_manager->reserve_cpu (name, prio);
94+ }
95+ create_worker (name, queue_size, prio, cpu_mask);
96+ }
97+
7598void worker_manager::create_du_cu_executors (bool is_blocking_mode_active,
7699 unsigned nof_ul_workers,
77100 unsigned nof_dl_workers,
@@ -80,17 +103,24 @@ void worker_manager::create_du_cu_executors(bool is_blocki
80103 unsigned pipeline_depth)
81104{
82105 // Instantiate workers
83- create_worker (" gnb_ue" , 512 );
84- gnb_ctrl_worker = std::make_unique<gnb_ctrl_worker_type>(" gnb_ctrl" ,
85- std::array<unsigned , 2 >{64 , task_worker_queue_size},
86- std::chrono::microseconds{200 },
87- os_thread_realtime_priority::max () - 20 ,
88- os_sched_affinity_bitmask{});
89- du_cell_worker = std::make_unique<du_cell_worker_type>(" du_cell" ,
106+ create_prio_worker (" gnb_ue" , 512 );
107+ os_sched_affinity_bitmask cpu_mask;
108+ if (use_tuned_profile) {
109+ cpu_mask = affinity_manager->reserve_cpu (" gnb_ctrl" , os_thread_realtime_priority::max () - 20 );
110+ }
111+ gnb_ctrl_worker = std::make_unique<du_cell_worker_type>(" gnb_ctrl" ,
112+ std::array<unsigned , 2 >{64 , task_worker_queue_size},
113+ std::chrono::microseconds{200 },
114+ os_thread_realtime_priority::max () - 20 ,
115+ cpu_mask);
116+ if (use_tuned_profile) {
117+ cpu_mask = affinity_manager->reserve_cpu (" du_cell" , os_thread_realtime_priority::max () - 2 );
118+ }
119+ du_cell_worker = std::make_unique<du_cell_worker_type>(" du_cell" ,
90120 std::array<unsigned , 2 >{8 , task_worker_queue_size},
91121 std::chrono::microseconds{10 },
92122 os_thread_realtime_priority::max () - 2 ,
93- os_sched_affinity_bitmask{} );
123+ cpu_mask );
94124
95125 // Instantiate task executors
96126 cu_cp_exec = make_priority_task_executor_ptr<task_queue_priority::min>(*gnb_ctrl_worker);
@@ -151,26 +181,33 @@ void worker_manager::create_du_low_executors(bool is_block
151181 for (unsigned cell_id = 0 , cell_end = cells_cfg.size (); cell_id != cell_end; ++cell_id) {
152182 if (is_blocking_mode_active) {
153183 if (cell_id == 0 ) {
154- create_worker (" phy_worker" , task_worker_queue_size, os_thread_realtime_priority::max ());
184+ create_prio_worker (" phy_worker" , task_worker_queue_size, os_thread_realtime_priority::max ());
155185 }
156186 task_worker& phy_worker = *workers.at (" phy_worker" );
157187 upper_pusch_exec.push_back (std::make_unique<task_worker_executor>(phy_worker));
158188 upper_pucch_exec.push_back (std::make_unique<task_worker_executor>(phy_worker));
159189 upper_prach_exec.push_back (std::make_unique<task_worker_executor>(phy_worker));
160190 du_low_dl_executors[cell_id].emplace_back (std::make_unique<task_worker_executor>(phy_worker));
161191 } else {
162- const std::string& name_ul = " up_phy_ul#" + std::to_string (cell_id);
163- create_worker_pool (name_ul, nof_ul_workers, task_worker_queue_size, os_thread_realtime_priority::max () - 20 );
192+ const std::string& name_ul = " up_phy_ul#" + std::to_string (cell_id);
193+ const auto prio = os_thread_realtime_priority::max () - 20 ;
194+ std::vector<os_sched_affinity_bitmask> cpu_masks;
195+ if (use_tuned_profile) {
196+ for (unsigned w = 0 ; w != nof_ul_workers; ++w) {
197+ cpu_masks.push_back (affinity_manager->reserve_cpu (name_ul, prio));
198+ }
199+ }
200+ create_worker_pool (name_ul, nof_ul_workers, task_worker_queue_size, prio, cpu_masks);
164201 upper_pusch_exec.push_back (std::make_unique<task_worker_pool_executor>(*worker_pools.at (name_ul)));
165202 upper_pucch_exec.push_back (std::make_unique<task_worker_pool_executor>(*worker_pools.at (name_ul)));
166203
167204 const std::string& name_prach = " phy_prach#" + std::to_string (cell_id);
168- create_worker (name_prach, task_worker_queue_size, os_thread_realtime_priority::max () - 2 );
205+ create_prio_worker (name_prach, task_worker_queue_size, os_thread_realtime_priority::max () - 2 );
169206 upper_prach_exec.push_back (std::make_unique<task_worker_executor>(*workers.at (name_prach)));
170207 for (unsigned i_dl_worker = 0 ; i_dl_worker != nof_dl_workers; ++i_dl_worker) {
171208 // Create upper PHY DL executors.
172209 std::string worker_name = " up_phy_dl#" + std::to_string (cell_id) + " #" + std::to_string (i_dl_worker);
173- create_worker (worker_name, task_worker_queue_size, os_thread_realtime_priority::max () - 10 );
210+ create_prio_worker (worker_name, task_worker_queue_size, os_thread_realtime_priority::max () - 10 );
174211 du_low_dl_executors[cell_id].emplace_back (std::make_unique<task_worker_executor>(*workers.at (worker_name)));
175212 }
176213 }
@@ -182,22 +219,27 @@ void worker_manager::create_du_low_executors(bool is_block
182219 cells_cfg[cell_id].cell .nof_antennas_dl ) /
183220 ldpc::MAX_MESSAGE_SIZE) *
184221 pipeline_depth;
185- create_worker_pool (name_pdsch, nof_pdsch_workers, max_nof_pdsch_cb_slot, os_thread_realtime_priority::max () - 10 );
222+
223+ const auto prio = os_thread_realtime_priority::max () - 10 ;
224+ std::vector<os_sched_affinity_bitmask> cpu_masks;
225+ if (use_tuned_profile) {
226+ for (unsigned w = 0 ; w != nof_pdsch_workers; ++w) {
227+ cpu_masks.push_back (affinity_manager->reserve_cpu (name_pdsch, prio));
228+ }
229+ }
230+ create_worker_pool (name_pdsch, nof_pdsch_workers, max_nof_pdsch_cb_slot, prio, cpu_masks);
186231
187232 upper_pdsch_exec[cell_id] = std::make_unique<task_worker_pool_executor>(*worker_pools.at (name_pdsch));
188233 }
189234 }
190235}
191236
192237// / Returns an affinity bitmask using the given affinity mask manager.
193- static os_sched_affinity_bitmask get_affinity_mask (affinity_mask_manager& manager, const std::string& name)
238+ static os_sched_affinity_bitmask
239+ get_affinity_mask (affinity_mask_manager& manager, const std::string& name, unsigned priority_from_max)
194240{
195- auto cpu_idx = manager.reserve_cpu_index ();
196- if (cpu_idx.is_error ()) {
197- fmt::print (" Could not set the affinity for the {} executor\n " , name);
198- }
199-
200- return (cpu_idx.has_value ()) ? os_sched_affinity_bitmask (cpu_idx.value ()) : os_sched_affinity_bitmask ();
241+ const os_thread_realtime_priority priority = os_thread_realtime_priority::max () - priority_from_max;
242+ return manager.reserve_cpu (name, priority);
201243}
202244
203245void worker_manager::create_ofh_executors (span<const cell_appconfig> cells, bool is_downlink_parallelized)
@@ -222,7 +264,7 @@ void worker_manager::create_ofh_executors(span<const cell_appconfig> cells, bool
222264 4 ,
223265 std::chrono::microseconds{0 },
224266 os_thread_realtime_priority::max () - 0 ,
225- get_affinity_mask (affinity_manager, name)));
267+ get_affinity_mask (* affinity_manager, name, 0 )));
226268 ru_timing_exec = std::make_unique<
227269 general_task_worker_executor<concurrent_queue_policy::lockfree_spsc, concurrent_queue_wait_policy::sleep>>(
228270 *ru_spsc_workers.back ());
@@ -237,7 +279,7 @@ void worker_manager::create_ofh_executors(span<const cell_appconfig> cells, bool
237279 ru_mpsc_workers.push_back (std::make_unique<ru_mpsc_worker_type>(name,
238280 task_worker_queue_size,
239281 os_thread_realtime_priority::max () - 5 ,
240- get_affinity_mask (affinity_manager, name)));
282+ get_affinity_mask (* affinity_manager, name, 5 )));
241283 ru_dl_exec[i].push_back (make_task_executor (*ru_mpsc_workers.back ()));
242284 }
243285
@@ -248,7 +290,7 @@ void worker_manager::create_ofh_executors(span<const cell_appconfig> cells, bool
248290 task_worker_queue_size,
249291 std::chrono::microseconds{1 },
250292 os_thread_realtime_priority::max () - 1 ,
251- get_affinity_mask (affinity_manager, name)));
293+ get_affinity_mask (* affinity_manager, name, 1 )));
252294 ru_tx_exec.push_back (make_task_executor (*ru_spsc_workers.back ()));
253295 }
254296
@@ -259,7 +301,7 @@ void worker_manager::create_ofh_executors(span<const cell_appconfig> cells, bool
259301 2 ,
260302 std::chrono::microseconds{1 },
261303 os_thread_realtime_priority::max () - 1 ,
262- get_affinity_mask (affinity_manager, name)));
304+ get_affinity_mask (* affinity_manager, name, 1 )));
263305 ru_rx_exec.push_back (make_task_executor (*ru_spsc_workers.back ()));
264306 }
265307 }
@@ -268,11 +310,11 @@ void worker_manager::create_ofh_executors(span<const cell_appconfig> cells, bool
268310void worker_manager::create_lower_phy_executors (lower_phy_thread_profile lower_phy_profile, unsigned nof_cells)
269311{
270312 // Radio Unit worker and executor.
271- create_worker (" radio" , task_worker_queue_size);
313+ create_prio_worker (" radio" , task_worker_queue_size);
272314 radio_exec = std::make_unique<task_worker_executor>(*workers.at (" radio" ));
273315
274316 // Radio Unit statistics worker and executor.
275- create_worker (" ru_stats_worker" , 1 );
317+ create_prio_worker (" ru_stats_worker" , 1 );
276318 ru_printer_exec = std::make_unique<task_worker_executor>(*workers.at (" ru_stats_worker" ));
277319
278320 for (unsigned cell_id = 0 ; cell_id != nof_cells; ++cell_id) {
@@ -296,7 +338,7 @@ void worker_manager::create_lower_phy_executors(lower_phy_thread_profile lower_p
296338 case lower_phy_thread_profile::single: {
297339 fmt::print (" Lower PHY in single executor mode.\n " );
298340 const std::string& name = " lower_phy#" + std::to_string (cell_id);
299- create_worker (name, 128 , os_thread_realtime_priority::max ());
341+ create_prio_worker (name, 128 , os_thread_realtime_priority::max ());
300342 task_worker& lower_phy_worker = *workers.at (name);
301343 lower_phy_tx_exec.push_back (std::make_unique<task_worker_executor>(lower_phy_worker));
302344 lower_phy_rx_exec.push_back (std::make_unique<task_worker_executor>(lower_phy_worker));
@@ -311,8 +353,8 @@ void worker_manager::create_lower_phy_executors(lower_phy_thread_profile lower_p
311353 fmt::print (" Lower PHY in dual executor mode.\n " );
312354 const std::string& name_dl = " lower_phy_dl#" + std::to_string (cell_id);
313355 const std::string& name_ul = " lower_phy_ul#" + std::to_string (cell_id);
314- create_worker (name_dl, 128 , os_thread_realtime_priority::max ());
315- create_worker (name_ul, 2 , os_thread_realtime_priority::max () - 1 );
356+ create_prio_worker (name_dl, 128 , os_thread_realtime_priority::max ());
357+ create_prio_worker (name_ul, 2 , os_thread_realtime_priority::max () - 1 );
316358 lower_phy_tx_exec.push_back (std::make_unique<task_worker_executor>(*workers.at (name_dl)));
317359 lower_phy_rx_exec.push_back (std::make_unique<task_worker_executor>(*workers.at (name_ul)));
318360 lower_phy_dl_exec.push_back (std::make_unique<task_worker_executor>(*workers.at (name_dl)));
@@ -328,10 +370,10 @@ void worker_manager::create_lower_phy_executors(lower_phy_thread_profile lower_p
328370 const std::string& name_ul = " lower_phy_ul#" + std::to_string (cell_id);
329371 const std::string& name_tx = " lower_phy_tx#" + std::to_string (cell_id);
330372 const std::string& name_rx = " lower_phy_rx#" + std::to_string (cell_id);
331- create_worker (name_tx, 128 , os_thread_realtime_priority::max ());
332- create_worker (name_rx, 1 , os_thread_realtime_priority::max () - 2 );
333- create_worker (name_dl, 128 , os_thread_realtime_priority::max () - 1 );
334- create_worker (name_ul, 128 , os_thread_realtime_priority::max () - 3 );
373+ create_prio_worker (name_tx, 128 , os_thread_realtime_priority::max ());
374+ create_prio_worker (name_rx, 1 , os_thread_realtime_priority::max () - 2 );
375+ create_prio_worker (name_dl, 128 , os_thread_realtime_priority::max () - 1 );
376+ create_prio_worker (name_ul, 128 , os_thread_realtime_priority::max () - 3 );
335377 lower_phy_tx_exec.push_back (std::make_unique<task_worker_executor>(*workers.at (name_tx)));
336378 lower_phy_rx_exec.push_back (std::make_unique<task_worker_executor>(*workers.at (name_rx)));
337379 lower_phy_dl_exec.push_back (std::make_unique<task_worker_executor>(*workers.at (name_dl)));
@@ -348,8 +390,6 @@ void worker_manager::create_lower_phy_executors(lower_phy_thread_profile lower_p
348390void worker_manager::create_ru_executors (const gnb_appconfig& appcfg)
349391{
350392 if (variant_holds_alternative<ru_ofh_appconfig>(appcfg.ru_cfg )) {
351- create_ofh_executors (appcfg.cells_cfg , variant_get<ru_ofh_appconfig>(appcfg.ru_cfg ).is_downlink_parallelized );
352-
353393 return ;
354394 }
355395
0 commit comments