Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
01373b0
Add error code "error_shutdown",
tszumski Nov 20, 2024
ba5c36a
Add ST2110 Rx/Tx 20/22/30 connections with basic UT
tszumski Nov 21, 2024
030c4ef
Adjust clang-format and apply it to st2110 files
tszumski Nov 21, 2024
8f3aa39
Call shutdown() in destructor
tszumski Nov 21, 2024
ae8e31d
CI/CD quick fix. Disable MTL tests as hugepages are not set
tszumski Nov 22, 2024
2954106
Replace std::function, with virtual fn in ST2110 derrived classes
tszumski Nov 22, 2024
974051f
Fix mutex
tszumski Nov 22, 2024
2db96fc
1. Free memory allocated by "strdup()"
tszumski Nov 26, 2024
cfdbb20
1. Remove prefix "_*" from class members name
tszumski Nov 26, 2024
0afaf08
1. Move empty constructor to header
tszumski Nov 26, 2024
e25952d
Rework functions to return an error if the format is unknown.
tszumski Nov 26, 2024
d194ef1
1. Initialze _ctx with context::WithCancel(context::Background());
tszumski Nov 26, 2024
1f0e0da
Set sent variable only when data is actually sent.
tszumski Nov 26, 2024
3bf6fce
Set state Result::configured only if Result::success
tszumski Nov 26, 2024
2bfc5b4
Rename variable "_sent" to "to_be_sent"
tszumski Nov 26, 2024
b29cdf8
Format-coding applied based on updated clang-format
tszumski Nov 26, 2024
db06c5a
Fix log message
tszumski Nov 26, 2024
2dd4cd3
Use only atomic for synchronization
tszumski Nov 26, 2024
8d01718
Rename error message
tszumski Nov 27, 2024
80abcb0
Move varaibles to default initializers
tszumski Nov 27, 2024
565e3fe
Adjust code formatting
tszumski Nov 27, 2024
e8a44f5
Fix synchronization based on std::atomic
tszumski Nov 27, 2024
c628077
Fix setting state prematurely
tszumski Nov 27, 2024
fa712dc
Reorder loop
tszumski Nov 27, 2024
ee85085
Remove redundant mesh:: namespace
tszumski Nov 29, 2024
b154f4b
Add test scenarios for ST2110 transmit and receive following Concept.…
tszumski Nov 29, 2024
2e7b582
Move common implementation from ST2110Tx/ST2110Rx to ST2110
tszumski Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ BasedOnStyle: LLVM
IndentWidth: 4
UseTab: Never
ColumnLimit: 100
BreakBeforeBraces: Linux
BreakBeforeBraces: Custom
BraceWrapping:
AfterClass: false
AfterNamespace: false
AfterFunction: false
AllowShortIfStatementsOnASingleLine: false
AllowShortEnumsOnASingleLine: false
IndentCaseLabels: false
IncludeBlocks: Preserve
SortIncludes: false
ReferenceAlignment: Left
...
2 changes: 1 addition & 1 deletion media-proxy/include/mesh/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ enum class Result {
error_bad_argument,
error_out_of_memory,
error_general_failure,

error_context_cancelled,
// TODO: more error codes to be added...
};

Expand Down
142 changes: 142 additions & 0 deletions media-proxy/include/mesh/st2110.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#ifndef ST2110_H
#define ST2110_H

#include <thread>
#include <bsd/string.h>
#include <arpa/inet.h>
#include <mtl/st_pipeline_api.h>
#include <mtl/st30_pipeline_api.h>

#include "conn.h"
#include "mesh_dp.h"
#include "logger.h"

namespace mesh::connection {

#define ST_APP_PAYLOAD_TYPE_ST30 (111)
#define ST_APP_PAYLOAD_TYPE_ST20 (112)
#define ST_APP_PAYLOAD_TYPE_ST22 (114)

int mesh_video_format_to_st_format(int fmt, st_frame_fmt& st_fmt);
int mesh_audio_format_to_st_format(int fmt, st30_fmt& st_fmt);
int mesh_audio_sampling_to_st_sampling(int sampling, st30_sampling& st_sampling);
int mesh_audio_ptime_to_st_ptime(int ptime, st30_ptime& st_ptime);

void *get_frame_data_ptr(st_frame *src);
void *get_frame_data_ptr(st30_frame *src);

void get_mtl_dev_params(mtl_init_params& st_param, const std::string& dev_port,
mtl_log_level log_level, const char local_ip_addr[MESH_IP_ADDRESS_SIZE]);
mtl_handle get_mtl_device(const std::string& dev_port, mtl_log_level log_level,
const char local_ip_addr[MESH_IP_ADDRESS_SIZE], int& session_id);

/**
* ST2110
*
* Base abstract class of SPMTE ST2110-xx bridge. ST2110Rx/ST2110Tx
* inherit this class.
*/
template <typename FRAME, typename HANDLE, typename OPS> class ST2110 : public Connection {
public:
virtual ~ST2110() {
shutdown(_ctx);
if (ops.name)
free((void *)ops.name);
};

protected:
mtl_handle mtl_device = nullptr;
HANDLE mtl_session = nullptr;
OPS ops = {0};
size_t transfer_size = 0;
std::atomic<bool> frame_available;
context::Context _ctx = context::WithCancel(context::Background());

virtual FRAME *get_frame(HANDLE) = 0;
virtual int put_frame(HANDLE, FRAME *) = 0;
virtual HANDLE create_session(mtl_handle, OPS *) = 0;
virtual int close_session(HANDLE) = 0;

static int frame_available_cb(void *ptr) {
auto _this = static_cast<ST2110 *>(ptr);
if (!_this) {
return -1;
}

_this->notify_frame_available();

return 0;
}

void notify_frame_available() {
frame_available.store(true, std::memory_order_release);
frame_available.notify_one();
}

void wait_frame_available() {
frame_available.wait(false, std::memory_order_acquire);
frame_available = false;
}

virtual int configure_common(context::Context& ctx, const std::string& dev_port,
const MeshConfig_ST2110& cfg_st2110) {
int session_id = 0;
mtl_device = get_mtl_device(dev_port, MTL_LOG_LEVEL_CRIT, cfg_st2110.local_ip_addr, session_id);
if (!mtl_device) {
log::error("Failed to get MTL device");
return -1;
}

strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN);
ops.port.num_port = 1;

char session_name[NAME_MAX] = "";
snprintf(session_name, NAME_MAX, "mcm_mtl_%d", session_id);
if (ops.name)
free((void *)ops.name);
ops.name = strdup(session_name);
ops.framebuff_cnt = 4;

ops.priv = this; // app handle register to lib
ops.notify_frame_available = frame_available_cb;

log::info("ST2110: configure")
("port", ops.port.port[MTL_PORT_P])
("num_port", (int)ops.port.num_port)
("name", ops.name)
("framebuff_cnt", ops.framebuff_cnt);

return 0;
}

Result on_establish(context::Context& ctx) override {
_ctx = context::WithCancel(ctx);
frame_available = false;

mtl_session = create_session(mtl_device, &ops);
if (!mtl_session) {
log::error("Failed to create session");
set_state(ctx, State::closed);
return set_result(Result::error_general_failure);
}

set_state(ctx, State::active);
return set_result(Result::success);
}

Result on_shutdown(context::Context& ctx) override {
_ctx.cancel();
notify_frame_available();

if (mtl_session) {
close_session(mtl_session);
mtl_session = nullptr;
}
set_state(ctx, State::closed);
return set_result(Result::success);
};
};

} // namespace mesh::connection

#endif // ST2110_H
129 changes: 129 additions & 0 deletions media-proxy/include/mesh/st2110rx.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#ifndef ST2110RX_H
#define ST2110RX_H

#include "st2110.h"

namespace mesh::connection {

/**
* ST2110Rx
*
* Base abstract class of ST2110Rx. ST2110_20Rx/ST2110_22Rx/ST2110_30Rx
* inherit this class.
*/
template <typename FRAME, typename HANDLE, typename OPS>
class ST2110Rx : public ST2110<FRAME, HANDLE, OPS> {
public:
ST2110Rx() { this->_kind = Kind::receiver; }

protected:
std::jthread frame_thread_handle;

int configure_common(context::Context& ctx, const std::string& dev_port,
const MeshConfig_ST2110& cfg_st2110) override{
ST2110<FRAME, HANDLE, OPS>::configure_common(ctx, dev_port, cfg_st2110);

inet_pton(AF_INET, cfg_st2110.remote_ip_addr, this->ops.port.ip_addr[MTL_PORT_P]);
inet_pton(AF_INET, cfg_st2110.local_ip_addr, this->ops.port.mcast_sip_addr[MTL_PORT_P]);
this->ops.port.udp_port[MTL_PORT_P] = cfg_st2110.local_port;

log::info("ST2110Rx: configure")
("ip_addr", std::to_string(this->ops.port.ip_addr[MTL_PORT_P][0]) + " " +
std::to_string(this->ops.port.ip_addr[MTL_PORT_P][1]) + " " +
std::to_string(this->ops.port.ip_addr[MTL_PORT_P][2]) + " " +
std::to_string(this->ops.port.ip_addr[MTL_PORT_P][3]))
("mcast_sip_addr", std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][0]) + " " +
std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][1]) + " " +
std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][2]) + " " +
std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][3]))
("udp_port", this->ops.port.udp_port[MTL_PORT_P]);
return 0;
}

Result on_establish(context::Context& ctx) override {
Result res = ST2110<FRAME, HANDLE, OPS>::on_establish(ctx);
if (res != Result::success) {
return res;
}

/* Start MTL session thread. */
try {
frame_thread_handle = std::jthread(&ST2110Rx::frame_thread, this);
} catch (const std::system_error& e) {
log::error("Failed to create thread");
this->set_state(ctx, State::closed);
return this->set_result(Result::error_out_of_memory);
}

this->set_state(ctx, State::active);
return this->set_result(Result::success);
}

Result on_shutdown(context::Context& ctx) override {
Result res = ST2110<FRAME, HANDLE, OPS>::on_shutdown(ctx);
if (res != Result::success) {
return res;
}

frame_thread_handle.join();

this->set_state(ctx, State::closed);
return this->set_result(Result::success);
};

private:
void frame_thread() {
while (!this->_ctx.cancelled()) {
// Get full buffer from MTL
FRAME *frame_ptr = this->get_frame(this->mtl_session);
if (frame_ptr) {
// Forward buffer to emulated receiver
this->transmit(this->_ctx, get_frame_data_ptr(frame_ptr), this->transfer_size);
// Return used buffer to MTL
this->put_frame(this->mtl_session, frame_ptr);
} else {
this->wait_frame_available();
}
}
}
};

class ST2110_20Rx : public ST2110Rx<st_frame, st20p_rx_handle, st20p_rx_ops> {
public:
Result configure(context::Context& ctx, const std::string& dev_port,
const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Video& cfg_video);

protected:
st_frame *get_frame(st20p_rx_handle h) override;
int put_frame(st20p_rx_handle h, st_frame *f) override;
st20p_rx_handle create_session(mtl_handle h, st20p_rx_ops *o) override;
int close_session(st20p_rx_handle h) override;
};

class ST2110_22Rx : public ST2110Rx<st_frame, st22p_rx_handle, st22p_rx_ops> {
public:
Result configure(context::Context& ctx, const std::string& dev_port,
const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Video& cfg_video);

protected:
st_frame *get_frame(st22p_rx_handle h) override;
int put_frame(st22p_rx_handle h, st_frame *f) override;
st22p_rx_handle create_session(mtl_handle h, st22p_rx_ops *o) override;
int close_session(st22p_rx_handle h) override;
};

class ST2110_30Rx : public ST2110Rx<st30_frame, st30p_rx_handle, st30p_rx_ops> {
public:
Result configure(context::Context& ctx, const std::string& dev_port,
const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Audio& cfg_audio);

protected:
st30_frame *get_frame(st30p_rx_handle h) override;
int put_frame(st30p_rx_handle h, st30_frame *f) override;
st30p_rx_handle create_session(mtl_handle h, st30p_rx_ops *o) override;
int close_session(st30p_rx_handle h) override;
};

} // namespace mesh::connection

#endif // ST2110RX_H
Loading