1313#include " srsran/adt/detail/concurrent_queue_params.h"
1414#include " srsran/adt/moodycamel_mpmc_queue.h"
1515#include " srsran/srslog/srslog.h"
16- #include " srsran/support/cpu_architecture_info.h"
1716#include " srsran/support/executors/detail/task_executor_utils.h"
1817#include " srsran/support/executors/task_executor.h"
1918
2019namespace srsran {
20+
21+ template <typename ExecType>
22+ class task_fork_limiter ;
23+
24+ // / Executor of a task_fork_limiter with a specific priority level.
25+ template <typename ExecType>
26+ class task_fork_limiter_executor final : public task_executor
27+ {
28+ public:
29+ task_fork_limiter_executor (task_fork_limiter<ExecType>& fork_limiter_, enqueue_priority prio_) :
30+ fork_limiter (&fork_limiter_), prio(prio_)
31+ {
32+ }
33+
34+ [[nodiscard]] bool execute (unique_task task) override { return fork_limiter->execute (prio, std::move (task)); }
35+ [[nodiscard]] bool defer (unique_task task) override { return fork_limiter->defer (prio, std::move (task)); }
36+
37+ private:
38+ task_fork_limiter<ExecType>* fork_limiter = nullptr ;
39+ enqueue_priority prio;
40+ };
41+
2142template <typename ExecType>
2243class task_fork_limiter final : public task_executor
2344{
2445public:
46+ using executor_type = task_fork_limiter_executor<ExecType>;
47+
48+ template <typename E = ExecType>
49+ task_fork_limiter (E&& exec,
50+ unsigned max_forks_,
51+ span<const unsigned > qsizes_,
52+ unsigned max_batch_ = std::numeric_limits<unsigned >::max()) :
53+ max_forks (max_forks_), max_batch(max_batch_), out_exec(std::forward<E>(exec))
54+ {
55+ report_fatal_error_if_not (max_forks > 0 , " Fork limit executor must have a positive max_forks value." );
56+ report_fatal_error_if_not (max_batch > 0 , " Fork limit executor must have a positive max_batch value." );
57+ report_fatal_error_if_not (not qsizes_.empty (), " Fork limit executor must have at least one queue." );
58+ queues.reserve (qsizes_.size ());
59+ exec_list.reserve (qsizes_.size ());
60+ for (const auto & qsize : qsizes_) {
61+ report_fatal_error_if_not (qsize > 0 , " Fork limit executor must have a positive queue size." );
62+ queues.emplace_back (qsize, std::thread::hardware_concurrency ());
63+ enqueue_priority prio = enqueue_priority::max - exec_list.size ();
64+ exec_list.emplace_back (*this , prio);
65+ }
66+ }
2567 template <typename E = ExecType>
2668 task_fork_limiter (E&& exec,
2769 unsigned max_forks_,
2870 unsigned qsize_,
2971 unsigned max_batch_ = std::numeric_limits<unsigned >::max()) :
30- max_forks (max_forks_),
31- max_batch (max_batch_),
32- out_exec (std::forward<E>(exec)),
33- queue (qsize_, cpu_architecture_info::get().get_host_nof_available_cpus())
72+ task_fork_limiter (std::forward<E>(exec), max_forks_, std::array<unsigned , 1 >{qsize_}, max_batch_)
3473 {
35- report_fatal_error_if_not (max_forks > 0 , " Fork limit executor must have a positive max_forks value." );
36- report_fatal_error_if_not (max_batch > 0 , " Fork limit executor must have a positive max_batch value." );
3774 }
3875
39- [[nodiscard]] bool execute (unique_task task) override { return dispatch<true >(std::move (task)); }
76+ [[nodiscard]] bool execute (enqueue_priority prio, unique_task task) { return dispatch<true >(prio, std::move (task)); }
77+ [[nodiscard]] bool defer (enqueue_priority prio, unique_task task) { return dispatch<false >(prio, std::move (task)); }
78+
79+ [[nodiscard]] bool execute (unique_task task) override
80+ {
81+ return dispatch<true >(enqueue_priority::min, std::move (task));
82+ }
83+
84+ [[nodiscard]] bool defer (unique_task task) override
85+ {
86+ return dispatch<false >(enqueue_priority::min, std::move (task));
87+ }
4088
41- [[nodiscard]] bool defer (unique_task task) override { return dispatch<false >(std::move (task)); }
89+ // / Retrieves the executors of different priority levels provided by this task fork limiter. The first executor in the
90+ // / returned span is the one with the highest priority.
91+ span<executor_type> get_executors () { return exec_list; }
4292
4393private:
4494 // / Value representative of one single fork in the state.
4595 static constexpr uint64_t one_fork_mask = static_cast <uint64_t >(1U ) << 32U ;
96+ using queue_type = concurrent_queue<unique_task,
97+ concurrent_queue_policy::moodycamel_lockfree_mpmc,
98+ concurrent_queue_wait_policy::non_blocking>;
4699
47100 template <bool Execute>
48- bool dispatch (unique_task task)
101+ bool dispatch (enqueue_priority prio, unique_task task)
49102 {
103+ size_t queue_index =
104+ std::min (static_cast <size_t >(enqueue_priority::max) - static_cast <size_t >(prio), queues.size () - 1 );
105+
50106 // Save task to process in internal queue.
51- if (not queue .try_push (std::move (task))) {
107+ if (not queues[queue_index] .try_push (std::move (task))) {
52108 return false ;
53109 }
54110
@@ -61,9 +117,9 @@ class task_fork_limiter final : public task_executor
61117
62118 bool dispatch_successful = false ;
63119 if constexpr (Execute) {
64- dispatch_successful = detail::get_task_executor_ref (out_exec).execute ([this ]() { handle_fork_tasks (); });
120+ dispatch_successful = detail::get_task_executor_ref (out_exec).execute ([this ]() { handle_fork_tasks (false ); });
65121 } else {
66- dispatch_successful = detail::get_task_executor_ref (out_exec).defer ([this ]() { handle_fork_tasks (); });
122+ dispatch_successful = detail::get_task_executor_ref (out_exec).defer ([this ]() { handle_fork_tasks (false ); });
67123 }
68124 if (not dispatch_successful) {
69125 // Failed to dispatch the task, handle it accordingly.
@@ -73,15 +129,16 @@ class task_fork_limiter final : public task_executor
73129 return true ;
74130 }
75131
76- void handle_fork_tasks ()
132+ // / Called to handle the tasks of a fork.
133+ void handle_fork_tasks (bool job_already_reserved)
77134 {
78135 unique_task task;
79136
80137 unsigned max_pops = max_batch;
81- while (fork_has_tasks ()) {
138+ while (std::exchange (job_already_reserved, false ) or reserve_fork_task ()) {
82139 if (not pop_task (task)) {
83140 // We were unable to pop a task, but there are still jobs in the queue. We will defer the remaining tasks.
84- defer_remaining_tasks ();
141+ defer_remaining_tasks (true );
85142 return ;
86143 }
87144
@@ -90,16 +147,17 @@ class task_fork_limiter final : public task_executor
90147
91148 // Check if we should yield back control to the worker because we reached the batch limit.
92149 if (--max_pops == 0 ) {
93- defer_remaining_tasks ();
150+ defer_remaining_tasks (false );
94151 return ;
95152 }
96153 }
97154 }
98155
99- bool defer_remaining_tasks ()
156+ bool defer_remaining_tasks (bool job_already_reserved )
100157 {
101158 // Dispatch batch dequeue job.
102- bool dispatch_successful = detail::get_task_executor_ref (out_exec).defer ([this ]() { handle_fork_tasks (); });
159+ bool dispatch_successful = detail::get_task_executor_ref (out_exec).defer (
160+ [this , job_already_reserved]() { handle_fork_tasks (job_already_reserved); });
103161 if (not dispatch_successful) {
104162 handle_failed_dispatch ();
105163 return false ;
@@ -117,16 +175,16 @@ class task_fork_limiter final : public task_executor
117175
118176 bool pop_task (unique_task& task)
119177 {
178+ // Note: We retry several times because, even if we know that there are jobs pending, tasks might be concurrently
179+ // being enqueued and dequeued at different priority levels.
120180 static constexpr unsigned max_pop_failures = 10 ;
121- static constexpr unsigned max_attempts = 5 ;
122-
123- unsigned attempts_count = 0 ;
124- for (; attempts_count < max_attempts and not queue.try_pop (task); ++attempts_count) {
125- }
126-
127- if (attempts_count < max_attempts) {
128- // Job successfully dequeued.
129- return true ;
181+ for (unsigned attempt = 0 ; attempt != max_pop_failures; ++attempt) {
182+ // Starting from the queue with the highest priority, try to pop a task.
183+ for (auto & queue : queues) {
184+ if (queue.try_pop (task)) {
185+ return true ;
186+ }
187+ }
130188 }
131189
132190 // There are still jobs left in the queue but we cannot dequeue them at the moment. We will re-try later.
@@ -168,8 +226,10 @@ class task_fork_limiter final : public task_executor
168226 return false ;
169227 }
170228
171- // / Called when a job runs to completion.
172- bool fork_has_tasks ()
229+ // / Called when a fork wants to reserve a task to be run.
230+ // / \return True if the fork can continue processing tasks, false if it should yield back control to the worker,
231+ // / because there are not more pending tasks.
232+ [[nodiscard]] bool reserve_fork_task ()
173233 {
174234 auto prev = state.load (std::memory_order_acquire);
175235 while (true ) {
@@ -193,17 +253,17 @@ class task_fork_limiter final : public task_executor
193253 const unsigned max_forks;
194254 // / Maximum number of tasks that can be processed by one fork, before it yields control back to the worker.
195255 const unsigned max_batch;
256+ // / Logger used to report errors and warnings.
257+ srslog::basic_logger& logger = srslog::fetch_basic_logger(" ALL" );
196258
197259 // Executor to which tasks are dispatched in serialized manner.
198260 ExecType out_exec;
199261
200- // / Queue of pending tasks.
201- concurrent_queue<unique_task,
202- concurrent_queue_policy::moodycamel_lockfree_mpmc,
203- concurrent_queue_wait_policy::non_blocking>
204- queue;
262+ // / Queue(s) of pending tasks.
263+ std::vector<queue_type> queues;
205264
206- srslog::basic_logger& logger = srslog::fetch_basic_logger(" ALL" );
265+ // / Executor list that contains the executors of different priority levels provided by this task fork limiter.
266+ std::vector<executor_type> exec_list;
207267
208268 // / \brief Current state of the task fork limiter. It contains two parts: (i) 32 LSBs for the jobs stored in the
209269 // / queue, and (ii) 32 MSBs for the number of active forks.
@@ -213,14 +273,20 @@ class task_fork_limiter final : public task_executor
213273 std::atomic<unsigned > count_pop_fails{0 };
214274};
215275
216- // / \brief Create an adaptor of the task_executor that limits the number of concurrent tasks to \c max_fork_size.
276+ // / \brief Create an adaptor of the task_executor that limits the number of concurrent tasks to \c max_fork_size. This
277+ // / adaptor only provides a single priority level for the tasks.
217278// / \param[in] out_exec Executor to which tasks are ultimately dispatched.
218279// / \param[in] max_fork_size Maximum number of concurrent tasks that can run concurrently in the wrapped executor.
219- // / \param[in] qsize Size of the internal queue
280+ // / \param[in] qsize Sizes of the internal queue.
281+ // / \param[in] max_batch Maximum number of tasks that can be processed by one fork, before it yields control back to the
282+ // / worker.
220283template <typename OutExec = task_executor*>
221- std::unique_ptr<task_executor> make_task_fork_limiter_ptr (OutExec&& out_exec, unsigned max_fork_size, unsigned qsize)
284+ std::unique_ptr<task_executor> make_task_fork_limiter_ptr (OutExec&& out_exec,
285+ unsigned max_fork_size,
286+ unsigned qsize,
287+ unsigned max_batch = std::numeric_limits<unsigned >::max())
222288{
223- return std::make_unique<task_fork_limiter<OutExec>>(std::forward<OutExec>(out_exec), max_fork_size, qsize);
289+ return std::make_unique<task_fork_limiter<OutExec>>(std::forward<OutExec>(out_exec), max_fork_size, qsize, max_batch );
224290}
225291
226292} // namespace srsran
0 commit comments