From 6039b3666343f6e884f4cf641311c38479ef5a6a Mon Sep 17 00:00:00 2001 From: Konstantin Ilichev Date: Fri, 16 May 2025 10:19:30 +0000 Subject: [PATCH 1/2] Add notification flow and shutdown handling in SDK * Improve closed connection handling in the FFmpeg MCM plugin. * Add SDK client registry in Media Proxy. * Add event broker in Media Proxy. * Add SDK client shutdown notification flow. * Improve local connection manager shutdown handling in Media Proxy. * Modify connection activation flow to return the link status and return only when the link to multipoint group is confirmed. * Add SDK client registration flow in Media Proxy. * Add global context and shutdown flow in SDK. Signed-off-by: Konstantin Ilichev --- docs/SDK_API_Definition.md | 3 +- ffmpeg-plugin/mcm_audio_rx.c | 21 +- ffmpeg-plugin/mcm_video_rx.c | 18 +- media-proxy/CMakeLists.txt | 1 + media-proxy/include/mesh/client.h | 29 +++ media-proxy/include/mesh/client_registry.h | 56 ++++++ media-proxy/include/mesh/conn.h | 5 + media-proxy/include/mesh/conn_registry.h | 7 +- media-proxy/include/mesh/event.h | 106 ++++++++++ media-proxy/include/mesh/manager_local.h | 7 +- media-proxy/include/mesh/st2110.h | 2 +- media-proxy/src/media_proxy.cc | 15 +- media-proxy/src/mesh/client_registry.cc | 13 ++ media-proxy/src/mesh/conn.cc | 17 +- media-proxy/src/mesh/event.cc | 13 ++ media-proxy/src/mesh/manager_local.cc | 89 +++++---- media-proxy/src/mesh/sdk_api.cc | 188 ++++++++++++++---- protos/sdk.proto | 34 +++- protos/type.proto | 12 ++ sdk/CMakeLists.txt | 1 + sdk/include/mesh_client.h | 7 + sdk/include/mesh_concurrency.h | 221 +++++++++++++++++++++ sdk/include/mesh_conn.h | 5 +- sdk/include/mesh_dp.h | 19 +- sdk/include/mesh_sdk_api.h | 3 +- sdk/src/memif_impl.c | 15 ++ sdk/src/mesh_buf.cc | 17 +- sdk/src/mesh_client.cc | 44 +++- sdk/src/mesh_concurrency.cc | 150 ++++++++++++++ sdk/src/mesh_dp.cc | 2 + sdk/src/mesh_sdk_api.cc | 142 ++++++++++++- sdk/tests/mesh_dp_api_tests.cc | 22 +- 32 files changed, 1131 insertions(+), 153 deletions(-) create mode 100644 media-proxy/include/mesh/client.h create mode 100644 media-proxy/include/mesh/client_registry.h create mode 100644 media-proxy/include/mesh/event.h create mode 100644 media-proxy/src/mesh/client_registry.cc create mode 100644 media-proxy/src/mesh/event.cc create mode 100644 protos/type.proto create mode 100644 sdk/include/mesh_concurrency.h create mode 100644 sdk/src/mesh_concurrency.cc diff --git a/docs/SDK_API_Definition.md b/docs/SDK_API_Definition.md index 98560b646..cd893aec9 100644 --- a/docs/SDK_API_Definition.md +++ b/docs/SDK_API_Definition.md @@ -441,9 +441,10 @@ NOTE: The codes are negative integer values. | `-MESH_ERR_BAD_CONFIG_PTR` | Bad configuration pointer | The configuration pointer is NULL. | | `-MESH_ERR_BAD_BUF_PTR` | Bad buffer pointer | The buffer pointer is NULL. | | `-MESH_ERR_BAD_BUF_LEN` | Bad buffer length | **Rx connection**: The buffer length is corrupted.
**Tx connection**: The buffer length is bigger than maximum. | +| `-MESH_ERR_CLIENT_FAILED` | Client creation failed | An error occurred while creating an SDK client. | | `-MESH_ERR_CLIENT_CONFIG_INVAL` | Invalid client config | JSON client configuration string is malformed. | | `-MESH_ERR_MAX_CONN` | Reached max connections number | An attempt to create a connection failed due to reaching the maximum number of connections defined in `"maxMediaConnections"`. | -| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting a client, some connections were found not closed. Delete all connections explicitly before deleting the client. | +| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting an SDK client, some connections were found not closed. Delete all connections explicitly before deleting the client. | | `-MESH_ERR_CONN_FAILED` | Connection creation failed | An error occurred while creating a connection. | | `-MESH_ERR_CONN_CONFIG_INVAL` | Invalid connection config | JSON connection configuration string is malformed or one of parameters has an incorrect value. | | `-MESH_ERR_CONN_CONFIG_INCOMPAT` | Incompatible connection config | Incompatible parameters found in the JSON connection configuration string. | diff --git a/ffmpeg-plugin/mcm_audio_rx.c b/ffmpeg-plugin/mcm_audio_rx.c index b2271dea6..e58d67822 100644 --- a/ffmpeg-plugin/mcm_audio_rx.c +++ b/ffmpeg-plugin/mcm_audio_rx.c @@ -133,19 +133,17 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt) s->first_frame = false; err = mesh_get_buffer_timeout(s->conn, &buf, timeout); - if (err == -MESH_ERR_CONN_CLOSED) { - ret = AVERROR_EOF; - goto error_close_conn; - } + if (err == -MESH_ERR_CONN_CLOSED) + return AVERROR_EOF; + if (err) { if (mcm_shutdown_requested()) { - ret = AVERROR_EXIT; + return AVERROR_EXIT; } else { av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n", mesh_err2str(err), err); - ret = AVERROR(EIO); + return AVERROR(EIO); } - goto error_close_conn; } if (mcm_shutdown_requested()) { @@ -166,8 +164,7 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt) if (err) { av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n", mesh_err2str(err), err); - ret = AVERROR(EIO); - goto error_close_conn; + return AVERROR(EIO); } return len; @@ -175,12 +172,6 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt) error_put_buf: mesh_put_buffer(&buf); -error_close_conn: - err = mesh_delete_connection(&s->conn); - if (err) - av_log(avctx, AV_LOG_ERROR, "Delete mesh connection failed: %s (%d)\n", - mesh_err2str(err), err); - return ret; } diff --git a/ffmpeg-plugin/mcm_video_rx.c b/ffmpeg-plugin/mcm_video_rx.c index 4aabd18d1..61ee6606a 100644 --- a/ffmpeg-plugin/mcm_video_rx.c +++ b/ffmpeg-plugin/mcm_video_rx.c @@ -128,19 +128,17 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt) s->first_frame = false; err = mesh_get_buffer_timeout(s->conn, &buf, timeout); - if (err == -MESH_ERR_CONN_CLOSED) { - ret = AVERROR_EOF; - goto error_close_conn; - } + if (err == -MESH_ERR_CONN_CLOSED) + return AVERROR_EOF; + if (err) { if (mcm_shutdown_requested()) { - ret = AVERROR_EXIT; + return AVERROR_EXIT; } else { av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n", mesh_err2str(err), err); - ret = AVERROR(EIO); + return AVERROR(EIO); } - goto error_close_conn; } if (mcm_shutdown_requested()) { @@ -161,8 +159,7 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt) if (err) { av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n", mesh_err2str(err), err); - ret = AVERROR(EIO); - goto error_close_conn; + return AVERROR(EIO); } return len; @@ -170,9 +167,6 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt) error_put_buf: mesh_put_buffer(&buf); -error_close_conn: - mesh_delete_connection(&s->conn); - return ret; } diff --git a/media-proxy/CMakeLists.txt b/media-proxy/CMakeLists.txt index 5c2b423fb..2d7e0ed85 100644 --- a/media-proxy/CMakeLists.txt +++ b/media-proxy/CMakeLists.txt @@ -66,6 +66,7 @@ endif() # Define the .proto files set(PROTO_FILES + ${PROTO_DIR}/type.proto ${PROTO_DIR}/conn-config.proto ${PROTO_DIR}/sdk.proto ${PROTO_DIR}/mediaproxy.proto diff --git a/media-proxy/include/mesh/client.h b/media-proxy/include/mesh/client.h new file mode 100644 index 000000000..13e9c8a53 --- /dev/null +++ b/media-proxy/include/mesh/client.h @@ -0,0 +1,29 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef CLIENT_H +#define CLIENT_H + +#include + +namespace mesh::client { + +/** + * Client + * + * SDK client implementation. + */ +class Client { + +public: + Client() {} + + std::string id; +}; + +} // namespace mesh::client + +#endif // CLIENT_H diff --git a/media-proxy/include/mesh/client_registry.h b/media-proxy/include/mesh/client_registry.h new file mode 100644 index 000000000..9769c61f4 --- /dev/null +++ b/media-proxy/include/mesh/client_registry.h @@ -0,0 +1,56 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef CLIENT_REGISTRY_H +#define CLIENT_REGISTRY_H + +#include "client.h" +#include +#include +#include +#include "uuid.h" + +namespace mesh::client { + +/** + * Registry + * + * Thread-safe registry to store pointers to SDK client instances. + */ +class Registry { +public: + int add(const std::string& id, Client *client) { + std::unique_lock lk(mx); + if (clients.contains(id)) + return -1; + clients[id] = client; + return 0; + } + + bool remove(const std::string& id) { + std::unique_lock lk(mx); + return clients.erase(id) > 0; + } + + Client * get(const std::string& id) { + std::shared_lock lk(mx); + auto it = clients.find(id); + if (it != clients.end()) { + return it->second; + } + return nullptr; + } + +private: + std::unordered_map clients; + std::shared_mutex mx; +}; + +extern Registry registry; + +} // namespace mesh::client + +#endif // CLIENT_REGISTRY_H diff --git a/media-proxy/include/mesh/conn.h b/media-proxy/include/mesh/conn.h index 1ebd4bf9d..066326ea8 100644 --- a/media-proxy/include/mesh/conn.h +++ b/media-proxy/include/mesh/conn.h @@ -206,6 +206,9 @@ class Connection : public telemetry::MetricsProvider { Connection * link(); void set_config(const Config& cfg); + void set_parent(const std::string& parent_id); + + void notify_parent_conn_unlink_requested(context::Context& ctx); Result establish(context::Context& ctx); Result establish_async(context::Context& ctx); @@ -227,6 +230,7 @@ class Connection : public telemetry::MetricsProvider { } info; Config config; + std::string legacy_sdk_id; protected: void set_state(context::Context& ctx, State new_state); @@ -268,6 +272,7 @@ class Connection : public telemetry::MetricsProvider { context::Context establish_ctx = context::WithCancel(context::Background()); std::jthread establish_th; std::jthread shutdown_th; + std::string parent_id; }; const char * kind2str(Kind kind, bool brief = false); diff --git a/media-proxy/include/mesh/conn_registry.h b/media-proxy/include/mesh/conn_registry.h index 7628d8e51..4e879ed7c 100644 --- a/media-proxy/include/mesh/conn_registry.h +++ b/media-proxy/include/mesh/conn_registry.h @@ -57,6 +57,11 @@ class Registry { return ids; } + int size() { + std::shared_lock lk(mx); + return conns.size(); + } + private: std::unordered_map conns; @@ -68,8 +73,6 @@ class Registry { std::shared_mutex mx; }; -extern Registry registry; - } // namespace mesh::connection #endif // CONN_REGISTRY_H diff --git a/media-proxy/include/mesh/event.h b/media-proxy/include/mesh/event.h new file mode 100644 index 000000000..0ac6176de --- /dev/null +++ b/media-proxy/include/mesh/event.h @@ -0,0 +1,106 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef EVENT_H +#define EVENT_H + +#include +#include +#include +#include "concurrency.h" +#include "logger.h" +#include +#include + +namespace mesh::event { + +enum class Type { + empty_event, + conn_unlink_requested, +}; + +typedef void (* Handler)(const Type& type); + +class Event { +public: + std::string consumer_id; + Type type; + std::unordered_map params; +}; + +/** + * EventBroker + * + * Subsystem for passing events from producers to consumers. + * Enables software component decoupling. + */ +class EventBroker { +public: + EventBroker() {} + + thread::Channel * subscribe(const std::string& consumer_id, int queue_sz = 1) { + auto ch = new(std::nothrow) thread::Channel(queue_sz); + if (ch) { + std::unique_lock lk(mx); + channels[ch] = consumer_id; + } + return ch; + } + + bool unsubscribe(thread::Channel *ch) { + std::unique_lock lk(mx); + return channels.erase(ch) > 0; + } + + bool send(context::Context& ctx, const std::string& consumer_id, const Type type, + const std::unordered_map& params = {}) { + Event evt = { + .consumer_id = consumer_id, + .type = type, + .params = params, + }; + return events.send(ctx, evt); + } + + void run(context::Context& ctx) { + for (;;) { + auto v = events.receive(ctx); + if (ctx.cancelled()) + return; + + if (!v.has_value()) + continue; + + auto evt = v.value(); + for (const auto& kv : channels) { + if (kv.second == evt.consumer_id) { + auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(3000)); + if (ctx.cancelled()) + return; + + if (tctx.cancelled()) { + log::error("Event sending timeout")("type", (int)evt.type) + ("consumer_id", evt.consumer_id); + } else if (!kv.first->send(tctx, evt)) { + log::error("Event sending failed")("type", (int)evt.type) + ("consumer_id", evt.consumer_id); + } + } + } + } + } + +private: + std::unordered_map *, std::string> channels; + std::mutex mx; + thread::Channel events = thread::Channel(100); +}; + +extern EventBroker broker; + +} // namespace mesh::event + +#endif // EVENT_H diff --git a/media-proxy/include/mesh/manager_local.h b/media-proxy/include/mesh/manager_local.h index a5d53ebe9..a6e52967a 100644 --- a/media-proxy/include/mesh/manager_local.h +++ b/media-proxy/include/mesh/manager_local.h @@ -17,10 +17,11 @@ namespace mesh::connection { class LocalManager { public: int create_connection_sdk(context::Context& ctx, std::string& id, - mcm_conn_param *param, memif_conn_param *memif_param, + const std::string& client_id, mcm_conn_param *param, + memif_conn_param *memif_param, const Config& conn_config, std::string& err_str); - int activate_connection_sdk(context::Context& ctx, const std::string& id); + Result activate_connection_sdk(context::Context& ctx, const std::string& id); int delete_connection_sdk(context::Context& ctx, const std::string& id, bool do_unregister = true); @@ -29,6 +30,8 @@ class LocalManager { int reregister_all_connections(context::Context& ctx); + int notify_all_shutdown_wait(context::Context& ctx); + void shutdown(context::Context& ctx); void lock(); diff --git a/media-proxy/include/mesh/st2110.h b/media-proxy/include/mesh/st2110.h index 5d985f536..fd1e941a5 100644 --- a/media-proxy/include/mesh/st2110.h +++ b/media-proxy/include/mesh/st2110.h @@ -132,7 +132,7 @@ template class ST2110 : public C mtl_session = create_session(mtl_device, &ops); if (!mtl_session) { - log::error("Failed to create session"); + log::error("Failed to create MTL session"); set_state(ctx, State::closed); return set_result(Result::error_general_failure); } diff --git a/media-proxy/src/media_proxy.cc b/media-proxy/src/media_proxy.cc index f299e72e8..28ac87c5d 100644 --- a/media-proxy/src/media_proxy.cc +++ b/media-proxy/src/media_proxy.cc @@ -18,6 +18,7 @@ #include "metrics_collector.h" #include "proxy_config.h" #include "mcm-version.h" +#include "event.h" #include #include @@ -209,6 +210,12 @@ int main(int argc, char* argv[]) std::signal(SIGINT, signal_handler); std::signal(SIGTERM, signal_handler); + // Start event broker + auto event_ctx = context::WithCancel(context::Background()); + std::thread eventBrokerThread([&]() { + event::broker.run(event_ctx); + }); + // Start ProxyAPI client auto err = RunProxyAPIClient(ctx); if (err) @@ -233,8 +240,8 @@ int main(int argc, char* argv[]) // Stop Local connection manager log::info("Shutting down Local conn manager"); auto tctx = context::WithTimeout(context::Background(), - std::chrono::milliseconds(3000)); - connection::local_manager.shutdown(ctx); + std::chrono::milliseconds(5000)); + connection::local_manager.shutdown(tctx); metricsCollectorThread.join(); @@ -245,6 +252,10 @@ int main(int argc, char* argv[]) sdk_ctx.cancel(); sdkApiThread.join(); + // Shutdown event broker + event_ctx.cancel(); + eventBrokerThread.join(); + log::info("Media Proxy exited"); exit(0); diff --git a/media-proxy/src/mesh/client_registry.cc b/media-proxy/src/mesh/client_registry.cc new file mode 100644 index 000000000..9375cd2b9 --- /dev/null +++ b/media-proxy/src/mesh/client_registry.cc @@ -0,0 +1,13 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "client_registry.h" + +namespace mesh::client { + +Registry registry; + +} // namespace mesh::client diff --git a/media-proxy/src/mesh/conn.cc b/media-proxy/src/mesh/conn.cc index 23ec64ae8..9a32b179d 100644 --- a/media-proxy/src/mesh/conn.cc +++ b/media-proxy/src/mesh/conn.cc @@ -7,6 +7,7 @@ #include "conn.h" #include #include "logger.h" +#include "event.h" namespace mesh::connection { @@ -120,6 +121,18 @@ void Connection::set_config(const Config& cfg) } } +void Connection::set_parent(const std::string& parent_id) +{ + this->parent_id = parent_id; +} + +void Connection::notify_parent_conn_unlink_requested(context::Context& ctx) +{ + if (!parent_id.empty()) + event::broker.send(ctx, parent_id, event::Type::conn_unlink_requested, + {{"conn_id", id}}); +} + Result Connection::establish(context::Context& ctx) { switch (state()) { @@ -316,8 +329,8 @@ Result Connection::set_link(context::Context& ctx, Connection *new_link, dp_link.store_wait(new_link); - // TODO: generate a post Event (conn_link_changed). - // Use context to cancel sending the Event. + if (!new_link) + notify_parent_conn_unlink_requested(ctx); return set_result(Result::success); } diff --git a/media-proxy/src/mesh/event.cc b/media-proxy/src/mesh/event.cc new file mode 100644 index 000000000..5c6695fda --- /dev/null +++ b/media-proxy/src/mesh/event.cc @@ -0,0 +1,13 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "event.h" + +namespace mesh::event { + +EventBroker broker; + +} // namespace mesh::event diff --git a/media-proxy/src/mesh/manager_local.cc b/media-proxy/src/mesh/manager_local.cc index 89bb36180..3ab4b5387 100644 --- a/media-proxy/src/mesh/manager_local.cc +++ b/media-proxy/src/mesh/manager_local.cc @@ -18,10 +18,8 @@ namespace mesh::connection { LocalManager local_manager; -// Temporary Multipoint group business logic. -// std::string tx_id, rx_id; - int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, + const std::string& client_id, mcm_conn_param *param, memif_conn_param *memif_param, const Config& conn_config, @@ -47,7 +45,6 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, memif_ops_t memif_ops = {}; memif_ops.interface_id = 0; - // const char *type_str = param->type == is_tx ? "tx" : "rx"; const char *type_str = conn_config.kind2str(); snprintf(memif_ops.app_name, sizeof(memif_ops.app_name), @@ -57,17 +54,10 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, snprintf(memif_ops.socket_path, sizeof(memif_ops.socket_path), "/run/mcm/media_proxy_%s_%s.sock", type_str, id.c_str()); - // size_t frame_size = st_frame_size(ST_FRAME_FMT_YUV422PLANAR10LE, - // param->width, param->height, false); - // size_t frame_size = st_frame_size(ST_FRAME_FMT_YUV422PLANAR10LE, - // conn_config.payload.video.width, - // conn_config.payload.video.height, false); - size_t frame_size = conn_config.buf_parts.total_size(); Local *conn; - // if (param->type == is_tx) if (conn_config.kind == sdk::CONN_KIND_TRANSMITTER) conn = new(std::nothrow) LocalRx; else @@ -79,6 +69,7 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, } conn->set_config(conn_config); + conn->set_parent(client_id); uint8_t log2_ring_size = conn_config.buf_queue_capacity ? std::log2(conn_config.buf_queue_capacity) : 0; @@ -90,7 +81,6 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, } // Prepare parameters to register in Media Proxy - // std::string kind = param->type == is_tx ? "rx" : "tx"; std::string kind = conn_config.kind == sdk::CONN_KIND_TRANSMITTER ? "rx" : "tx"; lock(); @@ -113,10 +103,6 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, return -1; } - // log::debug("MCM PARAMS") - // ("remote", std::string(param->remote_addr.ip) + ":" + std::string(param->remote_addr.port)) - // ("local", std::string(param->local_addr.ip) + ":" + std::string(param->local_addr.port)); - conn->get_params(memif_param); // Assign id accessed by metrics collector. @@ -125,34 +111,21 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, // TODO: Rethink the flow to avoid using two registries with different ids. registry_sdk.replace(id, conn); registry.add(agent_assigned_id, conn); - // log::debug("Added local conn")("conn_id", conn->id)("id", id); - - // // Temporary Multipoint group business logic. - // // TODO: Remove when Multipoint Groups are implemented. - // if (param->type == is_tx) { - // auto tx_conn = registry_sdk.get(tx_id); - // if (tx_conn) { - // conn->set_link(ctx, tx_conn); - // tx_conn->set_link(ctx, conn); - // } - // rx_id = id; - // } else { - // auto rx_conn = registry_sdk.get(rx_id); - // if (rx_conn) { - // conn->set_link(ctx, rx_conn); - // rx_conn->set_link(ctx, conn); - // } - // tx_id = id; - // } + + conn->legacy_sdk_id = id; // Remember the id generated in Media Proxy. + id = agent_assigned_id; // Let SDK use the Agent provided id. return 0; } -int LocalManager::activate_connection_sdk(context::Context& ctx, const std::string& id) +Result LocalManager::activate_connection_sdk(context::Context& ctx, const std::string& id) { - auto conn = registry_sdk.get(id); + auto conn = registry.get(id); if (!conn) - return -1; + return Result::error_bad_argument; + + if (!conn->link()) + return Result::error_no_link_assigned; log::debug("Activate local conn")("conn_id", conn->id)("id", id); @@ -163,13 +136,13 @@ int LocalManager::activate_connection_sdk(context::Context& ctx, const std::stri conn->resume(ctx); } - return 0; + return Result::success; } int LocalManager::delete_connection_sdk(context::Context& ctx, const std::string& id, bool do_unregister) { - auto conn = registry_sdk.get(id); + auto conn = registry.get(id); if (!conn) return -1; @@ -193,7 +166,7 @@ int LocalManager::delete_connection_sdk(context::Context& ctx, const std::string } registry.remove(conn->id); - registry_sdk.remove(id); + registry_sdk.remove(conn->legacy_sdk_id); } auto res = conn->shutdown(ctx); @@ -210,13 +183,13 @@ Connection * LocalManager::get_connection(context::Context& ctx, int LocalManager::reregister_all_connections(context::Context& ctx) { - auto ids = registry_sdk.get_all_ids(); + auto ids = registry.get_all_ids(); Config conn_config; log::debug("Re-register all conns"); for (std::string id : ids) { - auto conn = registry_sdk.get(id); + auto conn = registry.get(id); if (!conn) continue; @@ -244,9 +217,37 @@ int LocalManager::reregister_all_connections(context::Context& ctx) return 0; } +int LocalManager::notify_all_shutdown_wait(context::Context& ctx) +{ + auto ids = registry.get_all_ids(); + + lock(); + + for (const std::string& id : ids) { + auto conn = registry.get(id); + if (conn) + conn->notify_parent_conn_unlink_requested(ctx); + } + + unlock(); + + while (!ctx.cancelled()) { + if (registry.size() == 0) + return 0; + + thread::Sleep(ctx, std::chrono::milliseconds(100)); + } + + return -1; +} + void LocalManager::shutdown(context::Context& ctx) { - auto ids = registry_sdk.get_all_ids(); + auto err = notify_all_shutdown_wait(ctx); + if (err) + log::error("Shutdown notification timeout"); + + auto ids = registry.get_all_ids(); for (const std::string& id : ids) { auto err = delete_connection_sdk(ctx, id); diff --git a/media-proxy/src/mesh/sdk_api.cc b/media-proxy/src/mesh/sdk_api.cc index ba4d0f547..b12f8e8c3 100644 --- a/media-proxy/src/mesh/sdk_api.cc +++ b/media-proxy/src/mesh/sdk_api.cc @@ -15,37 +15,48 @@ #include "mcm_dp.h" #include "manager_local.h" #include "proxy_config.h" +#include "client_registry.h" +#include "uuid.h" +#include "event.h" namespace mesh { -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::Status; -using grpc::StatusCode; -using sdk::SDKAPI; -using sdk::CreateConnectionRequest; -using sdk::CreateConnectionResponse; -using sdk::ActivateConnectionRequest; -using sdk::ActivateConnectionResponse; -using sdk::DeleteConnectionRequest; -using sdk::DeleteConnectionResponse; -using sdk::ConnectionConfig; -using sdk::ConfigMultipointGroup; -using sdk::ConfigST2110; -using sdk::ConfigRDMA; -using sdk::ConfigVideo; -using sdk::ConfigAudio; -using sdk::ST2110Transport; -using sdk::VideoPixelFormat; -using sdk::AudioSampleRate; -using sdk::AudioFormat; -using sdk::AudioPacketTime; +using ::grpc::Server; +using ::grpc::ServerBuilder; +using ::grpc::ServerWriter; +using ::grpc::ServerContext; +using ::grpc::Status; +using ::grpc::StatusCode; +using ::sdk::SDKAPI; +using ::sdk::CreateConnectionRequest; +using ::sdk::CreateConnectionResponse; +using ::sdk::ActivateConnectionRequest; +using ::sdk::ActivateConnectionResponse; +using ::sdk::DeleteConnectionRequest; +using ::sdk::DeleteConnectionResponse; +using ::sdk::RegisterRequest; +using ::sdk::Event; +using ::sdk::ConnectionConfig; +using ::sdk::ConfigMultipointGroup; +using ::sdk::ConfigST2110; +using ::sdk::ConfigRDMA; +using ::sdk::ConfigVideo; +using ::sdk::ConfigAudio; +using ::sdk::ST2110Transport; +using ::sdk::VideoPixelFormat; +using ::sdk::AudioSampleRate; +using ::sdk::AudioFormat; +using ::sdk::AudioPacketTime; class SDKAPIServiceImpl final : public SDKAPI::Service { public: Status CreateConnection(ServerContext* sctx, const CreateConnectionRequest* req, CreateConnectionResponse* resp) override { + const auto& client_id = req->client_id(); + auto client = client::registry.get(client_id); + if (!client) + return Status(StatusCode::INVALID_ARGUMENT, "client not registered"); + memif_conn_param memif_param = {}; mcm_conn_param param = {}; @@ -77,8 +88,8 @@ class SDKAPIServiceImpl final : public SDKAPI::Service { std::string err_str; auto& mgr = connection::local_manager; - int err = mgr.create_connection_sdk(ctx, conn_id, ¶m, &memif_param, - conn_config, err_str); + int err = mgr.create_connection_sdk(ctx, conn_id, client_id, ¶m, + &memif_param, conn_config, err_str); if (err) { log::error("create_local_conn() failed (%d)", err); if (err_str.empty()) @@ -90,35 +101,49 @@ class SDKAPIServiceImpl final : public SDKAPI::Service { memif_param.conn_args.is_master = 0; // SDK client is to be secondary resp->set_conn_id(conn_id); - resp->set_client_id("default-client"); std::string memif_param_str(reinterpret_cast(&memif_param), sizeof(memif_conn_param)); resp->set_memif_conn_param(memif_param_str); log::info("[SDK] Connection created")("id", resp->conn_id()) - ("client_id", resp->client_id()); + ("client_id", client_id); return Status::OK; } Status ActivateConnection(ServerContext* sctx, const ActivateConnectionRequest* req, ActivateConnectionResponse* resp) override { - + const auto& client_id = req->client_id(); + auto client = client::registry.get(client_id); + if (!client) + return Status(StatusCode::INVALID_ARGUMENT, "client not registered"); + auto ctx = context::WithCancel(context::Background()); auto conn_id = req->conn_id(); auto& mgr = connection::local_manager; - int err = mgr.activate_connection_sdk(ctx, conn_id); - if (err) - ; // log::error("activate_local_conn err (%d)", err); - else + auto ret = mgr.activate_connection_sdk(ctx, conn_id); + switch (ret) { + case connection::Result::success: log::info("[SDK] Connection active")("id", req->conn_id()) - ("client_id", req->client_id()); + ("client_id", client_id); + resp->set_linked(true); + return Status::OK; + + case connection::Result::error_no_link_assigned: + resp->set_linked(false); + return Status::OK; - return Status::OK; + default: + return Status(StatusCode::INTERNAL, connection::result2str(ret)); + } } Status DeleteConnection(ServerContext* sctx, const DeleteConnectionRequest* req, DeleteConnectionResponse* resp) override { + const auto& client_id = req->client_id(); + auto client = client::registry.get(client_id); + if (!client) + return Status(StatusCode::INVALID_ARGUMENT, "client not registered"); auto ctx = context::WithCancel(context::Background()); auto conn_id = req->conn_id(); @@ -129,16 +154,106 @@ class SDKAPIServiceImpl final : public SDKAPI::Service { ; // log::error("delete_local_conn err (%d)", err); else log::info("[SDK] Connection deleted")("id", req->conn_id()) - ("client_id", req->client_id()); + ("client_id", client_id); return Status::OK; } + + Status RegisterAndStreamEvents(ServerContext* sctx, const RegisterRequest* req, + ServerWriter* writer) override { + std::string id; + auto status = register_client(id); + if (!status.ok()) { + log::error("SDK client registration err: %s", status.error_message().c_str()); + return status; + } + + auto ch = event::broker.subscribe(id); + if (!ch) { + log::error("SDK client event subscription err: %s", status.error_message().c_str()); + return status; + } + + Event event; + event.mutable_client_registered()->set_client_id(id); + writer->Write(event); + + while (!sctx->IsCancelled() && !ctx.cancelled()) { + auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(500)); + auto v = ch->receive(tctx); + if (!v.has_value()) + continue; + + auto evt = v.value(); + log::debug("Sending event")("type", (int)evt.type); + + Event event; + + if (evt.type == event::Type::conn_unlink_requested) { + auto e = event.mutable_conn_unlink_requested(); + if (evt.params.contains("conn_id")) { + auto v = evt.params["conn_id"]; + if (v.type() == typeid(std::string)) { + auto conn_id = std::any_cast(v); + e->set_conn_id(conn_id); + } + } + } + + writer->Write(event); + } + + event::broker.unsubscribe(ch); + + unregister_client(id); + + return Status::OK; + } + + context::Context ctx = context::WithCancel(context::Background()); + +private: + Status register_client(std::string& id) { + auto client = new(std::nothrow) client::Client; + if (!client) + return Status(StatusCode::INTERNAL, "out of memory"); + + bool found = false; + for (int i = 0; i < 5; i++) { + id = generate_uuid_v4(); + client->id = id; + int err = client::registry.add(id, client); + if (!err) { + found = true; + break; + } + } + if (!found) { + delete client; + log::error("SDK client registry contains UUID, max attempts."); + return Status(StatusCode::INTERNAL, "UUID max attempts"); + } + + return Status::OK; + } + + void unregister_client(const std::string& id) { + auto client = client::registry.get(id); + if (!client) { + log::error("SDK client unregister: id not found"); + return; + } + + client::registry.remove(id); + delete client; + } }; void RunSDKAPIServer(context::Context& ctx) { std::string server_address("0.0.0.0:"); // gRPC default 50051 server_address += std::to_string(config::proxy.sdk_api_port); SDKAPIServiceImpl service; + service.ctx = context::WithCancel(ctx); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); @@ -147,12 +262,13 @@ void RunSDKAPIServer(context::Context& ctx) { log::info("SDK API Server listening on %s", server_address.c_str()); std::jthread th([&]() { - ctx.done(); + service.ctx.done(); log::info("Shutting down SDK API Server"); server->Shutdown(); }); server->Wait(); + service.ctx.cancel(); } } // namespace mesh diff --git a/protos/sdk.proto b/protos/sdk.proto index 4f85fe96c..9815bcf0a 100644 --- a/protos/sdk.proto +++ b/protos/sdk.proto @@ -6,14 +6,42 @@ syntax = "proto3"; package sdk; +import "type.proto"; import "conn-config.proto"; service SDKAPI { + rpc RegisterAndStreamEvents (RegisterRequest) returns (stream Event); + rpc CreateConnection (CreateConnectionRequest) returns (CreateConnectionResponse); rpc ActivateConnection (ActivateConnectionRequest) returns (ActivateConnectionResponse); rpc DeleteConnection (DeleteConnectionRequest) returns (DeleteConnectionResponse); } +message RegisterRequest { + repeated type.Tag tags = 1; +} + +message ClientRegisteredEvent { + string client_id = 1; +} + +message ConnectionUnlinkRequestedEvent { + string conn_id = 1; +} + +message LoggerConfigChangedEvent { + repeated string filters = 1; +} + +message Event { + oneof event { + ClientRegisteredEvent client_registered = 1; + ConnectionUnlinkRequestedEvent conn_unlink_requested = 2; + + LoggerConfigChangedEvent logger_config_changed = 100; + } +} + message CreateConnectionRequest { string client_id = 1; bytes mcm_conn_param = 2; @@ -21,9 +49,8 @@ message CreateConnectionRequest { } message CreateConnectionResponse { - string client_id = 1; - string conn_id = 2; - bytes memif_conn_param = 3; + string conn_id = 1; + bytes memif_conn_param = 2; } message ActivateConnectionRequest { @@ -32,6 +59,7 @@ message ActivateConnectionRequest { } message ActivateConnectionResponse { + bool linked = 1; } message DeleteConnectionRequest { diff --git a/protos/type.proto b/protos/type.proto new file mode 100644 index 000000000..ca33b9466 --- /dev/null +++ b/protos/type.proto @@ -0,0 +1,12 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation +// +// SPDX-License-Identifier: BSD-3-Clause + +syntax = "proto3"; + +package type; + +message Tag { + string name = 1; + string value = 2; +} diff --git a/sdk/CMakeLists.txt b/sdk/CMakeLists.txt index c9a299f75..8b818a7a1 100644 --- a/sdk/CMakeLists.txt +++ b/sdk/CMakeLists.txt @@ -85,6 +85,7 @@ endif() # Define the .proto files set(PROTO_FILES + ${PROTO_DIR}/type.proto ${PROTO_DIR}/conn-config.proto ${PROTO_DIR}/sdk.proto ${PROTO_DIR}/mediaproxy.proto diff --git a/sdk/include/mesh_client.h b/sdk/include/mesh_client.h index 632d228b9..ca66c844d 100644 --- a/sdk/include/mesh_client.h +++ b/sdk/include/mesh_client.h @@ -10,6 +10,7 @@ #include #include #include "mesh_dp.h" +#include "mesh_concurrency.h" namespace mesh { @@ -45,6 +46,12 @@ class ClientContext { void *grpc_client = nullptr; }; +/** + * Global context for managing SDK client life cycle. + * Termination signals trigger immediate closing of this context. + */ +extern context::Context gctx; + } // namespace mesh /** diff --git a/sdk/include/mesh_concurrency.h b/sdk/include/mesh_concurrency.h new file mode 100644 index 000000000..b1aead109 --- /dev/null +++ b/sdk/include/mesh_concurrency.h @@ -0,0 +1,221 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + + #ifndef MESH_CONCURRENCY_H + #define MESH_CONCURRENCY_H + + #include + #include + #include + #include + #include + + #include + #include + + namespace mesh { + + namespace thread { + + template + class Channel; + + } // namespace thread + + namespace context { + + /** + * mesh::context::Context + * + * Thread-safe context carrying a cancellation signal to be passed to threads + * and blocking calls. Useful for graceful shutdown. + */ + class Context { + public: + Context(); + virtual ~Context(); + Context& operator=(Context&& other) noexcept; + + void cancel(); + bool cancelled(); + std::stop_token stop_token(); + bool done(); + + std::stop_source ss; + + protected: + Context(Context& parent); + Context(Context& parent, std::chrono::milliseconds timeout_ms); + + Context *parent = nullptr; + thread::Channel *ch = nullptr; + std::future async_cb; + std::chrono::milliseconds timeout_ms; + std::unique_ptr>> cb = nullptr; + + friend Context& Background(); + friend class WithCancel; + friend Context WithCancel(Context& parent); + friend Context WithTimeout(Context& parent, + std::chrono::milliseconds timeout_ms); + }; + + /** + * mesh::context::Background() + * + * Returns an empty Context with cancellation. It is typically used in the + * main function and initialization. It should be the parent context for + * context::WithCancel and context::WithTimeout. + */ + Context& Background(); + + /** + * mesh::context::WithCancel + * + * Creates a new Context with cancellation. When the parent context is + * cancelled, it triggers cancellation of the child context and all descending + * child contexts in the chain. On the other hand, if the context is cancelled + * by calling stop(), the parent context is not affected because the + * cancellation signal propagates only down to the child contexts in the chain. + * + * This concept enables propagation of the cancellation signal from the top + * parent context down to the very bottom child context and stopping all + * threads and blocking I/O calls that depend on any context in the chain. + */ + Context WithCancel(Context& parent); + + /** + * mesh::context::WithTimeout + * + * Creates a new Context with cancellation and a timeout. The time interval + * starts to count down at creation. When the time is out, the context is + * cancelled, which triggers cancellation of all child contexts in the chain. + */ + Context WithTimeout(Context& parent, std::chrono::milliseconds timeout_ms); + + } // namespace context + + namespace thread { + + /** + * Defer + * + * Allows to register a callback to be called when the defer object leaves + * the scope of visibility. Deferred callbacks can be used for automatic + * deallocation of resources created in a function when the function exits. + * Example: + * + * void func() { + * printf("Enter the function\n"); + * ... + * Defer d1([]{ printf("Deferred action 1"); }); + * ... + * Defer d2([]{ printf("Deferred action 2"); }); + * ... + * printf("Exit the function\n"); + * } + * + * Output: + * Enter the function + * Exit the function + * Deferred action 2 + * Deferred action 1 + */ + class Defer { + public: + Defer(std::function callback) : cb(callback) {} + ~Defer() { if (cb) cb(); } + + private: + std::function cb; + }; + + /** + * Sleep + * + * Sleeps for the given interval of time, which interrupts immediately when + * the context is cancelled. + */ + void Sleep(context::Context& ctx, std::chrono::milliseconds interval_ms); + + /** + * Channel + * + * A thread-safe queue template supporting cancellation of blocking calls + * send() and receive() by checking context::Context. To set a timeout on the + * blocking calls, context::WithTimeout is supposed to be used. + * Example: + * ... + */ + template + class Channel { + public: + Channel() : Channel(1) {} + Channel(size_t capacity) : cap(capacity), _closed(false) {} + + bool send(context::Context& ctx, T value) { + std::unique_lock lk(mx); + if (!cv_full.wait(lk, ctx.stop_token(), [this] { return !_closed && q.size() < cap; })) { + return false; + } + q.push(std::move(value)); + cv_empty.notify_one(); + return true; + } + + std::optional receive(context::Context& ctx) { + std::unique_lock lk(mx); + cv_empty.wait(lk, ctx.stop_token(), [this] { return !q.empty() || _closed; }); + if (ctx.stop_token().stop_requested()) { + return std::nullopt; + } + if (q.empty()) { + return std::nullopt; + } + T value = std::move(q.front()); + q.pop(); + cv_full.notify_one(); + return value; + } + + std::optional try_receive() { + std::unique_lock lk(mx); + if (q.empty()) { + return std::nullopt; + } + T value = std::move(q.front()); + q.pop(); + cv_full.notify_one(); + return value; + } + + void close() { + std::unique_lock lk(mx); + _closed = true; + cv_empty.notify_all(); + cv_full.notify_all(); + } + + bool closed() { + std::unique_lock lk(mx); + return _closed; + } + + private: + std::queue q; + std::mutex mx; + std::condition_variable_any cv_empty; + std::condition_variable_any cv_full; + size_t cap; + bool _closed; + }; + + } // namespace thread + + } // namespace mesh + + #endif // MESH_CONCURRENCY_H + \ No newline at end of file diff --git a/sdk/include/mesh_conn.h b/sdk/include/mesh_conn.h index 4e80585a3..0897020fe 100644 --- a/sdk/include/mesh_conn.h +++ b/sdk/include/mesh_conn.h @@ -26,7 +26,8 @@ struct mesh_internal_ops_t { int (*enqueue_buf)(mcm_conn_context *pctx, mcm_buffer *buf); void * (*grpc_create_client)(); - void * (*grpc_create_client_json)(const std::string& endpoint); + void * (*grpc_create_client_json)(const std::string& endpoint, + mesh::ClientContext *parent); void (*grpc_destroy_client)(void *client); void * (*grpc_create_conn)(void *client, mcm_conn_param *param); void * (*grpc_create_conn_json)(void *client, const mesh::ConnectionConfig& cfg); @@ -176,6 +177,8 @@ class ConnectionContext { void *grpc_conn = nullptr; ConnectionConfig cfg; + + context::Context ctx = context::WithCancel(context::Background()); }; } // namespace mesh diff --git a/sdk/include/mesh_dp.h b/sdk/include/mesh_dp.h index 2ddfa8e1f..a5a403b96 100644 --- a/sdk/include/mesh_dp.h +++ b/sdk/include/mesh_dp.h @@ -96,15 +96,16 @@ typedef struct{ #define MESH_ERR_BAD_CONFIG_PTR 1002 ///< Bad configuration pointer #define MESH_ERR_BAD_BUF_PTR 1003 ///< Bad buffer pointer #define MESH_ERR_BAD_BUF_LEN 1004 ///< Bad buffer length -#define MESH_ERR_CLIENT_CONFIG_INVAL 1005 ///< Invalid client config -#define MESH_ERR_MAX_CONN 1006 ///< Reached max connections number -#define MESH_ERR_FOUND_ALLOCATED 1007 ///< Found allocated resources -#define MESH_ERR_CONN_FAILED 1008 ///< Connection creation failed -#define MESH_ERR_CONN_CONFIG_INVAL 1009 ///< Invalid connection config -#define MESH_ERR_CONN_CONFIG_INCOMPAT 1010 ///< Incompatible connection config -#define MESH_ERR_CONN_CLOSED 1011 ///< Connection is closed -#define MESH_ERR_TIMEOUT 1012 ///< Timeout occurred -#define MESH_ERR_NOT_IMPLEMENTED 1013 ///< Feature not implemented yet +#define MESH_ERR_CLIENT_FAILED 1005 ///< Client creation failed +#define MESH_ERR_CLIENT_CONFIG_INVAL 1006 ///< Invalid client config +#define MESH_ERR_MAX_CONN 1007 ///< Reached max connections number +#define MESH_ERR_FOUND_ALLOCATED 1008 ///< Found allocated resources +#define MESH_ERR_CONN_FAILED 1009 ///< Connection creation failed +#define MESH_ERR_CONN_CONFIG_INVAL 1010 ///< Invalid connection config +#define MESH_ERR_CONN_CONFIG_INCOMPAT 1011 ///< Incompatible connection config +#define MESH_ERR_CONN_CLOSED 1012 ///< Connection is closed +#define MESH_ERR_TIMEOUT 1013 ///< Timeout occurred +#define MESH_ERR_NOT_IMPLEMENTED 1014 ///< Feature not implemented yet /** * @brief Create a new mesh client. diff --git a/sdk/include/mesh_sdk_api.h b/sdk/include/mesh_sdk_api.h index 2ca2a4c59..0e7cdf1ed 100644 --- a/sdk/include/mesh_sdk_api.h +++ b/sdk/include/mesh_sdk_api.h @@ -12,7 +12,8 @@ #include "mesh_conn.h" void * mesh_grpc_create_client(); -void * mesh_grpc_create_client_json(const std::string& endpoint); +void * mesh_grpc_create_client_json(const std::string& endpoint, + mesh::ClientContext *parent); void mesh_grpc_destroy_client(void *client); void * mesh_grpc_create_conn(void *client, mcm_conn_param *param); void * mesh_grpc_create_conn_json(void *client, const mesh::ConnectionConfig& cfg); diff --git a/sdk/src/memif_impl.c b/sdk/src/memif_impl.c index 93a7d18c6..8f971de2e 100644 --- a/sdk/src/memif_impl.c +++ b/sdk/src/memif_impl.c @@ -482,6 +482,21 @@ void mcm_destroy_connection_memif(memif_conn_context* pctx) return; } +void mcm_cancel_poll_event_memif(void *pctx) { + memif_conn_context *pmemif = (memif_conn_context *)pctx; + int err; + + if (!pmemif) + return; + + err = memif_cancel_poll_event(memif_get_socket_handle(pmemif->conn)); + if (err != MEMIF_ERR_SUCCESS) { + log_error("Error cancelling memif poll event"); + } else { + log_info("Memif poll event cancelled"); + } +} + // /* Alloc buffer from queue. */ // void* memif_alloc_buffer(void* conn_ctx, size_t size) // { diff --git a/sdk/src/mesh_buf.cc b/sdk/src/mesh_buf.cc index 146b5ea4f..ed4296a3f 100644 --- a/sdk/src/mesh_buf.cc +++ b/sdk/src/mesh_buf.cc @@ -21,10 +21,20 @@ int BufferContext::dequeue(int timeout_ms) if (!conn) return -MESH_ERR_BAD_CONN_PTR; + if (conn->ctx.cancelled()) + return -MESH_ERR_CONN_CLOSED; + int err; buf = mesh_internal_ops.dequeue_buf(conn->handle, timeout_ms, &err); - if (!buf) - return err ? err : -MESH_ERR_CONN_CLOSED; + if (!buf) { + if (!err || + err == MEMIF_ERR_POLL_CANCEL || + err == MEMIF_ERR_DISCONNECT || + err == MEMIF_ERR_DISCONNECTED) + return -MESH_ERR_CONN_CLOSED; + else + return err; + } if (buf->len != conn->cfg.buf_parts.total_size()) { mesh_internal_ops.enqueue_buf(conn->handle, buf); @@ -60,6 +70,9 @@ int BufferContext::enqueue(int timeout_ms) if (!conn) return -MESH_ERR_BAD_CONN_PTR; + if (conn->ctx.cancelled()) + return -MESH_ERR_CONN_CLOSED; + if (conn->cfg.kind == MESH_CONN_KIND_SENDER) { auto base_ptr = (char *)buf->data; auto sysdata = (BufferSysData *)(base_ptr + conn->cfg.buf_parts.sysdata.offset); diff --git a/sdk/src/mesh_client.cc b/sdk/src/mesh_client.cc index 5285f90f0..363f0db5e 100644 --- a/sdk/src/mesh_client.cc +++ b/sdk/src/mesh_client.cc @@ -5,6 +5,7 @@ */ #include "mesh_client.h" #include +#include #include "mesh_conn.h" #include "mesh_logger.h" #include "json.hpp" @@ -12,6 +13,44 @@ namespace mesh { +static volatile __sighandler_t prev_SIGINT_handler; +static volatile __sighandler_t prev_SIGTERM_handler; +context::Context gctx = context::WithCancel(context::Background()); + +static void HandleSignal(int signal) { + log::warn("Shutdown signal received"); + gctx.cancel(); + + if (signal == SIGINT && prev_SIGINT_handler) + prev_SIGINT_handler(signal); + else if (signal == SIGTERM && prev_SIGTERM_handler) + prev_SIGTERM_handler(signal); +} + +static void RegisterSigActionsOnce() { + static std::atomic initialized = false; + if (initialized) + return; + + struct sigaction action = { 0 }; + + sigfillset(&action.sa_mask); + + action.sa_flags = SA_RESTART; + action.sa_handler = HandleSignal; + + if (!prev_SIGINT_handler) { + prev_SIGINT_handler = signal(SIGINT, SIG_DFL); + sigaction(SIGINT, &action, NULL); + } + if (!prev_SIGTERM_handler) { + prev_SIGTERM_handler = signal(SIGTERM, SIG_DFL); + sigaction(SIGTERM, &action, NULL); + } + + initialized = true; +} + class KeyValueString { public: KeyValueString() = default; @@ -95,6 +134,7 @@ int ClientConfig::parse_from_json(const char *str) ClientContext::ClientContext() { + RegisterSigActionsOnce(); } int ClientContext::shutdown() @@ -123,7 +163,9 @@ int ClientContext::init(const char *config) return err; std::string endpoint = cfg.proxy_ip + ":" + cfg.proxy_port; - grpc_client = mesh_internal_ops.grpc_create_client_json(endpoint); + grpc_client = mesh_internal_ops.grpc_create_client_json(endpoint, this); + if (!grpc_client) + return -MESH_ERR_CLIENT_FAILED; return 0; } diff --git a/sdk/src/mesh_concurrency.cc b/sdk/src/mesh_concurrency.cc new file mode 100644 index 000000000..ff8c65549 --- /dev/null +++ b/sdk/src/mesh_concurrency.cc @@ -0,0 +1,150 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + + #include "mesh_concurrency.h" + + namespace mesh { + + namespace context { + + Context::Context() : ss(std::stop_source()), + timeout_ms(std::chrono::milliseconds(0)) + { + } + + Context::Context(Context& parent) : ss(std::stop_source()), + parent(&parent), + ch(new thread::Channel(1)), + timeout_ms(std::chrono::milliseconds(0)) + { + cb = std::make_unique>>( + parent.ss.get_token(), [this] { cancel(); } + ); + } + + Context::Context(Context& parent, + std::chrono::milliseconds timeout_ms) : ss(std::stop_source()), + parent(&parent), + ch(new thread::Channel(1)), + timeout_ms(timeout_ms) + { + cb = std::make_unique>>( + parent.ss.get_token(), [this] { cancel(); } + ); + + if (timeout_ms != std::chrono::milliseconds(0)) { + async_cb = std::async(std::launch::async, [this] { + std::mutex mx; + std::unique_lock lk(mx); + std::condition_variable_any cv; + cv.wait_for(lk, ss.get_token(), this->timeout_ms, [] { return false; }); + cancel(); + }); + } + } + + Context& Context::operator=(Context&& other) noexcept { + if (this != &other) { + ss = std::stop_source(); + parent = other.parent; + + if (parent) { + cb = std::make_unique>>( + parent->ss.get_token(), [this] { cancel(); } + ); + } + + timeout_ms = other.timeout_ms; + + if (timeout_ms != std::chrono::milliseconds(0)) { + async_cb = std::async(std::launch::async, [this] { + std::mutex mx; + std::unique_lock lk(mx); + std::condition_variable_any cv; + cv.wait_for(lk, ss.get_token(), timeout_ms, [] { return false; }); + cancel(); + }); + } + + if (ch) { + delete ch; + } + + ch = other.ch; + other.ch = nullptr; + } + return *this; + } + + Context::~Context() + { + cancel(); + + if (async_cb.valid()) + async_cb.wait(); + + if (ch) + delete ch; + } + + void Context::cancel() + { + if (ch) + ch->close(); + + ss.request_stop(); + } + + bool Context::cancelled() + { + return ss.stop_requested(); + } + + std::stop_token Context::stop_token() + { + return ss.get_token(); + } + + bool Context::done() + { + if (ch) + ch->receive(*this); + + return true; + } + + Context& Background() + { + static Context backgroundContext; + return backgroundContext; + } + + Context WithCancel(Context& parent) + { + return Context(parent); + } + + Context WithTimeout(Context& parent, std::chrono::milliseconds timeout_ms) + { + return Context(parent, timeout_ms); + } + + } // namespace context + + namespace thread { + + void Sleep(context::Context& ctx, std::chrono::milliseconds interval_ms) + { + std::mutex mx; + std::unique_lock lk(mx); + std::condition_variable_any cv; + cv.wait_for(lk, ctx.stop_token(), interval_ms, [] { return false; }); + } + + } // namespace thread + + } // namespace mesh + \ No newline at end of file diff --git a/sdk/src/mesh_dp.cc b/sdk/src/mesh_dp.cc index 5547b159e..0aec4209f 100644 --- a/sdk/src/mesh_dp.cc +++ b/sdk/src/mesh_dp.cc @@ -233,6 +233,8 @@ const char * mesh_err2str(int err) return "Bad buffer pointer"; case -MESH_ERR_BAD_BUF_LEN: return "Bad buffer length"; + case -MESH_ERR_CLIENT_FAILED: + return "Client creation failed"; case -MESH_ERR_CLIENT_CONFIG_INVAL: return "Invalid parameters in client configuration"; case -MESH_ERR_MAX_CONN: diff --git a/sdk/src/mesh_sdk_api.cc b/sdk/src/mesh_sdk_api.cc index 163495856..a99cba1fa 100644 --- a/sdk/src/mesh_sdk_api.cc +++ b/sdk/src/mesh_sdk_api.cc @@ -17,6 +17,7 @@ #include "mesh_logger.h" #include "mcm-version.h" #include "mesh_dp_legacy.h" +#include "mesh_concurrency.h" using grpc::Channel; using grpc::Status; @@ -27,6 +28,8 @@ using sdk::ActivateConnectionRequest; using sdk::ActivateConnectionResponse; using sdk::DeleteConnectionRequest; using sdk::DeleteConnectionResponse; +using sdk::RegisterRequest; +using sdk::Event; using sdk::BufferPartition; using sdk::BufferPartitions; using sdk::ConnectionKind; @@ -42,12 +45,14 @@ using sdk::AudioSampleRate; using sdk::AudioFormat; using sdk::AudioPacketTime; +extern "C" void mcm_cancel_poll_event_memif(void *pctx); + using namespace mesh; class SDKAPIClient { public: - SDKAPIClient(std::shared_ptr channel) - : stub_(SDKAPI::NewStub(channel)) {} + SDKAPIClient(std::shared_ptr channel, mesh::ClientContext *parent) + : stub_(SDKAPI::NewStub(channel)), parent(parent) {} int CreateConnection(std::string& conn_id, mcm_conn_param *param, memif_conn_param *memif_param) { @@ -69,7 +74,6 @@ class SDKAPIClient { Status status = stub_->CreateConnection(&context, req, &resp); if (status.ok()) { - client_id = resp.client_id(); conn_id = resp.conn_id(); int sz = resp.memif_conn_param().size(); @@ -183,7 +187,6 @@ class SDKAPIClient { Status status = stub_->CreateConnection(&context, req, &resp); if (status.ok()) { - client_id = resp.client_id(); conn_id = resp.conn_id(); int sz = resp.memif_conn_param().size(); @@ -216,7 +219,10 @@ class SDKAPIClient { Status status = stub_->ActivateConnection(&context, req, &resp); if (status.ok()) { - return 0; + if (!resp.linked()) + return -EAGAIN; + else + return 0; } else { log::error("ActivateConnection RPC failed: %s", status.error_message().c_str()); @@ -245,10 +251,108 @@ class SDKAPIClient { } } + void RegisterAndStreamEvents() { + RegisterRequest req; + + grpc::ClientContext context; + + std::jthread shutdown_thread([&]() { + ctx.done(); + context.TryCancel(); + }); + + thread::Defer d([&]{ ctx.cancel(); }); + + std::unique_ptr> reader( + stub_->RegisterAndStreamEvents(&context, req)); + + Event event; + while (reader->Read(&event)) { + if (event.has_client_registered()) { + client_id = event.client_registered().client_id(); + registered_ch.send(ctx, true); + } else if (event.has_conn_unlink_requested()) { + auto conn_id = event.conn_unlink_requested().conn_id(); + log::debug("[EVENT] Conn unlink requested")("id", conn_id); + + if (parent) { + std::lock_guard lk(parent->mx); + for (const auto conn : parent->conns) { + conn->ctx.cancel(); + if (conn->grpc_conn) { + auto handle = *(mcm_conn_context **)conn->grpc_conn; + if (handle) + mcm_cancel_poll_event_memif(handle->priv); + } + } + } + } else if (event.has_logger_config_changed()) { + log::debug("[EVENT] Logger config changed"); + } else { + log::info("Received unknown event type"); + } + } + + Status status = reader->Finish(); + registered_ch.send(ctx, false); + if (!status.ok()) { + if (status.error_code() == grpc::StatusCode::CANCELLED) + return; + + log::error("RegisterAndStreamEvents RPC failed: %s", + status.error_message().c_str()); + } + } + + int Run() { + try { + th = std::jthread([this]() { + RegisterAndStreamEvents(); + }); + } + catch (const std::system_error& e) { + log::error("SDK client background thread creation failed"); + Shutdown(); + return -ENOMEM; + } + + auto tctx = context::WithTimeout(gctx, std::chrono::milliseconds(15000)); + auto registered = registered_ch.receive(tctx); + if (!registered.has_value()) { + if (tctx.cancelled() && !gctx.cancelled()) + log::error("SDK client registration timeout"); + Shutdown(); + return -EIO; + } + if (!registered.value()) { + log::error("SDK client registration failed"); + Shutdown(); + return -EIO; + } + + log::info("SDK client registered successfully")("client_id", client_id); + + return 0; + } + + void Shutdown() { + ctx.cancel(); + if (th.joinable()) + th.join(); + } + std::string client_id; private: + // void BackgroundProcess() { + // thread::Sleep(ctx, std::chrono::milliseconds(1000)); + // } + std::unique_ptr stub_; + std::jthread th; + thread::Channel registered_ch; + context::Context ctx = context::WithCancel(context::Background()); + mesh::ClientContext *parent = nullptr; }; // DEBUG @@ -269,18 +373,28 @@ void * mesh_grpc_create_client() auto client = new(std::nothrow) SDKAPIClient(grpc::CreateChannel("localhost:" + sdk_port, // gRPC default 50051 - grpc::InsecureChannelCredentials())); + grpc::InsecureChannelCredentials()), nullptr); return client; } -void * mesh_grpc_create_client_json(const std::string& endpoint) { +void * mesh_grpc_create_client_json(const std::string& endpoint, + mesh::ClientContext *parent) { log::info("Media Communications Mesh SDK version %s #%s", VERSION_TAG, VERSION_HASH) ("endpoint", endpoint); auto client = new(std::nothrow) SDKAPIClient(grpc::CreateChannel(endpoint, - grpc::InsecureChannelCredentials())); + grpc::InsecureChannelCredentials()), parent); + + if (client) { + auto err = client->Run(); + if (err) { + delete client; + return NULL; + } + } + return client; } @@ -288,6 +402,7 @@ void mesh_grpc_destroy_client(void *client) { if (client) { auto cli = static_cast(client); + cli->Shutdown(); delete cli; } } @@ -400,7 +515,16 @@ void * mesh_grpc_create_conn_json(void *client, const mesh::ConnectionConfig& cf return NULL; } - err = cli->ActivateConnection(conn->conn_id); + while (!gctx.cancelled()) { + err = cli->ActivateConnection(conn->conn_id); + if (err == -EAGAIN) { + // Repeat after a small delay + thread::Sleep(gctx, std::chrono::milliseconds(50)); + continue; + } + break; + } + if (err) { log::error("Activate gRPC connection failed (%d)", err); mesh_grpc_destroy_conn(conn); diff --git a/sdk/tests/mesh_dp_api_tests.cc b/sdk/tests/mesh_dp_api_tests.cc index 85ced8d61..a5fb26e04 100644 --- a/sdk/tests/mesh_dp_api_tests.cc +++ b/sdk/tests/mesh_dp_api_tests.cc @@ -100,7 +100,8 @@ void * mock_grpc_create_client() return NULL; } -void * mock_grpc_create_client_json(const std::string& endpoint) +void * mock_grpc_create_client_json(const std::string& endpoint, + mesh::ClientContext *parent) { return NULL; } @@ -1533,15 +1534,16 @@ TEST(APITests_MeshBuffer, Test_ImportantConstants) { EXPECT_EQ(MESH_ERR_BAD_CONFIG_PTR, 1002); EXPECT_EQ(MESH_ERR_BAD_BUF_PTR, 1003); EXPECT_EQ(MESH_ERR_BAD_BUF_LEN, 1004); - EXPECT_EQ(MESH_ERR_CLIENT_CONFIG_INVAL, 1005); - EXPECT_EQ(MESH_ERR_MAX_CONN, 1006); - EXPECT_EQ(MESH_ERR_FOUND_ALLOCATED, 1007); - EXPECT_EQ(MESH_ERR_CONN_FAILED, 1008); - EXPECT_EQ(MESH_ERR_CONN_CONFIG_INVAL, 1009); - EXPECT_EQ(MESH_ERR_CONN_CONFIG_INCOMPAT, 1010); - EXPECT_EQ(MESH_ERR_CONN_CLOSED, 1011); - EXPECT_EQ(MESH_ERR_TIMEOUT, 1012); - EXPECT_EQ(MESH_ERR_NOT_IMPLEMENTED, 1013); + EXPECT_EQ(MESH_ERR_CLIENT_FAILED, 1005); + EXPECT_EQ(MESH_ERR_CLIENT_CONFIG_INVAL, 1006); + EXPECT_EQ(MESH_ERR_MAX_CONN, 1007); + EXPECT_EQ(MESH_ERR_FOUND_ALLOCATED, 1008); + EXPECT_EQ(MESH_ERR_CONN_FAILED, 1009); + EXPECT_EQ(MESH_ERR_CONN_CONFIG_INVAL, 1010); + EXPECT_EQ(MESH_ERR_CONN_CONFIG_INCOMPAT, 1011); + EXPECT_EQ(MESH_ERR_CONN_CLOSED, 1012); + EXPECT_EQ(MESH_ERR_TIMEOUT, 1013); + EXPECT_EQ(MESH_ERR_NOT_IMPLEMENTED, 1014); EXPECT_EQ(MESH_TIMEOUT_DEFAULT, -2); EXPECT_EQ(MESH_TIMEOUT_INFINITE, -1); From 43b6d8f9105f4f114f5ac724bd42e1ddc78a1c25 Mon Sep 17 00:00:00 2001 From: Konstantin Ilichev Date: Mon, 26 May 2025 10:54:54 +0000 Subject: [PATCH 2/2] Fix event channel deallocation Signed-off-by: Konstantin Ilichev --- media-proxy/include/mesh/event.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/media-proxy/include/mesh/event.h b/media-proxy/include/mesh/event.h index 0ac6176de..85a2c8447 100644 --- a/media-proxy/include/mesh/event.h +++ b/media-proxy/include/mesh/event.h @@ -52,7 +52,9 @@ class EventBroker { bool unsubscribe(thread::Channel *ch) { std::unique_lock lk(mx); - return channels.erase(ch) > 0; + auto ret = channels.erase(ch) > 0; + delete ch; + return ret; } bool send(context::Context& ctx, const std::string& consumer_id, const Type type,