diff --git a/docs/SDK_API_Definition.md b/docs/SDK_API_Definition.md index cd893aec9..98560b646 100644 --- a/docs/SDK_API_Definition.md +++ b/docs/SDK_API_Definition.md @@ -441,10 +441,9 @@ NOTE: The codes are negative integer values. | `-MESH_ERR_BAD_CONFIG_PTR` | Bad configuration pointer | The configuration pointer is NULL. | | `-MESH_ERR_BAD_BUF_PTR` | Bad buffer pointer | The buffer pointer is NULL. | | `-MESH_ERR_BAD_BUF_LEN` | Bad buffer length | **Rx connection**: The buffer length is corrupted.
**Tx connection**: The buffer length is bigger than maximum. | -| `-MESH_ERR_CLIENT_FAILED` | Client creation failed | An error occurred while creating an SDK client. | | `-MESH_ERR_CLIENT_CONFIG_INVAL` | Invalid client config | JSON client configuration string is malformed. | | `-MESH_ERR_MAX_CONN` | Reached max connections number | An attempt to create a connection failed due to reaching the maximum number of connections defined in `"maxMediaConnections"`. | -| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting an SDK client, some connections were found not closed. Delete all connections explicitly before deleting the client. | +| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting a client, some connections were found not closed. Delete all connections explicitly before deleting the client. | | `-MESH_ERR_CONN_FAILED` | Connection creation failed | An error occurred while creating a connection. | | `-MESH_ERR_CONN_CONFIG_INVAL` | Invalid connection config | JSON connection configuration string is malformed or one of parameters has an incorrect value. | | `-MESH_ERR_CONN_CONFIG_INCOMPAT` | Incompatible connection config | Incompatible parameters found in the JSON connection configuration string. | diff --git a/ffmpeg-plugin/mcm_audio_rx.c b/ffmpeg-plugin/mcm_audio_rx.c index e58d67822..b2271dea6 100644 --- a/ffmpeg-plugin/mcm_audio_rx.c +++ b/ffmpeg-plugin/mcm_audio_rx.c @@ -133,17 +133,19 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt) s->first_frame = false; err = mesh_get_buffer_timeout(s->conn, &buf, timeout); - if (err == -MESH_ERR_CONN_CLOSED) - return AVERROR_EOF; - + if (err == -MESH_ERR_CONN_CLOSED) { + ret = AVERROR_EOF; + goto error_close_conn; + } if (err) { if (mcm_shutdown_requested()) { - return AVERROR_EXIT; + ret = AVERROR_EXIT; } else { av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n", mesh_err2str(err), err); - return AVERROR(EIO); + ret = AVERROR(EIO); } + goto error_close_conn; } if (mcm_shutdown_requested()) { @@ -164,7 +166,8 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt) if (err) { av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n", mesh_err2str(err), err); - return AVERROR(EIO); + ret = AVERROR(EIO); + goto error_close_conn; } return len; @@ -172,6 +175,12 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt) error_put_buf: mesh_put_buffer(&buf); +error_close_conn: + err = mesh_delete_connection(&s->conn); + if (err) + av_log(avctx, AV_LOG_ERROR, "Delete mesh connection failed: %s (%d)\n", + mesh_err2str(err), err); + return ret; } diff --git a/ffmpeg-plugin/mcm_video_rx.c b/ffmpeg-plugin/mcm_video_rx.c index 61ee6606a..4aabd18d1 100644 --- a/ffmpeg-plugin/mcm_video_rx.c +++ b/ffmpeg-plugin/mcm_video_rx.c @@ -128,17 +128,19 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt) s->first_frame = false; err = mesh_get_buffer_timeout(s->conn, &buf, timeout); - if (err == -MESH_ERR_CONN_CLOSED) - return AVERROR_EOF; - + if (err == -MESH_ERR_CONN_CLOSED) { + ret = AVERROR_EOF; + goto error_close_conn; + } if (err) { if (mcm_shutdown_requested()) { - return AVERROR_EXIT; + ret = AVERROR_EXIT; } else { av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n", mesh_err2str(err), err); - return AVERROR(EIO); + ret = AVERROR(EIO); } + goto error_close_conn; } if (mcm_shutdown_requested()) { @@ -159,7 +161,8 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt) if (err) { av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n", mesh_err2str(err), err); - return AVERROR(EIO); + ret = AVERROR(EIO); + goto error_close_conn; } return len; @@ -167,6 +170,9 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt) error_put_buf: mesh_put_buffer(&buf); +error_close_conn: + mesh_delete_connection(&s->conn); + return ret; } diff --git a/media-proxy/CMakeLists.txt b/media-proxy/CMakeLists.txt index 2d7e0ed85..5c2b423fb 100644 --- a/media-proxy/CMakeLists.txt +++ b/media-proxy/CMakeLists.txt @@ -66,7 +66,6 @@ endif() # Define the .proto files set(PROTO_FILES - ${PROTO_DIR}/type.proto ${PROTO_DIR}/conn-config.proto ${PROTO_DIR}/sdk.proto ${PROTO_DIR}/mediaproxy.proto diff --git a/media-proxy/include/mesh/client.h b/media-proxy/include/mesh/client.h deleted file mode 100644 index 13e9c8a53..000000000 --- a/media-proxy/include/mesh/client.h +++ /dev/null @@ -1,29 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef CLIENT_H -#define CLIENT_H - -#include - -namespace mesh::client { - -/** - * Client - * - * SDK client implementation. - */ -class Client { - -public: - Client() {} - - std::string id; -}; - -} // namespace mesh::client - -#endif // CLIENT_H diff --git a/media-proxy/include/mesh/client_registry.h b/media-proxy/include/mesh/client_registry.h deleted file mode 100644 index 9769c61f4..000000000 --- a/media-proxy/include/mesh/client_registry.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef CLIENT_REGISTRY_H -#define CLIENT_REGISTRY_H - -#include "client.h" -#include -#include -#include -#include "uuid.h" - -namespace mesh::client { - -/** - * Registry - * - * Thread-safe registry to store pointers to SDK client instances. - */ -class Registry { -public: - int add(const std::string& id, Client *client) { - std::unique_lock lk(mx); - if (clients.contains(id)) - return -1; - clients[id] = client; - return 0; - } - - bool remove(const std::string& id) { - std::unique_lock lk(mx); - return clients.erase(id) > 0; - } - - Client * get(const std::string& id) { - std::shared_lock lk(mx); - auto it = clients.find(id); - if (it != clients.end()) { - return it->second; - } - return nullptr; - } - -private: - std::unordered_map clients; - std::shared_mutex mx; -}; - -extern Registry registry; - -} // namespace mesh::client - -#endif // CLIENT_REGISTRY_H diff --git a/media-proxy/include/mesh/conn.h b/media-proxy/include/mesh/conn.h index 066326ea8..1ebd4bf9d 100644 --- a/media-proxy/include/mesh/conn.h +++ b/media-proxy/include/mesh/conn.h @@ -206,9 +206,6 @@ class Connection : public telemetry::MetricsProvider { Connection * link(); void set_config(const Config& cfg); - void set_parent(const std::string& parent_id); - - void notify_parent_conn_unlink_requested(context::Context& ctx); Result establish(context::Context& ctx); Result establish_async(context::Context& ctx); @@ -230,7 +227,6 @@ class Connection : public telemetry::MetricsProvider { } info; Config config; - std::string legacy_sdk_id; protected: void set_state(context::Context& ctx, State new_state); @@ -272,7 +268,6 @@ class Connection : public telemetry::MetricsProvider { context::Context establish_ctx = context::WithCancel(context::Background()); std::jthread establish_th; std::jthread shutdown_th; - std::string parent_id; }; const char * kind2str(Kind kind, bool brief = false); diff --git a/media-proxy/include/mesh/conn_registry.h b/media-proxy/include/mesh/conn_registry.h index 4e879ed7c..7628d8e51 100644 --- a/media-proxy/include/mesh/conn_registry.h +++ b/media-proxy/include/mesh/conn_registry.h @@ -57,11 +57,6 @@ class Registry { return ids; } - int size() { - std::shared_lock lk(mx); - return conns.size(); - } - private: std::unordered_map conns; @@ -73,6 +68,8 @@ class Registry { std::shared_mutex mx; }; +extern Registry registry; + } // namespace mesh::connection #endif // CONN_REGISTRY_H diff --git a/media-proxy/include/mesh/event.h b/media-proxy/include/mesh/event.h deleted file mode 100644 index 85a2c8447..000000000 --- a/media-proxy/include/mesh/event.h +++ /dev/null @@ -1,108 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef EVENT_H -#define EVENT_H - -#include -#include -#include -#include "concurrency.h" -#include "logger.h" -#include -#include - -namespace mesh::event { - -enum class Type { - empty_event, - conn_unlink_requested, -}; - -typedef void (* Handler)(const Type& type); - -class Event { -public: - std::string consumer_id; - Type type; - std::unordered_map params; -}; - -/** - * EventBroker - * - * Subsystem for passing events from producers to consumers. - * Enables software component decoupling. - */ -class EventBroker { -public: - EventBroker() {} - - thread::Channel * subscribe(const std::string& consumer_id, int queue_sz = 1) { - auto ch = new(std::nothrow) thread::Channel(queue_sz); - if (ch) { - std::unique_lock lk(mx); - channels[ch] = consumer_id; - } - return ch; - } - - bool unsubscribe(thread::Channel *ch) { - std::unique_lock lk(mx); - auto ret = channels.erase(ch) > 0; - delete ch; - return ret; - } - - bool send(context::Context& ctx, const std::string& consumer_id, const Type type, - const std::unordered_map& params = {}) { - Event evt = { - .consumer_id = consumer_id, - .type = type, - .params = params, - }; - return events.send(ctx, evt); - } - - void run(context::Context& ctx) { - for (;;) { - auto v = events.receive(ctx); - if (ctx.cancelled()) - return; - - if (!v.has_value()) - continue; - - auto evt = v.value(); - for (const auto& kv : channels) { - if (kv.second == evt.consumer_id) { - auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(3000)); - if (ctx.cancelled()) - return; - - if (tctx.cancelled()) { - log::error("Event sending timeout")("type", (int)evt.type) - ("consumer_id", evt.consumer_id); - } else if (!kv.first->send(tctx, evt)) { - log::error("Event sending failed")("type", (int)evt.type) - ("consumer_id", evt.consumer_id); - } - } - } - } - } - -private: - std::unordered_map *, std::string> channels; - std::mutex mx; - thread::Channel events = thread::Channel(100); -}; - -extern EventBroker broker; - -} // namespace mesh::event - -#endif // EVENT_H diff --git a/media-proxy/include/mesh/manager_local.h b/media-proxy/include/mesh/manager_local.h index a6e52967a..a5d53ebe9 100644 --- a/media-proxy/include/mesh/manager_local.h +++ b/media-proxy/include/mesh/manager_local.h @@ -17,11 +17,10 @@ namespace mesh::connection { class LocalManager { public: int create_connection_sdk(context::Context& ctx, std::string& id, - const std::string& client_id, mcm_conn_param *param, - memif_conn_param *memif_param, + mcm_conn_param *param, memif_conn_param *memif_param, const Config& conn_config, std::string& err_str); - Result activate_connection_sdk(context::Context& ctx, const std::string& id); + int activate_connection_sdk(context::Context& ctx, const std::string& id); int delete_connection_sdk(context::Context& ctx, const std::string& id, bool do_unregister = true); @@ -30,8 +29,6 @@ class LocalManager { int reregister_all_connections(context::Context& ctx); - int notify_all_shutdown_wait(context::Context& ctx); - void shutdown(context::Context& ctx); void lock(); diff --git a/media-proxy/include/mesh/st2110.h b/media-proxy/include/mesh/st2110.h index fd1e941a5..5d985f536 100644 --- a/media-proxy/include/mesh/st2110.h +++ b/media-proxy/include/mesh/st2110.h @@ -132,7 +132,7 @@ template class ST2110 : public C mtl_session = create_session(mtl_device, &ops); if (!mtl_session) { - log::error("Failed to create MTL session"); + log::error("Failed to create session"); set_state(ctx, State::closed); return set_result(Result::error_general_failure); } diff --git a/media-proxy/src/media_proxy.cc b/media-proxy/src/media_proxy.cc index 28ac87c5d..f299e72e8 100644 --- a/media-proxy/src/media_proxy.cc +++ b/media-proxy/src/media_proxy.cc @@ -18,7 +18,6 @@ #include "metrics_collector.h" #include "proxy_config.h" #include "mcm-version.h" -#include "event.h" #include #include @@ -210,12 +209,6 @@ int main(int argc, char* argv[]) std::signal(SIGINT, signal_handler); std::signal(SIGTERM, signal_handler); - // Start event broker - auto event_ctx = context::WithCancel(context::Background()); - std::thread eventBrokerThread([&]() { - event::broker.run(event_ctx); - }); - // Start ProxyAPI client auto err = RunProxyAPIClient(ctx); if (err) @@ -240,8 +233,8 @@ int main(int argc, char* argv[]) // Stop Local connection manager log::info("Shutting down Local conn manager"); auto tctx = context::WithTimeout(context::Background(), - std::chrono::milliseconds(5000)); - connection::local_manager.shutdown(tctx); + std::chrono::milliseconds(3000)); + connection::local_manager.shutdown(ctx); metricsCollectorThread.join(); @@ -252,10 +245,6 @@ int main(int argc, char* argv[]) sdk_ctx.cancel(); sdkApiThread.join(); - // Shutdown event broker - event_ctx.cancel(); - eventBrokerThread.join(); - log::info("Media Proxy exited"); exit(0); diff --git a/media-proxy/src/mesh/client_registry.cc b/media-proxy/src/mesh/client_registry.cc deleted file mode 100644 index 9375cd2b9..000000000 --- a/media-proxy/src/mesh/client_registry.cc +++ /dev/null @@ -1,13 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "client_registry.h" - -namespace mesh::client { - -Registry registry; - -} // namespace mesh::client diff --git a/media-proxy/src/mesh/conn.cc b/media-proxy/src/mesh/conn.cc index 9a32b179d..23ec64ae8 100644 --- a/media-proxy/src/mesh/conn.cc +++ b/media-proxy/src/mesh/conn.cc @@ -7,7 +7,6 @@ #include "conn.h" #include #include "logger.h" -#include "event.h" namespace mesh::connection { @@ -121,18 +120,6 @@ void Connection::set_config(const Config& cfg) } } -void Connection::set_parent(const std::string& parent_id) -{ - this->parent_id = parent_id; -} - -void Connection::notify_parent_conn_unlink_requested(context::Context& ctx) -{ - if (!parent_id.empty()) - event::broker.send(ctx, parent_id, event::Type::conn_unlink_requested, - {{"conn_id", id}}); -} - Result Connection::establish(context::Context& ctx) { switch (state()) { @@ -329,8 +316,8 @@ Result Connection::set_link(context::Context& ctx, Connection *new_link, dp_link.store_wait(new_link); - if (!new_link) - notify_parent_conn_unlink_requested(ctx); + // TODO: generate a post Event (conn_link_changed). + // Use context to cancel sending the Event. return set_result(Result::success); } diff --git a/media-proxy/src/mesh/event.cc b/media-proxy/src/mesh/event.cc deleted file mode 100644 index 5c6695fda..000000000 --- a/media-proxy/src/mesh/event.cc +++ /dev/null @@ -1,13 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "event.h" - -namespace mesh::event { - -EventBroker broker; - -} // namespace mesh::event diff --git a/media-proxy/src/mesh/manager_local.cc b/media-proxy/src/mesh/manager_local.cc index 3ab4b5387..89bb36180 100644 --- a/media-proxy/src/mesh/manager_local.cc +++ b/media-proxy/src/mesh/manager_local.cc @@ -18,8 +18,10 @@ namespace mesh::connection { LocalManager local_manager; +// Temporary Multipoint group business logic. +// std::string tx_id, rx_id; + int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, - const std::string& client_id, mcm_conn_param *param, memif_conn_param *memif_param, const Config& conn_config, @@ -45,6 +47,7 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, memif_ops_t memif_ops = {}; memif_ops.interface_id = 0; + // const char *type_str = param->type == is_tx ? "tx" : "rx"; const char *type_str = conn_config.kind2str(); snprintf(memif_ops.app_name, sizeof(memif_ops.app_name), @@ -54,10 +57,17 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, snprintf(memif_ops.socket_path, sizeof(memif_ops.socket_path), "/run/mcm/media_proxy_%s_%s.sock", type_str, id.c_str()); + // size_t frame_size = st_frame_size(ST_FRAME_FMT_YUV422PLANAR10LE, + // param->width, param->height, false); + // size_t frame_size = st_frame_size(ST_FRAME_FMT_YUV422PLANAR10LE, + // conn_config.payload.video.width, + // conn_config.payload.video.height, false); + size_t frame_size = conn_config.buf_parts.total_size(); Local *conn; + // if (param->type == is_tx) if (conn_config.kind == sdk::CONN_KIND_TRANSMITTER) conn = new(std::nothrow) LocalRx; else @@ -69,7 +79,6 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, } conn->set_config(conn_config); - conn->set_parent(client_id); uint8_t log2_ring_size = conn_config.buf_queue_capacity ? std::log2(conn_config.buf_queue_capacity) : 0; @@ -81,6 +90,7 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, } // Prepare parameters to register in Media Proxy + // std::string kind = param->type == is_tx ? "rx" : "tx"; std::string kind = conn_config.kind == sdk::CONN_KIND_TRANSMITTER ? "rx" : "tx"; lock(); @@ -103,6 +113,10 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, return -1; } + // log::debug("MCM PARAMS") + // ("remote", std::string(param->remote_addr.ip) + ":" + std::string(param->remote_addr.port)) + // ("local", std::string(param->local_addr.ip) + ":" + std::string(param->local_addr.port)); + conn->get_params(memif_param); // Assign id accessed by metrics collector. @@ -111,21 +125,34 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id, // TODO: Rethink the flow to avoid using two registries with different ids. registry_sdk.replace(id, conn); registry.add(agent_assigned_id, conn); - - conn->legacy_sdk_id = id; // Remember the id generated in Media Proxy. - id = agent_assigned_id; // Let SDK use the Agent provided id. + // log::debug("Added local conn")("conn_id", conn->id)("id", id); + + // // Temporary Multipoint group business logic. + // // TODO: Remove when Multipoint Groups are implemented. + // if (param->type == is_tx) { + // auto tx_conn = registry_sdk.get(tx_id); + // if (tx_conn) { + // conn->set_link(ctx, tx_conn); + // tx_conn->set_link(ctx, conn); + // } + // rx_id = id; + // } else { + // auto rx_conn = registry_sdk.get(rx_id); + // if (rx_conn) { + // conn->set_link(ctx, rx_conn); + // rx_conn->set_link(ctx, conn); + // } + // tx_id = id; + // } return 0; } -Result LocalManager::activate_connection_sdk(context::Context& ctx, const std::string& id) +int LocalManager::activate_connection_sdk(context::Context& ctx, const std::string& id) { - auto conn = registry.get(id); + auto conn = registry_sdk.get(id); if (!conn) - return Result::error_bad_argument; - - if (!conn->link()) - return Result::error_no_link_assigned; + return -1; log::debug("Activate local conn")("conn_id", conn->id)("id", id); @@ -136,13 +163,13 @@ Result LocalManager::activate_connection_sdk(context::Context& ctx, const std::s conn->resume(ctx); } - return Result::success; + return 0; } int LocalManager::delete_connection_sdk(context::Context& ctx, const std::string& id, bool do_unregister) { - auto conn = registry.get(id); + auto conn = registry_sdk.get(id); if (!conn) return -1; @@ -166,7 +193,7 @@ int LocalManager::delete_connection_sdk(context::Context& ctx, const std::string } registry.remove(conn->id); - registry_sdk.remove(conn->legacy_sdk_id); + registry_sdk.remove(id); } auto res = conn->shutdown(ctx); @@ -183,13 +210,13 @@ Connection * LocalManager::get_connection(context::Context& ctx, int LocalManager::reregister_all_connections(context::Context& ctx) { - auto ids = registry.get_all_ids(); + auto ids = registry_sdk.get_all_ids(); Config conn_config; log::debug("Re-register all conns"); for (std::string id : ids) { - auto conn = registry.get(id); + auto conn = registry_sdk.get(id); if (!conn) continue; @@ -217,37 +244,9 @@ int LocalManager::reregister_all_connections(context::Context& ctx) return 0; } -int LocalManager::notify_all_shutdown_wait(context::Context& ctx) -{ - auto ids = registry.get_all_ids(); - - lock(); - - for (const std::string& id : ids) { - auto conn = registry.get(id); - if (conn) - conn->notify_parent_conn_unlink_requested(ctx); - } - - unlock(); - - while (!ctx.cancelled()) { - if (registry.size() == 0) - return 0; - - thread::Sleep(ctx, std::chrono::milliseconds(100)); - } - - return -1; -} - void LocalManager::shutdown(context::Context& ctx) { - auto err = notify_all_shutdown_wait(ctx); - if (err) - log::error("Shutdown notification timeout"); - - auto ids = registry.get_all_ids(); + auto ids = registry_sdk.get_all_ids(); for (const std::string& id : ids) { auto err = delete_connection_sdk(ctx, id); diff --git a/media-proxy/src/mesh/sdk_api.cc b/media-proxy/src/mesh/sdk_api.cc index b12f8e8c3..ba4d0f547 100644 --- a/media-proxy/src/mesh/sdk_api.cc +++ b/media-proxy/src/mesh/sdk_api.cc @@ -15,48 +15,37 @@ #include "mcm_dp.h" #include "manager_local.h" #include "proxy_config.h" -#include "client_registry.h" -#include "uuid.h" -#include "event.h" namespace mesh { -using ::grpc::Server; -using ::grpc::ServerBuilder; -using ::grpc::ServerWriter; -using ::grpc::ServerContext; -using ::grpc::Status; -using ::grpc::StatusCode; -using ::sdk::SDKAPI; -using ::sdk::CreateConnectionRequest; -using ::sdk::CreateConnectionResponse; -using ::sdk::ActivateConnectionRequest; -using ::sdk::ActivateConnectionResponse; -using ::sdk::DeleteConnectionRequest; -using ::sdk::DeleteConnectionResponse; -using ::sdk::RegisterRequest; -using ::sdk::Event; -using ::sdk::ConnectionConfig; -using ::sdk::ConfigMultipointGroup; -using ::sdk::ConfigST2110; -using ::sdk::ConfigRDMA; -using ::sdk::ConfigVideo; -using ::sdk::ConfigAudio; -using ::sdk::ST2110Transport; -using ::sdk::VideoPixelFormat; -using ::sdk::AudioSampleRate; -using ::sdk::AudioFormat; -using ::sdk::AudioPacketTime; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::Status; +using grpc::StatusCode; +using sdk::SDKAPI; +using sdk::CreateConnectionRequest; +using sdk::CreateConnectionResponse; +using sdk::ActivateConnectionRequest; +using sdk::ActivateConnectionResponse; +using sdk::DeleteConnectionRequest; +using sdk::DeleteConnectionResponse; +using sdk::ConnectionConfig; +using sdk::ConfigMultipointGroup; +using sdk::ConfigST2110; +using sdk::ConfigRDMA; +using sdk::ConfigVideo; +using sdk::ConfigAudio; +using sdk::ST2110Transport; +using sdk::VideoPixelFormat; +using sdk::AudioSampleRate; +using sdk::AudioFormat; +using sdk::AudioPacketTime; class SDKAPIServiceImpl final : public SDKAPI::Service { public: Status CreateConnection(ServerContext* sctx, const CreateConnectionRequest* req, CreateConnectionResponse* resp) override { - const auto& client_id = req->client_id(); - auto client = client::registry.get(client_id); - if (!client) - return Status(StatusCode::INVALID_ARGUMENT, "client not registered"); - memif_conn_param memif_param = {}; mcm_conn_param param = {}; @@ -88,8 +77,8 @@ class SDKAPIServiceImpl final : public SDKAPI::Service { std::string err_str; auto& mgr = connection::local_manager; - int err = mgr.create_connection_sdk(ctx, conn_id, client_id, ¶m, - &memif_param, conn_config, err_str); + int err = mgr.create_connection_sdk(ctx, conn_id, ¶m, &memif_param, + conn_config, err_str); if (err) { log::error("create_local_conn() failed (%d)", err); if (err_str.empty()) @@ -101,49 +90,35 @@ class SDKAPIServiceImpl final : public SDKAPI::Service { memif_param.conn_args.is_master = 0; // SDK client is to be secondary resp->set_conn_id(conn_id); + resp->set_client_id("default-client"); std::string memif_param_str(reinterpret_cast(&memif_param), sizeof(memif_conn_param)); resp->set_memif_conn_param(memif_param_str); log::info("[SDK] Connection created")("id", resp->conn_id()) - ("client_id", client_id); + ("client_id", resp->client_id()); return Status::OK; } Status ActivateConnection(ServerContext* sctx, const ActivateConnectionRequest* req, ActivateConnectionResponse* resp) override { - const auto& client_id = req->client_id(); - auto client = client::registry.get(client_id); - if (!client) - return Status(StatusCode::INVALID_ARGUMENT, "client not registered"); - + auto ctx = context::WithCancel(context::Background()); auto conn_id = req->conn_id(); auto& mgr = connection::local_manager; - auto ret = mgr.activate_connection_sdk(ctx, conn_id); - switch (ret) { - case connection::Result::success: + int err = mgr.activate_connection_sdk(ctx, conn_id); + if (err) + ; // log::error("activate_local_conn err (%d)", err); + else log::info("[SDK] Connection active")("id", req->conn_id()) - ("client_id", client_id); - resp->set_linked(true); - return Status::OK; - - case connection::Result::error_no_link_assigned: - resp->set_linked(false); - return Status::OK; + ("client_id", req->client_id()); - default: - return Status(StatusCode::INTERNAL, connection::result2str(ret)); - } + return Status::OK; } Status DeleteConnection(ServerContext* sctx, const DeleteConnectionRequest* req, DeleteConnectionResponse* resp) override { - const auto& client_id = req->client_id(); - auto client = client::registry.get(client_id); - if (!client) - return Status(StatusCode::INVALID_ARGUMENT, "client not registered"); auto ctx = context::WithCancel(context::Background()); auto conn_id = req->conn_id(); @@ -154,106 +129,16 @@ class SDKAPIServiceImpl final : public SDKAPI::Service { ; // log::error("delete_local_conn err (%d)", err); else log::info("[SDK] Connection deleted")("id", req->conn_id()) - ("client_id", client_id); + ("client_id", req->client_id()); return Status::OK; } - - Status RegisterAndStreamEvents(ServerContext* sctx, const RegisterRequest* req, - ServerWriter* writer) override { - std::string id; - auto status = register_client(id); - if (!status.ok()) { - log::error("SDK client registration err: %s", status.error_message().c_str()); - return status; - } - - auto ch = event::broker.subscribe(id); - if (!ch) { - log::error("SDK client event subscription err: %s", status.error_message().c_str()); - return status; - } - - Event event; - event.mutable_client_registered()->set_client_id(id); - writer->Write(event); - - while (!sctx->IsCancelled() && !ctx.cancelled()) { - auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(500)); - auto v = ch->receive(tctx); - if (!v.has_value()) - continue; - - auto evt = v.value(); - log::debug("Sending event")("type", (int)evt.type); - - Event event; - - if (evt.type == event::Type::conn_unlink_requested) { - auto e = event.mutable_conn_unlink_requested(); - if (evt.params.contains("conn_id")) { - auto v = evt.params["conn_id"]; - if (v.type() == typeid(std::string)) { - auto conn_id = std::any_cast(v); - e->set_conn_id(conn_id); - } - } - } - - writer->Write(event); - } - - event::broker.unsubscribe(ch); - - unregister_client(id); - - return Status::OK; - } - - context::Context ctx = context::WithCancel(context::Background()); - -private: - Status register_client(std::string& id) { - auto client = new(std::nothrow) client::Client; - if (!client) - return Status(StatusCode::INTERNAL, "out of memory"); - - bool found = false; - for (int i = 0; i < 5; i++) { - id = generate_uuid_v4(); - client->id = id; - int err = client::registry.add(id, client); - if (!err) { - found = true; - break; - } - } - if (!found) { - delete client; - log::error("SDK client registry contains UUID, max attempts."); - return Status(StatusCode::INTERNAL, "UUID max attempts"); - } - - return Status::OK; - } - - void unregister_client(const std::string& id) { - auto client = client::registry.get(id); - if (!client) { - log::error("SDK client unregister: id not found"); - return; - } - - client::registry.remove(id); - delete client; - } }; void RunSDKAPIServer(context::Context& ctx) { std::string server_address("0.0.0.0:"); // gRPC default 50051 server_address += std::to_string(config::proxy.sdk_api_port); SDKAPIServiceImpl service; - service.ctx = context::WithCancel(ctx); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); @@ -262,13 +147,12 @@ void RunSDKAPIServer(context::Context& ctx) { log::info("SDK API Server listening on %s", server_address.c_str()); std::jthread th([&]() { - service.ctx.done(); + ctx.done(); log::info("Shutting down SDK API Server"); server->Shutdown(); }); server->Wait(); - service.ctx.cancel(); } } // namespace mesh diff --git a/protos/sdk.proto b/protos/sdk.proto index 9815bcf0a..4f85fe96c 100644 --- a/protos/sdk.proto +++ b/protos/sdk.proto @@ -6,42 +6,14 @@ syntax = "proto3"; package sdk; -import "type.proto"; import "conn-config.proto"; service SDKAPI { - rpc RegisterAndStreamEvents (RegisterRequest) returns (stream Event); - rpc CreateConnection (CreateConnectionRequest) returns (CreateConnectionResponse); rpc ActivateConnection (ActivateConnectionRequest) returns (ActivateConnectionResponse); rpc DeleteConnection (DeleteConnectionRequest) returns (DeleteConnectionResponse); } -message RegisterRequest { - repeated type.Tag tags = 1; -} - -message ClientRegisteredEvent { - string client_id = 1; -} - -message ConnectionUnlinkRequestedEvent { - string conn_id = 1; -} - -message LoggerConfigChangedEvent { - repeated string filters = 1; -} - -message Event { - oneof event { - ClientRegisteredEvent client_registered = 1; - ConnectionUnlinkRequestedEvent conn_unlink_requested = 2; - - LoggerConfigChangedEvent logger_config_changed = 100; - } -} - message CreateConnectionRequest { string client_id = 1; bytes mcm_conn_param = 2; @@ -49,8 +21,9 @@ message CreateConnectionRequest { } message CreateConnectionResponse { - string conn_id = 1; - bytes memif_conn_param = 2; + string client_id = 1; + string conn_id = 2; + bytes memif_conn_param = 3; } message ActivateConnectionRequest { @@ -59,7 +32,6 @@ message ActivateConnectionRequest { } message ActivateConnectionResponse { - bool linked = 1; } message DeleteConnectionRequest { diff --git a/protos/type.proto b/protos/type.proto deleted file mode 100644 index ca33b9466..000000000 --- a/protos/type.proto +++ /dev/null @@ -1,12 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation -// -// SPDX-License-Identifier: BSD-3-Clause - -syntax = "proto3"; - -package type; - -message Tag { - string name = 1; - string value = 2; -} diff --git a/sdk/CMakeLists.txt b/sdk/CMakeLists.txt index 8b818a7a1..c9a299f75 100644 --- a/sdk/CMakeLists.txt +++ b/sdk/CMakeLists.txt @@ -85,7 +85,6 @@ endif() # Define the .proto files set(PROTO_FILES - ${PROTO_DIR}/type.proto ${PROTO_DIR}/conn-config.proto ${PROTO_DIR}/sdk.proto ${PROTO_DIR}/mediaproxy.proto diff --git a/sdk/include/mesh_client.h b/sdk/include/mesh_client.h index ca66c844d..632d228b9 100644 --- a/sdk/include/mesh_client.h +++ b/sdk/include/mesh_client.h @@ -10,7 +10,6 @@ #include #include #include "mesh_dp.h" -#include "mesh_concurrency.h" namespace mesh { @@ -46,12 +45,6 @@ class ClientContext { void *grpc_client = nullptr; }; -/** - * Global context for managing SDK client life cycle. - * Termination signals trigger immediate closing of this context. - */ -extern context::Context gctx; - } // namespace mesh /** diff --git a/sdk/include/mesh_concurrency.h b/sdk/include/mesh_concurrency.h deleted file mode 100644 index b1aead109..000000000 --- a/sdk/include/mesh_concurrency.h +++ /dev/null @@ -1,221 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - - #ifndef MESH_CONCURRENCY_H - #define MESH_CONCURRENCY_H - - #include - #include - #include - #include - #include - - #include - #include - - namespace mesh { - - namespace thread { - - template - class Channel; - - } // namespace thread - - namespace context { - - /** - * mesh::context::Context - * - * Thread-safe context carrying a cancellation signal to be passed to threads - * and blocking calls. Useful for graceful shutdown. - */ - class Context { - public: - Context(); - virtual ~Context(); - Context& operator=(Context&& other) noexcept; - - void cancel(); - bool cancelled(); - std::stop_token stop_token(); - bool done(); - - std::stop_source ss; - - protected: - Context(Context& parent); - Context(Context& parent, std::chrono::milliseconds timeout_ms); - - Context *parent = nullptr; - thread::Channel *ch = nullptr; - std::future async_cb; - std::chrono::milliseconds timeout_ms; - std::unique_ptr>> cb = nullptr; - - friend Context& Background(); - friend class WithCancel; - friend Context WithCancel(Context& parent); - friend Context WithTimeout(Context& parent, - std::chrono::milliseconds timeout_ms); - }; - - /** - * mesh::context::Background() - * - * Returns an empty Context with cancellation. It is typically used in the - * main function and initialization. It should be the parent context for - * context::WithCancel and context::WithTimeout. - */ - Context& Background(); - - /** - * mesh::context::WithCancel - * - * Creates a new Context with cancellation. When the parent context is - * cancelled, it triggers cancellation of the child context and all descending - * child contexts in the chain. On the other hand, if the context is cancelled - * by calling stop(), the parent context is not affected because the - * cancellation signal propagates only down to the child contexts in the chain. - * - * This concept enables propagation of the cancellation signal from the top - * parent context down to the very bottom child context and stopping all - * threads and blocking I/O calls that depend on any context in the chain. - */ - Context WithCancel(Context& parent); - - /** - * mesh::context::WithTimeout - * - * Creates a new Context with cancellation and a timeout. The time interval - * starts to count down at creation. When the time is out, the context is - * cancelled, which triggers cancellation of all child contexts in the chain. - */ - Context WithTimeout(Context& parent, std::chrono::milliseconds timeout_ms); - - } // namespace context - - namespace thread { - - /** - * Defer - * - * Allows to register a callback to be called when the defer object leaves - * the scope of visibility. Deferred callbacks can be used for automatic - * deallocation of resources created in a function when the function exits. - * Example: - * - * void func() { - * printf("Enter the function\n"); - * ... - * Defer d1([]{ printf("Deferred action 1"); }); - * ... - * Defer d2([]{ printf("Deferred action 2"); }); - * ... - * printf("Exit the function\n"); - * } - * - * Output: - * Enter the function - * Exit the function - * Deferred action 2 - * Deferred action 1 - */ - class Defer { - public: - Defer(std::function callback) : cb(callback) {} - ~Defer() { if (cb) cb(); } - - private: - std::function cb; - }; - - /** - * Sleep - * - * Sleeps for the given interval of time, which interrupts immediately when - * the context is cancelled. - */ - void Sleep(context::Context& ctx, std::chrono::milliseconds interval_ms); - - /** - * Channel - * - * A thread-safe queue template supporting cancellation of blocking calls - * send() and receive() by checking context::Context. To set a timeout on the - * blocking calls, context::WithTimeout is supposed to be used. - * Example: - * ... - */ - template - class Channel { - public: - Channel() : Channel(1) {} - Channel(size_t capacity) : cap(capacity), _closed(false) {} - - bool send(context::Context& ctx, T value) { - std::unique_lock lk(mx); - if (!cv_full.wait(lk, ctx.stop_token(), [this] { return !_closed && q.size() < cap; })) { - return false; - } - q.push(std::move(value)); - cv_empty.notify_one(); - return true; - } - - std::optional receive(context::Context& ctx) { - std::unique_lock lk(mx); - cv_empty.wait(lk, ctx.stop_token(), [this] { return !q.empty() || _closed; }); - if (ctx.stop_token().stop_requested()) { - return std::nullopt; - } - if (q.empty()) { - return std::nullopt; - } - T value = std::move(q.front()); - q.pop(); - cv_full.notify_one(); - return value; - } - - std::optional try_receive() { - std::unique_lock lk(mx); - if (q.empty()) { - return std::nullopt; - } - T value = std::move(q.front()); - q.pop(); - cv_full.notify_one(); - return value; - } - - void close() { - std::unique_lock lk(mx); - _closed = true; - cv_empty.notify_all(); - cv_full.notify_all(); - } - - bool closed() { - std::unique_lock lk(mx); - return _closed; - } - - private: - std::queue q; - std::mutex mx; - std::condition_variable_any cv_empty; - std::condition_variable_any cv_full; - size_t cap; - bool _closed; - }; - - } // namespace thread - - } // namespace mesh - - #endif // MESH_CONCURRENCY_H - \ No newline at end of file diff --git a/sdk/include/mesh_conn.h b/sdk/include/mesh_conn.h index 0897020fe..4e80585a3 100644 --- a/sdk/include/mesh_conn.h +++ b/sdk/include/mesh_conn.h @@ -26,8 +26,7 @@ struct mesh_internal_ops_t { int (*enqueue_buf)(mcm_conn_context *pctx, mcm_buffer *buf); void * (*grpc_create_client)(); - void * (*grpc_create_client_json)(const std::string& endpoint, - mesh::ClientContext *parent); + void * (*grpc_create_client_json)(const std::string& endpoint); void (*grpc_destroy_client)(void *client); void * (*grpc_create_conn)(void *client, mcm_conn_param *param); void * (*grpc_create_conn_json)(void *client, const mesh::ConnectionConfig& cfg); @@ -177,8 +176,6 @@ class ConnectionContext { void *grpc_conn = nullptr; ConnectionConfig cfg; - - context::Context ctx = context::WithCancel(context::Background()); }; } // namespace mesh diff --git a/sdk/include/mesh_dp.h b/sdk/include/mesh_dp.h index a5a403b96..2ddfa8e1f 100644 --- a/sdk/include/mesh_dp.h +++ b/sdk/include/mesh_dp.h @@ -96,16 +96,15 @@ typedef struct{ #define MESH_ERR_BAD_CONFIG_PTR 1002 ///< Bad configuration pointer #define MESH_ERR_BAD_BUF_PTR 1003 ///< Bad buffer pointer #define MESH_ERR_BAD_BUF_LEN 1004 ///< Bad buffer length -#define MESH_ERR_CLIENT_FAILED 1005 ///< Client creation failed -#define MESH_ERR_CLIENT_CONFIG_INVAL 1006 ///< Invalid client config -#define MESH_ERR_MAX_CONN 1007 ///< Reached max connections number -#define MESH_ERR_FOUND_ALLOCATED 1008 ///< Found allocated resources -#define MESH_ERR_CONN_FAILED 1009 ///< Connection creation failed -#define MESH_ERR_CONN_CONFIG_INVAL 1010 ///< Invalid connection config -#define MESH_ERR_CONN_CONFIG_INCOMPAT 1011 ///< Incompatible connection config -#define MESH_ERR_CONN_CLOSED 1012 ///< Connection is closed -#define MESH_ERR_TIMEOUT 1013 ///< Timeout occurred -#define MESH_ERR_NOT_IMPLEMENTED 1014 ///< Feature not implemented yet +#define MESH_ERR_CLIENT_CONFIG_INVAL 1005 ///< Invalid client config +#define MESH_ERR_MAX_CONN 1006 ///< Reached max connections number +#define MESH_ERR_FOUND_ALLOCATED 1007 ///< Found allocated resources +#define MESH_ERR_CONN_FAILED 1008 ///< Connection creation failed +#define MESH_ERR_CONN_CONFIG_INVAL 1009 ///< Invalid connection config +#define MESH_ERR_CONN_CONFIG_INCOMPAT 1010 ///< Incompatible connection config +#define MESH_ERR_CONN_CLOSED 1011 ///< Connection is closed +#define MESH_ERR_TIMEOUT 1012 ///< Timeout occurred +#define MESH_ERR_NOT_IMPLEMENTED 1013 ///< Feature not implemented yet /** * @brief Create a new mesh client. diff --git a/sdk/include/mesh_sdk_api.h b/sdk/include/mesh_sdk_api.h index 0e7cdf1ed..2ca2a4c59 100644 --- a/sdk/include/mesh_sdk_api.h +++ b/sdk/include/mesh_sdk_api.h @@ -12,8 +12,7 @@ #include "mesh_conn.h" void * mesh_grpc_create_client(); -void * mesh_grpc_create_client_json(const std::string& endpoint, - mesh::ClientContext *parent); +void * mesh_grpc_create_client_json(const std::string& endpoint); void mesh_grpc_destroy_client(void *client); void * mesh_grpc_create_conn(void *client, mcm_conn_param *param); void * mesh_grpc_create_conn_json(void *client, const mesh::ConnectionConfig& cfg); diff --git a/sdk/src/memif_impl.c b/sdk/src/memif_impl.c index 8f971de2e..93a7d18c6 100644 --- a/sdk/src/memif_impl.c +++ b/sdk/src/memif_impl.c @@ -482,21 +482,6 @@ void mcm_destroy_connection_memif(memif_conn_context* pctx) return; } -void mcm_cancel_poll_event_memif(void *pctx) { - memif_conn_context *pmemif = (memif_conn_context *)pctx; - int err; - - if (!pmemif) - return; - - err = memif_cancel_poll_event(memif_get_socket_handle(pmemif->conn)); - if (err != MEMIF_ERR_SUCCESS) { - log_error("Error cancelling memif poll event"); - } else { - log_info("Memif poll event cancelled"); - } -} - // /* Alloc buffer from queue. */ // void* memif_alloc_buffer(void* conn_ctx, size_t size) // { diff --git a/sdk/src/mesh_buf.cc b/sdk/src/mesh_buf.cc index ed4296a3f..146b5ea4f 100644 --- a/sdk/src/mesh_buf.cc +++ b/sdk/src/mesh_buf.cc @@ -21,20 +21,10 @@ int BufferContext::dequeue(int timeout_ms) if (!conn) return -MESH_ERR_BAD_CONN_PTR; - if (conn->ctx.cancelled()) - return -MESH_ERR_CONN_CLOSED; - int err; buf = mesh_internal_ops.dequeue_buf(conn->handle, timeout_ms, &err); - if (!buf) { - if (!err || - err == MEMIF_ERR_POLL_CANCEL || - err == MEMIF_ERR_DISCONNECT || - err == MEMIF_ERR_DISCONNECTED) - return -MESH_ERR_CONN_CLOSED; - else - return err; - } + if (!buf) + return err ? err : -MESH_ERR_CONN_CLOSED; if (buf->len != conn->cfg.buf_parts.total_size()) { mesh_internal_ops.enqueue_buf(conn->handle, buf); @@ -70,9 +60,6 @@ int BufferContext::enqueue(int timeout_ms) if (!conn) return -MESH_ERR_BAD_CONN_PTR; - if (conn->ctx.cancelled()) - return -MESH_ERR_CONN_CLOSED; - if (conn->cfg.kind == MESH_CONN_KIND_SENDER) { auto base_ptr = (char *)buf->data; auto sysdata = (BufferSysData *)(base_ptr + conn->cfg.buf_parts.sysdata.offset); diff --git a/sdk/src/mesh_client.cc b/sdk/src/mesh_client.cc index 363f0db5e..5285f90f0 100644 --- a/sdk/src/mesh_client.cc +++ b/sdk/src/mesh_client.cc @@ -5,7 +5,6 @@ */ #include "mesh_client.h" #include -#include #include "mesh_conn.h" #include "mesh_logger.h" #include "json.hpp" @@ -13,44 +12,6 @@ namespace mesh { -static volatile __sighandler_t prev_SIGINT_handler; -static volatile __sighandler_t prev_SIGTERM_handler; -context::Context gctx = context::WithCancel(context::Background()); - -static void HandleSignal(int signal) { - log::warn("Shutdown signal received"); - gctx.cancel(); - - if (signal == SIGINT && prev_SIGINT_handler) - prev_SIGINT_handler(signal); - else if (signal == SIGTERM && prev_SIGTERM_handler) - prev_SIGTERM_handler(signal); -} - -static void RegisterSigActionsOnce() { - static std::atomic initialized = false; - if (initialized) - return; - - struct sigaction action = { 0 }; - - sigfillset(&action.sa_mask); - - action.sa_flags = SA_RESTART; - action.sa_handler = HandleSignal; - - if (!prev_SIGINT_handler) { - prev_SIGINT_handler = signal(SIGINT, SIG_DFL); - sigaction(SIGINT, &action, NULL); - } - if (!prev_SIGTERM_handler) { - prev_SIGTERM_handler = signal(SIGTERM, SIG_DFL); - sigaction(SIGTERM, &action, NULL); - } - - initialized = true; -} - class KeyValueString { public: KeyValueString() = default; @@ -134,7 +95,6 @@ int ClientConfig::parse_from_json(const char *str) ClientContext::ClientContext() { - RegisterSigActionsOnce(); } int ClientContext::shutdown() @@ -163,9 +123,7 @@ int ClientContext::init(const char *config) return err; std::string endpoint = cfg.proxy_ip + ":" + cfg.proxy_port; - grpc_client = mesh_internal_ops.grpc_create_client_json(endpoint, this); - if (!grpc_client) - return -MESH_ERR_CLIENT_FAILED; + grpc_client = mesh_internal_ops.grpc_create_client_json(endpoint); return 0; } diff --git a/sdk/src/mesh_concurrency.cc b/sdk/src/mesh_concurrency.cc deleted file mode 100644 index ff8c65549..000000000 --- a/sdk/src/mesh_concurrency.cc +++ /dev/null @@ -1,150 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - - #include "mesh_concurrency.h" - - namespace mesh { - - namespace context { - - Context::Context() : ss(std::stop_source()), - timeout_ms(std::chrono::milliseconds(0)) - { - } - - Context::Context(Context& parent) : ss(std::stop_source()), - parent(&parent), - ch(new thread::Channel(1)), - timeout_ms(std::chrono::milliseconds(0)) - { - cb = std::make_unique>>( - parent.ss.get_token(), [this] { cancel(); } - ); - } - - Context::Context(Context& parent, - std::chrono::milliseconds timeout_ms) : ss(std::stop_source()), - parent(&parent), - ch(new thread::Channel(1)), - timeout_ms(timeout_ms) - { - cb = std::make_unique>>( - parent.ss.get_token(), [this] { cancel(); } - ); - - if (timeout_ms != std::chrono::milliseconds(0)) { - async_cb = std::async(std::launch::async, [this] { - std::mutex mx; - std::unique_lock lk(mx); - std::condition_variable_any cv; - cv.wait_for(lk, ss.get_token(), this->timeout_ms, [] { return false; }); - cancel(); - }); - } - } - - Context& Context::operator=(Context&& other) noexcept { - if (this != &other) { - ss = std::stop_source(); - parent = other.parent; - - if (parent) { - cb = std::make_unique>>( - parent->ss.get_token(), [this] { cancel(); } - ); - } - - timeout_ms = other.timeout_ms; - - if (timeout_ms != std::chrono::milliseconds(0)) { - async_cb = std::async(std::launch::async, [this] { - std::mutex mx; - std::unique_lock lk(mx); - std::condition_variable_any cv; - cv.wait_for(lk, ss.get_token(), timeout_ms, [] { return false; }); - cancel(); - }); - } - - if (ch) { - delete ch; - } - - ch = other.ch; - other.ch = nullptr; - } - return *this; - } - - Context::~Context() - { - cancel(); - - if (async_cb.valid()) - async_cb.wait(); - - if (ch) - delete ch; - } - - void Context::cancel() - { - if (ch) - ch->close(); - - ss.request_stop(); - } - - bool Context::cancelled() - { - return ss.stop_requested(); - } - - std::stop_token Context::stop_token() - { - return ss.get_token(); - } - - bool Context::done() - { - if (ch) - ch->receive(*this); - - return true; - } - - Context& Background() - { - static Context backgroundContext; - return backgroundContext; - } - - Context WithCancel(Context& parent) - { - return Context(parent); - } - - Context WithTimeout(Context& parent, std::chrono::milliseconds timeout_ms) - { - return Context(parent, timeout_ms); - } - - } // namespace context - - namespace thread { - - void Sleep(context::Context& ctx, std::chrono::milliseconds interval_ms) - { - std::mutex mx; - std::unique_lock lk(mx); - std::condition_variable_any cv; - cv.wait_for(lk, ctx.stop_token(), interval_ms, [] { return false; }); - } - - } // namespace thread - - } // namespace mesh - \ No newline at end of file diff --git a/sdk/src/mesh_dp.cc b/sdk/src/mesh_dp.cc index 0aec4209f..5547b159e 100644 --- a/sdk/src/mesh_dp.cc +++ b/sdk/src/mesh_dp.cc @@ -233,8 +233,6 @@ const char * mesh_err2str(int err) return "Bad buffer pointer"; case -MESH_ERR_BAD_BUF_LEN: return "Bad buffer length"; - case -MESH_ERR_CLIENT_FAILED: - return "Client creation failed"; case -MESH_ERR_CLIENT_CONFIG_INVAL: return "Invalid parameters in client configuration"; case -MESH_ERR_MAX_CONN: diff --git a/sdk/src/mesh_sdk_api.cc b/sdk/src/mesh_sdk_api.cc index a99cba1fa..163495856 100644 --- a/sdk/src/mesh_sdk_api.cc +++ b/sdk/src/mesh_sdk_api.cc @@ -17,7 +17,6 @@ #include "mesh_logger.h" #include "mcm-version.h" #include "mesh_dp_legacy.h" -#include "mesh_concurrency.h" using grpc::Channel; using grpc::Status; @@ -28,8 +27,6 @@ using sdk::ActivateConnectionRequest; using sdk::ActivateConnectionResponse; using sdk::DeleteConnectionRequest; using sdk::DeleteConnectionResponse; -using sdk::RegisterRequest; -using sdk::Event; using sdk::BufferPartition; using sdk::BufferPartitions; using sdk::ConnectionKind; @@ -45,14 +42,12 @@ using sdk::AudioSampleRate; using sdk::AudioFormat; using sdk::AudioPacketTime; -extern "C" void mcm_cancel_poll_event_memif(void *pctx); - using namespace mesh; class SDKAPIClient { public: - SDKAPIClient(std::shared_ptr channel, mesh::ClientContext *parent) - : stub_(SDKAPI::NewStub(channel)), parent(parent) {} + SDKAPIClient(std::shared_ptr channel) + : stub_(SDKAPI::NewStub(channel)) {} int CreateConnection(std::string& conn_id, mcm_conn_param *param, memif_conn_param *memif_param) { @@ -74,6 +69,7 @@ class SDKAPIClient { Status status = stub_->CreateConnection(&context, req, &resp); if (status.ok()) { + client_id = resp.client_id(); conn_id = resp.conn_id(); int sz = resp.memif_conn_param().size(); @@ -187,6 +183,7 @@ class SDKAPIClient { Status status = stub_->CreateConnection(&context, req, &resp); if (status.ok()) { + client_id = resp.client_id(); conn_id = resp.conn_id(); int sz = resp.memif_conn_param().size(); @@ -219,10 +216,7 @@ class SDKAPIClient { Status status = stub_->ActivateConnection(&context, req, &resp); if (status.ok()) { - if (!resp.linked()) - return -EAGAIN; - else - return 0; + return 0; } else { log::error("ActivateConnection RPC failed: %s", status.error_message().c_str()); @@ -251,108 +245,10 @@ class SDKAPIClient { } } - void RegisterAndStreamEvents() { - RegisterRequest req; - - grpc::ClientContext context; - - std::jthread shutdown_thread([&]() { - ctx.done(); - context.TryCancel(); - }); - - thread::Defer d([&]{ ctx.cancel(); }); - - std::unique_ptr> reader( - stub_->RegisterAndStreamEvents(&context, req)); - - Event event; - while (reader->Read(&event)) { - if (event.has_client_registered()) { - client_id = event.client_registered().client_id(); - registered_ch.send(ctx, true); - } else if (event.has_conn_unlink_requested()) { - auto conn_id = event.conn_unlink_requested().conn_id(); - log::debug("[EVENT] Conn unlink requested")("id", conn_id); - - if (parent) { - std::lock_guard lk(parent->mx); - for (const auto conn : parent->conns) { - conn->ctx.cancel(); - if (conn->grpc_conn) { - auto handle = *(mcm_conn_context **)conn->grpc_conn; - if (handle) - mcm_cancel_poll_event_memif(handle->priv); - } - } - } - } else if (event.has_logger_config_changed()) { - log::debug("[EVENT] Logger config changed"); - } else { - log::info("Received unknown event type"); - } - } - - Status status = reader->Finish(); - registered_ch.send(ctx, false); - if (!status.ok()) { - if (status.error_code() == grpc::StatusCode::CANCELLED) - return; - - log::error("RegisterAndStreamEvents RPC failed: %s", - status.error_message().c_str()); - } - } - - int Run() { - try { - th = std::jthread([this]() { - RegisterAndStreamEvents(); - }); - } - catch (const std::system_error& e) { - log::error("SDK client background thread creation failed"); - Shutdown(); - return -ENOMEM; - } - - auto tctx = context::WithTimeout(gctx, std::chrono::milliseconds(15000)); - auto registered = registered_ch.receive(tctx); - if (!registered.has_value()) { - if (tctx.cancelled() && !gctx.cancelled()) - log::error("SDK client registration timeout"); - Shutdown(); - return -EIO; - } - if (!registered.value()) { - log::error("SDK client registration failed"); - Shutdown(); - return -EIO; - } - - log::info("SDK client registered successfully")("client_id", client_id); - - return 0; - } - - void Shutdown() { - ctx.cancel(); - if (th.joinable()) - th.join(); - } - std::string client_id; private: - // void BackgroundProcess() { - // thread::Sleep(ctx, std::chrono::milliseconds(1000)); - // } - std::unique_ptr stub_; - std::jthread th; - thread::Channel registered_ch; - context::Context ctx = context::WithCancel(context::Background()); - mesh::ClientContext *parent = nullptr; }; // DEBUG @@ -373,28 +269,18 @@ void * mesh_grpc_create_client() auto client = new(std::nothrow) SDKAPIClient(grpc::CreateChannel("localhost:" + sdk_port, // gRPC default 50051 - grpc::InsecureChannelCredentials()), nullptr); + grpc::InsecureChannelCredentials())); return client; } -void * mesh_grpc_create_client_json(const std::string& endpoint, - mesh::ClientContext *parent) { +void * mesh_grpc_create_client_json(const std::string& endpoint) { log::info("Media Communications Mesh SDK version %s #%s", VERSION_TAG, VERSION_HASH) ("endpoint", endpoint); auto client = new(std::nothrow) SDKAPIClient(grpc::CreateChannel(endpoint, - grpc::InsecureChannelCredentials()), parent); - - if (client) { - auto err = client->Run(); - if (err) { - delete client; - return NULL; - } - } - + grpc::InsecureChannelCredentials())); return client; } @@ -402,7 +288,6 @@ void mesh_grpc_destroy_client(void *client) { if (client) { auto cli = static_cast(client); - cli->Shutdown(); delete cli; } } @@ -515,16 +400,7 @@ void * mesh_grpc_create_conn_json(void *client, const mesh::ConnectionConfig& cf return NULL; } - while (!gctx.cancelled()) { - err = cli->ActivateConnection(conn->conn_id); - if (err == -EAGAIN) { - // Repeat after a small delay - thread::Sleep(gctx, std::chrono::milliseconds(50)); - continue; - } - break; - } - + err = cli->ActivateConnection(conn->conn_id); if (err) { log::error("Activate gRPC connection failed (%d)", err); mesh_grpc_destroy_conn(conn); diff --git a/sdk/tests/mesh_dp_api_tests.cc b/sdk/tests/mesh_dp_api_tests.cc index a5fb26e04..85ced8d61 100644 --- a/sdk/tests/mesh_dp_api_tests.cc +++ b/sdk/tests/mesh_dp_api_tests.cc @@ -100,8 +100,7 @@ void * mock_grpc_create_client() return NULL; } -void * mock_grpc_create_client_json(const std::string& endpoint, - mesh::ClientContext *parent) +void * mock_grpc_create_client_json(const std::string& endpoint) { return NULL; } @@ -1534,16 +1533,15 @@ TEST(APITests_MeshBuffer, Test_ImportantConstants) { EXPECT_EQ(MESH_ERR_BAD_CONFIG_PTR, 1002); EXPECT_EQ(MESH_ERR_BAD_BUF_PTR, 1003); EXPECT_EQ(MESH_ERR_BAD_BUF_LEN, 1004); - EXPECT_EQ(MESH_ERR_CLIENT_FAILED, 1005); - EXPECT_EQ(MESH_ERR_CLIENT_CONFIG_INVAL, 1006); - EXPECT_EQ(MESH_ERR_MAX_CONN, 1007); - EXPECT_EQ(MESH_ERR_FOUND_ALLOCATED, 1008); - EXPECT_EQ(MESH_ERR_CONN_FAILED, 1009); - EXPECT_EQ(MESH_ERR_CONN_CONFIG_INVAL, 1010); - EXPECT_EQ(MESH_ERR_CONN_CONFIG_INCOMPAT, 1011); - EXPECT_EQ(MESH_ERR_CONN_CLOSED, 1012); - EXPECT_EQ(MESH_ERR_TIMEOUT, 1013); - EXPECT_EQ(MESH_ERR_NOT_IMPLEMENTED, 1014); + EXPECT_EQ(MESH_ERR_CLIENT_CONFIG_INVAL, 1005); + EXPECT_EQ(MESH_ERR_MAX_CONN, 1006); + EXPECT_EQ(MESH_ERR_FOUND_ALLOCATED, 1007); + EXPECT_EQ(MESH_ERR_CONN_FAILED, 1008); + EXPECT_EQ(MESH_ERR_CONN_CONFIG_INVAL, 1009); + EXPECT_EQ(MESH_ERR_CONN_CONFIG_INCOMPAT, 1010); + EXPECT_EQ(MESH_ERR_CONN_CLOSED, 1011); + EXPECT_EQ(MESH_ERR_TIMEOUT, 1012); + EXPECT_EQ(MESH_ERR_NOT_IMPLEMENTED, 1013); EXPECT_EQ(MESH_TIMEOUT_DEFAULT, -2); EXPECT_EQ(MESH_TIMEOUT_INFINITE, -1);