|
| 1 | +/* |
| 2 | + * |
| 3 | + * Copyright 2021-2024 Software Radio Systems Limited |
| 4 | + * |
| 5 | + * By using this file, you agree to the terms and conditions set |
| 6 | + * forth in the LICENSE file which can be found at the top level of |
| 7 | + * the distribution. |
| 8 | + * |
| 9 | + */ |
| 10 | + |
| 11 | +#include "cu_up_connection_manager.h" |
| 12 | +#include "../cu_up_processor/cu_up_processor_repository.h" |
| 13 | +#include "common_task_scheduler.h" |
| 14 | +#include "srsran/e1ap/common/e1ap_message.h" |
| 15 | +#include "srsran/support/executors/sync_task_executor.h" |
| 16 | +#include <thread> |
| 17 | + |
| 18 | +using namespace srsran; |
| 19 | +using namespace srs_cu_cp; |
| 20 | + |
| 21 | +/// Context of a CU-UP connection which is shared between the cu_up_connection_manager and the e1ap_message_notifier. |
| 22 | +class cu_up_connection_manager::shared_cu_up_connection_context |
| 23 | +{ |
| 24 | +public: |
| 25 | + shared_cu_up_connection_context(cu_up_connection_manager& parent_) : parent(parent_) {} |
| 26 | + shared_cu_up_connection_context(const shared_cu_up_connection_context&) = delete; |
| 27 | + shared_cu_up_connection_context(shared_cu_up_connection_context&&) = delete; |
| 28 | + shared_cu_up_connection_context& operator=(const shared_cu_up_connection_context&) = delete; |
| 29 | + shared_cu_up_connection_context& operator=(shared_cu_up_connection_context&&) = delete; |
| 30 | + ~shared_cu_up_connection_context() { disconnect(); } |
| 31 | + |
| 32 | + /// Assign a CU-UP repository index to the context. This is called when the CU-UP repository is actually created. |
| 33 | + void connect_cu_up(cu_up_index_t cu_up_idx_) |
| 34 | + { |
| 35 | + cu_up_idx = cu_up_idx_; |
| 36 | + msg_handler = &parent.cu_ups.get_cu_up(cu_up_idx).get_message_handler(); |
| 37 | + } |
| 38 | + |
| 39 | + /// Determines whether a CU-UP repository has been created for this connection. |
| 40 | + bool connected() const { return msg_handler != nullptr; } |
| 41 | + |
| 42 | + /// Deletes the associated CU-UP repository, if it exists. |
| 43 | + void disconnect() |
| 44 | + { |
| 45 | + if (not connected()) { |
| 46 | + // CU-UP was never allocated or was already removed. |
| 47 | + return; |
| 48 | + } |
| 49 | + |
| 50 | + // Notify CU-UP that the connection is closed. |
| 51 | + parent.handle_e1_gw_connection_closed(cu_up_idx); |
| 52 | + |
| 53 | + cu_up_idx = cu_up_index_t::invalid; |
| 54 | + msg_handler = nullptr; |
| 55 | + } |
| 56 | + |
| 57 | + /// Handle E1AP message coming from the CU-UP. |
| 58 | + void handle_message(const e1ap_message& msg) |
| 59 | + { |
| 60 | + if (not connected()) { |
| 61 | + parent.logger.warning("Discarding CU-UP E1AP message. Cause: CU-UP connection has been closed."); |
| 62 | + } |
| 63 | + |
| 64 | + // Forward message. |
| 65 | + msg_handler->handle_message(msg); |
| 66 | + } |
| 67 | + |
| 68 | +private: |
| 69 | + cu_up_connection_manager& parent; |
| 70 | + cu_up_index_t cu_up_idx = cu_up_index_t::invalid; |
| 71 | + e1ap_message_handler* msg_handler = nullptr; |
| 72 | +}; |
| 73 | + |
| 74 | +/// Notifier used to forward Rx E1AP messages from the E1 GW to CU-CP in a thread safe manner. |
| 75 | +class cu_up_connection_manager::e1_gw_to_cu_cp_pdu_adapter final : public e1ap_message_notifier |
| 76 | +{ |
| 77 | +public: |
| 78 | + e1_gw_to_cu_cp_pdu_adapter(cu_up_connection_manager& parent_, |
| 79 | + std::shared_ptr<shared_cu_up_connection_context> ctxt_) : |
| 80 | + parent(parent_), ctxt(std::move(ctxt_)) |
| 81 | + { |
| 82 | + // Increment number of CU-UP connections. |
| 83 | + parent.cu_up_count.fetch_add(1, std::memory_order_release); |
| 84 | + } |
| 85 | + |
| 86 | + ~e1_gw_to_cu_cp_pdu_adapter() override |
| 87 | + { |
| 88 | + // Decrement the number of active CU-UP connections. |
| 89 | + parent.cu_up_count.fetch_sub(1, std::memory_order_release); |
| 90 | + |
| 91 | + // Defer destruction of context to CU-CP execution context. |
| 92 | + // Note: We make a copy of the shared_ptr of the context to extend its lifetime to when the defer callback actually |
| 93 | + // gets executed. |
| 94 | + // Note: We don't use move because the defer may fail. |
| 95 | + while (not parent.cu_cp_exec.defer([ctxt_cpy = ctxt]() { ctxt_cpy->disconnect(); })) { |
| 96 | + parent.logger.error("Failed to schedule CU-UP removal task. Retrying..."); |
| 97 | + std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + void on_new_message(const e1ap_message& msg) override |
| 102 | + { |
| 103 | + // Dispatch the E1AP Rx message handling to the CU-CP executor. |
| 104 | + while (not parent.cu_cp_exec.execute([this, msg]() { ctxt->handle_message(msg); })) { |
| 105 | + parent.logger.error("Failed to dispatch E1AP message to CU-CP. Retrying..."); |
| 106 | + std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| 107 | + } |
| 108 | + } |
| 109 | + |
| 110 | +private: |
| 111 | + cu_up_connection_manager& parent; |
| 112 | + std::shared_ptr<shared_cu_up_connection_context> ctxt; |
| 113 | +}; |
| 114 | + |
| 115 | +cu_up_connection_manager::cu_up_connection_manager(unsigned max_nof_cu_ups_, |
| 116 | + cu_up_processor_repository& cu_ups_, |
| 117 | + task_executor& cu_cp_exec_, |
| 118 | + common_task_scheduler& common_task_sched_) : |
| 119 | + max_nof_cu_ups(max_nof_cu_ups_), |
| 120 | + cu_ups(cu_ups_), |
| 121 | + cu_cp_exec(cu_cp_exec_), |
| 122 | + common_task_sched(common_task_sched_), |
| 123 | + logger(srslog::fetch_basic_logger("CU-CP")) |
| 124 | +{ |
| 125 | +} |
| 126 | + |
| 127 | +std::unique_ptr<e1ap_message_notifier> |
| 128 | +cu_up_connection_manager::handle_new_cu_up_connection(std::unique_ptr<e1ap_message_notifier> e1ap_tx_pdu_notifier) |
| 129 | +{ |
| 130 | + // Note: This function may be called from a different execution context than the CU-CP. |
| 131 | + |
| 132 | + if (stopped.load(std::memory_order_acquire)) { |
| 133 | + // CU-CP is in the process of being stopped. |
| 134 | + return nullptr; |
| 135 | + } |
| 136 | + |
| 137 | + // Verify that there is space for new CU-UP connection. |
| 138 | + if (cu_up_count.load(std::memory_order_acquire) >= max_nof_cu_ups) { |
| 139 | + logger.warning("Rejecting new CU-UP connection. Cause: Maximum number of CU-UPs {} reached.", max_nof_cu_ups); |
| 140 | + return nullptr; |
| 141 | + } |
| 142 | + |
| 143 | + // We create a "detached" notifier, that has no associated CU-UP processor yet. |
| 144 | + auto shared_ctxt = std::make_shared<shared_cu_up_connection_context>(*this); |
| 145 | + auto rx_pdu_notifier = std::make_unique<e1_gw_to_cu_cp_pdu_adapter>(*this, shared_ctxt); |
| 146 | + |
| 147 | + // We dispatch the task to allocate a CU-UP processor and "attach" it to the notifier |
| 148 | + while (not cu_cp_exec.execute([this, shared_ctxt, sender_notifier = std::move(e1ap_tx_pdu_notifier)]() mutable { |
| 149 | + // Create a new CU-UP processor. |
| 150 | + cu_up_index_t cu_up_index = cu_ups.add_cu_up(std::move(sender_notifier)); |
| 151 | + if (cu_up_index == cu_up_index_t::invalid) { |
| 152 | + logger.warning("Rejecting new CU-UP TNL connection. Cause: Failed to create a new CU-UP."); |
| 153 | + return; |
| 154 | + } |
| 155 | + |
| 156 | + // Register the allocated CU-UP processor index in the CU-UP connection context. |
| 157 | + shared_ctxt->connect_cu_up(cu_up_index); |
| 158 | + |
| 159 | + if (not cu_up_connections.insert(std::make_pair(cu_up_index, std::move(shared_ctxt))).second) { |
| 160 | + logger.error("Failed to store new CU-UP connection {}", cu_up_index); |
| 161 | + return; |
| 162 | + } |
| 163 | + |
| 164 | + logger.info("Added TNL connection to CU-UP {}", cu_up_index); |
| 165 | + })) { |
| 166 | + logger.debug("Failed to dispatch CU-CP CU-UP connection task. Retrying..."); |
| 167 | + std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| 168 | + } |
| 169 | + |
| 170 | + return rx_pdu_notifier; |
| 171 | +} |
| 172 | + |
| 173 | +void cu_up_connection_manager::handle_e1_gw_connection_closed(cu_up_index_t cu_up_idx) |
| 174 | +{ |
| 175 | + // Note: Called from within CU-CP execution context. |
| 176 | + |
| 177 | + common_task_sched.schedule_async_task(launch_async([this, cu_up_idx](coro_context<async_task<void>>& ctx) { |
| 178 | + CORO_BEGIN(ctx); |
| 179 | + if (cu_up_connections.find(cu_up_idx) == cu_up_connections.end()) { |
| 180 | + // CU-UP was already removed. |
| 181 | + CORO_EARLY_RETURN(); |
| 182 | + } |
| 183 | + |
| 184 | + // Await for clean removal of the CU-UP from the CU-UP repository. |
| 185 | + CORO_AWAIT(cu_ups.remove_cu_up(cu_up_idx)); |
| 186 | + |
| 187 | + // Mark the connection as closed. |
| 188 | + cu_up_connections.erase(cu_up_idx); |
| 189 | + |
| 190 | + // Flag that all CU-UPs got removed. |
| 191 | + if (stopped and cu_up_connections.empty()) { |
| 192 | + std::unique_lock<std::mutex> lock(stop_mutex); |
| 193 | + stop_completed = true; |
| 194 | + stop_cvar.notify_one(); |
| 195 | + } |
| 196 | + |
| 197 | + CORO_RETURN(); |
| 198 | + })); |
| 199 | +} |
| 200 | + |
| 201 | +void cu_up_connection_manager::stop() |
| 202 | +{ |
| 203 | + // Note: Called from outside of the CU-CP execution context. |
| 204 | + stop_completed = false; |
| 205 | + stopped = true; |
| 206 | + |
| 207 | + while (not cu_cp_exec.execute([this]() mutable { |
| 208 | + if (cu_up_connections.empty()) { |
| 209 | + // No CU-UPs connected. Notify completion. |
| 210 | + std::unique_lock<std::mutex> lock(stop_mutex); |
| 211 | + stop_completed = true; |
| 212 | + stop_cvar.notify_one(); |
| 213 | + return; |
| 214 | + } |
| 215 | + |
| 216 | + // For each created CU-UP connection context, launch the deletion routine. |
| 217 | + std::vector<cu_up_index_t> cu_up_idxs; |
| 218 | + cu_up_idxs.reserve(cu_up_connections.size()); |
| 219 | + for (const auto& [cu_up_idx, ctxt] : cu_up_connections) { |
| 220 | + cu_up_idxs.push_back(cu_up_idx); |
| 221 | + } |
| 222 | + for (cu_up_index_t cu_up_idx : cu_up_idxs) { |
| 223 | + // Disconnect CU-UP notifier. |
| 224 | + cu_up_connections[cu_up_idx]->disconnect(); |
| 225 | + } |
| 226 | + })) { |
| 227 | + logger.debug("Failed to dispatch CU-CP CU-UP disconnection task. Retrying..."); |
| 228 | + std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| 229 | + } |
| 230 | + |
| 231 | + // Wait for CU-UP stop to complete. |
| 232 | + { |
| 233 | + std::unique_lock<std::mutex> lock(stop_mutex); |
| 234 | + stop_cvar.wait(lock, [this] { return stop_completed; }); |
| 235 | + } |
| 236 | +} |
0 commit comments