Skip to content

Commit adeebb9

Browse files
committed
app: remove unused features of task_executor_manager
1 parent 90ab6e9 commit adeebb9

File tree

12 files changed

+151
-367
lines changed

12 files changed

+151
-367
lines changed

apps/examples/ofh/ru_emulator.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,7 @@ struct worker_manager {
851851
const std::string exec_name = "ru_rx_exec_#" + std::to_string(i);
852852

853853
const single_worker ru_worker{name,
854-
{concurrent_queue_policy::lockfree_spsc, 2},
855-
{{exec_name}},
854+
{exec_name, concurrent_queue_policy::lockfree_spsc, 2},
856855
std::chrono::microseconds{1},
857856
os_thread_realtime_priority::max() - 1};
858857
if (!exec_mng.add_execution_context(create_execution_context(ru_worker))) {
@@ -866,8 +865,7 @@ struct worker_manager {
866865
const std::string name = "ru_emu_#" + std::to_string(i);
867866
const std::string exec_name = "ru_emu_exec_#" + std::to_string(i);
868867
const single_worker ru_worker{name,
869-
{concurrent_queue_policy::lockfree_spsc, task_worker_queue_size},
870-
{{exec_name}},
868+
{exec_name, concurrent_queue_policy::lockfree_spsc, task_worker_queue_size},
871869
std::chrono::microseconds{1},
872870
os_thread_realtime_priority::max() - 1};
873871
if (!exec_mng.add_execution_context(create_execution_context(ru_worker))) {
@@ -883,8 +881,7 @@ struct worker_manager {
883881
const std::string exec_name = "ru_timing_exec";
884882

885883
const single_worker ru_worker{name,
886-
{concurrent_queue_policy::lockfree_spsc, 4},
887-
{{exec_name}},
884+
{exec_name, concurrent_queue_policy::lockfree_spsc, 4},
888885
std::chrono::microseconds{1},
889886
os_thread_realtime_priority::max() - 0};
890887
if (!exec_mng.add_execution_context(create_execution_context(ru_worker))) {

apps/services/worker_manager/worker_manager.cpp

Lines changed: 50 additions & 63 deletions
Large diffs are not rendered by default.

apps/services/worker_manager/worker_manager.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,17 @@ struct worker_manager {
152152
std::vector<os_sched_affinity_manager> affinity_mng;
153153

154154
/// Helper method to create workers with non zero priority.
155-
void create_prio_worker(const std::string& name,
156-
unsigned queue_size,
157-
const std::vector<execution_config_helper::executor>& execs,
158-
const os_sched_affinity_bitmask& mask,
159-
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime());
155+
void create_prio_worker(const std::string& name,
156+
const std::string& exec_name,
157+
unsigned queue_size,
158+
const os_sched_affinity_bitmask& mask,
159+
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime());
160160

161161
/// Helper method to create worker pool.
162-
void create_worker_pool(const std::string& name,
163-
unsigned nof_workers,
164-
unsigned queue_size,
165-
const std::vector<execution_config_helper::executor>& execs,
162+
void create_worker_pool(const std::string& name,
163+
unsigned nof_workers,
164+
const std::string& exec_name,
165+
unsigned queue_size,
166166
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime(),
167167
span<const os_sched_affinity_bitmask> cpu_masks = {},
168168
concurrent_queue_policy queue_policy = concurrent_queue_policy::locking_mpmc);

include/srsran/adt/detail/concurrent_queue_params.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#pragma once
1212

1313
#include <limits>
14+
#include <optional>
1415

1516
namespace srsran {
1617

@@ -46,6 +47,8 @@ struct concurrent_queue_params {
4647
concurrent_queue_policy policy;
4748
/// Task queue size.
4849
unsigned size;
50+
/// In case of moodycamel MPMC, the number of pre-reserved producers can be set.
51+
unsigned nof_prereserved_producers = 2;
4952
};
5053

5154
/// \brief Queue priority used to map to specific queue of the \c priority_multiqueue_task_worker. The higher the

include/srsran/adt/moodycamel_mpmc_queue.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@ class concurrent_queue<T, concurrent_queue_policy::moodycamel_lockfree_mpmc, con
160160
blocking_ext_base(sleep_time_), non_block_queue_base(qsize)
161161
{
162162
}
163+
explicit concurrent_queue(size_t qsize,
164+
size_t nof_prereserved_producers = 2,
165+
std::chrono::microseconds sleep_time_ = std::chrono::microseconds{0}) :
166+
blocking_ext_base(sleep_time_), non_block_queue_base(qsize, nof_prereserved_producers)
167+
{
168+
}
163169

164170
/// Creates a sequential consumer for this queue.
165171
consumer_type create_consumer() { return consumer_type(*this); }

include/srsran/support/executors/task_execution_manager.h

Lines changed: 8 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -26,58 +26,15 @@ namespace execution_config_helper {
2626
using task_priority = enqueue_priority;
2727

2828
/// Parameters of a queue of tasks.
29-
using task_queue = concurrent_queue_params;
30-
31-
/// Parameters of a strand executor.
32-
struct strand {
33-
struct executor {
34-
/// \brief Name of the strand executor.
35-
std::string name;
36-
/// \brief Queueing policy associated with this strand executor.
37-
concurrent_queue_policy policy;
38-
/// \brief Size of the queue used.
39-
unsigned size;
40-
/// \brief Whether the caller blocks waiting for task to complete.
41-
bool synchronous = false;
42-
};
43-
/// Queues of different priorities. The lower the index, the higher the priority.
44-
std::vector<executor> queues;
45-
};
46-
47-
/// Parameters of the task executor, including name and decorators.
48-
struct executor {
49-
/// Name of the executor.
29+
struct task_queue {
30+
/// Name attributed to this task queue.
5031
std::string name;
51-
/// Priority assigned to the tasks dispatched through this executor.
52-
task_priority priority = task_priority::min;
53-
/// Strands instantiated on top of this executor.
54-
std::vector<strand> strands;
55-
/// \brief Present if the executor works as a strand, serializing all the enqueued tasks. The value is the size of
56-
/// the strand queue size.
57-
std::optional<unsigned> strand_queue_size;
58-
/// \brief Whether to make an executor synchronous. If true, the executor will be blocking, until the pushed task is
59-
/// fully executed. This will have a negative impact on performance, but can be useful for debugging.
60-
bool synchronous = false;
61-
62-
executor(const std::string& name_,
63-
const std::vector<strand>& strands_ = {},
64-
std::optional<unsigned> strand_queue_size_ = std::nullopt,
65-
bool synchronous_ = false) :
66-
name(name_), strands(strands_), strand_queue_size(strand_queue_size_), synchronous(synchronous_)
67-
{
68-
}
69-
executor(const std::string& name_,
70-
task_priority priority_,
71-
const std::vector<strand>& strands_ = {},
72-
std::optional<unsigned> strand_queue_size_ = std::nullopt,
73-
bool synchronous_ = false) :
74-
name(name_),
75-
priority(priority_),
76-
strands(strands_),
77-
strand_queue_size(strand_queue_size_),
78-
synchronous(synchronous_)
79-
{
80-
}
32+
/// Queueing policy used by this task queue.
33+
concurrent_queue_policy policy;
34+
/// Size of the queue used.
35+
unsigned size;
36+
/// Number of pre-reserved producers in the case of the moodycamel lockfree MPMC queue.
37+
unsigned nof_prereserved_producers = 2;
8138
};
8239

8340
/// Arguments for a single task worker creation.
@@ -86,17 +43,13 @@ struct single_worker {
8643
std::string name;
8744
/// Queue used by the task worker.
8845
task_queue queue;
89-
/// Executors associated with this execution context.
90-
std::vector<executor> executors;
9146
/// \brief Wait time in microseconds, when task queue has no pending tasks. If not set, a condition variable is
9247
/// used to wake up the worker when a new task is pushed.
9348
std::optional<std::chrono::microseconds> wait_sleep_time;
9449
/// OS priority of the worker thread.
9550
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime();
9651
/// Bit mask to set worker cpu affinity.
9752
os_sched_affinity_bitmask mask = {};
98-
/// Non null in case tracing of the worker executors is enabled.
99-
file_event_tracer<true>* tracer = nullptr;
10053
};
10154

10255
/// Arguments for a task worker pool creation.
@@ -107,16 +60,12 @@ struct worker_pool {
10760
unsigned nof_workers;
10861
/// Queue(s) used by the task worker. The lower the index, the higher the priority.
10962
std::vector<task_queue> queues;
110-
/// Executors associated with this execution context.
111-
std::vector<executor> executors;
11263
/// \brief Wait time in microseconds, when task queue has no pending tasks.
11364
std::chrono::microseconds sleep_time;
11465
/// OS priority of the worker thread.
11566
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime();
11667
/// Array of CPU bitmasks to assign to each worker in the pool.
11768
std::vector<os_sched_affinity_bitmask> masks;
118-
/// Non null in case tracing of the worker executors is enabled.
119-
file_event_tracer<true>* tracer = nullptr;
12069
};
12170

12271
/// Arguments for the creation of a priority multiqueue worker.
@@ -128,14 +77,10 @@ struct priority_multiqueue_worker {
12877
std::vector<task_queue> queues;
12978
/// \brief Wait time in microseconds, when task queue has no pending tasks.
13079
std::chrono::microseconds spin_sleep_time;
131-
/// Executors associated with this execution context.
132-
std::vector<executor> executors;
13380
/// OS priority of the worker thread.
13481
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime();
13582
/// Bit mask to set worker cpu affinity.
13683
os_sched_affinity_bitmask mask = {};
137-
/// Non null in case tracing of the worker executors is enabled.
138-
file_event_tracer<true>* tracer = nullptr;
13984
};
14085

14186
} // namespace execution_config_helper

include/srsran/support/executors/task_worker_pool.h

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ template <>
7373
class base_task_queue<concurrent_queue_policy::lockfree_mpmc>
7474
{
7575
protected:
76-
base_task_queue(size_t qsize, std::chrono::microseconds wait_sleep_time) : queue(qsize, wait_sleep_time) {}
76+
base_task_queue(size_t qsize, std::chrono::microseconds wait_sleep_time, unsigned /* unused */) :
77+
queue(qsize, wait_sleep_time)
78+
{
79+
}
7780

7881
// Queue of pending tasks.
7982
concurrent_queue<unique_task, concurrent_queue_policy::lockfree_mpmc, concurrent_queue_wait_policy::sleep> queue;
@@ -83,9 +86,7 @@ template <>
8386
class base_task_queue<concurrent_queue_policy::locking_mpmc>
8487
{
8588
protected:
86-
base_task_queue(size_t qsize, std::chrono::microseconds wait_sleep_time = std::chrono::microseconds{0}) : queue(qsize)
87-
{
88-
}
89+
base_task_queue(size_t qsize, std::chrono::microseconds wait_sleep_time, unsigned /* unused */) : queue(qsize) {}
8990

9091
// Queue of pending tasks.
9192
concurrent_queue<unique_task, concurrent_queue_policy::locking_mpmc, concurrent_queue_wait_policy::condition_variable>
@@ -96,8 +97,8 @@ template <>
9697
class base_task_queue<concurrent_queue_policy::moodycamel_lockfree_mpmc>
9798
{
9899
protected:
99-
base_task_queue(size_t qsize, std::chrono::microseconds wait_sleep_time = std::chrono::microseconds{0}) :
100-
queue(qsize, wait_sleep_time)
100+
base_task_queue(size_t qsize, std::chrono::microseconds wait_sleep_time, unsigned nof_prereserved_producers) :
101+
queue(qsize, nof_prereserved_producers, wait_sleep_time)
101102
{
102103
}
103104

@@ -175,9 +176,10 @@ class task_worker_pool : public detail::base_task_queue<QueuePolicy>, public det
175176
task_worker_pool(std::string worker_pool_name,
176177
unsigned nof_workers_,
177178
unsigned qsize_,
178-
std::chrono::microseconds wait_sleep_time = std::chrono::microseconds{100},
179-
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime(),
180-
span<const os_sched_affinity_bitmask> cpu_masks = {});
179+
std::chrono::microseconds wait_sleep_time = std::chrono::microseconds{100},
180+
unsigned nof_prereserved_producers = 2,
181+
os_thread_realtime_priority prio = os_thread_realtime_priority::no_realtime(),
182+
span<const os_sched_affinity_bitmask> cpu_masks = {});
181183
~task_worker_pool();
182184

183185
/// \brief Push a new task to be processed by the worker pool. If the task queue is full, it skips the task and

lib/support/executors/priority_task_queue.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ class any_task_queue_impl final : public detail::any_task_queue
106106
queue_impl q;
107107
};
108108

109-
template <typename... Args>
110109
std::unique_ptr<detail::any_task_queue> make_any_task_queue(const concurrent_queue_params& params)
111110
{
112111
switch (params.policy) {
@@ -121,7 +120,8 @@ std::unique_ptr<detail::any_task_queue> make_any_task_queue(const concurrent_que
121120
any_task_queue_impl<concurrent_queue_policy::locking_mpmc, concurrent_queue_wait_policy::condition_variable>>(
122121
params.size);
123122
case concurrent_queue_policy::moodycamel_lockfree_mpmc:
124-
return std::make_unique<any_task_queue_impl<concurrent_queue_policy::moodycamel_lockfree_mpmc>>(params.size);
123+
return std::make_unique<any_task_queue_impl<concurrent_queue_policy::moodycamel_lockfree_mpmc>>(
124+
params.size, params.nof_prereserved_producers);
125125
default:
126126
report_fatal_error("Unknown concurrent_queue_policy");
127127
}

0 commit comments

Comments
 (0)