Skip to content

Commit 5257dd8

Browse files
committed
Add notification flow and shutdown handling in SDK (#393)
* Add notification flow and shutdown handling in SDK * Improve closed connection handling in the FFmpeg MCM plugin. * Add SDK client registry in Media Proxy. * Add event broker in Media Proxy. * Add SDK client shutdown notification flow. * Improve local connection manager shutdown handling in Media Proxy. * Modify connection activation flow to return the link status and return only when the link to multipoint group is confirmed. * Add SDK client registration flow in Media Proxy. * Add global context and shutdown flow in SDK. Signed-off-by: Konstantin Ilichev <konstantin.ilichev@intel.com>
1 parent f5f0c29 commit 5257dd8

32 files changed

+1133
-153
lines changed

docs/SDK_API_Definition.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,10 @@ NOTE: The codes are negative integer values.
441441
| `-MESH_ERR_BAD_CONFIG_PTR` | Bad configuration pointer | The configuration pointer is NULL. |
442442
| `-MESH_ERR_BAD_BUF_PTR` | Bad buffer pointer | The buffer pointer is NULL. |
443443
| `-MESH_ERR_BAD_BUF_LEN` | Bad buffer length | **Rx connection**: The buffer length is corrupted.<br>**Tx connection**: The buffer length is bigger than maximum. |
444+
| `-MESH_ERR_CLIENT_FAILED` | Client creation failed | An error occurred while creating an SDK client. |
444445
| `-MESH_ERR_CLIENT_CONFIG_INVAL` | Invalid client config | JSON client configuration string is malformed. |
445446
| `-MESH_ERR_MAX_CONN` | Reached max connections number | An attempt to create a connection failed due to reaching the maximum number of connections defined in `"maxMediaConnections"`. |
446-
| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting a client, some connections were found not closed. Delete all connections explicitly before deleting the client. |
447+
| `-MESH_ERR_FOUND_ALLOCATED` | Found allocated resources | When deleting an SDK client, some connections were found not closed. Delete all connections explicitly before deleting the client. |
447448
| `-MESH_ERR_CONN_FAILED` | Connection creation failed | An error occurred while creating a connection. |
448449
| `-MESH_ERR_CONN_CONFIG_INVAL` | Invalid connection config | JSON connection configuration string is malformed or one of parameters has an incorrect value. |
449450
| `-MESH_ERR_CONN_CONFIG_INCOMPAT` | Incompatible connection config | Incompatible parameters found in the JSON connection configuration string. |

ffmpeg-plugin/mcm_audio_rx.c

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -133,19 +133,17 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt)
133133
s->first_frame = false;
134134

135135
err = mesh_get_buffer_timeout(s->conn, &buf, timeout);
136-
if (err == -MESH_ERR_CONN_CLOSED) {
137-
ret = AVERROR_EOF;
138-
goto error_close_conn;
139-
}
136+
if (err == -MESH_ERR_CONN_CLOSED)
137+
return AVERROR_EOF;
138+
140139
if (err) {
141140
if (mcm_shutdown_requested()) {
142-
ret = AVERROR_EXIT;
141+
return AVERROR_EXIT;
143142
} else {
144143
av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n",
145144
mesh_err2str(err), err);
146-
ret = AVERROR(EIO);
145+
return AVERROR(EIO);
147146
}
148-
goto error_close_conn;
149147
}
150148

151149
if (mcm_shutdown_requested()) {
@@ -166,21 +164,14 @@ static int mcm_audio_read_packet(AVFormatContext* avctx, AVPacket* pkt)
166164
if (err) {
167165
av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n",
168166
mesh_err2str(err), err);
169-
ret = AVERROR(EIO);
170-
goto error_close_conn;
167+
return AVERROR(EIO);
171168
}
172169

173170
return len;
174171

175172
error_put_buf:
176173
mesh_put_buffer(&buf);
177174

178-
error_close_conn:
179-
err = mesh_delete_connection(&s->conn);
180-
if (err)
181-
av_log(avctx, AV_LOG_ERROR, "Delete mesh connection failed: %s (%d)\n",
182-
mesh_err2str(err), err);
183-
184175
return ret;
185176
}
186177

ffmpeg-plugin/mcm_video_rx.c

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,19 +128,17 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt)
128128
s->first_frame = false;
129129

130130
err = mesh_get_buffer_timeout(s->conn, &buf, timeout);
131-
if (err == -MESH_ERR_CONN_CLOSED) {
132-
ret = AVERROR_EOF;
133-
goto error_close_conn;
134-
}
131+
if (err == -MESH_ERR_CONN_CLOSED)
132+
return AVERROR_EOF;
133+
135134
if (err) {
136135
if (mcm_shutdown_requested()) {
137-
ret = AVERROR_EXIT;
136+
return AVERROR_EXIT;
138137
} else {
139138
av_log(avctx, AV_LOG_ERROR, "Get buffer error: %s (%d)\n",
140139
mesh_err2str(err), err);
141-
ret = AVERROR(EIO);
140+
return AVERROR(EIO);
142141
}
143-
goto error_close_conn;
144142
}
145143

146144
if (mcm_shutdown_requested()) {
@@ -161,18 +159,14 @@ static int mcm_video_read_packet(AVFormatContext* avctx, AVPacket* pkt)
161159
if (err) {
162160
av_log(avctx, AV_LOG_ERROR, "Put buffer error: %s (%d)\n",
163161
mesh_err2str(err), err);
164-
ret = AVERROR(EIO);
165-
goto error_close_conn;
162+
return AVERROR(EIO);
166163
}
167164

168165
return len;
169166

170167
error_put_buf:
171168
mesh_put_buffer(&buf);
172169

173-
error_close_conn:
174-
mesh_delete_connection(&s->conn);
175-
176170
return ret;
177171
}
178172

media-proxy/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ endif()
6666

6767
# Define the .proto files
6868
set(PROTO_FILES
69+
${PROTO_DIR}/type.proto
6970
${PROTO_DIR}/conn-config.proto
7071
${PROTO_DIR}/sdk.proto
7172
${PROTO_DIR}/mediaproxy.proto

media-proxy/include/mesh/client.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: BSD-3-Clause
5+
*/
6+
7+
#ifndef CLIENT_H
8+
#define CLIENT_H
9+
10+
#include <string>
11+
12+
namespace mesh::client {
13+
14+
/**
15+
* Client
16+
*
17+
* SDK client implementation.
18+
*/
19+
class Client {
20+
21+
public:
22+
Client() {}
23+
24+
std::string id;
25+
};
26+
27+
} // namespace mesh::client
28+
29+
#endif // CLIENT_H
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: BSD-3-Clause
5+
*/
6+
7+
#ifndef CLIENT_REGISTRY_H
8+
#define CLIENT_REGISTRY_H
9+
10+
#include "client.h"
11+
#include <unordered_map>
12+
#include <mutex>
13+
#include <shared_mutex>
14+
#include "uuid.h"
15+
16+
namespace mesh::client {
17+
18+
/**
19+
* Registry
20+
*
21+
* Thread-safe registry to store pointers to SDK client instances.
22+
*/
23+
class Registry {
24+
public:
25+
int add(const std::string& id, Client *client) {
26+
std::unique_lock lk(mx);
27+
if (clients.contains(id))
28+
return -1;
29+
clients[id] = client;
30+
return 0;
31+
}
32+
33+
bool remove(const std::string& id) {
34+
std::unique_lock lk(mx);
35+
return clients.erase(id) > 0;
36+
}
37+
38+
Client * get(const std::string& id) {
39+
std::shared_lock lk(mx);
40+
auto it = clients.find(id);
41+
if (it != clients.end()) {
42+
return it->second;
43+
}
44+
return nullptr;
45+
}
46+
47+
private:
48+
std::unordered_map<std::string, Client *> clients;
49+
std::shared_mutex mx;
50+
};
51+
52+
extern Registry registry;
53+
54+
} // namespace mesh::client
55+
56+
#endif // CLIENT_REGISTRY_H

media-proxy/include/mesh/conn.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ class Connection : public telemetry::MetricsProvider {
206206
Connection * link();
207207

208208
void set_config(const Config& cfg);
209+
void set_parent(const std::string& parent_id);
210+
211+
void notify_parent_conn_unlink_requested(context::Context& ctx);
209212

210213
Result establish(context::Context& ctx);
211214
Result establish_async(context::Context& ctx);
@@ -227,6 +230,7 @@ class Connection : public telemetry::MetricsProvider {
227230
} info;
228231

229232
Config config;
233+
std::string legacy_sdk_id;
230234

231235
protected:
232236
void set_state(context::Context& ctx, State new_state);
@@ -268,6 +272,7 @@ class Connection : public telemetry::MetricsProvider {
268272
context::Context establish_ctx = context::WithCancel(context::Background());
269273
std::jthread establish_th;
270274
std::jthread shutdown_th;
275+
std::string parent_id;
271276
};
272277

273278
const char * kind2str(Kind kind, bool brief = false);

media-proxy/include/mesh/conn_registry.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ class Registry {
5757
return ids;
5858
}
5959

60+
int size() {
61+
std::shared_lock lk(mx);
62+
return conns.size();
63+
}
64+
6065
private:
6166
std::unordered_map<std::string, Connection *> conns;
6267

@@ -68,8 +73,6 @@ class Registry {
6873
std::shared_mutex mx;
6974
};
7075

71-
extern Registry registry;
72-
7376
} // namespace mesh::connection
7477

7578
#endif // CONN_REGISTRY_H

media-proxy/include/mesh/event.h

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation
3+
*
4+
* SPDX-License-Identifier: BSD-3-Clause
5+
*/
6+
7+
#ifndef EVENT_H
8+
#define EVENT_H
9+
10+
#include <string>
11+
#include <unordered_map>
12+
#include <mutex>
13+
#include "concurrency.h"
14+
#include "logger.h"
15+
#include <unordered_map>
16+
#include <any>
17+
18+
namespace mesh::event {
19+
20+
enum class Type {
21+
empty_event,
22+
conn_unlink_requested,
23+
};
24+
25+
typedef void (* Handler)(const Type& type);
26+
27+
class Event {
28+
public:
29+
std::string consumer_id;
30+
Type type;
31+
std::unordered_map<std::string, std::any> params;
32+
};
33+
34+
/**
35+
* EventBroker
36+
*
37+
* Subsystem for passing events from producers to consumers.
38+
* Enables software component decoupling.
39+
*/
40+
class EventBroker {
41+
public:
42+
EventBroker() {}
43+
44+
thread::Channel<Event> * subscribe(const std::string& consumer_id, int queue_sz = 1) {
45+
auto ch = new(std::nothrow) thread::Channel<Event>(queue_sz);
46+
if (ch) {
47+
std::unique_lock lk(mx);
48+
channels[ch] = consumer_id;
49+
}
50+
return ch;
51+
}
52+
53+
bool unsubscribe(thread::Channel<Event> *ch) {
54+
std::unique_lock lk(mx);
55+
auto ret = channels.erase(ch) > 0;
56+
delete ch;
57+
return ret;
58+
}
59+
60+
bool send(context::Context& ctx, const std::string& consumer_id, const Type type,
61+
const std::unordered_map<std::string, std::any>& params = {}) {
62+
Event evt = {
63+
.consumer_id = consumer_id,
64+
.type = type,
65+
.params = params,
66+
};
67+
return events.send(ctx, evt);
68+
}
69+
70+
void run(context::Context& ctx) {
71+
for (;;) {
72+
auto v = events.receive(ctx);
73+
if (ctx.cancelled())
74+
return;
75+
76+
if (!v.has_value())
77+
continue;
78+
79+
auto evt = v.value();
80+
for (const auto& kv : channels) {
81+
if (kv.second == evt.consumer_id) {
82+
auto tctx = context::WithTimeout(ctx, std::chrono::milliseconds(3000));
83+
if (ctx.cancelled())
84+
return;
85+
86+
if (tctx.cancelled()) {
87+
log::error("Event sending timeout")("type", (int)evt.type)
88+
("consumer_id", evt.consumer_id);
89+
} else if (!kv.first->send(tctx, evt)) {
90+
log::error("Event sending failed")("type", (int)evt.type)
91+
("consumer_id", evt.consumer_id);
92+
}
93+
}
94+
}
95+
}
96+
}
97+
98+
private:
99+
std::unordered_map<thread::Channel<Event> *, std::string> channels;
100+
std::mutex mx;
101+
thread::Channel<Event> events = thread::Channel<Event>(100);
102+
};
103+
104+
extern EventBroker broker;
105+
106+
} // namespace mesh::event
107+
108+
#endif // EVENT_H

media-proxy/include/mesh/manager_local.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ namespace mesh::connection {
1717
class LocalManager {
1818
public:
1919
int create_connection_sdk(context::Context& ctx, std::string& id,
20-
mcm_conn_param *param, memif_conn_param *memif_param,
20+
const std::string& client_id, mcm_conn_param *param,
21+
memif_conn_param *memif_param,
2122
const Config& conn_config, std::string& err_str);
2223

23-
int activate_connection_sdk(context::Context& ctx, const std::string& id);
24+
Result activate_connection_sdk(context::Context& ctx, const std::string& id);
2425

2526
int delete_connection_sdk(context::Context& ctx, const std::string& id,
2627
bool do_unregister = true);
@@ -29,6 +30,8 @@ class LocalManager {
2930

3031
int reregister_all_connections(context::Context& ctx);
3132

33+
int notify_all_shutdown_wait(context::Context& ctx);
34+
3235
void shutdown(context::Context& ctx);
3336

3437
void lock();

0 commit comments

Comments
 (0)