diff --git a/docs/SDK_API_Definition.md b/docs/SDK_API_Definition.md
index 98560b646..cd893aec9 100644
--- a/docs/SDK_API_Definition.md
+++ b/docs/SDK_API_Definition.md
@@ -441,9 +441,10 @@ NOTE: The codes are negative integer values.
| `-MESH_ERR_BAD_CONFIG_PTR` | Bad configuration pointer | The configuration pointer is NULL. |
| `-MESH_ERR_BAD_BUF_PTR` | Bad buffer pointer | The buffer pointer is NULL. |
| `-MESH_ERR_BAD_BUF_LEN` | Bad buffer length | **Rx connection**: The buffer length is corrupted.
**Tx connection**: The buffer length is bigger than maximum. |
+| `-MESH_ERR_CLIENT_FAILED` | Client creation failed | An error occurred while creating an SDK client. |
| `-MESH_ERR_CLIENT_CONFIG_INVAL` | Invalid client config | JSON client configuration string is malformed. |
| `-MESH_ERR_MAX_CONN` | Reached max connections number | An attempt to create a connection failed due to reaching the maximum number of connections defined in `"maxMediaConnections"`. |
-| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting a client, some connections were found not closed. Delete all connections explicitly before deleting the client. |
+| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting an SDK client, some connections were found not closed. Delete all connections explicitly before deleting the client. |
| `-MESH_ERR_CONN_FAILED` | Connection creation failed | An error occurred while creating a connection. |
| `-MESH_ERR_CONN_CONFIG_INVAL` | Invalid connection config | JSON connection configuration string is malformed or one of parameters has an incorrect value. |
| `-MESH_ERR_CONN_CONFIG_INCOMPAT` | Incompatible connection config | Incompatible parameters found in the JSON connection configuration string. |
diff --git a/ffmpeg-plugin/mcm_audio_rx.c b/ffmpeg-plugin/mcm_audio_rx.c
index b2271dea6..e58d67822 100644
--- a/ffmpeg-plugin/mcm_audio_rx.c
+++ b/ffmpeg-plugin/mcm_audio_rx.c
@@ -133,19 +133,17 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt)
s->first_frame = false;
err = mesh_get_buffer_timeout(s->conn, &buf, timeout);
- if (err == -MESH_ERR_CONN_CLOSED) {
- ret = AVERROR_EOF;
- goto error_close_conn;
- }
+ if (err == -MESH_ERR_CONN_CLOSED)
+ return AVERROR_EOF;
+
if (err) {
if (mcm_shutdown_requested()) {
- ret = AVERROR_EXIT;
+ return AVERROR_EXIT;
} else {
av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n",
mesh_err2str(err), err);
- ret = AVERROR(EIO);
+ return AVERROR(EIO);
}
- goto error_close_conn;
}
if (mcm_shutdown_requested()) {
@@ -166,8 +164,7 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt)
if (err) {
av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n",
mesh_err2str(err), err);
- ret = AVERROR(EIO);
- goto error_close_conn;
+ return AVERROR(EIO);
}
return len;
@@ -175,12 +172,6 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt)
error_put_buf:
mesh_put_buffer(&buf);
-error_close_conn:
- err = mesh_delete_connection(&s->conn);
- if (err)
- av_log(avctx, AV_LOG_ERROR, "Delete mesh connection failed: %s (%d)\n",
- mesh_err2str(err), err);
-
return ret;
}
diff --git a/ffmpeg-plugin/mcm_video_rx.c b/ffmpeg-plugin/mcm_video_rx.c
index 4aabd18d1..61ee6606a 100644
--- a/ffmpeg-plugin/mcm_video_rx.c
+++ b/ffmpeg-plugin/mcm_video_rx.c
@@ -128,19 +128,17 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt)
s->first_frame = false;
err = mesh_get_buffer_timeout(s->conn, &buf, timeout);
- if (err == -MESH_ERR_CONN_CLOSED) {
- ret = AVERROR_EOF;
- goto error_close_conn;
- }
+ if (err == -MESH_ERR_CONN_CLOSED)
+ return AVERROR_EOF;
+
if (err) {
if (mcm_shutdown_requested()) {
- ret = AVERROR_EXIT;
+ return AVERROR_EXIT;
} else {
av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n",
mesh_err2str(err), err);
- ret = AVERROR(EIO);
+ return AVERROR(EIO);
}
- goto error_close_conn;
}
if (mcm_shutdown_requested()) {
@@ -161,8 +159,7 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt)
if (err) {
av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n",
mesh_err2str(err), err);
- ret = AVERROR(EIO);
- goto error_close_conn;
+ return AVERROR(EIO);
}
return len;
@@ -170,9 +167,6 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt)
error_put_buf:
mesh_put_buffer(&buf);
-error_close_conn:
- mesh_delete_connection(&s->conn);
-
return ret;
}
diff --git a/media-proxy/CMakeLists.txt b/media-proxy/CMakeLists.txt
index 5c2b423fb..2d7e0ed85 100644
--- a/media-proxy/CMakeLists.txt
+++ b/media-proxy/CMakeLists.txt
@@ -66,6 +66,7 @@ endif()
# Define the .proto files
set(PROTO_FILES
+ ${PROTO_DIR}/type.proto
${PROTO_DIR}/conn-config.proto
${PROTO_DIR}/sdk.proto
${PROTO_DIR}/mediaproxy.proto
diff --git a/media-proxy/include/mesh/client.h b/media-proxy/include/mesh/client.h
new file mode 100644
index 000000000..13e9c8a53
--- /dev/null
+++ b/media-proxy/include/mesh/client.h
@@ -0,0 +1,29 @@
+/*
+ * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef CLIENT_H
+#define CLIENT_H
+
+#include
+
+namespace mesh::client {
+
+/**
+ * Client
+ *
+ * SDK client implementation.
+ */
+class Client {
+
+public:
+ Client() {}
+
+ std::string id;
+};
+
+} // namespace mesh::client
+
+#endif // CLIENT_H
diff --git a/media-proxy/include/mesh/client_registry.h b/media-proxy/include/mesh/client_registry.h
new file mode 100644
index 000000000..9769c61f4
--- /dev/null
+++ b/media-proxy/include/mesh/client_registry.h
@@ -0,0 +1,56 @@
+/*
+ * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef CLIENT_REGISTRY_H
+#define CLIENT_REGISTRY_H
+
+#include "client.h"
+#include
+#include
+#include
+#include "uuid.h"
+
+namespace mesh::client {
+
+/**
+ * Registry
+ *
+ * Thread-safe registry to store pointers to SDK client instances.
+ */
+class Registry {
+public:
+ int add(const std::string& id, Client *client) {
+ std::unique_lock lk(mx);
+ if (clients.contains(id))
+ return -1;
+ clients[id] = client;
+ return 0;
+ }
+
+ bool remove(const std::string& id) {
+ std::unique_lock lk(mx);
+ return clients.erase(id) > 0;
+ }
+
+ Client * get(const std::string& id) {
+ std::shared_lock lk(mx);
+ auto it = clients.find(id);
+ if (it != clients.end()) {
+ return it->second;
+ }
+ return nullptr;
+ }
+
+private:
+ std::unordered_map clients;
+ std::shared_mutex mx;
+};
+
+extern Registry registry;
+
+} // namespace mesh::client
+
+#endif // CLIENT_REGISTRY_H
diff --git a/media-proxy/include/mesh/conn.h b/media-proxy/include/mesh/conn.h
index 1ebd4bf9d..066326ea8 100644
--- a/media-proxy/include/mesh/conn.h
+++ b/media-proxy/include/mesh/conn.h
@@ -206,6 +206,9 @@ class Connection : public telemetry::MetricsProvider {
Connection * link();
void set_config(const Config& cfg);
+ void set_parent(const std::string& parent_id);
+
+ void notify_parent_conn_unlink_requested(context::Context& ctx);
Result establish(context::Context& ctx);
Result establish_async(context::Context& ctx);
@@ -227,6 +230,7 @@ class Connection : public telemetry::MetricsProvider {
} info;
Config config;
+ std::string legacy_sdk_id;
protected:
void set_state(context::Context& ctx, State new_state);
@@ -268,6 +272,7 @@ class Connection : public telemetry::MetricsProvider {
context::Context establish_ctx = context::WithCancel(context::Background());
std::jthread establish_th;
std::jthread shutdown_th;
+ std::string parent_id;
};
const char * kind2str(Kind kind, bool brief = false);
diff --git a/media-proxy/include/mesh/conn_registry.h b/media-proxy/include/mesh/conn_registry.h
index 7628d8e51..4e879ed7c 100644
--- a/media-proxy/include/mesh/conn_registry.h
+++ b/media-proxy/include/mesh/conn_registry.h
@@ -57,6 +57,11 @@ class Registry {
return ids;
}
+ int size() {
+ std::shared_lock lk(mx);
+ return conns.size();
+ }
+
private:
std::unordered_map conns;
@@ -68,8 +73,6 @@ class Registry {
std::shared_mutex mx;
};
-extern Registry registry;
-
} // namespace mesh::connection
#endif // CONN_REGISTRY_H
diff --git a/media-proxy/include/mesh/event.h b/media-proxy/include/mesh/event.h
new file mode 100644
index 000000000..85a2c8447
--- /dev/null
+++ b/media-proxy/include/mesh/event.h
@@ -0,0 +1,108 @@
+/*
+ * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef EVENT_H
+#define EVENT_H
+
+#include
+#include
+#include
+#include "concurrency.h"
+#include "logger.h"
+#include
+#include
+
+namespace mesh::event {
+
+enum class Type {
+ empty_event,
+ conn_unlink_requested,
+};
+
+typedef void (* Handler)(const Type& type);
+
+class Event {
+public:
+ std::string consumer_id;
+ Type type;
+ std::unordered_map params;
+};
+
+/**
+ * EventBroker
+ *
+ * Subsystem for passing events from producers to consumers.
+ * Enables software component decoupling.
+ */
+class EventBroker {
+public:
+ EventBroker() {}
+
+ thread::Channel * subscribe(const std::string& consumer_id, int queue_sz = 1) {
+ auto ch = new(std::nothrow) thread::Channel(queue_sz);
+ if (ch) {
+ std::unique_lock lk(mx);
+ channels[ch] = consumer_id;
+ }
+ return ch;
+ }
+
+ bool unsubscribe(thread::Channel *ch) {
+ std::unique_lock lk(mx);
+ auto ret = channels.erase(ch) > 0;
+ delete ch;
+ return ret;
+ }
+
+ bool send(context::Context& ctx, const std::string& consumer_id, const Type type,
+ const std::unordered_map& params = {}) {
+ Event evt = {
+ .consumer_id = consumer_id,
+ .type = type,
+ .params = params,
+ };
+ return events.send(ctx, evt);
+ }
+
+ void run(context::Context& ctx) {
+ for (;;) {
+ auto v = events.receive(ctx);
+ if (ctx.cancelled())
+ return;
+
+ if (!v.has_value())
+ continue;
+
+ auto evt = v.value();
+ for (const auto& kv : channels) {
+ if (kv.second == evt.consumer_id) {
+ auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(3000));
+ if (ctx.cancelled())
+ return;
+
+ if (tctx.cancelled()) {
+ log::error("Event sending timeout")("type", (int)evt.type)
+ ("consumer_id", evt.consumer_id);
+ } else if (!kv.first->send(tctx, evt)) {
+ log::error("Event sending failed")("type", (int)evt.type)
+ ("consumer_id", evt.consumer_id);
+ }
+ }
+ }
+ }
+ }
+
+private:
+ std::unordered_map *, std::string> channels;
+ std::mutex mx;
+ thread::Channel events = thread::Channel(100);
+};
+
+extern EventBroker broker;
+
+} // namespace mesh::event
+
+#endif // EVENT_H
diff --git a/media-proxy/include/mesh/manager_local.h b/media-proxy/include/mesh/manager_local.h
index a5d53ebe9..a6e52967a 100644
--- a/media-proxy/include/mesh/manager_local.h
+++ b/media-proxy/include/mesh/manager_local.h
@@ -17,10 +17,11 @@ namespace mesh::connection {
class LocalManager {
public:
int create_connection_sdk(context::Context& ctx, std::string& id,
- mcm_conn_param *param, memif_conn_param *memif_param,
+ const std::string& client_id, mcm_conn_param *param,
+ memif_conn_param *memif_param,
const Config& conn_config, std::string& err_str);
- int activate_connection_sdk(context::Context& ctx, const std::string& id);
+ Result activate_connection_sdk(context::Context& ctx, const std::string& id);
int delete_connection_sdk(context::Context& ctx, const std::string& id,
bool do_unregister = true);
@@ -29,6 +30,8 @@ class LocalManager {
int reregister_all_connections(context::Context& ctx);
+ int notify_all_shutdown_wait(context::Context& ctx);
+
void shutdown(context::Context& ctx);
void lock();
diff --git a/media-proxy/include/mesh/st2110.h b/media-proxy/include/mesh/st2110.h
index 5d985f536..fd1e941a5 100644
--- a/media-proxy/include/mesh/st2110.h
+++ b/media-proxy/include/mesh/st2110.h
@@ -132,7 +132,7 @@ template class ST2110 : public C
mtl_session = create_session(mtl_device, &ops);
if (!mtl_session) {
- log::error("Failed to create session");
+ log::error("Failed to create MTL session");
set_state(ctx, State::closed);
return set_result(Result::error_general_failure);
}
diff --git a/media-proxy/src/media_proxy.cc b/media-proxy/src/media_proxy.cc
index f299e72e8..28ac87c5d 100644
--- a/media-proxy/src/media_proxy.cc
+++ b/media-proxy/src/media_proxy.cc
@@ -18,6 +18,7 @@
#include "metrics_collector.h"
#include "proxy_config.h"
#include "mcm-version.h"
+#include "event.h"
#include
#include
@@ -209,6 +210,12 @@ int main(int argc, char* argv[])
std::signal(SIGINT, signal_handler);
std::signal(SIGTERM, signal_handler);
+ // Start event broker
+ auto event_ctx = context::WithCancel(context::Background());
+ std::thread eventBrokerThread([&]() {
+ event::broker.run(event_ctx);
+ });
+
// Start ProxyAPI client
auto err = RunProxyAPIClient(ctx);
if (err)
@@ -233,8 +240,8 @@ int main(int argc, char* argv[])
// Stop Local connection manager
log::info("Shutting down Local conn manager");
auto tctx = context::WithTimeout(context::Background(),
- std::chrono::milliseconds(3000));
- connection::local_manager.shutdown(ctx);
+ std::chrono::milliseconds(5000));
+ connection::local_manager.shutdown(tctx);
metricsCollectorThread.join();
@@ -245,6 +252,10 @@ int main(int argc, char* argv[])
sdk_ctx.cancel();
sdkApiThread.join();
+ // Shutdown event broker
+ event_ctx.cancel();
+ eventBrokerThread.join();
+
log::info("Media Proxy exited");
exit(0);
diff --git a/media-proxy/src/mesh/client_registry.cc b/media-proxy/src/mesh/client_registry.cc
new file mode 100644
index 000000000..9375cd2b9
--- /dev/null
+++ b/media-proxy/src/mesh/client_registry.cc
@@ -0,0 +1,13 @@
+/*
+ * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include "client_registry.h"
+
+namespace mesh::client {
+
+Registry registry;
+
+} // namespace mesh::client
diff --git a/media-proxy/src/mesh/conn.cc b/media-proxy/src/mesh/conn.cc
index 23ec64ae8..9a32b179d 100644
--- a/media-proxy/src/mesh/conn.cc
+++ b/media-proxy/src/mesh/conn.cc
@@ -7,6 +7,7 @@
#include "conn.h"
#include
#include "logger.h"
+#include "event.h"
namespace mesh::connection {
@@ -120,6 +121,18 @@ void Connection::set_config(const Config& cfg)
}
}
+void Connection::set_parent(const std::string& parent_id)
+{
+ this->parent_id = parent_id;
+}
+
+void Connection::notify_parent_conn_unlink_requested(context::Context& ctx)
+{
+ if (!parent_id.empty())
+ event::broker.send(ctx, parent_id, event::Type::conn_unlink_requested,
+ {{"conn_id", id}});
+}
+
Result Connection::establish(context::Context& ctx)
{
switch (state()) {
@@ -316,8 +329,8 @@ Result Connection::set_link(context::Context& ctx, Connection *new_link,
dp_link.store_wait(new_link);
- // TODO: generate a post Event (conn_link_changed).
- // Use context to cancel sending the Event.
+ if (!new_link)
+ notify_parent_conn_unlink_requested(ctx);
return set_result(Result::success);
}
diff --git a/media-proxy/src/mesh/event.cc b/media-proxy/src/mesh/event.cc
new file mode 100644
index 000000000..5c6695fda
--- /dev/null
+++ b/media-proxy/src/mesh/event.cc
@@ -0,0 +1,13 @@
+/*
+ * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include "event.h"
+
+namespace mesh::event {
+
+EventBroker broker;
+
+} // namespace mesh::event
diff --git a/media-proxy/src/mesh/manager_local.cc b/media-proxy/src/mesh/manager_local.cc
index 89bb36180..3ab4b5387 100644
--- a/media-proxy/src/mesh/manager_local.cc
+++ b/media-proxy/src/mesh/manager_local.cc
@@ -18,10 +18,8 @@ namespace mesh::connection {
LocalManager local_manager;
-// Temporary Multipoint group business logic.
-// std::string tx_id, rx_id;
-
int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id,
+ const std::string& client_id,
mcm_conn_param *param,
memif_conn_param *memif_param,
const Config& conn_config,
@@ -47,7 +45,6 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id,
memif_ops_t memif_ops = {};
memif_ops.interface_id = 0;
- // const char *type_str = param->type == is_tx ? "tx" : "rx";
const char *type_str = conn_config.kind2str();
snprintf(memif_ops.app_name, sizeof(memif_ops.app_name),
@@ -57,17 +54,10 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id,
snprintf(memif_ops.socket_path, sizeof(memif_ops.socket_path),
"/run/mcm/media_proxy_%s_%s.sock", type_str, id.c_str());
- // size_t frame_size = st_frame_size(ST_FRAME_FMT_YUV422PLANAR10LE,
- // param->width, param->height, false);
- // size_t frame_size = st_frame_size(ST_FRAME_FMT_YUV422PLANAR10LE,
- // conn_config.payload.video.width,
- // conn_config.payload.video.height, false);
-
size_t frame_size = conn_config.buf_parts.total_size();
Local *conn;
- // if (param->type == is_tx)
if (conn_config.kind == sdk::CONN_KIND_TRANSMITTER)
conn = new(std::nothrow) LocalRx;
else
@@ -79,6 +69,7 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id,
}
conn->set_config(conn_config);
+ conn->set_parent(client_id);
uint8_t log2_ring_size = conn_config.buf_queue_capacity ?
std::log2(conn_config.buf_queue_capacity) : 0;
@@ -90,7 +81,6 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id,
}
// Prepare parameters to register in Media Proxy
- // std::string kind = param->type == is_tx ? "rx" : "tx";
std::string kind = conn_config.kind == sdk::CONN_KIND_TRANSMITTER ? "rx" : "tx";
lock();
@@ -113,10 +103,6 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id,
return -1;
}
- // log::debug("MCM PARAMS")
- // ("remote", std::string(param->remote_addr.ip) + ":" + std::string(param->remote_addr.port))
- // ("local", std::string(param->local_addr.ip) + ":" + std::string(param->local_addr.port));
-
conn->get_params(memif_param);
// Assign id accessed by metrics collector.
@@ -125,34 +111,21 @@ int LocalManager::create_connection_sdk(context::Context& ctx, std::string& id,
// TODO: Rethink the flow to avoid using two registries with different ids.
registry_sdk.replace(id, conn);
registry.add(agent_assigned_id, conn);
- // log::debug("Added local conn")("conn_id", conn->id)("id", id);
-
- // // Temporary Multipoint group business logic.
- // // TODO: Remove when Multipoint Groups are implemented.
- // if (param->type == is_tx) {
- // auto tx_conn = registry_sdk.get(tx_id);
- // if (tx_conn) {
- // conn->set_link(ctx, tx_conn);
- // tx_conn->set_link(ctx, conn);
- // }
- // rx_id = id;
- // } else {
- // auto rx_conn = registry_sdk.get(rx_id);
- // if (rx_conn) {
- // conn->set_link(ctx, rx_conn);
- // rx_conn->set_link(ctx, conn);
- // }
- // tx_id = id;
- // }
+
+ conn->legacy_sdk_id = id; // Remember the id generated in Media Proxy.
+ id = agent_assigned_id; // Let SDK use the Agent provided id.
return 0;
}
-int LocalManager::activate_connection_sdk(context::Context& ctx, const std::string& id)
+Result LocalManager::activate_connection_sdk(context::Context& ctx, const std::string& id)
{
- auto conn = registry_sdk.get(id);
+ auto conn = registry.get(id);
if (!conn)
- return -1;
+ return Result::error_bad_argument;
+
+ if (!conn->link())
+ return Result::error_no_link_assigned;
log::debug("Activate local conn")("conn_id", conn->id)("id", id);
@@ -163,13 +136,13 @@ int LocalManager::activate_connection_sdk(context::Context& ctx, const std::stri
conn->resume(ctx);
}
- return 0;
+ return Result::success;
}
int LocalManager::delete_connection_sdk(context::Context& ctx, const std::string& id,
bool do_unregister)
{
- auto conn = registry_sdk.get(id);
+ auto conn = registry.get(id);
if (!conn)
return -1;
@@ -193,7 +166,7 @@ int LocalManager::delete_connection_sdk(context::Context& ctx, const std::string
}
registry.remove(conn->id);
- registry_sdk.remove(id);
+ registry_sdk.remove(conn->legacy_sdk_id);
}
auto res = conn->shutdown(ctx);
@@ -210,13 +183,13 @@ Connection * LocalManager::get_connection(context::Context& ctx,
int LocalManager::reregister_all_connections(context::Context& ctx)
{
- auto ids = registry_sdk.get_all_ids();
+ auto ids = registry.get_all_ids();
Config conn_config;
log::debug("Re-register all conns");
for (std::string id : ids) {
- auto conn = registry_sdk.get(id);
+ auto conn = registry.get(id);
if (!conn)
continue;
@@ -244,9 +217,37 @@ int LocalManager::reregister_all_connections(context::Context& ctx)
return 0;
}
+int LocalManager::notify_all_shutdown_wait(context::Context& ctx)
+{
+ auto ids = registry.get_all_ids();
+
+ lock();
+
+ for (const std::string& id : ids) {
+ auto conn = registry.get(id);
+ if (conn)
+ conn->notify_parent_conn_unlink_requested(ctx);
+ }
+
+ unlock();
+
+ while (!ctx.cancelled()) {
+ if (registry.size() == 0)
+ return 0;
+
+ thread::Sleep(ctx, std::chrono::milliseconds(100));
+ }
+
+ return -1;
+}
+
void LocalManager::shutdown(context::Context& ctx)
{
- auto ids = registry_sdk.get_all_ids();
+ auto err = notify_all_shutdown_wait(ctx);
+ if (err)
+ log::error("Shutdown notification timeout");
+
+ auto ids = registry.get_all_ids();
for (const std::string& id : ids) {
auto err = delete_connection_sdk(ctx, id);
diff --git a/media-proxy/src/mesh/sdk_api.cc b/media-proxy/src/mesh/sdk_api.cc
index ba4d0f547..b12f8e8c3 100644
--- a/media-proxy/src/mesh/sdk_api.cc
+++ b/media-proxy/src/mesh/sdk_api.cc
@@ -15,37 +15,48 @@
#include "mcm_dp.h"
#include "manager_local.h"
#include "proxy_config.h"
+#include "client_registry.h"
+#include "uuid.h"
+#include "event.h"
namespace mesh {
-using grpc::Server;
-using grpc::ServerBuilder;
-using grpc::ServerContext;
-using grpc::Status;
-using grpc::StatusCode;
-using sdk::SDKAPI;
-using sdk::CreateConnectionRequest;
-using sdk::CreateConnectionResponse;
-using sdk::ActivateConnectionRequest;
-using sdk::ActivateConnectionResponse;
-using sdk::DeleteConnectionRequest;
-using sdk::DeleteConnectionResponse;
-using sdk::ConnectionConfig;
-using sdk::ConfigMultipointGroup;
-using sdk::ConfigST2110;
-using sdk::ConfigRDMA;
-using sdk::ConfigVideo;
-using sdk::ConfigAudio;
-using sdk::ST2110Transport;
-using sdk::VideoPixelFormat;
-using sdk::AudioSampleRate;
-using sdk::AudioFormat;
-using sdk::AudioPacketTime;
+using ::grpc::Server;
+using ::grpc::ServerBuilder;
+using ::grpc::ServerWriter;
+using ::grpc::ServerContext;
+using ::grpc::Status;
+using ::grpc::StatusCode;
+using ::sdk::SDKAPI;
+using ::sdk::CreateConnectionRequest;
+using ::sdk::CreateConnectionResponse;
+using ::sdk::ActivateConnectionRequest;
+using ::sdk::ActivateConnectionResponse;
+using ::sdk::DeleteConnectionRequest;
+using ::sdk::DeleteConnectionResponse;
+using ::sdk::RegisterRequest;
+using ::sdk::Event;
+using ::sdk::ConnectionConfig;
+using ::sdk::ConfigMultipointGroup;
+using ::sdk::ConfigST2110;
+using ::sdk::ConfigRDMA;
+using ::sdk::ConfigVideo;
+using ::sdk::ConfigAudio;
+using ::sdk::ST2110Transport;
+using ::sdk::VideoPixelFormat;
+using ::sdk::AudioSampleRate;
+using ::sdk::AudioFormat;
+using ::sdk::AudioPacketTime;
class SDKAPIServiceImpl final : public SDKAPI::Service {
public:
Status CreateConnection(ServerContext* sctx, const CreateConnectionRequest* req,
CreateConnectionResponse* resp) override {
+ const auto& client_id = req->client_id();
+ auto client = client::registry.get(client_id);
+ if (!client)
+ return Status(StatusCode::INVALID_ARGUMENT, "client not registered");
+
memif_conn_param memif_param = {};
mcm_conn_param param = {};
@@ -77,8 +88,8 @@ class SDKAPIServiceImpl final : public SDKAPI::Service {
std::string err_str;
auto& mgr = connection::local_manager;
- int err = mgr.create_connection_sdk(ctx, conn_id, ¶m, &memif_param,
- conn_config, err_str);
+ int err = mgr.create_connection_sdk(ctx, conn_id, client_id, ¶m,
+ &memif_param, conn_config, err_str);
if (err) {
log::error("create_local_conn() failed (%d)", err);
if (err_str.empty())
@@ -90,35 +101,49 @@ class SDKAPIServiceImpl final : public SDKAPI::Service {
memif_param.conn_args.is_master = 0; // SDK client is to be secondary
resp->set_conn_id(conn_id);
- resp->set_client_id("default-client");
std::string memif_param_str(reinterpret_cast(&memif_param),
sizeof(memif_conn_param));
resp->set_memif_conn_param(memif_param_str);
log::info("[SDK] Connection created")("id", resp->conn_id())
- ("client_id", resp->client_id());
+ ("client_id", client_id);
return Status::OK;
}
Status ActivateConnection(ServerContext* sctx, const ActivateConnectionRequest* req,
ActivateConnectionResponse* resp) override {
-
+ const auto& client_id = req->client_id();
+ auto client = client::registry.get(client_id);
+ if (!client)
+ return Status(StatusCode::INVALID_ARGUMENT, "client not registered");
+
auto ctx = context::WithCancel(context::Background());
auto conn_id = req->conn_id();
auto& mgr = connection::local_manager;
- int err = mgr.activate_connection_sdk(ctx, conn_id);
- if (err)
- ; // log::error("activate_local_conn err (%d)", err);
- else
+ auto ret = mgr.activate_connection_sdk(ctx, conn_id);
+ switch (ret) {
+ case connection::Result::success:
log::info("[SDK] Connection active")("id", req->conn_id())
- ("client_id", req->client_id());
+ ("client_id", client_id);
+ resp->set_linked(true);
+ return Status::OK;
+
+ case connection::Result::error_no_link_assigned:
+ resp->set_linked(false);
+ return Status::OK;
- return Status::OK;
+ default:
+ return Status(StatusCode::INTERNAL, connection::result2str(ret));
+ }
}
Status DeleteConnection(ServerContext* sctx, const DeleteConnectionRequest* req,
DeleteConnectionResponse* resp) override {
+ const auto& client_id = req->client_id();
+ auto client = client::registry.get(client_id);
+ if (!client)
+ return Status(StatusCode::INVALID_ARGUMENT, "client not registered");
auto ctx = context::WithCancel(context::Background());
auto conn_id = req->conn_id();
@@ -129,16 +154,106 @@ class SDKAPIServiceImpl final : public SDKAPI::Service {
; // log::error("delete_local_conn err (%d)", err);
else
log::info("[SDK] Connection deleted")("id", req->conn_id())
- ("client_id", req->client_id());
+ ("client_id", client_id);
return Status::OK;
}
+
+ Status RegisterAndStreamEvents(ServerContext* sctx, const RegisterRequest* req,
+ ServerWriter* writer) override {
+ std::string id;
+ auto status = register_client(id);
+ if (!status.ok()) {
+ log::error("SDK client registration err: %s", status.error_message().c_str());
+ return status;
+ }
+
+ auto ch = event::broker.subscribe(id);
+ if (!ch) {
+ log::error("SDK client event subscription err: %s", status.error_message().c_str());
+ return status;
+ }
+
+ Event event;
+ event.mutable_client_registered()->set_client_id(id);
+ writer->Write(event);
+
+ while (!sctx->IsCancelled() && !ctx.cancelled()) {
+ auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(500));
+ auto v = ch->receive(tctx);
+ if (!v.has_value())
+ continue;
+
+ auto evt = v.value();
+ log::debug("Sending event")("type", (int)evt.type);
+
+ Event event;
+
+ if (evt.type == event::Type::conn_unlink_requested) {
+ auto e = event.mutable_conn_unlink_requested();
+ if (evt.params.contains("conn_id")) {
+ auto v = evt.params["conn_id"];
+ if (v.type() == typeid(std::string)) {
+ auto conn_id = std::any_cast(v);
+ e->set_conn_id(conn_id);
+ }
+ }
+ }
+
+ writer->Write(event);
+ }
+
+ event::broker.unsubscribe(ch);
+
+ unregister_client(id);
+
+ return Status::OK;
+ }
+
+ context::Context ctx = context::WithCancel(context::Background());
+
+private:
+ Status register_client(std::string& id) {
+ auto client = new(std::nothrow) client::Client;
+ if (!client)
+ return Status(StatusCode::INTERNAL, "out of memory");
+
+ bool found = false;
+ for (int i = 0; i < 5; i++) {
+ id = generate_uuid_v4();
+ client->id = id;
+ int err = client::registry.add(id, client);
+ if (!err) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ delete client;
+ log::error("SDK client registry contains UUID, max attempts.");
+ return Status(StatusCode::INTERNAL, "UUID max attempts");
+ }
+
+ return Status::OK;
+ }
+
+ void unregister_client(const std::string& id) {
+ auto client = client::registry.get(id);
+ if (!client) {
+ log::error("SDK client unregister: id not found");
+ return;
+ }
+
+ client::registry.remove(id);
+ delete client;
+ }
};
void RunSDKAPIServer(context::Context& ctx) {
std::string server_address("0.0.0.0:"); // gRPC default 50051
server_address += std::to_string(config::proxy.sdk_api_port);
SDKAPIServiceImpl service;
+ service.ctx = context::WithCancel(ctx);
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
@@ -147,12 +262,13 @@ void RunSDKAPIServer(context::Context& ctx) {
log::info("SDK API Server listening on %s", server_address.c_str());
std::jthread th([&]() {
- ctx.done();
+ service.ctx.done();
log::info("Shutting down SDK API Server");
server->Shutdown();
});
server->Wait();
+ service.ctx.cancel();
}
} // namespace mesh
diff --git a/protos/sdk.proto b/protos/sdk.proto
index 4f85fe96c..9815bcf0a 100644
--- a/protos/sdk.proto
+++ b/protos/sdk.proto
@@ -6,14 +6,42 @@ syntax = "proto3";
package sdk;
+import "type.proto";
import "conn-config.proto";
service SDKAPI {
+ rpc RegisterAndStreamEvents (RegisterRequest) returns (stream Event);
+
rpc CreateConnection (CreateConnectionRequest) returns (CreateConnectionResponse);
rpc ActivateConnection (ActivateConnectionRequest) returns (ActivateConnectionResponse);
rpc DeleteConnection (DeleteConnectionRequest) returns (DeleteConnectionResponse);
}
+message RegisterRequest {
+ repeated type.Tag tags = 1;
+}
+
+message ClientRegisteredEvent {
+ string client_id = 1;
+}
+
+message ConnectionUnlinkRequestedEvent {
+ string conn_id = 1;
+}
+
+message LoggerConfigChangedEvent {
+ repeated string filters = 1;
+}
+
+message Event {
+ oneof event {
+ ClientRegisteredEvent client_registered = 1;
+ ConnectionUnlinkRequestedEvent conn_unlink_requested = 2;
+
+ LoggerConfigChangedEvent logger_config_changed = 100;
+ }
+}
+
message CreateConnectionRequest {
string client_id = 1;
bytes mcm_conn_param = 2;
@@ -21,9 +49,8 @@ message CreateConnectionRequest {
}
message CreateConnectionResponse {
- string client_id = 1;
- string conn_id = 2;
- bytes memif_conn_param = 3;
+ string conn_id = 1;
+ bytes memif_conn_param = 2;
}
message ActivateConnectionRequest {
@@ -32,6 +59,7 @@ message ActivateConnectionRequest {
}
message ActivateConnectionResponse {
+ bool linked = 1;
}
message DeleteConnectionRequest {
diff --git a/protos/type.proto b/protos/type.proto
new file mode 100644
index 000000000..ca33b9466
--- /dev/null
+++ b/protos/type.proto
@@ -0,0 +1,12 @@
+// SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+//
+// SPDX-License-Identifier: BSD-3-Clause
+
+syntax = "proto3";
+
+package type;
+
+message Tag {
+ string name = 1;
+ string value = 2;
+}
diff --git a/sdk/CMakeLists.txt b/sdk/CMakeLists.txt
index c9a299f75..8b818a7a1 100644
--- a/sdk/CMakeLists.txt
+++ b/sdk/CMakeLists.txt
@@ -85,6 +85,7 @@ endif()
# Define the .proto files
set(PROTO_FILES
+ ${PROTO_DIR}/type.proto
${PROTO_DIR}/conn-config.proto
${PROTO_DIR}/sdk.proto
${PROTO_DIR}/mediaproxy.proto
diff --git a/sdk/include/mesh_client.h b/sdk/include/mesh_client.h
index 632d228b9..ca66c844d 100644
--- a/sdk/include/mesh_client.h
+++ b/sdk/include/mesh_client.h
@@ -10,6 +10,7 @@
#include
#include
#include "mesh_dp.h"
+#include "mesh_concurrency.h"
namespace mesh {
@@ -45,6 +46,12 @@ class ClientContext {
void *grpc_client = nullptr;
};
+/**
+ * Global context for managing SDK client life cycle.
+ * Termination signals trigger immediate closing of this context.
+ */
+extern context::Context gctx;
+
} // namespace mesh
/**
diff --git a/sdk/include/mesh_concurrency.h b/sdk/include/mesh_concurrency.h
new file mode 100644
index 000000000..b1aead109
--- /dev/null
+++ b/sdk/include/mesh_concurrency.h
@@ -0,0 +1,221 @@
+/*
+ * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+ #ifndef MESH_CONCURRENCY_H
+ #define MESH_CONCURRENCY_H
+
+ #include
+ #include
+ #include
+ #include
+ #include
+
+ #include
+ #include
+
+ namespace mesh {
+
+ namespace thread {
+
+ template
+ class Channel;
+
+ } // namespace thread
+
+ namespace context {
+
+ /**
+ * mesh::context::Context
+ *
+ * Thread-safe context carrying a cancellation signal to be passed to threads
+ * and blocking calls. Useful for graceful shutdown.
+ */
+ class Context {
+ public:
+ Context();
+ virtual ~Context();
+ Context& operator=(Context&& other) noexcept;
+
+ void cancel();
+ bool cancelled();
+ std::stop_token stop_token();
+ bool done();
+
+ std::stop_source ss;
+
+ protected:
+ Context(Context& parent);
+ Context(Context& parent, std::chrono::milliseconds timeout_ms);
+
+ Context *parent = nullptr;
+ thread::Channel *ch = nullptr;
+ std::future async_cb;
+ std::chrono::milliseconds timeout_ms;
+ std::unique_ptr>> cb = nullptr;
+
+ friend Context& Background();
+ friend class WithCancel;
+ friend Context WithCancel(Context& parent);
+ friend Context WithTimeout(Context& parent,
+ std::chrono::milliseconds timeout_ms);
+ };
+
+ /**
+ * mesh::context::Background()
+ *
+ * Returns an empty Context with cancellation. It is typically used in the
+ * main function and initialization. It should be the parent context for
+ * context::WithCancel and context::WithTimeout.
+ */
+ Context& Background();
+
+ /**
+ * mesh::context::WithCancel
+ *
+ * Creates a new Context with cancellation. When the parent context is
+ * cancelled, it triggers cancellation of the child context and all descending
+ * child contexts in the chain. On the other hand, if the context is cancelled
+ * by calling stop(), the parent context is not affected because the
+ * cancellation signal propagates only down to the child contexts in the chain.
+ *
+ * This concept enables propagation of the cancellation signal from the top
+ * parent context down to the very bottom child context and stopping all
+ * threads and blocking I/O calls that depend on any context in the chain.
+ */
+ Context WithCancel(Context& parent);
+
+ /**
+ * mesh::context::WithTimeout
+ *
+ * Creates a new Context with cancellation and a timeout. The time interval
+ * starts to count down at creation. When the time is out, the context is
+ * cancelled, which triggers cancellation of all child contexts in the chain.
+ */
+ Context WithTimeout(Context& parent, std::chrono::milliseconds timeout_ms);
+
+ } // namespace context
+
+ namespace thread {
+
+ /**
+ * Defer
+ *
+ * Allows to register a callback to be called when the defer object leaves
+ * the scope of visibility. Deferred callbacks can be used for automatic
+ * deallocation of resources created in a function when the function exits.
+ * Example:
+ *
+ * void func() {
+ * printf("Enter the function\n");
+ * ...
+ * Defer d1([]{ printf("Deferred action 1"); });
+ * ...
+ * Defer d2([]{ printf("Deferred action 2"); });
+ * ...
+ * printf("Exit the function\n");
+ * }
+ *
+ * Output:
+ * Enter the function
+ * Exit the function
+ * Deferred action 2
+ * Deferred action 1
+ */
+ class Defer {
+ public:
+ Defer(std::function callback) : cb(callback) {}
+ ~Defer() { if (cb) cb(); }
+
+ private:
+ std::function cb;
+ };
+
+ /**
+ * Sleep
+ *
+ * Sleeps for the given interval of time, which interrupts immediately when
+ * the context is cancelled.
+ */
+ void Sleep(context::Context& ctx, std::chrono::milliseconds interval_ms);
+
+ /**
+ * Channel
+ *
+ * A thread-safe queue template supporting cancellation of blocking calls
+ * send() and receive() by checking context::Context. To set a timeout on the
+ * blocking calls, context::WithTimeout is supposed to be used.
+ * Example:
+ * ...
+ */
+ template
+ class Channel {
+ public:
+ Channel() : Channel(1) {}
+ Channel(size_t capacity) : cap(capacity), _closed(false) {}
+
+ bool send(context::Context& ctx, T value) {
+ std::unique_lock lk(mx);
+ if (!cv_full.wait(lk, ctx.stop_token(), [this] { return !_closed && q.size() < cap; })) {
+ return false;
+ }
+ q.push(std::move(value));
+ cv_empty.notify_one();
+ return true;
+ }
+
+ std::optional receive(context::Context& ctx) {
+ std::unique_lock lk(mx);
+ cv_empty.wait(lk, ctx.stop_token(), [this] { return !q.empty() || _closed; });
+ if (ctx.stop_token().stop_requested()) {
+ return std::nullopt;
+ }
+ if (q.empty()) {
+ return std::nullopt;
+ }
+ T value = std::move(q.front());
+ q.pop();
+ cv_full.notify_one();
+ return value;
+ }
+
+ std::optional try_receive() {
+ std::unique_lock lk(mx);
+ if (q.empty()) {
+ return std::nullopt;
+ }
+ T value = std::move(q.front());
+ q.pop();
+ cv_full.notify_one();
+ return value;
+ }
+
+ void close() {
+ std::unique_lock lk(mx);
+ _closed = true;
+ cv_empty.notify_all();
+ cv_full.notify_all();
+ }
+
+ bool closed() {
+ std::unique_lock lk(mx);
+ return _closed;
+ }
+
+ private:
+ std::queue q;
+ std::mutex mx;
+ std::condition_variable_any cv_empty;
+ std::condition_variable_any cv_full;
+ size_t cap;
+ bool _closed;
+ };
+
+ } // namespace thread
+
+ } // namespace mesh
+
+ #endif // MESH_CONCURRENCY_H
+
\ No newline at end of file
diff --git a/sdk/include/mesh_conn.h b/sdk/include/mesh_conn.h
index 4e80585a3..0897020fe 100644
--- a/sdk/include/mesh_conn.h
+++ b/sdk/include/mesh_conn.h
@@ -26,7 +26,8 @@ struct mesh_internal_ops_t {
int (*enqueue_buf)(mcm_conn_context *pctx, mcm_buffer *buf);
void * (*grpc_create_client)();
- void * (*grpc_create_client_json)(const std::string& endpoint);
+ void * (*grpc_create_client_json)(const std::string& endpoint,
+ mesh::ClientContext *parent);
void (*grpc_destroy_client)(void *client);
void * (*grpc_create_conn)(void *client, mcm_conn_param *param);
void * (*grpc_create_conn_json)(void *client, const mesh::ConnectionConfig& cfg);
@@ -176,6 +177,8 @@ class ConnectionContext {
void *grpc_conn = nullptr;
ConnectionConfig cfg;
+
+ context::Context ctx = context::WithCancel(context::Background());
};
} // namespace mesh
diff --git a/sdk/include/mesh_dp.h b/sdk/include/mesh_dp.h
index 2ddfa8e1f..a5a403b96 100644
--- a/sdk/include/mesh_dp.h
+++ b/sdk/include/mesh_dp.h
@@ -96,15 +96,16 @@ typedef struct{
#define MESH_ERR_BAD_CONFIG_PTR 1002 ///< Bad configuration pointer
#define MESH_ERR_BAD_BUF_PTR 1003 ///< Bad buffer pointer
#define MESH_ERR_BAD_BUF_LEN 1004 ///< Bad buffer length
-#define MESH_ERR_CLIENT_CONFIG_INVAL 1005 ///< Invalid client config
-#define MESH_ERR_MAX_CONN 1006 ///< Reached max connections number
-#define MESH_ERR_FOUND_ALLOCATED 1007 ///< Found allocated resources
-#define MESH_ERR_CONN_FAILED 1008 ///< Connection creation failed
-#define MESH_ERR_CONN_CONFIG_INVAL 1009 ///< Invalid connection config
-#define MESH_ERR_CONN_CONFIG_INCOMPAT 1010 ///< Incompatible connection config
-#define MESH_ERR_CONN_CLOSED 1011 ///< Connection is closed
-#define MESH_ERR_TIMEOUT 1012 ///< Timeout occurred
-#define MESH_ERR_NOT_IMPLEMENTED 1013 ///< Feature not implemented yet
+#define MESH_ERR_CLIENT_FAILED 1005 ///< Client creation failed
+#define MESH_ERR_CLIENT_CONFIG_INVAL 1006 ///< Invalid client config
+#define MESH_ERR_MAX_CONN 1007 ///< Reached max connections number
+#define MESH_ERR_FOUND_ALLOCATED 1008 ///< Found allocated resources
+#define MESH_ERR_CONN_FAILED 1009 ///< Connection creation failed
+#define MESH_ERR_CONN_CONFIG_INVAL 1010 ///< Invalid connection config
+#define MESH_ERR_CONN_CONFIG_INCOMPAT 1011 ///< Incompatible connection config
+#define MESH_ERR_CONN_CLOSED 1012 ///< Connection is closed
+#define MESH_ERR_TIMEOUT 1013 ///< Timeout occurred
+#define MESH_ERR_NOT_IMPLEMENTED 1014 ///< Feature not implemented yet
/**
* @brief Create a new mesh client.
diff --git a/sdk/include/mesh_sdk_api.h b/sdk/include/mesh_sdk_api.h
index 2ca2a4c59..0e7cdf1ed 100644
--- a/sdk/include/mesh_sdk_api.h
+++ b/sdk/include/mesh_sdk_api.h
@@ -12,7 +12,8 @@
#include "mesh_conn.h"
void * mesh_grpc_create_client();
-void * mesh_grpc_create_client_json(const std::string& endpoint);
+void * mesh_grpc_create_client_json(const std::string& endpoint,
+ mesh::ClientContext *parent);
void mesh_grpc_destroy_client(void *client);
void * mesh_grpc_create_conn(void *client, mcm_conn_param *param);
void * mesh_grpc_create_conn_json(void *client, const mesh::ConnectionConfig& cfg);
diff --git a/sdk/src/memif_impl.c b/sdk/src/memif_impl.c
index 93a7d18c6..8f971de2e 100644
--- a/sdk/src/memif_impl.c
+++ b/sdk/src/memif_impl.c
@@ -482,6 +482,21 @@ void mcm_destroy_connection_memif(memif_conn_context* pctx)
return;
}
+void mcm_cancel_poll_event_memif(void *pctx) {
+ memif_conn_context *pmemif = (memif_conn_context *)pctx;
+ int err;
+
+ if (!pmemif)
+ return;
+
+ err = memif_cancel_poll_event(memif_get_socket_handle(pmemif->conn));
+ if (err != MEMIF_ERR_SUCCESS) {
+ log_error("Error cancelling memif poll event");
+ } else {
+ log_info("Memif poll event cancelled");
+ }
+}
+
// /* Alloc buffer from queue. */
// void* memif_alloc_buffer(void* conn_ctx, size_t size)
// {
diff --git a/sdk/src/mesh_buf.cc b/sdk/src/mesh_buf.cc
index 146b5ea4f..ed4296a3f 100644
--- a/sdk/src/mesh_buf.cc
+++ b/sdk/src/mesh_buf.cc
@@ -21,10 +21,20 @@ int BufferContext::dequeue(int timeout_ms)
if (!conn)
return -MESH_ERR_BAD_CONN_PTR;
+ if (conn->ctx.cancelled())
+ return -MESH_ERR_CONN_CLOSED;
+
int err;
buf = mesh_internal_ops.dequeue_buf(conn->handle, timeout_ms, &err);
- if (!buf)
- return err ? err : -MESH_ERR_CONN_CLOSED;
+ if (!buf) {
+ if (!err ||
+ err == MEMIF_ERR_POLL_CANCEL ||
+ err == MEMIF_ERR_DISCONNECT ||
+ err == MEMIF_ERR_DISCONNECTED)
+ return -MESH_ERR_CONN_CLOSED;
+ else
+ return err;
+ }
if (buf->len != conn->cfg.buf_parts.total_size()) {
mesh_internal_ops.enqueue_buf(conn->handle, buf);
@@ -60,6 +70,9 @@ int BufferContext::enqueue(int timeout_ms)
if (!conn)
return -MESH_ERR_BAD_CONN_PTR;
+ if (conn->ctx.cancelled())
+ return -MESH_ERR_CONN_CLOSED;
+
if (conn->cfg.kind == MESH_CONN_KIND_SENDER) {
auto base_ptr = (char *)buf->data;
auto sysdata = (BufferSysData *)(base_ptr + conn->cfg.buf_parts.sysdata.offset);
diff --git a/sdk/src/mesh_client.cc b/sdk/src/mesh_client.cc
index 5285f90f0..363f0db5e 100644
--- a/sdk/src/mesh_client.cc
+++ b/sdk/src/mesh_client.cc
@@ -5,6 +5,7 @@
*/
#include "mesh_client.h"
#include
+#include
#include "mesh_conn.h"
#include "mesh_logger.h"
#include "json.hpp"
@@ -12,6 +13,44 @@
namespace mesh {
+static volatile __sighandler_t prev_SIGINT_handler;
+static volatile __sighandler_t prev_SIGTERM_handler;
+context::Context gctx = context::WithCancel(context::Background());
+
+static void HandleSignal(int signal) {
+ log::warn("Shutdown signal received");
+ gctx.cancel();
+
+ if (signal == SIGINT && prev_SIGINT_handler)
+ prev_SIGINT_handler(signal);
+ else if (signal == SIGTERM && prev_SIGTERM_handler)
+ prev_SIGTERM_handler(signal);
+}
+
+static void RegisterSigActionsOnce() {
+ static std::atomic initialized = false;
+ if (initialized)
+ return;
+
+ struct sigaction action = { 0 };
+
+ sigfillset(&action.sa_mask);
+
+ action.sa_flags = SA_RESTART;
+ action.sa_handler = HandleSignal;
+
+ if (!prev_SIGINT_handler) {
+ prev_SIGINT_handler = signal(SIGINT, SIG_DFL);
+ sigaction(SIGINT, &action, NULL);
+ }
+ if (!prev_SIGTERM_handler) {
+ prev_SIGTERM_handler = signal(SIGTERM, SIG_DFL);
+ sigaction(SIGTERM, &action, NULL);
+ }
+
+ initialized = true;
+}
+
class KeyValueString {
public:
KeyValueString() = default;
@@ -95,6 +134,7 @@ int ClientConfig::parse_from_json(const char *str)
ClientContext::ClientContext()
{
+ RegisterSigActionsOnce();
}
int ClientContext::shutdown()
@@ -123,7 +163,9 @@ int ClientContext::init(const char *config)
return err;
std::string endpoint = cfg.proxy_ip + ":" + cfg.proxy_port;
- grpc_client = mesh_internal_ops.grpc_create_client_json(endpoint);
+ grpc_client = mesh_internal_ops.grpc_create_client_json(endpoint, this);
+ if (!grpc_client)
+ return -MESH_ERR_CLIENT_FAILED;
return 0;
}
diff --git a/sdk/src/mesh_concurrency.cc b/sdk/src/mesh_concurrency.cc
new file mode 100644
index 000000000..ff8c65549
--- /dev/null
+++ b/sdk/src/mesh_concurrency.cc
@@ -0,0 +1,150 @@
+/*
+ * SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+ #include "mesh_concurrency.h"
+
+ namespace mesh {
+
+ namespace context {
+
+ Context::Context() : ss(std::stop_source()),
+ timeout_ms(std::chrono::milliseconds(0))
+ {
+ }
+
+ Context::Context(Context& parent) : ss(std::stop_source()),
+ parent(&parent),
+ ch(new thread::Channel(1)),
+ timeout_ms(std::chrono::milliseconds(0))
+ {
+ cb = std::make_unique>>(
+ parent.ss.get_token(), [this] { cancel(); }
+ );
+ }
+
+ Context::Context(Context& parent,
+ std::chrono::milliseconds timeout_ms) : ss(std::stop_source()),
+ parent(&parent),
+ ch(new thread::Channel(1)),
+ timeout_ms(timeout_ms)
+ {
+ cb = std::make_unique>>(
+ parent.ss.get_token(), [this] { cancel(); }
+ );
+
+ if (timeout_ms != std::chrono::milliseconds(0)) {
+ async_cb = std::async(std::launch::async, [this] {
+ std::mutex mx;
+ std::unique_lock lk(mx);
+ std::condition_variable_any cv;
+ cv.wait_for(lk, ss.get_token(), this->timeout_ms, [] { return false; });
+ cancel();
+ });
+ }
+ }
+
+ Context& Context::operator=(Context&& other) noexcept {
+ if (this != &other) {
+ ss = std::stop_source();
+ parent = other.parent;
+
+ if (parent) {
+ cb = std::make_unique>>(
+ parent->ss.get_token(), [this] { cancel(); }
+ );
+ }
+
+ timeout_ms = other.timeout_ms;
+
+ if (timeout_ms != std::chrono::milliseconds(0)) {
+ async_cb = std::async(std::launch::async, [this] {
+ std::mutex mx;
+ std::unique_lock lk(mx);
+ std::condition_variable_any cv;
+ cv.wait_for(lk, ss.get_token(), timeout_ms, [] { return false; });
+ cancel();
+ });
+ }
+
+ if (ch) {
+ delete ch;
+ }
+
+ ch = other.ch;
+ other.ch = nullptr;
+ }
+ return *this;
+ }
+
+ Context::~Context()
+ {
+ cancel();
+
+ if (async_cb.valid())
+ async_cb.wait();
+
+ if (ch)
+ delete ch;
+ }
+
+ void Context::cancel()
+ {
+ if (ch)
+ ch->close();
+
+ ss.request_stop();
+ }
+
+ bool Context::cancelled()
+ {
+ return ss.stop_requested();
+ }
+
+ std::stop_token Context::stop_token()
+ {
+ return ss.get_token();
+ }
+
+ bool Context::done()
+ {
+ if (ch)
+ ch->receive(*this);
+
+ return true;
+ }
+
+ Context& Background()
+ {
+ static Context backgroundContext;
+ return backgroundContext;
+ }
+
+ Context WithCancel(Context& parent)
+ {
+ return Context(parent);
+ }
+
+ Context WithTimeout(Context& parent, std::chrono::milliseconds timeout_ms)
+ {
+ return Context(parent, timeout_ms);
+ }
+
+ } // namespace context
+
+ namespace thread {
+
+ void Sleep(context::Context& ctx, std::chrono::milliseconds interval_ms)
+ {
+ std::mutex mx;
+ std::unique_lock lk(mx);
+ std::condition_variable_any cv;
+ cv.wait_for(lk, ctx.stop_token(), interval_ms, [] { return false; });
+ }
+
+ } // namespace thread
+
+ } // namespace mesh
+
\ No newline at end of file
diff --git a/sdk/src/mesh_dp.cc b/sdk/src/mesh_dp.cc
index 5547b159e..0aec4209f 100644
--- a/sdk/src/mesh_dp.cc
+++ b/sdk/src/mesh_dp.cc
@@ -233,6 +233,8 @@ const char * mesh_err2str(int err)
return "Bad buffer pointer";
case -MESH_ERR_BAD_BUF_LEN:
return "Bad buffer length";
+ case -MESH_ERR_CLIENT_FAILED:
+ return "Client creation failed";
case -MESH_ERR_CLIENT_CONFIG_INVAL:
return "Invalid parameters in client configuration";
case -MESH_ERR_MAX_CONN:
diff --git a/sdk/src/mesh_sdk_api.cc b/sdk/src/mesh_sdk_api.cc
index 163495856..a99cba1fa 100644
--- a/sdk/src/mesh_sdk_api.cc
+++ b/sdk/src/mesh_sdk_api.cc
@@ -17,6 +17,7 @@
#include "mesh_logger.h"
#include "mcm-version.h"
#include "mesh_dp_legacy.h"
+#include "mesh_concurrency.h"
using grpc::Channel;
using grpc::Status;
@@ -27,6 +28,8 @@ using sdk::ActivateConnectionRequest;
using sdk::ActivateConnectionResponse;
using sdk::DeleteConnectionRequest;
using sdk::DeleteConnectionResponse;
+using sdk::RegisterRequest;
+using sdk::Event;
using sdk::BufferPartition;
using sdk::BufferPartitions;
using sdk::ConnectionKind;
@@ -42,12 +45,14 @@ using sdk::AudioSampleRate;
using sdk::AudioFormat;
using sdk::AudioPacketTime;
+extern "C" void mcm_cancel_poll_event_memif(void *pctx);
+
using namespace mesh;
class SDKAPIClient {
public:
- SDKAPIClient(std::shared_ptr channel)
- : stub_(SDKAPI::NewStub(channel)) {}
+ SDKAPIClient(std::shared_ptr channel, mesh::ClientContext *parent)
+ : stub_(SDKAPI::NewStub(channel)), parent(parent) {}
int CreateConnection(std::string& conn_id, mcm_conn_param *param,
memif_conn_param *memif_param) {
@@ -69,7 +74,6 @@ class SDKAPIClient {
Status status = stub_->CreateConnection(&context, req, &resp);
if (status.ok()) {
- client_id = resp.client_id();
conn_id = resp.conn_id();
int sz = resp.memif_conn_param().size();
@@ -183,7 +187,6 @@ class SDKAPIClient {
Status status = stub_->CreateConnection(&context, req, &resp);
if (status.ok()) {
- client_id = resp.client_id();
conn_id = resp.conn_id();
int sz = resp.memif_conn_param().size();
@@ -216,7 +219,10 @@ class SDKAPIClient {
Status status = stub_->ActivateConnection(&context, req, &resp);
if (status.ok()) {
- return 0;
+ if (!resp.linked())
+ return -EAGAIN;
+ else
+ return 0;
} else {
log::error("ActivateConnection RPC failed: %s",
status.error_message().c_str());
@@ -245,10 +251,108 @@ class SDKAPIClient {
}
}
+ void RegisterAndStreamEvents() {
+ RegisterRequest req;
+
+ grpc::ClientContext context;
+
+ std::jthread shutdown_thread([&]() {
+ ctx.done();
+ context.TryCancel();
+ });
+
+ thread::Defer d([&]{ ctx.cancel(); });
+
+ std::unique_ptr> reader(
+ stub_->RegisterAndStreamEvents(&context, req));
+
+ Event event;
+ while (reader->Read(&event)) {
+ if (event.has_client_registered()) {
+ client_id = event.client_registered().client_id();
+ registered_ch.send(ctx, true);
+ } else if (event.has_conn_unlink_requested()) {
+ auto conn_id = event.conn_unlink_requested().conn_id();
+ log::debug("[EVENT] Conn unlink requested")("id", conn_id);
+
+ if (parent) {
+ std::lock_guard lk(parent->mx);
+ for (const auto conn : parent->conns) {
+ conn->ctx.cancel();
+ if (conn->grpc_conn) {
+ auto handle = *(mcm_conn_context **)conn->grpc_conn;
+ if (handle)
+ mcm_cancel_poll_event_memif(handle->priv);
+ }
+ }
+ }
+ } else if (event.has_logger_config_changed()) {
+ log::debug("[EVENT] Logger config changed");
+ } else {
+ log::info("Received unknown event type");
+ }
+ }
+
+ Status status = reader->Finish();
+ registered_ch.send(ctx, false);
+ if (!status.ok()) {
+ if (status.error_code() == grpc::StatusCode::CANCELLED)
+ return;
+
+ log::error("RegisterAndStreamEvents RPC failed: %s",
+ status.error_message().c_str());
+ }
+ }
+
+ int Run() {
+ try {
+ th = std::jthread([this]() {
+ RegisterAndStreamEvents();
+ });
+ }
+ catch (const std::system_error& e) {
+ log::error("SDK client background thread creation failed");
+ Shutdown();
+ return -ENOMEM;
+ }
+
+ auto tctx = context::WithTimeout(gctx, std::chrono::milliseconds(15000));
+ auto registered = registered_ch.receive(tctx);
+ if (!registered.has_value()) {
+ if (tctx.cancelled() && !gctx.cancelled())
+ log::error("SDK client registration timeout");
+ Shutdown();
+ return -EIO;
+ }
+ if (!registered.value()) {
+ log::error("SDK client registration failed");
+ Shutdown();
+ return -EIO;
+ }
+
+ log::info("SDK client registered successfully")("client_id", client_id);
+
+ return 0;
+ }
+
+ void Shutdown() {
+ ctx.cancel();
+ if (th.joinable())
+ th.join();
+ }
+
std::string client_id;
private:
+ // void BackgroundProcess() {
+ // thread::Sleep(ctx, std::chrono::milliseconds(1000));
+ // }
+
std::unique_ptr stub_;
+ std::jthread th;
+ thread::Channel registered_ch;
+ context::Context ctx = context::WithCancel(context::Background());
+ mesh::ClientContext *parent = nullptr;
};
// DEBUG
@@ -269,18 +373,28 @@ void * mesh_grpc_create_client()
auto client = new(std::nothrow)
SDKAPIClient(grpc::CreateChannel("localhost:" + sdk_port, // gRPC default 50051
- grpc::InsecureChannelCredentials()));
+ grpc::InsecureChannelCredentials()), nullptr);
return client;
}
-void * mesh_grpc_create_client_json(const std::string& endpoint) {
+void * mesh_grpc_create_client_json(const std::string& endpoint,
+ mesh::ClientContext *parent) {
log::info("Media Communications Mesh SDK version %s #%s", VERSION_TAG, VERSION_HASH)
("endpoint", endpoint);
auto client = new(std::nothrow)
SDKAPIClient(grpc::CreateChannel(endpoint,
- grpc::InsecureChannelCredentials()));
+ grpc::InsecureChannelCredentials()), parent);
+
+ if (client) {
+ auto err = client->Run();
+ if (err) {
+ delete client;
+ return NULL;
+ }
+ }
+
return client;
}
@@ -288,6 +402,7 @@ void mesh_grpc_destroy_client(void *client)
{
if (client) {
auto cli = static_cast(client);
+ cli->Shutdown();
delete cli;
}
}
@@ -400,7 +515,16 @@ void * mesh_grpc_create_conn_json(void *client, const mesh::ConnectionConfig& cf
return NULL;
}
- err = cli->ActivateConnection(conn->conn_id);
+ while (!gctx.cancelled()) {
+ err = cli->ActivateConnection(conn->conn_id);
+ if (err == -EAGAIN) {
+ // Repeat after a small delay
+ thread::Sleep(gctx, std::chrono::milliseconds(50));
+ continue;
+ }
+ break;
+ }
+
if (err) {
log::error("Activate gRPC connection failed (%d)", err);
mesh_grpc_destroy_conn(conn);
diff --git a/sdk/tests/mesh_dp_api_tests.cc b/sdk/tests/mesh_dp_api_tests.cc
index 85ced8d61..a5fb26e04 100644
--- a/sdk/tests/mesh_dp_api_tests.cc
+++ b/sdk/tests/mesh_dp_api_tests.cc
@@ -100,7 +100,8 @@ void * mock_grpc_create_client()
return NULL;
}
-void * mock_grpc_create_client_json(const std::string& endpoint)
+void * mock_grpc_create_client_json(const std::string& endpoint,
+ mesh::ClientContext *parent)
{
return NULL;
}
@@ -1533,15 +1534,16 @@ TEST(APITests_MeshBuffer, Test_ImportantConstants) {
EXPECT_EQ(MESH_ERR_BAD_CONFIG_PTR, 1002);
EXPECT_EQ(MESH_ERR_BAD_BUF_PTR, 1003);
EXPECT_EQ(MESH_ERR_BAD_BUF_LEN, 1004);
- EXPECT_EQ(MESH_ERR_CLIENT_CONFIG_INVAL, 1005);
- EXPECT_EQ(MESH_ERR_MAX_CONN, 1006);
- EXPECT_EQ(MESH_ERR_FOUND_ALLOCATED, 1007);
- EXPECT_EQ(MESH_ERR_CONN_FAILED, 1008);
- EXPECT_EQ(MESH_ERR_CONN_CONFIG_INVAL, 1009);
- EXPECT_EQ(MESH_ERR_CONN_CONFIG_INCOMPAT, 1010);
- EXPECT_EQ(MESH_ERR_CONN_CLOSED, 1011);
- EXPECT_EQ(MESH_ERR_TIMEOUT, 1012);
- EXPECT_EQ(MESH_ERR_NOT_IMPLEMENTED, 1013);
+ EXPECT_EQ(MESH_ERR_CLIENT_FAILED, 1005);
+ EXPECT_EQ(MESH_ERR_CLIENT_CONFIG_INVAL, 1006);
+ EXPECT_EQ(MESH_ERR_MAX_CONN, 1007);
+ EXPECT_EQ(MESH_ERR_FOUND_ALLOCATED, 1008);
+ EXPECT_EQ(MESH_ERR_CONN_FAILED, 1009);
+ EXPECT_EQ(MESH_ERR_CONN_CONFIG_INVAL, 1010);
+ EXPECT_EQ(MESH_ERR_CONN_CONFIG_INCOMPAT, 1011);
+ EXPECT_EQ(MESH_ERR_CONN_CLOSED, 1012);
+ EXPECT_EQ(MESH_ERR_TIMEOUT, 1013);
+ EXPECT_EQ(MESH_ERR_NOT_IMPLEMENTED, 1014);
EXPECT_EQ(MESH_TIMEOUT_DEFAULT, -2);
EXPECT_EQ(MESH_TIMEOUT_INFINITE, -1);