Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/SDK_API_Definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,10 @@ NOTE: The codes are negative integer values.
| `-MESH_ERR_BAD_CONFIG_PTR` | Bad configuration pointer | The configuration pointer is NULL. |
| `-MESH_ERR_BAD_BUF_PTR` | Bad buffer pointer | The buffer pointer is NULL. |
| `-MESH_ERR_BAD_BUF_LEN` | Bad buffer length | **Rx connection**: The buffer length is corrupted.<br>**Tx connection**: The buffer length is bigger than maximum. |
| `-MESH_ERR_CLIENT_FAILED` | Client creation failed | An error occurred while creating an SDK client. |
| `-MESH_ERR_CLIENT_CONFIG_INVAL` | Invalid client config | JSON client configuration string is malformed. |
| `-MESH_ERR_MAX_CONN` | Reached max connections number | An attempt to create a connection failed due to reaching the maximum number of connections defined in `"maxMediaConnections"`. |
| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting a client, some connections were found not closed. Delete all connections explicitly before deleting the client. |
| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting an SDK client, some connections were found not closed. Delete all connections explicitly before deleting the client. |
| `-MESH_ERR_CONN_FAILED` | Connection creation failed | An error occurred while creating a connection. |
| `-MESH_ERR_CONN_CONFIG_INVAL` | Invalid connection config | JSON connection configuration string is malformed or one of parameters has an incorrect value. |
| `-MESH_ERR_CONN_CONFIG_INCOMPAT` | Incompatible connection config | Incompatible parameters found in the JSON connection configuration string. |
Expand Down
21 changes: 6 additions & 15 deletions ffmpeg-plugin/mcm_audio_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,17 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt)
s->first_frame = false;

err = mesh_get_buffer_timeout(s->conn, &buf, timeout);
if (err == -MESH_ERR_CONN_CLOSED) {
ret = AVERROR_EOF;
goto error_close_conn;
}
if (err == -MESH_ERR_CONN_CLOSED)
return AVERROR_EOF;

if (err) {
if (mcm_shutdown_requested()) {
ret = AVERROR_EXIT;
return AVERROR_EXIT;
} else {
av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n",
mesh_err2str(err), err);
ret = AVERROR(EIO);
return AVERROR(EIO);
}
goto error_close_conn;
}

if (mcm_shutdown_requested()) {
Expand All @@ -166,21 +164,14 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt)
if (err) {
av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n",
mesh_err2str(err), err);
ret = AVERROR(EIO);
goto error_close_conn;
return AVERROR(EIO);
}

return len;

error_put_buf:
mesh_put_buffer(&buf);

error_close_conn:
err = mesh_delete_connection(&s->conn);
if (err)
av_log(avctx, AV_LOG_ERROR, "Delete mesh connection failed: %s (%d)\n",
mesh_err2str(err), err);

return ret;
}

Expand Down
18 changes: 6 additions & 12 deletions ffmpeg-plugin/mcm_video_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,17 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt)
s->first_frame = false;

err = mesh_get_buffer_timeout(s->conn, &buf, timeout);
if (err == -MESH_ERR_CONN_CLOSED) {
ret = AVERROR_EOF;
goto error_close_conn;
}
if (err == -MESH_ERR_CONN_CLOSED)
return AVERROR_EOF;

if (err) {
if (mcm_shutdown_requested()) {
ret = AVERROR_EXIT;
return AVERROR_EXIT;
} else {
av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n",
mesh_err2str(err), err);
ret = AVERROR(EIO);
return AVERROR(EIO);
}
goto error_close_conn;
}

if (mcm_shutdown_requested()) {
Expand All @@ -161,18 +159,14 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt)
if (err) {
av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n",
mesh_err2str(err), err);
ret = AVERROR(EIO);
goto error_close_conn;
return AVERROR(EIO);
}

return len;

error_put_buf:
mesh_put_buffer(&buf);

error_close_conn:
mesh_delete_connection(&s->conn);

return ret;
}

Expand Down
1 change: 1 addition & 0 deletions media-proxy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ endif()

# Define the .proto files
set(PROTO_FILES
${PROTO_DIR}/type.proto
${PROTO_DIR}/conn-config.proto
${PROTO_DIR}/sdk.proto
${PROTO_DIR}/mediaproxy.proto
Expand Down
29 changes: 29 additions & 0 deletions media-proxy/include/mesh/client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#ifndef CLIENT_H
#define CLIENT_H

#include <string>

namespace mesh::client {

/**
* Client
*
* SDK client implementation.
*/
class Client {

public:
Client() {}

std::string id;
};

} // namespace mesh::client

#endif // CLIENT_H
56 changes: 56 additions & 0 deletions media-proxy/include/mesh/client_registry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#ifndef CLIENT_REGISTRY_H
#define CLIENT_REGISTRY_H

#include "client.h"
#include <unordered_map>
#include <mutex>
#include <shared_mutex>
#include "uuid.h"

namespace mesh::client {

/**
* Registry
*
* Thread-safe registry to store pointers to SDK client instances.
*/
class Registry {
public:
int add(const std::string& id, Client *client) {
std::unique_lock lk(mx);
if (clients.contains(id))
return -1;
clients[id] = client;
return 0;
}

bool remove(const std::string& id) {
std::unique_lock lk(mx);
return clients.erase(id) > 0;
}

Client * get(const std::string& id) {
std::shared_lock lk(mx);
auto it = clients.find(id);
if (it != clients.end()) {
return it->second;
}
return nullptr;
}

private:
std::unordered_map<std::string, Client *> clients;
std::shared_mutex mx;
};

extern Registry registry;

} // namespace mesh::client

#endif // CLIENT_REGISTRY_H
5 changes: 5 additions & 0 deletions media-proxy/include/mesh/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ class Connection : public telemetry::MetricsProvider {
Connection * link();

void set_config(const Config& cfg);
void set_parent(const std::string& parent_id);

void notify_parent_conn_unlink_requested(context::Context& ctx);

Result establish(context::Context& ctx);
Result establish_async(context::Context& ctx);
Expand All @@ -227,6 +230,7 @@ class Connection : public telemetry::MetricsProvider {
} info;

Config config;
std::string legacy_sdk_id;

protected:
void set_state(context::Context& ctx, State new_state);
Expand Down Expand Up @@ -268,6 +272,7 @@ class Connection : public telemetry::MetricsProvider {
context::Context establish_ctx = context::WithCancel(context::Background());
std::jthread establish_th;
std::jthread shutdown_th;
std::string parent_id;
};

const char * kind2str(Kind kind, bool brief = false);
Expand Down
7 changes: 5 additions & 2 deletions media-proxy/include/mesh/conn_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class Registry {
return ids;
}

int size() {
std::shared_lock lk(mx);
return conns.size();
}

private:
std::unordered_map<std::string, Connection *> conns;

Expand All @@ -68,8 +73,6 @@ class Registry {
std::shared_mutex mx;
};

extern Registry registry;

} // namespace mesh::connection

#endif // CONN_REGISTRY_H
106 changes: 106 additions & 0 deletions media-proxy/include/mesh/event.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#ifndef EVENT_H
#define EVENT_H

#include <string>
#include <unordered_map>
#include <mutex>
#include "concurrency.h"
#include "logger.h"
#include <unordered_map>
#include <any>

namespace mesh::event {

enum class Type {
empty_event,
conn_unlink_requested,
};

typedef void (* Handler)(const Type& type);

class Event {
public:
std::string consumer_id;
Type type;
std::unordered_map<std::string, std::any> params;
};

/**
* EventBroker
*
* Subsystem for passing events from producers to consumers.
* Enables software component decoupling.
*/
class EventBroker {
public:
EventBroker() {}

thread::Channel<Event> * subscribe(const std::string& consumer_id, int queue_sz = 1) {
auto ch = new(std::nothrow) thread::Channel<Event>(queue_sz);
if (ch) {
std::unique_lock lk(mx);
channels[ch] = consumer_id;
}
return ch;
}

bool unsubscribe(thread::Channel<Event> *ch) {
std::unique_lock lk(mx);
return channels.erase(ch) > 0;
}

bool send(context::Context& ctx, const std::string& consumer_id, const Type type,
const std::unordered_map<std::string, std::any>& params = {}) {
Event evt = {
.consumer_id = consumer_id,
.type = type,
.params = params,
};
return events.send(ctx, evt);
}

void run(context::Context& ctx) {
for (;;) {
auto v = events.receive(ctx);
if (ctx.cancelled())
return;

if (!v.has_value())
continue;

auto evt = v.value();
for (const auto& kv : channels) {
if (kv.second == evt.consumer_id) {
auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(3000));
if (ctx.cancelled())
return;

if (tctx.cancelled()) {
log::error("Event sending timeout")("type", (int)evt.type)
("consumer_id", evt.consumer_id);
} else if (!kv.first->send(tctx, evt)) {
log::error("Event sending failed")("type", (int)evt.type)
("consumer_id", evt.consumer_id);
}
}
}
}
}

private:
std::unordered_map<thread::Channel<Event> *, std::string> channels;
std::mutex mx;
thread::Channel<Event> events = thread::Channel<Event>(100);
};

extern EventBroker broker;

} // namespace mesh::event

#endif // EVENT_H
7 changes: 5 additions & 2 deletions media-proxy/include/mesh/manager_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ namespace mesh::connection {
class LocalManager {
public:
int create_connection_sdk(context::Context& ctx, std::string& id,
mcm_conn_param *param, memif_conn_param *memif_param,
const std::string& client_id, mcm_conn_param *param,
memif_conn_param *memif_param,
const Config& conn_config, std::string& err_str);

int activate_connection_sdk(context::Context& ctx, const std::string& id);
Result activate_connection_sdk(context::Context& ctx, const std::string& id);

int delete_connection_sdk(context::Context& ctx, const std::string& id,
bool do_unregister = true);
Expand All @@ -29,6 +30,8 @@ class LocalManager {

int reregister_all_connections(context::Context& ctx);

int notify_all_shutdown_wait(context::Context& ctx);

void shutdown(context::Context& ctx);

void lock();
Expand Down
2 changes: 1 addition & 1 deletion media-proxy/include/mesh/st2110.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ template <typename FRAME, typename HANDLE, typename OPS> class ST2110 : public C

mtl_session = create_session(mtl_device, &ops);
if (!mtl_session) {
log::error("Failed to create session");
log::error("Failed to create MTL session");
set_state(ctx, State::closed);
return set_result(Result::error_general_failure);
}
Expand Down
Loading
Loading