Skip to content

Commit a03d904

Browse files
committed
f1ap-du: use blocking execute_on based on timers
1 parent b0d1cc0 commit a03d904

20 files changed

+209
-96
lines changed

include/srsran/adt/noop_functor.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ struct noop_operation {
1010
{
1111
// Do nothing.
1212
}
13+
void operator()() const
14+
{
15+
// Do nothing.
16+
}
1317
};
1418

1519
} // namespace srsran

include/srsran/f1ap/du/f1ap_du_factory.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
namespace srsran {
77

88
class du_high_ue_executor_mapper;
9+
class timer_manager;
910

1011
namespace srs_du {
1112

@@ -16,7 +17,8 @@ std::unique_ptr<f1ap_du> create_f1ap(f1c_connection_client& f1c_client_hand
1617
f1ap_du_configurator& du_mng,
1718
task_executor& ctrl_exec,
1819
du_high_ue_executor_mapper& ue_exec_mapper,
19-
f1ap_du_paging_notifier& paging_notifier);
20+
f1ap_du_paging_notifier& paging_notifier,
21+
srsran::timer_manager& timers);
2022

2123
} // namespace srs_du
2224
} // namespace srsran

include/srsran/f1ap/du/f1c_bearer.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,5 @@ class f1c_rx_pdu_handler
7777
class f1c_bearer : public f1c_tx_sdu_handler, public f1c_tx_delivery_handler, public f1c_rx_pdu_handler
7878
{};
7979

80-
std::unique_ptr<f1ap_du> create_f1ap(f1c_connection_client& f1c_client_handler,
81-
f1ap_du_configurator& du_mng,
82-
task_executor& ctrl_exec,
83-
du_high_ue_executor_mapper& ue_exec_mapper,
84-
f1ap_du_paging_notifier& paging_notifier);
8580
} // namespace srs_du
8681
} // namespace srsran

include/srsran/support/async/execute_on_blocking.h

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,22 @@
1010

1111
#pragma once
1212

13+
#include "srsran/adt/noop_functor.h"
1314
#include "srsran/support/async/execute_on.h"
1415
#include "srsran/support/timers.h"
1516

1617
namespace srsran {
1718

1819
namespace detail {
1920

20-
template <typename TaskExecutor, bool IsExecute>
21-
auto dispatch_on_blocking(TaskExecutor& exec, timer_manager& timers)
21+
template <typename TaskExecutor, typename OnFailureToDispatch, bool IsExecute>
22+
auto dispatch_on_blocking(TaskExecutor& exec, timer_manager& timers, OnFailureToDispatch&& on_failure)
2223
{
2324
struct blocking_dispatch_on_awaiter {
24-
blocking_dispatch_on_awaiter(TaskExecutor& exec_, timer_manager& timers_) : exec(exec_), timers(timers_) {}
25+
blocking_dispatch_on_awaiter(TaskExecutor& exec_, timer_manager& timers_, OnFailureToDispatch&& on_failure_) :
26+
exec(exec_), timers(timers_), on_failure(std::forward<OnFailureToDispatch>(on_failure_))
27+
{
28+
}
2529

2630
bool await_ready() noexcept { return false; }
2731

@@ -41,6 +45,8 @@ auto dispatch_on_blocking(TaskExecutor& exec, timer_manager& timers)
4145
}
4246

4347
// Task execute/defer failed (potentially because task executor queue is full).
48+
on_failure();
49+
4450
// Leverage timer infrastructure to run task.
4551
// Note: Even if the timer expiry fails to invoke task in executor, it keeps trying on every tick.
4652
retry_timer = timers.create_unique_timer(exec);
@@ -54,56 +60,70 @@ auto dispatch_on_blocking(TaskExecutor& exec, timer_manager& timers)
5460
blocking_dispatch_on_awaiter& get_awaiter() { return *this; }
5561

5662
private:
57-
TaskExecutor& exec;
58-
timer_manager& timers;
59-
unique_timer retry_timer;
63+
TaskExecutor& exec;
64+
timer_manager& timers;
65+
OnFailureToDispatch on_failure;
66+
unique_timer retry_timer;
6067
};
6168

62-
return blocking_dispatch_on_awaiter{exec, timers};
69+
return blocking_dispatch_on_awaiter{exec, timers, std::forward<OnFailureToDispatch>(on_failure)};
6370
}
6471

6572
} // namespace detail
6673

6774
/// \brief Returns an awaitable that resumes the suspended coroutine in a different execution context. If the call
6875
/// to execute fails, the awaitable yields and will retry the dispatch at a later point, until it succeeds.
69-
template <typename TaskExecutor>
70-
auto execute_on_blocking(TaskExecutor& exec, timer_manager& timers)
76+
/// \param[in] exec Executor used to dispatch coroutine to a new execution context.
77+
/// \param[in] timers Timer service used to handle reattempts to dispatch task to new execution context.
78+
/// \param[in] on_failure Callback invoked in case the dispatch to executor fails at first attempt.
79+
template <typename TaskExecutor, typename OnFailureToDispatch = noop_operation>
80+
auto execute_on_blocking(TaskExecutor& exec, timer_manager& timers, OnFailureToDispatch&& on_failure = noop_operation{})
7181
{
72-
return detail::dispatch_on_blocking<TaskExecutor, true>(exec, timers);
82+
return detail::dispatch_on_blocking<TaskExecutor, OnFailureToDispatch, true>(
83+
exec, timers, std::forward<OnFailureToDispatch>(on_failure));
7384
}
7485

7586
/// \brief Returns an awaitable that resumes the suspended coroutine in a different execution context. If the call
7687
/// to defer fails, the awaitable yields and will retry the dispatch at a later point, until it succeeds.
77-
template <typename TaskExecutor>
78-
auto defer_on_blocking(TaskExecutor& exec, timer_manager& timers)
88+
/// \param[in] exec Executor used to dispatch coroutine to a new execution context.
89+
/// \param[in] timers Timer service used to handle reattempts to dispatch task to new execution context.
90+
/// \param[in] on_failure Callback invoked in case the dispatch to executor fails at first attempt.
91+
template <typename TaskExecutor, typename OnFailureToDispatch = noop_operation>
92+
auto defer_on_blocking(TaskExecutor& exec, timer_manager& timers, OnFailureToDispatch&& on_failure = noop_operation{})
7993
{
80-
return detail::dispatch_on_blocking<TaskExecutor, false>(exec, timers);
94+
return detail::dispatch_on_blocking<TaskExecutor, OnFailureToDispatch, false>(
95+
exec, timers, std::forward<OnFailureToDispatch>(on_failure));
8196
}
8297

8398
/// \brief Returns an async_task<void> that runs a given invocable task in a \c dispatch_exec executor, and once the
8499
/// task is complete, it resumes the suspended coroutine in a \c return_exec executor.
85100
template <typename DispatchTaskExecutor,
86101
typename CurrentTaskExecutor,
87102
typename Callable,
88-
typename ReturnType = detail::function_return_t<decltype(&Callable::operator())>>
103+
typename OnFailureToDispatch = noop_operation,
104+
typename ReturnType = detail::function_return_t<decltype(&Callable::operator())>>
89105
std::enable_if_t<std::is_same_v<ReturnType, void>, async_task<void>>
90106
execute_and_continue_on_blocking(DispatchTaskExecutor& dispatch_exec,
91107
CurrentTaskExecutor& return_exec,
92108
timer_manager& timers,
93-
Callable&& callable)
109+
Callable&& callable,
110+
OnFailureToDispatch&& on_failure = noop_operation{})
94111
{
95-
return launch_async([&return_exec, &dispatch_exec, task = std::forward<Callable>(callable), &timers](
96-
coro_context<async_task<void>>& ctx) mutable {
112+
return launch_async([&return_exec,
113+
&dispatch_exec,
114+
task = std::forward<Callable>(callable),
115+
on_failure = std::forward<OnFailureToDispatch>(on_failure),
116+
&timers](coro_context<async_task<void>>& ctx) mutable {
97117
CORO_BEGIN(ctx);
98118

99119
// Dispatch execution context switch.
100-
CORO_AWAIT(execute_on_blocking(dispatch_exec, timers));
120+
CORO_AWAIT(execute_on_blocking(dispatch_exec, timers, on_failure));
101121

102122
// Run task.
103123
task();
104124

105125
// Continuation in the original executor.
106-
CORO_AWAIT(execute_on_blocking(return_exec, timers));
126+
CORO_AWAIT(execute_on_blocking(return_exec, timers, on_failure));
107127

108128
CORO_RETURN();
109129
});
@@ -114,26 +134,32 @@ execute_and_continue_on_blocking(DispatchTaskExecutor& dispatch_exec,
114134
template <typename DispatchTaskExecutor,
115135
typename CurrentTaskExecutor,
116136
typename Callable,
117-
typename ReturnType = detail::function_return_t<decltype(&Callable::operator())>>
137+
typename OnFailureToDispatch = noop_operation,
138+
typename ReturnType = detail::function_return_t<decltype(&Callable::operator())>>
118139
std::enable_if_t<not std::is_same_v<ReturnType, void>, async_task<ReturnType>>
119140
execute_and_continue_on_blocking(DispatchTaskExecutor& dispatch_exec,
120141
CurrentTaskExecutor& return_exec,
121142
timer_manager& timers,
122-
Callable&& callable)
143+
Callable&& callable,
144+
OnFailureToDispatch&& on_failure = noop_operation{})
123145
{
124146
ReturnType ret{};
125-
return launch_async([&return_exec, &dispatch_exec, task = std::forward<Callable>(callable), &timers, ret](
126-
coro_context<async_task<ReturnType>>& ctx) mutable {
147+
return launch_async([&return_exec,
148+
&dispatch_exec,
149+
task = std::forward<Callable>(callable),
150+
on_failure = std::forward<OnFailureToDispatch>(on_failure),
151+
&timers,
152+
ret](coro_context<async_task<ReturnType>>& ctx) mutable {
127153
CORO_BEGIN(ctx);
128154

129155
// Dispatch execution context switch.
130-
CORO_AWAIT(execute_on_blocking(dispatch_exec, timers));
156+
CORO_AWAIT(execute_on_blocking(dispatch_exec, timers, on_failure));
131157

132158
// Run task.
133159
ret = task();
134160

135161
// Continuation in the original executor.
136-
CORO_AWAIT(execute_on_blocking(return_exec, timers));
162+
CORO_AWAIT(execute_on_blocking(return_exec, timers, on_failure));
137163

138164
CORO_RETURN(ret);
139165
});

lib/du/du_high/adapters/f1ap_test_mode_adapter.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,14 +234,15 @@ std::unique_ptr<f1ap_du> srsran::srs_du::create_du_high_f1ap(f1c_connection_clie
234234
task_executor& ctrl_exec,
235235
du_high_ue_executor_mapper& ue_exec_mapper,
236236
f1ap_du_paging_notifier& paging_notifier,
237+
timer_manager& timers,
237238
const du_test_mode_config& test_cfg)
238239
{
239240
if (not test_cfg.test_ue.has_value()) {
240-
return create_f1ap(f1c_client_handler, du_mng, ctrl_exec, ue_exec_mapper, paging_notifier);
241+
return create_f1ap(f1c_client_handler, du_mng, ctrl_exec, ue_exec_mapper, paging_notifier, timers);
241242
}
242243

243244
// Create a F1AP test mode adapter that wraps the real F1AP and intercepts messages to the F1-C client.
244245
auto f1ap_testmode = std::make_unique<f1ap_test_mode_adapter>(*test_cfg.test_ue, f1c_client_handler, ctrl_exec);
245-
f1ap_testmode->connect(create_f1ap(*f1ap_testmode, du_mng, ctrl_exec, ue_exec_mapper, paging_notifier));
246+
f1ap_testmode->connect(create_f1ap(*f1ap_testmode, du_mng, ctrl_exec, ue_exec_mapper, paging_notifier, timers));
246247
return f1ap_testmode;
247248
}

lib/du/du_high/adapters/f1ap_test_mode_adapter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ std::unique_ptr<f1ap_du> create_du_high_f1ap(f1c_connection_client& f1c_cli
2525
task_executor& ctrl_exec,
2626
du_high_ue_executor_mapper& ue_exec_mapper,
2727
f1ap_du_paging_notifier& paging_notifier,
28+
timer_manager& timers,
2829
const du_test_mode_config& test_cfg);
2930

3031
} // namespace srs_du

lib/du/du_high/du_high_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ du_high_impl::du_high_impl(const du_high_configuration& config_) :
120120
cfg.exec_mapper->du_control_executor(),
121121
cfg.exec_mapper->ue_mapper(),
122122
adapters->f1ap_paging_notifier,
123+
timers,
123124
cfg.test_cfg);
124125

125126
du_manager = create_du_manager(du_manager_params{

lib/du/du_high/du_manager/du_ue/du_ue_controller_impl.cpp

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -318,20 +318,27 @@ async_task<void> du_ue_controller_impl::create_stop_traffic_task()
318318

319319
async_task<void> du_ue_controller_impl::run_in_ue_executor(unique_task task)
320320
{
321-
return launch_async([this, task = std::move(task)](coro_context<async_task<void>>& ctx) {
321+
auto log_dispatch_retry = [this]() {
322+
logger.warning("ue={}: Postpone dispatching of control task to executor. Cause: Task queue is full", ue_index);
323+
};
324+
325+
return launch_async([this, task = std::move(task), log_dispatch_retry](coro_context<async_task<void>>& ctx) {
322326
CORO_BEGIN(ctx);
323327

324328
// Sync with UE control executor to run provided task.
325-
CORO_AWAIT(defer_on_blocking(cfg.services.ue_execs.ctrl_executor(ue_index), cfg.services.timers));
329+
CORO_AWAIT(
330+
defer_on_blocking(cfg.services.ue_execs.ctrl_executor(ue_index), cfg.services.timers, log_dispatch_retry));
326331
task();
327-
CORO_AWAIT(defer_on_blocking(cfg.services.du_mng_exec, cfg.services.timers));
332+
CORO_AWAIT(execute_on_blocking(cfg.services.du_mng_exec, cfg.services.timers, log_dispatch_retry));
328333

329334
// Sync with remaining UE executors, as there might be still pending tasks dispatched to those.
330335
// TODO: use when_all awaiter
331-
CORO_AWAIT(defer_on_blocking(cfg.services.ue_execs.mac_ul_pdu_executor(ue_index), cfg.services.timers));
332-
CORO_AWAIT(defer_on_blocking(cfg.services.du_mng_exec, cfg.services.timers));
333-
CORO_AWAIT(defer_on_blocking(cfg.services.ue_execs.f1u_dl_pdu_executor(ue_index), cfg.services.timers));
334-
CORO_AWAIT(defer_on_blocking(cfg.services.du_mng_exec, cfg.services.timers));
336+
CORO_AWAIT(defer_on_blocking(
337+
cfg.services.ue_execs.mac_ul_pdu_executor(ue_index), cfg.services.timers, log_dispatch_retry));
338+
CORO_AWAIT(execute_on_blocking(cfg.services.du_mng_exec, cfg.services.timers, log_dispatch_retry));
339+
CORO_AWAIT(defer_on_blocking(
340+
cfg.services.ue_execs.f1u_dl_pdu_executor(ue_index), cfg.services.timers, log_dispatch_retry));
341+
CORO_AWAIT(execute_on_blocking(cfg.services.du_mng_exec, cfg.services.timers, log_dispatch_retry));
335342

336343
CORO_RETURN();
337344
});

lib/f1ap/du/f1ap_du_factory.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010

1111
#include "srsran/f1ap/du/f1ap_du_factory.h"
1212
#include "f1ap_du_impl.h"
13-
#include "srsran/du/du_high/du_manager/du_manager.h"
14-
15-
/// Notice this would be the only place were we include concrete class implementation files.
1613

1714
using namespace srsran;
1815
using namespace srs_du;
@@ -21,8 +18,10 @@ std::unique_ptr<f1ap_du> srsran::srs_du::create_f1ap(f1c_connection_client&
2118
f1ap_du_configurator& du_mng,
2219
task_executor& ctrl_exec,
2320
du_high_ue_executor_mapper& ue_exec_mapper,
24-
f1ap_du_paging_notifier& paging_notifier)
21+
f1ap_du_paging_notifier& paging_notifier,
22+
srsran::timer_manager& timers)
2523
{
26-
auto f1ap_du = std::make_unique<f1ap_du_impl>(f1c_client_handler, du_mng, ctrl_exec, ue_exec_mapper, paging_notifier);
24+
auto f1ap_du =
25+
std::make_unique<f1ap_du_impl>(f1c_client_handler, du_mng, ctrl_exec, ue_exec_mapper, paging_notifier, timers);
2726
return f1ap_du;
2827
}

lib/f1ap/du/f1ap_du_impl.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,14 @@ f1ap_du_impl::f1ap_du_impl(f1c_connection_client& f1c_client_handler_,
7171
f1ap_du_configurator& du_mng_,
7272
task_executor& ctrl_exec_,
7373
du_high_ue_executor_mapper& ue_exec_mapper_,
74-
f1ap_du_paging_notifier& paging_notifier_) :
74+
f1ap_du_paging_notifier& paging_notifier_,
75+
timer_manager& timers_) :
7576
logger(srslog::fetch_basic_logger("DU-F1")),
7677
ctrl_exec(ctrl_exec_),
7778
du_mng(du_mng_),
7879
paging_notifier(paging_notifier_),
7980
connection_handler(f1c_client_handler_, *this, du_mng, ctrl_exec),
80-
ues(du_mng, ctrl_exec, ue_exec_mapper_),
81+
ues(du_mng, ctrl_exec, ue_exec_mapper_, timers_),
8182
events(std::make_unique<f1ap_event_manager>(du_mng.get_timer_factory()))
8283
{
8384
}

0 commit comments

Comments
 (0)