diff --git a/.clang-format b/.clang-format index 331acefc6..88b118d2d 100644 --- a/.clang-format +++ b/.clang-format @@ -3,10 +3,15 @@ BasedOnStyle: LLVM IndentWidth: 4 UseTab: Never ColumnLimit: 100 -BreakBeforeBraces: Linux +BreakBeforeBraces: Custom +BraceWrapping: + AfterClass: false + AfterNamespace: false + AfterFunction: false AllowShortIfStatementsOnASingleLine: false AllowShortEnumsOnASingleLine: false IndentCaseLabels: false IncludeBlocks: Preserve SortIncludes: false +ReferenceAlignment: Left ... diff --git a/media-proxy/include/mesh/conn.h b/media-proxy/include/mesh/conn.h index ad5da1cd2..29f863450 100644 --- a/media-proxy/include/mesh/conn.h +++ b/media-proxy/include/mesh/conn.h @@ -66,7 +66,7 @@ enum class Result { error_bad_argument, error_out_of_memory, error_general_failure, - + error_context_cancelled, // TODO: more error codes to be added... }; diff --git a/media-proxy/include/mesh/st2110.h b/media-proxy/include/mesh/st2110.h new file mode 100644 index 000000000..74e306e88 --- /dev/null +++ b/media-proxy/include/mesh/st2110.h @@ -0,0 +1,142 @@ +#ifndef ST2110_H +#define ST2110_H + +#include +#include +#include +#include +#include + +#include "conn.h" +#include "mesh_dp.h" +#include "logger.h" + +namespace mesh::connection { + +#define ST_APP_PAYLOAD_TYPE_ST30 (111) +#define ST_APP_PAYLOAD_TYPE_ST20 (112) +#define ST_APP_PAYLOAD_TYPE_ST22 (114) + +int mesh_video_format_to_st_format(int fmt, st_frame_fmt& st_fmt); +int mesh_audio_format_to_st_format(int fmt, st30_fmt& st_fmt); +int mesh_audio_sampling_to_st_sampling(int sampling, st30_sampling& st_sampling); +int mesh_audio_ptime_to_st_ptime(int ptime, st30_ptime& st_ptime); + +void *get_frame_data_ptr(st_frame *src); +void *get_frame_data_ptr(st30_frame *src); + +void get_mtl_dev_params(mtl_init_params& st_param, const std::string& dev_port, + mtl_log_level log_level, const char local_ip_addr[MESH_IP_ADDRESS_SIZE]); +mtl_handle get_mtl_device(const std::string& dev_port, mtl_log_level log_level, + const char local_ip_addr[MESH_IP_ADDRESS_SIZE], int& session_id); + +/** + * ST2110 + * + * Base abstract class of SPMTE ST2110-xx bridge. ST2110Rx/ST2110Tx + * inherit this class. + */ +template class ST2110 : public Connection { + public: + virtual ~ST2110() { + shutdown(_ctx); + if (ops.name) + free((void *)ops.name); + }; + + protected: + mtl_handle mtl_device = nullptr; + HANDLE mtl_session = nullptr; + OPS ops = {0}; + size_t transfer_size = 0; + std::atomic frame_available; + context::Context _ctx = context::WithCancel(context::Background()); + + virtual FRAME *get_frame(HANDLE) = 0; + virtual int put_frame(HANDLE, FRAME *) = 0; + virtual HANDLE create_session(mtl_handle, OPS *) = 0; + virtual int close_session(HANDLE) = 0; + + static int frame_available_cb(void *ptr) { + auto _this = static_cast(ptr); + if (!_this) { + return -1; + } + + _this->notify_frame_available(); + + return 0; + } + + void notify_frame_available() { + frame_available.store(true, std::memory_order_release); + frame_available.notify_one(); + } + + void wait_frame_available() { + frame_available.wait(false, std::memory_order_acquire); + frame_available = false; + } + + virtual int configure_common(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110) { + int session_id = 0; + mtl_device = get_mtl_device(dev_port, MTL_LOG_LEVEL_CRIT, cfg_st2110.local_ip_addr, session_id); + if (!mtl_device) { + log::error("Failed to get MTL device"); + return -1; + } + + strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); + ops.port.num_port = 1; + + char session_name[NAME_MAX] = ""; + snprintf(session_name, NAME_MAX, "mcm_mtl_%d", session_id); + if (ops.name) + free((void *)ops.name); + ops.name = strdup(session_name); + ops.framebuff_cnt = 4; + + ops.priv = this; // app handle register to lib + ops.notify_frame_available = frame_available_cb; + + log::info("ST2110: configure") + ("port", ops.port.port[MTL_PORT_P]) + ("num_port", (int)ops.port.num_port) + ("name", ops.name) + ("framebuff_cnt", ops.framebuff_cnt); + + return 0; + } + + Result on_establish(context::Context& ctx) override { + _ctx = context::WithCancel(ctx); + frame_available = false; + + mtl_session = create_session(mtl_device, &ops); + if (!mtl_session) { + log::error("Failed to create session"); + set_state(ctx, State::closed); + return set_result(Result::error_general_failure); + } + + set_state(ctx, State::active); + return set_result(Result::success); + } + + Result on_shutdown(context::Context& ctx) override { + _ctx.cancel(); + notify_frame_available(); + + if (mtl_session) { + close_session(mtl_session); + mtl_session = nullptr; + } + set_state(ctx, State::closed); + return set_result(Result::success); + }; +}; + +} // namespace mesh::connection + +#endif // ST2110_H diff --git a/media-proxy/include/mesh/st2110rx.h b/media-proxy/include/mesh/st2110rx.h new file mode 100644 index 000000000..692537936 --- /dev/null +++ b/media-proxy/include/mesh/st2110rx.h @@ -0,0 +1,129 @@ +#ifndef ST2110RX_H +#define ST2110RX_H + +#include "st2110.h" + +namespace mesh::connection { + +/** + * ST2110Rx + * + * Base abstract class of ST2110Rx. ST2110_20Rx/ST2110_22Rx/ST2110_30Rx + * inherit this class. + */ +template +class ST2110Rx : public ST2110 { + public: + ST2110Rx() { this->_kind = Kind::receiver; } + + protected: + std::jthread frame_thread_handle; + + int configure_common(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110) override{ + ST2110::configure_common(ctx, dev_port, cfg_st2110); + + inet_pton(AF_INET, cfg_st2110.remote_ip_addr, this->ops.port.ip_addr[MTL_PORT_P]); + inet_pton(AF_INET, cfg_st2110.local_ip_addr, this->ops.port.mcast_sip_addr[MTL_PORT_P]); + this->ops.port.udp_port[MTL_PORT_P] = cfg_st2110.local_port; + + log::info("ST2110Rx: configure") + ("ip_addr", std::to_string(this->ops.port.ip_addr[MTL_PORT_P][0]) + " " + + std::to_string(this->ops.port.ip_addr[MTL_PORT_P][1]) + " " + + std::to_string(this->ops.port.ip_addr[MTL_PORT_P][2]) + " " + + std::to_string(this->ops.port.ip_addr[MTL_PORT_P][3])) + ("mcast_sip_addr", std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][0]) + " " + + std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][1]) + " " + + std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][2]) + " " + + std::to_string(this->ops.port.mcast_sip_addr[MTL_PORT_P][3])) + ("udp_port", this->ops.port.udp_port[MTL_PORT_P]); + return 0; + } + + Result on_establish(context::Context& ctx) override { + Result res = ST2110::on_establish(ctx); + if (res != Result::success) { + return res; + } + + /* Start MTL session thread. */ + try { + frame_thread_handle = std::jthread(&ST2110Rx::frame_thread, this); + } catch (const std::system_error& e) { + log::error("Failed to create thread"); + this->set_state(ctx, State::closed); + return this->set_result(Result::error_out_of_memory); + } + + this->set_state(ctx, State::active); + return this->set_result(Result::success); + } + + Result on_shutdown(context::Context& ctx) override { + Result res = ST2110::on_shutdown(ctx); + if (res != Result::success) { + return res; + } + + frame_thread_handle.join(); + + this->set_state(ctx, State::closed); + return this->set_result(Result::success); + }; + + private: + void frame_thread() { + while (!this->_ctx.cancelled()) { + // Get full buffer from MTL + FRAME *frame_ptr = this->get_frame(this->mtl_session); + if (frame_ptr) { + // Forward buffer to emulated receiver + this->transmit(this->_ctx, get_frame_data_ptr(frame_ptr), this->transfer_size); + // Return used buffer to MTL + this->put_frame(this->mtl_session, frame_ptr); + } else { + this->wait_frame_available(); + } + } + } +}; + +class ST2110_20Rx : public ST2110Rx { + public: + Result configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Video& cfg_video); + + protected: + st_frame *get_frame(st20p_rx_handle h) override; + int put_frame(st20p_rx_handle h, st_frame *f) override; + st20p_rx_handle create_session(mtl_handle h, st20p_rx_ops *o) override; + int close_session(st20p_rx_handle h) override; +}; + +class ST2110_22Rx : public ST2110Rx { + public: + Result configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Video& cfg_video); + + protected: + st_frame *get_frame(st22p_rx_handle h) override; + int put_frame(st22p_rx_handle h, st_frame *f) override; + st22p_rx_handle create_session(mtl_handle h, st22p_rx_ops *o) override; + int close_session(st22p_rx_handle h) override; +}; + +class ST2110_30Rx : public ST2110Rx { + public: + Result configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Audio& cfg_audio); + + protected: + st30_frame *get_frame(st30p_rx_handle h) override; + int put_frame(st30p_rx_handle h, st30_frame *f) override; + st30p_rx_handle create_session(mtl_handle h, st30p_rx_ops *o) override; + int close_session(st30p_rx_handle h) override; +}; + +} // namespace mesh::connection + +#endif // ST2110RX_H diff --git a/media-proxy/include/mesh/st2110tx.h b/media-proxy/include/mesh/st2110tx.h new file mode 100644 index 000000000..79d11a819 --- /dev/null +++ b/media-proxy/include/mesh/st2110tx.h @@ -0,0 +1,102 @@ +#ifndef ST2110TX_H +#define ST2110TX_H + +#include "st2110.h" + +namespace mesh::connection { + +/** + * ST2110Tx + * + * Base abstract class of ST2110Tx. ST2110_20Tx/ST2110_22Tx/ST2110_30Tx + * inherit this class. + */ +template +class ST2110Tx : public ST2110 { + public: + ST2110Tx() { this->_kind = Kind::transmitter; }; + + protected: + int configure_common(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110) override { + ST2110::configure_common(ctx, dev_port, cfg_st2110); + + inet_pton(AF_INET, cfg_st2110.remote_ip_addr, this->ops.port.dip_addr[MTL_PORT_P]); + this->ops.port.udp_port[MTL_PORT_P] = cfg_st2110.remote_port; + this->ops.port.udp_src_port[MTL_PORT_P] = cfg_st2110.local_port; + + log::info("ST2110Tx: configure") + ("dip_addr", std::to_string(this->ops.port.dip_addr[MTL_PORT_P][0]) + " " + + std::to_string(this->ops.port.dip_addr[MTL_PORT_P][1]) + " " + + std::to_string(this->ops.port.dip_addr[MTL_PORT_P][2]) + " " + + std::to_string(this->ops.port.dip_addr[MTL_PORT_P][3])) + ("udp_port", this->ops.port.udp_port[MTL_PORT_P]) + ("udp_src_port", this->ops.port.udp_src_port[MTL_PORT_P]); + return 0; + } + + Result on_receive(context::Context& ctx, void *ptr, uint32_t sz, uint32_t& sent) override { + int copy_size = this->transfer_size > sz ? sz : this->transfer_size; + + FRAME *frame; + for (;;) { + if (ctx.cancelled() || this->_ctx.cancelled()) + return this->set_result(Result::error_context_cancelled); + + // Get empty buffer from MTL + frame = this->get_frame(this->mtl_session); + if (frame) + break; + + this->wait_frame_available(); + } + + // Copy data from emulated transmitter to MTL empty buffer + mtl_memcpy(get_frame_data_ptr(frame), ptr, copy_size); + // Return full buffer to MTL + this->put_frame(this->mtl_session, frame); + + sent = this->transfer_size; + return this->set_result(Result::success); + }; +}; + +class ST2110_20Tx : public ST2110Tx { + public: + Result configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Video& cfg_video); + + protected: + st_frame *get_frame(st20p_tx_handle h) override; + int put_frame(st20p_tx_handle h, st_frame *f) override; + st20p_tx_handle create_session(mtl_handle h, st20p_tx_ops *o) override; + int close_session(st20p_tx_handle h) override; +}; + +class ST2110_22Tx : public ST2110Tx { + public: + Result configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Video& cfg_video); + + protected: + st_frame *get_frame(st22p_tx_handle h) override; + int put_frame(st22p_tx_handle h, st_frame *f) override; + st22p_tx_handle create_session(mtl_handle h, st22p_tx_ops *o) override; + int close_session(st22p_tx_handle h) override; +}; + +class ST2110_30Tx : public ST2110Tx { + public: + Result configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, const MeshConfig_Audio& cfg_audio); + + protected: + st30_frame *get_frame(st30p_tx_handle h) override; + int put_frame(st30p_tx_handle h, st30_frame *f) override; + st30p_tx_handle create_session(mtl_handle h, st30p_tx_ops *o) override; + int close_session(st30p_tx_handle h) override; +}; + +} // namespace mesh::connection + +#endif // ST2110TX_H diff --git a/media-proxy/src/mesh/conn.cc b/media-proxy/src/mesh/conn.cc index 85a178ebf..186e4e114 100644 --- a/media-proxy/src/mesh/conn.cc +++ b/media-proxy/src/mesh/conn.cc @@ -15,6 +15,7 @@ Connection::Connection() _status = Status::initial; setting_link = false; transmitting = false; + _link = nullptr; metrics.inbound_bytes = 0; metrics.outbound_bytes = 0; @@ -273,6 +274,7 @@ const char * result2str(Result res) case Result::error_bad_argument: return "bad argument"; case Result::error_out_of_memory: return "out of memory"; case Result::error_general_failure: return "general failure"; + case Result::error_context_cancelled: return "context cancelled"; default: return str_unknown; } } diff --git a/media-proxy/src/mesh/st2110.cc b/media-proxy/src/mesh/st2110.cc new file mode 100644 index 000000000..0dcf7f026 --- /dev/null +++ b/media-proxy/src/mesh/st2110.cc @@ -0,0 +1,165 @@ +#include "st2110.h" + +namespace mesh::connection { + +int mesh_video_format_to_st_format(int mesh_fmt, st_frame_fmt& st_fmt) { + switch (mesh_fmt) { + case MESH_VIDEO_PIXEL_FORMAT_NV12: + st_fmt = ST_FRAME_FMT_YUV420CUSTOM8; + break; + case MESH_VIDEO_PIXEL_FORMAT_YUV422P: + st_fmt = ST_FRAME_FMT_YUV422PLANAR8; + break; + case MESH_VIDEO_PIXEL_FORMAT_YUV422P10LE: + st_fmt = ST_FRAME_FMT_YUV422PLANAR10LE; + break; + case MESH_VIDEO_PIXEL_FORMAT_YUV444P10LE: + st_fmt = ST_FRAME_FMT_YUV444PLANAR10LE; + break; + case MESH_VIDEO_PIXEL_FORMAT_RGB8: + st_fmt = ST_FRAME_FMT_RGB8; + break; + default: + return -1; // Error: unknown format + } + + return 0; // Success +} + +int mesh_audio_format_to_st_format(int mesh_fmt, st30_fmt& st_fmt) { + switch (mesh_fmt) { + case MESH_AUDIO_FORMAT_PCM_S8: + st_fmt = ST30_FMT_PCM8; + break; + case MESH_AUDIO_FORMAT_PCM_S16BE: + st_fmt = ST30_FMT_PCM16; + break; + case MESH_AUDIO_FORMAT_PCM_S24BE: + st_fmt = ST30_FMT_PCM24; + break; + default: + return -1; // Error: unknown format + } + + return 0; // Success +} + +int mesh_audio_sampling_to_st_sampling(int sampling, st30_sampling& st_sampling) { + switch (sampling) { + case MESH_AUDIO_SAMPLE_RATE_48000: + st_sampling = ST30_SAMPLING_48K; + break; + case MESH_AUDIO_SAMPLE_RATE_96000: + st_sampling = ST30_SAMPLING_96K; + break; + case MESH_AUDIO_SAMPLE_RATE_44100: + st_sampling = ST31_SAMPLING_44K; + break; + default: + return -1; // Error: unknown sampling rate + } + + return 0; // Success +} + +int mesh_audio_ptime_to_st_ptime(int ptime, st30_ptime& st_ptime) { + switch (ptime) { + case MESH_AUDIO_PACKET_TIME_1MS: + st_ptime = ST30_PTIME_1MS; + break; + case MESH_AUDIO_PACKET_TIME_125US: + st_ptime = ST30_PTIME_125US; + break; + case MESH_AUDIO_PACKET_TIME_250US: + st_ptime = ST30_PTIME_250US; + break; + case MESH_AUDIO_PACKET_TIME_333US: + st_ptime = ST30_PTIME_333US; + break; + case MESH_AUDIO_PACKET_TIME_4MS: + st_ptime = ST30_PTIME_4MS; + break; + case MESH_AUDIO_PACKET_TIME_80US: + st_ptime = ST31_PTIME_80US; + break; + case MESH_AUDIO_PACKET_TIME_1_09MS: + st_ptime = ST31_PTIME_1_09MS; + break; + case MESH_AUDIO_PACKET_TIME_0_14MS: + st_ptime = ST31_PTIME_0_14MS; + break; + case MESH_AUDIO_PACKET_TIME_0_09MS: + st_ptime = ST31_PTIME_0_09MS; + break; + default: + return -1; // Error: unknown packet time + } + + return 0; // Success +} + +void *get_frame_data_ptr(st_frame *src) { return src->addr[0]; } + +void *get_frame_data_ptr(st30_frame *src) { return src->addr; } + +void get_mtl_dev_params(mtl_init_params& st_param, const std::string& dev_port, + mtl_log_level log_level, const char local_ip_addr[MESH_IP_ADDRESS_SIZE]) { + if (getenv("KAHAWAI_CFG_PATH") == NULL) { + setenv("KAHAWAI_CFG_PATH", "/usr/local/etc/imtl.json", 0); + } + strlcpy(st_param.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); + inet_pton(AF_INET, local_ip_addr, st_param.sip_addr[MTL_PORT_P]); + st_param.pmd[MTL_PORT_P] = mtl_pmd_by_port_name(st_param.port[MTL_PORT_P]); + st_param.num_ports = 1; + st_param.flags = MTL_FLAG_BIND_NUMA; + st_param.flags |= MTL_FLAG_TX_VIDEO_MIGRATE; + st_param.flags |= MTL_FLAG_RX_VIDEO_MIGRATE; + st_param.flags |= MTL_FLAG_RX_UDP_PORT_ONLY; + st_param.pacing = ST21_TX_PACING_WAY_AUTO; + st_param.log_level = log_level; + st_param.priv = NULL; + st_param.ptp_get_time_fn = NULL; + // Native af_xdp have only 62 queues available + if (st_param.pmd[MTL_PORT_P] == MTL_PMD_NATIVE_AF_XDP) { + st_param.rx_queues_cnt[MTL_PORT_P] = 62; + st_param.tx_queues_cnt[MTL_PORT_P] = 62; + } else { + st_param.rx_queues_cnt[MTL_PORT_P] = 128; + st_param.tx_queues_cnt[MTL_PORT_P] = 128; + } + st_param.lcores = NULL; + st_param.memzone_max = 9000; +} + +mtl_handle get_mtl_device(const std::string& dev_port, mtl_log_level log_level, + const char local_ip_addr[MESH_IP_ADDRESS_SIZE], int& session_id) { + static mtl_handle dev_handle; + static int _session_id; + static std::mutex mtx; + std::lock_guard lock(mtx); + + session_id = _session_id++; + + if (dev_handle) { + return dev_handle; + } + + mtl_init_params st_param = {0}; + get_mtl_dev_params(st_param, dev_port, log_level, local_ip_addr); + // create device + dev_handle = mtl_init(&st_param); + if (!dev_handle) { + log::error("Failed to initialize MTL device"); + return nullptr; + } + + // start MTL device + if (mtl_start(dev_handle) != 0) { + log::error("Failed to start MTL device"); + return nullptr; + } + + return dev_handle; +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/st2110_20rx.cc b/media-proxy/src/mesh/st2110_20rx.cc new file mode 100644 index 000000000..992786dd0 --- /dev/null +++ b/media-proxy/src/mesh/st2110_20rx.cc @@ -0,0 +1,66 @@ +#include "st2110rx.h" + +namespace mesh::connection { + +st_frame *ST2110_20Rx::get_frame(st20p_rx_handle h) { + return st20p_rx_get_frame(h); +}; + +int ST2110_20Rx::put_frame(st20p_rx_handle h, st_frame *f) { + return st20p_rx_put_frame(h, f); +}; + +st20p_rx_handle ST2110_20Rx::create_session(mtl_handle h, st20p_rx_ops *o) { + return st20p_rx_create(h, o); +}; + +int ST2110_20Rx::close_session(st20p_rx_handle h) { + return st20p_rx_free(h); +}; + +Result ST2110_20Rx::configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, + const MeshConfig_Video& cfg_video) { + if (cfg_st2110.transport != MESH_CONN_TRANSPORT_ST2110_20) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (configure_common(ctx, dev_port, cfg_st2110)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST20; + ops.width = cfg_video.width; + ops.height = cfg_video.height; + ops.fps = st_frame_rate_to_st_fps(cfg_video.fps); + ops.transport_fmt = ST20_FMT_YUV_422_PLANAR10LE; + + if (mesh_video_format_to_st_format(cfg_video.pixel_format, ops.output_fmt)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.device = ST_PLUGIN_DEVICE_AUTO; + + transfer_size = st_frame_size(ops.output_fmt, ops.width, ops.height, false); + if (transfer_size == 0) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + log::info("ST2110_20Rx: configure") + ("payload_type", (int)ops.port.payload_type) + ("width", ops.width) + ("height", ops.height) + ("fps", ops.fps) + ("transport_fmt", ops.transport_fmt) + ("output_fmt", ops.output_fmt) + ("device", ops.device); + + set_state(ctx, State::configured); + return set_result(Result::success); +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/st2110_20tx.cc b/media-proxy/src/mesh/st2110_20tx.cc new file mode 100644 index 000000000..44efac0c0 --- /dev/null +++ b/media-proxy/src/mesh/st2110_20tx.cc @@ -0,0 +1,66 @@ +#include "st2110tx.h" + +namespace mesh::connection { + +st_frame *ST2110_20Tx::get_frame(st20p_tx_handle h) { + return st20p_tx_get_frame(h); +}; + +int ST2110_20Tx::put_frame(st20p_tx_handle h, st_frame *f) { + return st20p_tx_put_frame(h, f); +}; + +st20p_tx_handle ST2110_20Tx::create_session(mtl_handle h, st20p_tx_ops *o) { + return st20p_tx_create(h, o); +}; + +int ST2110_20Tx::close_session(st20p_tx_handle h) { + return st20p_tx_free(h); +}; + +Result ST2110_20Tx::configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, + const MeshConfig_Video& cfg_video) { + if (cfg_st2110.transport != MESH_CONN_TRANSPORT_ST2110_20) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (configure_common(ctx, dev_port, cfg_st2110)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST20; + ops.width = cfg_video.width; + ops.height = cfg_video.height; + ops.fps = st_frame_rate_to_st_fps(cfg_video.fps); + ops.transport_fmt = ST20_FMT_YUV_422_PLANAR10LE; + + if (mesh_video_format_to_st_format(cfg_video.pixel_format, ops.input_fmt)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.device = ST_PLUGIN_DEVICE_AUTO; + + log::info("ST2110_20Tx: configure") + ("payload_type", (int)ops.port.payload_type) + ("width", ops.width) + ("height", ops.height) + ("fps", ops.fps) + ("transport_fmt", ops.transport_fmt) + ("input_fmt", ops.input_fmt) + ("device", ops.device); + + transfer_size = st_frame_size(ops.input_fmt, ops.width, ops.height, false); + if (transfer_size == 0) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + set_state(ctx, State::configured); + return set_result(Result::success); +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/st2110_22rx.cc b/media-proxy/src/mesh/st2110_22rx.cc new file mode 100644 index 000000000..2eda00d25 --- /dev/null +++ b/media-proxy/src/mesh/st2110_22rx.cc @@ -0,0 +1,68 @@ +#include "st2110rx.h" + +namespace mesh::connection { + +st_frame *ST2110_22Rx::get_frame(st22p_rx_handle h) { + return st22p_rx_get_frame(h); +}; + +int ST2110_22Rx::put_frame(st22p_rx_handle h, st_frame *f) { + return st22p_rx_put_frame(h, f); +}; + +st22p_rx_handle ST2110_22Rx::create_session(mtl_handle h, st22p_rx_ops *o) { + return st22p_rx_create(h, o); +}; + +int ST2110_22Rx::close_session(st22p_rx_handle h) { + return st22p_rx_free(h); +}; + +Result ST2110_22Rx::configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, + const MeshConfig_Video& cfg_video) { + if (cfg_st2110.transport != MESH_CONN_TRANSPORT_ST2110_22) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (configure_common(ctx, dev_port, cfg_st2110)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST22; + ops.width = cfg_video.width; + ops.height = cfg_video.height; + ops.fps = st_frame_rate_to_st_fps(cfg_video.fps); + + if (mesh_video_format_to_st_format(cfg_video.pixel_format, ops.output_fmt)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.device = ST_PLUGIN_DEVICE_AUTO; + ops.pack_type = ST22_PACK_CODESTREAM; + ops.codec = ST22_CODEC_JPEGXS; + ops.codec_thread_cnt = 0; + ops.max_codestream_size = 0; + + log::info("ST2110_22Rx: configure") + ("payload_type", (int)ops.port.payload_type) + ("width", ops.width) + ("height", ops.height) + ("fps", ops.fps) + ("output_fmt", ops.output_fmt) + ("device", ops.device); + + transfer_size = st_frame_size(ops.output_fmt, ops.width, ops.height, false); + if (transfer_size == 0) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + set_state(ctx, State::configured); + return set_result(Result::success); +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/st2110_22tx.cc b/media-proxy/src/mesh/st2110_22tx.cc new file mode 100644 index 000000000..1f34c8d59 --- /dev/null +++ b/media-proxy/src/mesh/st2110_22tx.cc @@ -0,0 +1,69 @@ +#include "st2110tx.h" + +namespace mesh::connection { + +st_frame *ST2110_22Tx::get_frame(st22p_tx_handle h) { + return st22p_tx_get_frame(h); +}; + +int ST2110_22Tx::put_frame(st22p_tx_handle h, st_frame *f) { + return st22p_tx_put_frame(h, f); +}; + +st22p_tx_handle ST2110_22Tx::create_session(mtl_handle h, st22p_tx_ops *o) { + return st22p_tx_create(h, o); +}; + +int ST2110_22Tx::close_session(st22p_tx_handle h) { + return st22p_tx_free(h); +}; + +Result ST2110_22Tx::configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, + const MeshConfig_Video& cfg_video) { + if (cfg_st2110.transport != MESH_CONN_TRANSPORT_ST2110_22) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (configure_common(ctx, dev_port, cfg_st2110)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST22; + ops.width = cfg_video.width; + ops.height = cfg_video.height; + ops.fps = st_frame_rate_to_st_fps(cfg_video.fps); + + if (mesh_video_format_to_st_format(cfg_video.pixel_format, ops.input_fmt)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.device = ST_PLUGIN_DEVICE_AUTO; + ops.pack_type = ST22_PACK_CODESTREAM; + ops.codec = ST22_CODEC_JPEGXS; + ops.quality = ST22_QUALITY_MODE_SPEED; + ops.codec_thread_cnt = 0; + ops.codestream_size = ops.width * ops.height * 3 / 8; + + log::info("ST2110_22Tx: configure") + ("payload_type", (int)ops.port.payload_type) + ("width", ops.width) + ("height", ops.height) + ("fps", ops.fps) + ("input_fmt", ops.input_fmt) + ("device", ops.device); + + transfer_size = st_frame_size(ops.input_fmt, ops.width, ops.height, false); + if (transfer_size == 0) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + set_state(ctx, State::configured); + return set_result(Result::success); +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/st2110_30rx.cc b/media-proxy/src/mesh/st2110_30rx.cc new file mode 100644 index 000000000..1d8a6b591 --- /dev/null +++ b/media-proxy/src/mesh/st2110_30rx.cc @@ -0,0 +1,70 @@ +#include "st2110rx.h" + +namespace mesh::connection { + +st30_frame *ST2110_30Rx::get_frame(st30p_rx_handle h) { + return st30p_rx_get_frame(h); +}; + +int ST2110_30Rx::put_frame(st30p_rx_handle h, st30_frame *f) { + return st30p_rx_put_frame(h, f); +}; + +st30p_rx_handle ST2110_30Rx::create_session(mtl_handle h, st30p_rx_ops *o) { + return st30p_rx_create(h, o); +}; + +int ST2110_30Rx::close_session(st30p_rx_handle h) { + return st30p_rx_free(h); +}; + +Result ST2110_30Rx::configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, + const MeshConfig_Audio& cfg_audio) { + if (cfg_st2110.transport != MESH_CONN_TRANSPORT_ST2110_30) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (configure_common(ctx, dev_port, cfg_st2110)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST30; + + if (mesh_audio_format_to_st_format(cfg_audio.format, ops.fmt)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.channel = cfg_audio.channels; + if (mesh_audio_sampling_to_st_sampling(cfg_audio.sample_rate, ops.sampling)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (mesh_audio_ptime_to_st_ptime(cfg_audio.packet_time, ops.ptime)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + log::info("ST2110_30Rx: configure") + ("payload_type", (int)ops.port.payload_type) + ("audio_fmt", ops.fmt) + ("audio_chan", ops.channel) + ("audio_sampl", ops.sampling) + ("audio_ptime", ops.ptime); + + ops.framebuff_size = transfer_size = + st30_get_packet_size(ops.fmt, ops.ptime, ops.sampling, ops.channel); + if (transfer_size == 0) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + set_state(ctx, State::configured); + return set_result(Result::success); +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/st2110_30tx.cc b/media-proxy/src/mesh/st2110_30tx.cc new file mode 100644 index 000000000..03debbb9a --- /dev/null +++ b/media-proxy/src/mesh/st2110_30tx.cc @@ -0,0 +1,70 @@ +#include "st2110tx.h" + +namespace mesh::connection { + +st30_frame *ST2110_30Tx::get_frame(st30p_tx_handle h) { + return st30p_tx_get_frame(h); +}; + +int ST2110_30Tx::put_frame(st30p_tx_handle h, st30_frame *f) { + return st30p_tx_put_frame(h, f); +}; + +st30p_tx_handle ST2110_30Tx::create_session(mtl_handle h, st30p_tx_ops *o) { + return st30p_tx_create(h, o); +}; + +int ST2110_30Tx::close_session(st30p_tx_handle h) { + return st30p_tx_free(h); +}; + +Result ST2110_30Tx::configure(context::Context& ctx, const std::string& dev_port, + const MeshConfig_ST2110& cfg_st2110, + const MeshConfig_Audio& cfg_audio) { + if (cfg_st2110.transport != MESH_CONN_TRANSPORT_ST2110_30) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (configure_common(ctx, dev_port, cfg_st2110)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST30; + + if (mesh_audio_format_to_st_format(cfg_audio.format, ops.fmt)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + ops.channel = cfg_audio.channels; + if (mesh_audio_sampling_to_st_sampling(cfg_audio.sample_rate, ops.sampling)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + if (mesh_audio_ptime_to_st_ptime(cfg_audio.packet_time, ops.ptime)) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + log::info("ST2110_30Tx: configure") + ("payload_type", (int)ops.port.payload_type) + ("audio_fmt", ops.fmt) + ("audio_chan", ops.channel) + ("audio_sampl", ops.sampling) + ("audio_ptime", ops.ptime); + + ops.framebuff_size = transfer_size = + st30_get_packet_size(ops.fmt, ops.ptime, ops.sampling, ops.channel); + if (transfer_size == 0) { + set_state(ctx, State::not_configured); + return set_result(Result::error_bad_argument); + } + + set_state(ctx, State::configured); + return set_result(Result::success); +} + +} // namespace mesh::connection diff --git a/media-proxy/src/mesh/uuid.cc b/media-proxy/src/mesh/uuid.cc index 52ca52f91..25b857d7f 100644 --- a/media-proxy/src/mesh/uuid.cc +++ b/media-proxy/src/mesh/uuid.cc @@ -8,6 +8,7 @@ #include #include #include +#include namespace mesh { diff --git a/media-proxy/tests/st2110_tests.cc b/media-proxy/tests/st2110_tests.cc new file mode 100644 index 000000000..787da0db5 --- /dev/null +++ b/media-proxy/tests/st2110_tests.cc @@ -0,0 +1,664 @@ +#include +#include "mesh/st2110rx.h" +#include "mesh/st2110tx.h" +#include "mesh/conn.h" +using namespace mesh; + +#define DUMMY_DATA1 "DUMMY_DATA1" +#define DUMMY_DATA2 "DUMMY_DATA2" + +class EmulatedTransmitter : public connection::Connection { + public: + EmulatedTransmitter(context::Context& ctx) { + _kind = connection::Kind::transmitter; + set_state(ctx, connection::State::configured); + } + + connection::Result on_establish(context::Context& ctx) { + set_state(ctx, connection::State::active); + return connection::Result::success; + } + + connection::Result on_shutdown(context::Context& ctx) { return connection::Result::success; } + + connection::Result transmit_wrapper(context::Context& ctx, void *ptr, uint32_t sz) { + return transmit(ctx, ptr, sz); + } +}; + +class EmulatedReceiver : public connection::Connection { + public: + uint32_t received_packets_lossless; + uint32_t received_packets_lossy; + EmulatedReceiver(context::Context& ctx) { + _kind = connection::Kind::receiver; + received_packets_lossless = 0; + received_packets_lossy = 0; + set_state(ctx, connection::State::configured); + } + + connection::Result on_establish(context::Context& ctx) { + set_state(ctx, connection::State::active); + return connection::Result::success; + } + + connection::Result on_shutdown(context::Context& ctx) { return connection::Result::success; } + + connection::Result on_receive(context::Context& ctx, void *ptr, uint32_t sz, uint32_t& sent) { + if (memcmp(ptr, DUMMY_DATA1, sizeof(DUMMY_DATA1)) == 0) { + received_packets_lossless++; + } else { + received_packets_lossy++; + } + return connection::Result::success; + } +}; + +class EmulatedST2110_Tx : public connection::ST2110Tx { + public: + uint32_t received_packets_dummy1; + uint32_t received_packets_dummy2; + + EmulatedST2110_Tx() { + received_packets_dummy1 = 0; + received_packets_dummy2 = 0; + transfer_size = 10000; + }; + ~EmulatedST2110_Tx() {}; + + connection::Result configure(context::Context& ctx) { + set_state(ctx, connection::State::configured); + return connection::Result::success; + } + + st_frame *get_frame(int *h) override { + st_frame *f = new st_frame; + f->addr[0] = calloc(1000, 1); + memcpy(f->addr[0], DUMMY_DATA1, sizeof(DUMMY_DATA1)); + return f; + } + + int put_frame(int *d, st_frame *f) override { + if (memcmp(f->addr[0], DUMMY_DATA1, sizeof(DUMMY_DATA1)) == 0) { + received_packets_dummy1++; + } else if (memcmp(f->addr[0], DUMMY_DATA2, sizeof(DUMMY_DATA2)) == 0) { + received_packets_dummy2++; + } + + free(f->addr[0]); + delete f; + return 0; + } + + int *create_session(mtl_handle, st20p_tx_ops *o) override { return (int *)malloc(1); } + + int close_session(int *h) override { + free(h); + return 0; + } +}; + +class EmulatedST2110_Rx : public connection::ST2110Rx { + public: + uint32_t received_packets_dummy1; + uint32_t received_packets_dummy2; + EmulatedST2110_Rx() { + received_packets_dummy1 = 0; + received_packets_dummy2 = 0; + transfer_size = 10000; + }; + ~EmulatedST2110_Rx() {}; + + connection::Result configure(context::Context& ctx) { + set_state(ctx, connection::State::configured); + return connection::Result::success; + } + + st_frame *get_frame(int *h) override { + st_frame *f = new st_frame; + f->addr[0] = calloc(1000, 1); + memcpy(f->addr[0], DUMMY_DATA1, sizeof(DUMMY_DATA1)); + return f; + } + + int put_frame(int *d, st_frame *f) override { + if (memcmp(f->addr[0], DUMMY_DATA1, sizeof(DUMMY_DATA1)) == 0) { + received_packets_dummy1++; + } else if (memcmp(f->addr[0], DUMMY_DATA2, sizeof(DUMMY_DATA2)) == 0) { + received_packets_dummy2++; + } + + free(f->addr[0]); + delete f; + return 0; + } + + int *create_session(mtl_handle, st20p_rx_ops *o) override { return (int *)malloc(1); } + + int close_session(int *h) override { + free(h); + return 0; + } +}; + +static void validate_state_change(context::Context& ctx, connection::Connection *c) { + connection::Result res; + + // Change state Configured -> Active + res = c->establish(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(c->state(), connection::State::active); + + // Change state Active -> Suspended + res = c->suspend(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(c->state(), connection::State::suspended); + + // Change state Suspended -> Active + res = c->resume(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(c->state(), connection::State::active); + + // Change state Active -> Closed + res = c->shutdown(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(c->state(), connection::State::closed); +} + +static void get_ST2110_session_config(MeshConfig_ST2110& cfg_st2110, int transport) { + memcpy(cfg_st2110.local_ip_addr, "127.0.0.1", sizeof("127.0.0.1")); + memcpy(cfg_st2110.remote_ip_addr, "127.0.0.1", sizeof("127.0.0.1")); + cfg_st2110.local_port = 9001; + cfg_st2110.remote_port = 9001; + cfg_st2110.transport = transport; +} + +static void get_ST2110_video_cfg(MeshConfig_Video& cfg_video) { + cfg_video.fps = 30; + cfg_video.width = 1920; + cfg_video.height = 1080; + cfg_video.pixel_format = MESH_VIDEO_PIXEL_FORMAT_YUV422P10LE; +} + +static void get_ST2110_audio_cfg(MeshConfig_Audio& cfg_audio) { + cfg_audio.channels = 2; + cfg_audio.format = MESH_AUDIO_FORMAT_PCM_S16BE; + cfg_audio.packet_time = MESH_AUDIO_PACKET_TIME_1MS; + cfg_audio.sample_rate = MESH_AUDIO_SAMPLE_RATE_48000; +} + +TEST(st2110_tx, state_change) { + auto ctx = context::WithCancel(context::Background()); + + auto conn_tx = new EmulatedST2110_Tx; + ASSERT_EQ(conn_tx->kind(), connection::Kind::transmitter); + ASSERT_EQ(conn_tx->state(), connection::State::not_configured); + + // Change state: Not Configured -> Configured + connection::Result res = conn_tx->configure(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::configured); + + validate_state_change(ctx, dynamic_cast(conn_tx)); + + delete conn_tx; +} + +TEST(st2110_rx, state_change) { + auto ctx = context::WithCancel(context::Background()); + + auto conn_rx = new EmulatedST2110_Rx; + ASSERT_EQ(conn_rx->kind(), connection::Kind::receiver); + ASSERT_EQ(conn_rx->state(), connection::State::not_configured); + + // Change state: Not Configured -> Configured + connection::Result res = conn_rx->configure(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + validate_state_change(ctx, dynamic_cast(conn_rx)); + + delete conn_rx; +} + +TEST(DISABLED_st2110_20tx, state_change) { + auto ctx = context::WithCancel(context::Background()); + + auto conn_rx = new connection::ST2110_20Tx; + ASSERT_EQ(conn_rx->kind(), connection::Kind::transmitter); + ASSERT_EQ(conn_rx->state(), connection::State::not_configured); + + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_20); + + MeshConfig_Video cfg_video; + get_ST2110_video_cfg(cfg_video); + + // Change state: Not Configured -> Configured + std::string dev_port("kernel:lo"); + connection::Result res = conn_rx->configure(ctx, dev_port, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + validate_state_change(ctx, dynamic_cast(conn_rx)); + validate_state_change(ctx, dynamic_cast(conn_rx)); + + delete conn_rx; +} + +TEST(DISABLED_st2110_22tx, state_change) { + auto ctx = context::WithCancel(context::Background()); + + auto conn_rx = new connection::ST2110_22Tx; + ASSERT_EQ(conn_rx->kind(), connection::Kind::transmitter); + ASSERT_EQ(conn_rx->state(), connection::State::not_configured); + + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_22); + + MeshConfig_Video cfg_video; + get_ST2110_video_cfg(cfg_video); + + // Change state: Not Configured -> Configured + std::string dev_port("kernel:lo"); + connection::Result res = conn_rx->configure(ctx, dev_port, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + validate_state_change(ctx, dynamic_cast(conn_rx)); + validate_state_change(ctx, dynamic_cast(conn_rx)); + + delete conn_rx; +} + +TEST(DISABLED_st2110_30tx, state_change) { + auto ctx = context::WithCancel(context::Background()); + + auto conn_rx = new connection::ST2110_30Tx; + ASSERT_EQ(conn_rx->kind(), connection::Kind::transmitter); + ASSERT_EQ(conn_rx->state(), connection::State::not_configured); + + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_30); + + MeshConfig_Audio cfg_audio; + get_ST2110_audio_cfg(cfg_audio); + + // Change state: Not Configured -> Configured + std::string dev_port("kernel:lo"); + connection::Result res = conn_rx->configure(ctx, dev_port, cfg_st2110, cfg_audio); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + validate_state_change(ctx, dynamic_cast(conn_rx)); + + // Change state: Closed -> Configured + res = conn_rx->configure(ctx, dev_port, cfg_st2110, cfg_audio); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + validate_state_change(ctx, dynamic_cast(conn_rx)); + + delete conn_rx; +} + +TEST(st2110_tx, send_data) { + auto ctx = context::WithCancel(context::Background()); + connection::Result res; + + auto conn_tx = new EmulatedST2110_Tx; + auto emulated_tx = new EmulatedTransmitter(ctx); + + // Setup Tx connection + res = conn_tx->configure(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::configured); + + // Change state Configured -> Active + res = conn_tx->establish(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::active); + + // Setup Emulated Transmitter + emulated_tx->establish(ctx); + + // Connect Emulated Transmitter to Tx connection + emulated_tx->set_link(ctx, conn_tx); + + for (int i = 0; i < 5; i++) { + res = emulated_tx->transmit_wrapper(ctx, (void *)DUMMY_DATA2, sizeof(DUMMY_DATA2)); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::active); + ASSERT_GT(conn_tx->received_packets_dummy2, 0); + } + + // Shutdown Tx connection + res = conn_tx->shutdown(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::closed); + + // Destroy resources + delete emulated_tx; + delete conn_tx; +} + +TEST(st2110_rx, get_data) { + auto ctx = context::WithCancel(context::Background()); + connection::Result res; + + // Setup Emulated Receiver + auto emulated_rx = new EmulatedReceiver(ctx); + emulated_rx->establish(ctx); + + // Setup Rx connection + auto conn_rx = new EmulatedST2110_Rx; + + res = conn_rx->configure(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + res = conn_rx->establish(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::active); + + // Connect Rx connection to Emulated Receiver + conn_rx->set_link(ctx, emulated_rx); + + // Sleep some sufficient time to allow receiving the data from transmitter + mesh::thread::Sleep(ctx, std::chrono::milliseconds(100)); + + // Shutdown Rx connection + res = conn_rx->shutdown(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::closed); + + ASSERT_GT(conn_rx->received_packets_dummy1, 0); + // Destroy resources + delete conn_rx; + delete emulated_rx; +} + +/************************** */ +static void tx_thread(context::Context& ctx, connection::Connection *conn_tx) { + auto emulated_tx = new EmulatedTransmitter(ctx); + // Change state Configured -> Active + connection::Result res = conn_tx->establish(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::active); + + // Setup Emulated Transmitter + emulated_tx->establish(ctx); + + // Connect Emulated Transmitter to Tx connection + emulated_tx->set_link(ctx, conn_tx); + + for (int i = 0; i < 50; i++) { + res = emulated_tx->transmit_wrapper(ctx, (void *)DUMMY_DATA1, sizeof(DUMMY_DATA1)); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::active); + } + + // Shutdown Tx connection + res = conn_tx->shutdown(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::closed); + + // Destroy resources + delete emulated_tx; +} + +static void rx_thread(context::Context& ctx, connection::Connection *conn_rx, bool is_lossless) { + auto emulated_rx = new EmulatedReceiver(ctx); + emulated_rx->establish(ctx); + + conn_rx->set_link(ctx, emulated_rx); + // Connect Rx connection to Emulated Receiver + connection::Result res = conn_rx->establish(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::active); + + // Sleep some sufficient time to allow receiving the data from transmitter + mesh::thread::Sleep(ctx, std::chrono::milliseconds(500)); + + // Shutdown Rx connection + res = conn_rx->shutdown(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::closed); + + if (is_lossless) { + ASSERT_GT(emulated_rx->received_packets_lossless, 0); + ASSERT_EQ(emulated_rx->received_packets_lossy, 0); + } else { + ASSERT_GT(emulated_rx->received_packets_lossy, 0); + ASSERT_EQ(emulated_rx->received_packets_lossless, 0); + } + + delete emulated_rx; +} + +TEST(DISABLED_st2110_20, send_and_receive_data) { + auto ctx = context::WithCancel(context::Background()); + connection::Result res; + + // Setup Tx connection + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_20); + + MeshConfig_Video cfg_video; + get_ST2110_video_cfg(cfg_video); + + // Configure Tx + std::string dev_port("kernel:lo"); + auto conn_tx = new connection::ST2110_20Tx; + res = conn_tx->configure(ctx, dev_port, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::configured); + + // Configure Rx + auto conn_rx = new connection::ST2110_20Rx; + res = conn_rx->configure(ctx, dev_port, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + std::jthread rx_th, tx_th; + try { + rx_th = std::jthread(&rx_thread, std::ref(ctx), conn_rx, 1); + tx_th = std::jthread(&tx_thread, std::ref(ctx), conn_tx); + } catch (const std::system_error& e) { + ASSERT_TRUE(0) << e.what(); + } + + rx_th.join(); + tx_th.join(); + + delete conn_tx; + delete conn_rx; +} + +TEST(DISABLED_st2110_22, send_and_receive_data) { + auto ctx = context::WithCancel(context::Background()); + connection::Result res; + + // Setup Tx connection + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_22); + + MeshConfig_Video cfg_video; + get_ST2110_video_cfg(cfg_video); + + // Configure Tx + std::string dev_port("kernel:lo"); + auto conn_tx = new connection::ST2110_22Tx; + res = conn_tx->configure(ctx, dev_port, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::configured); + + // Configure Rx + auto conn_rx = new connection::ST2110_22Rx; + res = conn_rx->configure(ctx, dev_port, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + std::jthread rx_th, tx_th; + try { + rx_th = std::jthread(&rx_thread, std::ref(ctx), conn_rx, 0); + tx_th = std::jthread(&tx_thread, std::ref(ctx), conn_tx); + } catch (const std::system_error& e) { + ASSERT_TRUE(0) << e.what(); + } + + rx_th.join(); + tx_th.join(); + + delete conn_tx; + delete conn_rx; +} + +TEST(DISABLED_st2110_30, send_and_receive_data) { + auto ctx = context::WithCancel(context::Background()); + connection::Result res; + + // Setup Tx connection + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_30); + + MeshConfig_Audio cfg_audio; + get_ST2110_audio_cfg(cfg_audio); + + // Configure Tx + std::string dev_port("kernel:lo"); + auto conn_tx = new connection::ST2110_30Tx; + res = conn_tx->configure(ctx, dev_port, cfg_st2110, cfg_audio); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::configured); + + // Configure Rx + auto conn_rx = new connection::ST2110_30Rx; + res = conn_rx->configure(ctx, dev_port, cfg_st2110, cfg_audio); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + std::jthread rx_th, tx_th; + try { + rx_th = std::jthread(&rx_thread, std::ref(ctx), conn_rx, 1); + tx_th = std::jthread(&tx_thread, std::ref(ctx), conn_tx); + } catch (const std::system_error& e) { + ASSERT_TRUE(0) << e.what(); + } + + rx_th.join(); + tx_th.join(); + + delete conn_tx; + delete conn_rx; +} + +/********************************************CONCEPT.md scenario**************************************** */ +/** + * How to run the test: + * 1) Modify two variables port_card0 and port_card1 with the correct values of the network interfaces of the machine. + * 2) Open 2 consoles + * 2a) In the first console run the following command: + * ./media_proxy_unit_tests --gtest_also_run_disabled_tests --gtest_filter=*concept_scenarion.mtl_st20_rx + * 2b) In the second console run the following command: + * ./media_proxy_unit_tests --gtest_also_run_disabled_tests --gtest_filter=*concept_scenarion.mtl_st20_tx + * 3) Wait until test ends ~120s + * 4) Check the output of the first console, it should show the number of received packets ~2000 + * example: + * received_packets_lossless: 2000 + * received_packets_lossy: 0 + */ + +std::string port_card0("0000:4b:01.1"); +std::string port_card1("0000:4b:11.1"); + +TEST(DISABLED_concept_scenarion, mtl_st20_tx) { + auto ctx = context::WithCancel(context::Background()); + connection::Result res; + + MeshConfig_Video cfg_video; + get_ST2110_video_cfg(cfg_video); + + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_20); + memcpy(cfg_st2110.local_ip_addr, "192.168.96.2", sizeof("192.168.96.2")); + memcpy(cfg_st2110.remote_ip_addr, "192.168.96.1", sizeof("192.168.96.1")); + + // Configure Tx + auto conn_tx = new connection::ST2110_20Tx; + res = conn_tx->configure(ctx, port_card0, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::configured); + + auto emulated_tx = new EmulatedTransmitter(ctx); + // Change state Configured -> Active + res = conn_tx->establish(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::active); + + // Setup Emulated Transmitter + emulated_tx->establish(ctx); + + // Connect Emulated Transmitter to Tx connection + emulated_tx->set_link(ctx, conn_tx); + + uint32_t data_size = cfg_video.width * cfg_video.height * 4; + void *data = malloc(data_size); + memcpy(data, DUMMY_DATA1, sizeof(DUMMY_DATA1)); + + for (int i = 0; i < 2000; i++) { + res = emulated_tx->transmit_wrapper(ctx, data, data_size); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::active); + } + + // Shutdown Tx connection + res = conn_tx->shutdown(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_tx->state(), connection::State::closed); + + // Destroy resources + delete emulated_tx; + delete conn_tx; + free(data); +} + +TEST(DISABLED_concept_scenarion, mtl_st20_rx) { + auto ctx = context::WithCancel(context::Background()); + connection::Result res; + + MeshConfig_Video cfg_video; + get_ST2110_video_cfg(cfg_video); + + MeshConfig_ST2110 cfg_st2110; + get_ST2110_session_config(cfg_st2110, MESH_CONN_TRANSPORT_ST2110_20); + memcpy(cfg_st2110.local_ip_addr, "192.168.96.1", sizeof("192.168.96.1")); + memcpy(cfg_st2110.remote_ip_addr, "192.168.96.2", sizeof("192.168.96.2")); + + // Configure Rx + auto conn_rx = new connection::ST2110_20Rx; + res = conn_rx->configure(ctx, port_card1, cfg_st2110, cfg_video); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::configured); + + auto emulated_rx = new EmulatedReceiver(ctx); + emulated_rx->establish(ctx); + + conn_rx->set_link(ctx, emulated_rx); + // Connect Rx connection to Emulated Receiver + res = conn_rx->establish(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::active); + + // Sleep some sufficient time to allow receiving the data from transmitter + mesh::thread::Sleep(ctx, std::chrono::milliseconds(1000 * 120)); // 120s + + // Shutdown Rx connection + res = conn_rx->shutdown(ctx); + ASSERT_EQ(res, connection::Result::success) << connection::result2str(res); + ASSERT_EQ(conn_rx->state(), connection::State::closed); + + printf("received_packets_lossless: %d\n", emulated_rx->received_packets_lossless); + printf("received_packets_lossy: %d\n", emulated_rx->received_packets_lossy); + + delete emulated_rx; + delete conn_rx; +}