diff --git a/media-proxy/include/api_server_tcp.h b/media-proxy/include/api_server_tcp.h deleted file mode 100644 index bff0d1fd3..000000000 --- a/media-proxy/include/api_server_tcp.h +++ /dev/null @@ -1,9 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * SPDX-License-Identifier: BSD-3-Clause -*/ - -#pragma once -#include "proxy_context.h" - -void RunTCPServer(ProxyContext* ctx); diff --git a/media-proxy/include/proxy_context.h b/media-proxy/include/proxy_context.h deleted file mode 100644 index 88526217c..000000000 --- a/media-proxy/include/proxy_context.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * SPDX-License-Identifier: BSD-3-Clause -*/ - -#ifndef __PROXY_CONTEXT_H -#define __PROXY_CONTEXT_H - -#include -#include -#include -#include -#include - -#include "libfabric_dev.h" -#include - -#include "session-mtl.h" -#include "session-rdma.h" -#include - -#pragma once - -class ProxyContext { -public: - std::string mRpcCtrlAddr; - int mRpcCtrlPort; - - std::string mTcpCtrlAddr; - int mTcpCtrlPort; - - std::string mVideoFormat; - - // direction mDir; - std::vector mDpCtx; - mtl_handle mDevHandle = NULL; - libfabric_ctx *mDevHandle_rdma = NULL; - std::mutex ctx_mtx; - - bool imtl_init_preparing; - pthread_mutex_t mutex_lock; - - std::string mDevPort; - std::string mDpAddress; - std::string mDpPort; - - ProxyContext(void); - ProxyContext(std::string_view rpc_addr, std::string_view tcp_addr); - - void setRPCListenAddress(std::string_view addr); - void setTCPListenAddress(std::string_view addr); - void setDevicePort(std::string_view dev); - void setDataPlaneAddress(std::string_view ip); - void setDataPlanePort(std::string_view port); - - std::string getDevicePort(void); - std::string getDataPlaneAddress(void); - std::string getDataPlanePort(void); - - std::string getRPCListenAddress(void); - std::string getTCPListenAddress(void); - int getTCPListenPort(void); - - void ParseStInitParam(const mcm_conn_param* request, struct mtl_init_params* init_param); - - void ParseMemIFParam(const mcm_conn_param* request, memif_ops_t& memif_ops); - - int TxStart(const mcm_conn_param* request); - int RxStart(const mcm_conn_param* request); - int Stop(const int32_t session_id); - -private: - std::atomic mSessionCount; - pthread_mutex_t sessions_count_mutex_lock; - - ProxyContext(const ProxyContext&) = delete; - ProxyContext& operator=(const ProxyContext&) = delete; - uint32_t incrementMSessionCount(bool postIncrement); - - int TxStart_mtl(const mcm_conn_param *request); - int RxStart_mtl(const mcm_conn_param *request); - int TxStart_rdma(const mcm_conn_param *request); - int RxStart_rdma(const mcm_conn_param *request); -}; - -#endif // __PROXY_CONTEXT_H diff --git a/media-proxy/include/session-base.h b/media-proxy/include/session-base.h deleted file mode 100644 index 15673700c..000000000 --- a/media-proxy/include/session-base.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef __SESSION_BASE_H -#define __SESSION_BASE_H - -#include -#include -#include -#include -#include -#include - -#include - -#include "shm_memif.h" /* shared memory */ -#include "utils.h" - -class Session -{ - uint32_t id; - enum direction type; - mcm_payload_type payload_type; - - /* shared memory arguments */ - memif_socket_handle_t memif_socket; - memif_socket_args_t memif_socket_args; - memif_conn_args_t memif_conn_args; - std::thread *memif_event_thread; - - volatile bool memif_stop; - void memif_event_loop(); - int shm_deinit(); - - protected: - memif_conn_handle_t memif_conn; - std::atomic_bool shm_ready; - - int shm_init(uint32_t buffer_size, uint32_t log2_ring_size); - Session(memif_ops_t &memif_ops, mcm_payload_type payload, direction dir_type); - - public: - uint32_t get_id() { return id; }; - memif_socket_args_t get_socket_args() { return memif_socket_args; }; - memif_conn_args_t get_conn_args() { return memif_conn_args; }; - - virtual ~Session(); - - virtual int init() = 0; - virtual int on_connect_cb(memif_conn_handle_t conn); - virtual int on_disconnect_cb(memif_conn_handle_t conn); - virtual int on_receive_cb(memif_conn_handle_t conn, uint16_t qid); -}; - -#endif /* __SESSION_BASE_H */ diff --git a/media-proxy/include/session-mtl.h b/media-proxy/include/session-mtl.h deleted file mode 100644 index 186db9c3a..000000000 --- a/media-proxy/include/session-mtl.h +++ /dev/null @@ -1,228 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef __SESSION_MTL_H -#define __SESSION_MTL_H - -#include -#include /* st2110 pipeline */ -#include - -#include "session-base.h" - -#include -#include - -#define MTL_ZERO_COPY - -// Based on mtl/app/src/fmt.h -#ifndef ST_APP_PAYLOAD_TYPE_VIDEO -#define ST_APP_PAYLOAD_TYPE_ST30 (111) -#define ST_APP_PAYLOAD_TYPE_VIDEO (112) -#define ST_APP_PAYLOAD_TYPE_ST22 (114) -#endif - -class MtlSession : public Session -{ - protected: - mtl_handle st; - - pthread_cond_t wake_cond; - pthread_mutex_t wake_mutex; - - volatile bool stop; - - MtlSession(memif_ops_t &memif_ops, mcm_payload_type payload, direction dir_type, mtl_handle st); - virtual ~MtlSession(); - - public: - int frame_available_cb(); -}; - -class RxSt20MtlSession : public MtlSession -{ - st20p_rx_ops ops; - st20p_rx_handle handle; - - int fb_recv; - size_t frame_size; - - std::thread *frame_thread_handle; - -#if defined(MTL_ZERO_COPY) - std::queue fifo; - std::mutex fifo_mtx; - uint8_t *source_begin; - mtl_iova_t source_begin_iova; - size_t source_begin_iova_map_sz; -#endif - - void copy_connection_params(const mcm_conn_param &request, std::string &dev_port); - void consume_frame(struct st_frame *frame); - void frame_thread(); - -#if defined(MTL_ZERO_COPY) - int on_connect_cb(memif_conn_handle_t conn); - int on_disconnect_cb(memif_conn_handle_t conn); -#endif - - public: - int init(); - RxSt20MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, std::string dev_port, - memif_ops_t &memif_ops); - ~RxSt20MtlSession(); - -#if defined(MTL_ZERO_COPY) - int query_ext_frame_cb(struct st_ext_frame *ext_frame, struct st20_rx_frame_meta *meta); -#endif -}; - -class TxSt20MtlSession : public MtlSession -{ - st20p_tx_ops ops; - st20p_tx_handle handle; - - int fb_send; - size_t frame_size; - -#if defined(MTL_ZERO_COPY) - uint8_t *source_begin; - mtl_iova_t source_begin_iova; - size_t source_begin_iova_map_sz; -#endif - - void copy_connection_params(const mcm_conn_param &request, std::string &dev_port); - int on_receive_cb(memif_conn_handle_t conn, uint16_t qid); - -#if defined(MTL_ZERO_COPY) - int on_connect_cb(memif_conn_handle_t conn); - int on_disconnect_cb(memif_conn_handle_t conn); -#endif - - public: - int frame_done_cb(struct st_frame *frame); - int init(); - TxSt20MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, std::string dev_port, - memif_ops_t &memif_ops); - ~TxSt20MtlSession(); -}; - -class RxSt22MtlSession : public MtlSession -{ - st22p_rx_ops ops; - st22p_rx_handle handle; - - int fb_recv; - size_t frame_size; - - std::thread *frame_thread_handle; - -#if defined(MTL_ZERO_COPY) - std::queue fifo; - std::mutex fifo_mtx; - uint8_t *source_begin; - mtl_iova_t source_begin_iova; - size_t source_begin_iova_map_sz; -#endif - - void copy_connection_params(const mcm_conn_param &request, std::string &dev_port); - void consume_frame(struct st_frame *frame); - void frame_thread(); - -#if defined(MTL_ZERO_COPY) - int on_connect_cb(memif_conn_handle_t conn); - int on_disconnect_cb(memif_conn_handle_t conn); -#endif - - public: - int init(); - RxSt22MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, std::string dev_port, - memif_ops_t &memif_ops); - ~RxSt22MtlSession(); - -#if defined(MTL_ZERO_COPY) - int query_ext_frame_cb(struct st_ext_frame *ext_frame, struct st22_rx_frame_meta *meta); -#endif -}; - -class TxSt22MtlSession : public MtlSession -{ - st22p_tx_ops ops; - st22p_tx_handle handle; - - int fb_send; - size_t frame_size; - -#if defined(MTL_ZERO_COPY) - uint8_t *source_begin; - mtl_iova_t source_begin_iova; - size_t source_begin_iova_map_sz; -#endif - - void copy_connection_params(const mcm_conn_param &request, std::string &dev_port); - int on_receive_cb(memif_conn_handle_t conn, uint16_t qid); - -#if defined(MTL_ZERO_COPY) - int on_connect_cb(memif_conn_handle_t conn); - int on_disconnect_cb(memif_conn_handle_t conn); -#endif - - public: - int frame_done_cb(struct st_frame *frame); - int init(); - TxSt22MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, std::string dev_port, - memif_ops_t &memif_ops); - ~TxSt22MtlSession(); -}; - -class RxSt30MtlSession : public MtlSession -{ - st30p_rx_ops ops; - st30p_rx_handle handle; - - int fb_recv; - - std::thread *frame_thread_handle; - - void consume_frame(struct st30_frame *frame); - void copy_connection_params(const mcm_conn_param &request, std::string &dev_port); - void frame_thread(); - - public: - int init(); - RxSt30MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, std::string dev_port, - memif_ops_t &memif_ops); - ~RxSt30MtlSession(); -}; - -class TxSt30MtlSession : public MtlSession -{ - st30p_tx_ops ops; - st30p_tx_handle handle; - - int fb_send; - - void copy_connection_params(const mcm_conn_param &request, std::string &dev_port); - int on_receive_cb(memif_conn_handle_t conn, uint16_t qid); - - public: - int init(); - TxSt30MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, std::string dev_port, - memif_ops_t &memif_ops); - ~TxSt30MtlSession(); -}; - -/* Initialize MTL library */ -mtl_handle inst_init(struct mtl_init_params *st_param); - -/* Deinitialize MTL */ -void mtl_deinit(mtl_handle dev_handle); - -st_frame_fmt get_st_frame_fmt(video_pixel_format mcm_frame_fmt); - -int frame_available_callback_wrapper(void *priv_data); - -#endif /* __SESSION_MTL_H */ diff --git a/media-proxy/include/session-rdma.h b/media-proxy/include/session-rdma.h deleted file mode 100644 index 5bac0d593..000000000 --- a/media-proxy/include/session-rdma.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#ifndef __SESSION_RDMA_H -#define __SESSION_RDMA_H - -#include -#include - -#include "libfabric_dev.h" -#include "libfabric_ep.h" -#include "session-base.h" - -typedef struct { - memif_buffer_t shm_buf; - bool used; -} shm_buf_info_t; - -typedef struct { - size_t transfer_size; - rdma_addr remote_addr; - rdma_addr local_addr; - enum direction dir; -} rdma_s_ops_t; - -class TxRdmaSession : public Session -{ - ep_cfg_t ep_cfg; - ep_ctx_t *ep_ctx; - - volatile bool stop; - std::thread *frame_thread_handle; - - int fb_send; - size_t transfer_size; - - int on_connect_cb(memif_conn_handle_t conn); - int on_receive_cb(memif_conn_handle_t conn, uint16_t qid); - int on_disconnect_cb(memif_conn_handle_t conn); - - void frame_thread(); - void handle_sent_buffers(); - - public: - int init(); - TxRdmaSession(libfabric_ctx *dev_handle, const mcm_conn_param &request, memif_ops_t &memif_ops); - ~TxRdmaSession(); -}; - -class RxRdmaSession : public Session -{ - ep_cfg_t ep_cfg; - ep_ctx_t *ep_ctx; - - volatile bool stop; - std::thread *frame_thread_handle; - - int fb_recv; - size_t transfer_size; - - shm_buf_info_t *shm_bufs; - uint16_t shm_buf_num; - - int on_connect_cb(memif_conn_handle_t conn); - int on_disconnect_cb(memif_conn_handle_t conn); - - void frame_thread(); - void handle_received_buffers(); - shm_buf_info_t *get_free_shm_buf(); - int pass_empty_buf_to_libfabric(); - - public: - int init(); - RxRdmaSession(libfabric_ctx *dev_handle, const mcm_conn_param &request, memif_ops_t &memif_ops); - ~RxRdmaSession(); -}; - -#endif /* __SESSION_RDMA_H */ diff --git a/media-proxy/include/shm_memif.h b/media-proxy/include/shm_memif.h index 632328a8b..ce5b26b11 100644 --- a/media-proxy/include/shm_memif.h +++ b/media-proxy/include/shm_memif.h @@ -56,7 +56,6 @@ void print_memif_details(memif_conn_handle_t conn); int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid, memif_buffer_t * bufs, uint16_t count, uint16_t * count_out, uint32_t size, uint32_t timeout_ms); -int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region); #ifdef __cplusplus } diff --git a/media-proxy/src/api_server_tcp.cc b/media-proxy/src/api_server_tcp.cc deleted file mode 100644 index 2132f9da9..000000000 --- a/media-proxy/src/api_server_tcp.cc +++ /dev/null @@ -1,293 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * SPDX-License-Identifier: BSD-3-Clause -*/ - -#include -#include "api_server_tcp.h" -#include -#include -#include -#include "logger.h" - -static volatile bool keepRunning = true; - -typedef struct { - int sock; - struct sockaddr address; - int addr_len; -} connection_t; - -typedef struct _control_context { - ProxyContext* proxy_ctx; - connection_t* conn; -} control_context; - -void* msg_loop(void* ptr) -{ - int ret = 0; - int len = 0; - char* buffer = NULL; - bool sessionKeepRunning = true; - control_context* ctl_ctx = NULL; - connection_t* conn = NULL; - ProxyContext* proxy_ctx = NULL; - long addr = 0; - mcm_proxy_ctl_msg msg = {}; - uint32_t session_id = 0; - uint32_t intf_id = 0; - - if (!ptr) { - ERROR("msg_loop(void* ptr): Illegal Parameter, ptr==NULL."); - pthread_exit(0); - } - - ctl_ctx = (control_context*)ptr; - conn = ctl_ctx->conn; - proxy_ctx = ctl_ctx->proxy_ctx; - - /* infinite loop */ - do { - /* read control message header */ - ret = read(conn->sock, &msg.header, sizeof(msg.header)); - if (ret <= 0) { - break; - } - - if (msg.header.magic_word != *(uint32_t*)HEADER_MAGIC_WORD) { - ERROR("Header Data Mismatch: Incorrect magic word."); - continue; - } - if (msg.header.version != HEADER_VERSION) { - ERROR("Header Data Mismatch: Incorrect version of client."); - continue; - } - - /* control command */ - ret = read(conn->sock, &msg.command, sizeof(msg.command)); - if (ret <= 0) { - INFO("Failed to read control command."); - break; - } - - if (msg.command.data_len > 0) { - /* read parameters */ - buffer = (char*)malloc(msg.command.data_len); - if (buffer == NULL) { - ERROR("(char*)malloc(msg.command.data_len) in msg_loop() failed. Out of Memory."); - continue; - } else { - ret = 0; - int bytesRead = 0; - while (bytesRead < msg.command.data_len && ret >= 0) { - ret = read(conn->sock, buffer + bytesRead, msg.command.data_len - bytesRead); - if (ret >= 0) bytesRead += ret; - } - if (bytesRead < msg.command.data_len) { - ERROR("Read socket failed: Failed to read all command parameters."); - free(buffer); - buffer = NULL; - continue; - } - } - } - - mcm_conn_param param = { }; - - /* operation */ - switch (msg.command.inst) { - case MCM_CREATE_SESSION: - DEBUG("MCM_CREATE_SESSION: Case entry."); - if (buffer == NULL) { - INFO("MCM_CREATE_SESSION: Invalid parameters, buffer is NULL."); - break; - } - memcpy(¶m, buffer, sizeof(mcm_conn_param)); - if (param.type == is_tx) { - ret = proxy_ctx->TxStart(¶m); - } else { - ret = proxy_ctx->RxStart(¶m); - } - - if (ret >= 0) { - session_id = (uint32_t)ret; - if (write(conn->sock, &session_id, sizeof(session_id)) <= 0) { - ERROR("MCM_CREATE_SESSION: Return session id error, failed to write socket."); - } - } else { - ERROR("MCM_CREATE_SESSION: Failed to start a session."); - } - break; - case MCM_QUERY_MEMIF_PATH: - DEBUG("MCM_QUERY_MEMIF_PATH: Case entry."); - /* TODO: return memif socket path */ - break; - case MCM_QUERY_MEMIF_ID: - DEBUG("MCM_QUERY_MEMIF_ID: Case entry."); - /* TODO: return memdif ID */ - break; - case MCM_QUERY_MEMIF_PARAM: { - DEBUG("MCM_QUERY_MEMIF_PARAM: Case entry."); - if (buffer == NULL || msg.command.data_len < 4) { - INFO("Invalid parameters."); - break; - } - std::lock_guard lock(proxy_ctx->ctx_mtx); - session_id = *(uint32_t*)buffer; - for (auto it : proxy_ctx->mDpCtx) { - if (it->get_id() == session_id) { - /* return memif parameters. */ - memif_conn_param param = { - .socket_args = it->get_socket_args(), - .conn_args = it->get_conn_args(), - }; - - if (param.conn_args.is_master) { - param.conn_args.is_master = 0; - } else { - param.conn_args.is_master = 1; - } - - if (write(conn->sock, ¶m, sizeof(memif_conn_param)) <= 0) { - INFO("Fail to return path length."); - } - break; - } - } - break; - } - case MCM_DESTROY_SESSION: - DEBUG("MCM_DESTROY_SESSION: Case entry."); - if (buffer == NULL || msg.command.data_len < 4) { - INFO("Invalid parameters."); - break; - } - session_id = *(uint32_t*)buffer; - if (!proxy_ctx->Stop(session_id)) { - sessionKeepRunning = false; - } - break; - default: - DEBUG("UNKNOWN_CASE: Default case entry."); - break; - } - - if (buffer != NULL) { - free(buffer); - buffer = NULL; - } - } while (keepRunning && sessionKeepRunning); - - addr = (long)((struct sockaddr_in*)&conn->address)->sin_addr.s_addr; - INFO("Disconnect with %d.%d.%d.%d", - (int)((addr)&0xff), (int)((addr >> 8) & 0xff), - (int)((addr >> 16) & 0xff), (int)((addr >> 24) & 0xff)); - - if (session_id > 0) { - proxy_ctx->Stop(session_id); - } - - /* close socket and clean up */ - close(conn->sock); - free(conn); - free(ctl_ctx); - - pthread_exit(0); -} - -void handleSignals(int sig_num) -{ - keepRunning = false; - // exit(0); -} - -void registerSignals() -{ - signal(SIGINT, handleSignals); - signal(SIGTERM, handleSignals); - signal(SIGKILL, handleSignals); -} - -void RunTCPServer(ProxyContext* ctx) -{ - int sock = -1; - struct sockaddr_in address; - int port = 0; - connection_t* connection = NULL; - control_context* ctl_ctx = NULL; - pthread_t thread; - const int enable = 1; - - port = ctx->getTCPListenPort(); - if (port <= 0) { - INFO("Illegal TCP listen address"); - return; - } - - /* create socket */ - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - fprintf(stderr, "error: cannot create socket\n"); - return; - } - - /* Workaround to allow media_proxy to listen on the same port after termination and starting again */ - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) { - fprintf(stderr, "error: cannot set SO_REUSEADDR"); - close(sock); - return; - } - - /* bind socket to port */ - address.sin_family = AF_INET; - address.sin_addr.s_addr = htonl(INADDR_ANY); - address.sin_port = htons(port); - if (bind(sock, (struct sockaddr*)&address, sizeof(address)) < 0) { - fprintf(stderr, "error: cannot bind socket to port %d: %s\n", port, strerror(errno)); - close(sock); - return; - } - - /* listen on port and set 500 for 1K IPC sessions burst */ - if (listen(sock, 500) < 0) { - fprintf(stderr, "error: cannot listen on port\n"); - return; - } - - mesh::log::info("TCP Server listening on %s", ctx->getTCPListenAddress().c_str()); - // registerSignals(); - - do { - /* accept incoming connections */ - connection = (connection_t*)malloc(sizeof(connection_t)); - if (connection) { - memset(connection, 0x0, sizeof(connection_t)); - connection->sock = accept(sock, &connection->address, (socklen_t*)&connection->addr_len); - if (connection->sock > 0) { - /* start a new thread but do not wait for it */ - ctl_ctx = (control_context*)malloc(sizeof(control_context)); - if (ctl_ctx) { - memset(ctl_ctx, 0x0, sizeof(control_context)); - ctl_ctx->proxy_ctx = ctx; - ctl_ctx->conn = connection; - if (pthread_create(&thread, 0, msg_loop, (void*)ctl_ctx) == 0) { - pthread_detach(thread); - } else { - free(connection); - free(ctl_ctx); - } - } else { - free(connection); - } - } else { - free(connection); - if (errno == EINTR) - break; - } - } - } while (keepRunning); - - INFO("TCP Server Quit: %s", ctx->getTCPListenAddress().c_str()); - - return; -} diff --git a/media-proxy/src/media_proxy.cc b/media-proxy/src/media_proxy.cc index f299e72e8..98af071c6 100644 --- a/media-proxy/src/media_proxy.cc +++ b/media-proxy/src/media_proxy.cc @@ -7,8 +7,6 @@ #include #include -#include "api_server_tcp.h" - #include #include "concurrency.h" #include "sdk_api.h" @@ -192,13 +190,6 @@ int main(int argc, char* argv[]) log::info("RDMA dataplane local port ranges: %s", config::proxy.rdma.dataplane_local_ports.c_str()); - std::string legacy_grpc_port = DEFAULT_GRPC_PORT; - auto legacy_tcp_port = std::to_string(config::proxy.sdk_api_port + 10000); // DEBUG - ProxyContext* proxy_ctx = new ProxyContext("0.0.0.0:" + legacy_grpc_port, - "0.0.0.0:" + legacy_tcp_port); - proxy_ctx->setDevicePort(config::proxy.st2110.dev_port_bdf); - proxy_ctx->setDataPlaneAddress(config::proxy.st2110.dataplane_ip_addr); - // Intercept shutdown signals to cancel the main context auto signal_handler = [](int sig) { if (sig == SIGINT || sig == SIGTERM) { @@ -220,9 +211,6 @@ int main(int argc, char* argv[]) collector.run(ctx); }); - /* start TCP server */ - std::thread tcpThread(RunTCPServer, proxy_ctx); - // Start SDK API server auto sdk_ctx = context::WithCancel(context::Background()); std::thread sdkApiThread([&]() { RunSDKAPIServer(sdk_ctx); }); @@ -246,11 +234,6 @@ int main(int argc, char* argv[]) sdkApiThread.join(); log::info("Media Proxy exited"); - exit(0); - - tcpThread.join(); - - delete (proxy_ctx); return 0; } diff --git a/media-proxy/src/proxy_context.cc b/media-proxy/src/proxy_context.cc deleted file mode 100644 index 7951ace92..000000000 --- a/media-proxy/src/proxy_context.cc +++ /dev/null @@ -1,389 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include -#include -#include -#include - -#include "proxy_context.h" - -ProxyContext::ProxyContext(void) - : mRpcCtrlAddr("0.0.0.0:8001") - , mTcpCtrlAddr("0.0.0.0:8002") - , imtl_init_preparing(false), mSessionCount(0) -{ - mRpcCtrlPort = 8001; - mTcpCtrlPort = 8002; -} - -ProxyContext::ProxyContext(std::string_view rpc_addr, std::string_view tcp_addr) - : mRpcCtrlAddr(rpc_addr) - , mTcpCtrlAddr(tcp_addr) - , imtl_init_preparing(false), mSessionCount(0) -{ - auto colon = tcp_addr.find_first_of(":"); - if (colon >= tcp_addr.size() || - std::from_chars(tcp_addr.data() + colon + 1, tcp_addr.data() + tcp_addr.size(), mTcpCtrlPort).ec != std::errc()) - { - ERROR("ProxyContext::ProxyContext(): Illegal TCP listen address."); - throw; - } - - pthread_mutex_init(&sessions_count_mutex_lock, NULL); -} - -void ProxyContext::setRPCListenAddress(std::string_view addr) -{ - mRpcCtrlAddr = addr; -} - -void ProxyContext::setTCPListenAddress(std::string_view addr) -{ - mTcpCtrlAddr = addr; -} - -void ProxyContext::setDevicePort(std::string_view dev) -{ - mDevPort = dev; -} - -void ProxyContext::setDataPlaneAddress(std::string_view ip) -{ - mDpAddress = ip; -} - -void ProxyContext::setDataPlanePort(std::string_view port) -{ - mDpPort = port; -} - -std::string ProxyContext::getRPCListenAddress(void) -{ - return mRpcCtrlAddr; -} - -std::string ProxyContext::getTCPListenAddress(void) -{ - return mTcpCtrlAddr; -} - -int ProxyContext::getTCPListenPort(void) -{ - return mTcpCtrlPort; -} - -std::string ProxyContext::getDevicePort(void) -{ - return mDevPort; -} - -std::string ProxyContext::getDataPlaneAddress(void) -{ - return mDpAddress; -} - -std::string ProxyContext::getDataPlanePort(void) -{ - return mDpPort; -} - -uint32_t ProxyContext::incrementMSessionCount(bool postIncrement=true) -{ - uint32_t retValue; - pthread_mutex_lock(&this->sessions_count_mutex_lock); /* lock to protect mSessionCount from change by multi-session simultaneously */ - if(postIncrement) - retValue = (this->mSessionCount)++; - else - retValue = ++(this->mSessionCount); - pthread_mutex_unlock(&this->sessions_count_mutex_lock); - return retValue; -} - -void ProxyContext::ParseStInitParam(const mcm_conn_param* request, struct mtl_init_params* st_param) -{ - strlcpy(st_param->port[MTL_PORT_P], getDevicePort().c_str(), MTL_PORT_MAX_LEN); - inet_pton(AF_INET, getDataPlaneAddress().c_str(), st_param->sip_addr[MTL_PORT_P]); - st_param->pmd[MTL_PORT_P] = mtl_pmd_by_port_name(st_param->port[MTL_PORT_P]); - st_param->num_ports = 1; - st_param->flags = MTL_FLAG_BIND_NUMA; - st_param->flags |= MTL_FLAG_TX_VIDEO_MIGRATE; - st_param->flags |= MTL_FLAG_RX_VIDEO_MIGRATE; - st_param->flags |= request->payload_mtl_flags_mask; - st_param->pacing = (st21_tx_pacing_way) request->payload_mtl_pacing; - st_param->log_level = MTL_LOG_LEVEL_DEBUG; - st_param->priv = NULL; - st_param->ptp_get_time_fn = NULL; - // Native af_xdp have only 62 queues available - if(st_param->pmd[MTL_PORT_P] == MTL_PMD_NATIVE_AF_XDP) { - st_param->rx_queues_cnt[MTL_PORT_P] = 62; - st_param->tx_queues_cnt[MTL_PORT_P] = 62; - } else { - st_param->rx_queues_cnt[MTL_PORT_P] = 128; - st_param->tx_queues_cnt[MTL_PORT_P] = 128; - } - st_param->lcores = NULL; - st_param->memzone_max = 9000; - - INFO("ProxyContext: ParseStInitParam(const mcm_conn_param* request, struct mtl_init_params* st_param)"); - INFO("num_ports : '%d'", st_param->num_ports); - INFO("port : '%s'", st_param->port[MTL_PORT_P]); - INFO("port pmd : '%d'", int(st_param->pmd[MTL_PORT_P])); - INFO("sip_addr : '%s'", getDataPlaneAddress().c_str()); - INFO("flags: : '%ld'", st_param->flags); - INFO("log_level : %d", st_param->log_level); - if (st_param->lcores) { - INFO("lcores : %s", st_param->lcores); - } else { - INFO("lcores : NULL"); - } - INFO("rx_sessions_cnt_max : %d", st_param->rx_queues_cnt[MTL_PORT_P]); - INFO("tx_sessions_cnt_max : %d", st_param->tx_queues_cnt[MTL_PORT_P]); -} - -void ProxyContext::ParseMemIFParam(const mcm_conn_param* request, memif_ops_t& memif_ops) -{ - uint32_t sessionCount = incrementMSessionCount(); - std::string type_str = ""; - - if (request->type == is_tx) { - type_str = "tx"; - } else { - type_str = "rx"; - } - - memif_ops.is_master = 1; - memif_ops.interface_id = 0; - snprintf(memif_ops.app_name, sizeof(memif_ops.app_name), "memif_%s_%d", type_str.c_str(), int(sessionCount)); - snprintf(memif_ops.interface_name, sizeof(memif_ops.interface_name), "memif_%s_%d", type_str.c_str(), int(sessionCount)); - snprintf(memif_ops.socket_path, sizeof(memif_ops.socket_path), "/run/mcm/media_proxy_%s_%d.sock", type_str.c_str(), int(sessionCount)); - memif_ops.m_session_count = ++sessionCount; -} - -int ProxyContext::RxStart_rdma(const mcm_conn_param *request) -{ - memif_ops_t memif_ops = {0}; - int ret; - - if (!mDevHandle_rdma) { - ret = libfabric_dev_ops.rdma_init(&mDevHandle_rdma); - if (ret) { - INFO("%s, Failed to initialize libfabric.", __func__); - return -EINVAL; - } - } - - ParseMemIFParam(request, memif_ops); - - RxRdmaSession *session_ptr = new RxRdmaSession(mDevHandle_rdma, *request, memif_ops); - - if (session_ptr->init()) { - ERROR("%s, Failed to initialize session.", __func__); - delete session_ptr; - return -1; - } - - INFO("%s, session id: %d", __func__, session_ptr->get_id()); - std::lock_guard lock(ctx_mtx); - mDpCtx.push_back(session_ptr); - return session_ptr->get_id(); -} - -int ProxyContext::RxStart_mtl(const mcm_conn_param *request) -{ - INFO("ProxyContext: RxStart(const mcm_conn_param* request)"); - struct st20p_rx_ops opts = {0}; - memif_ops_t memif_ops = {0}; - int ret; - - /*add lock to protect MTL library initialization to aviod being called by multi-session - * simultaneously*/ - if (!mDevHandle && imtl_init_preparing == false) { - - imtl_init_preparing = true; - struct mtl_init_params st_param = {0}; - - /* set default parameters */ - ParseStInitParam(request, &st_param); - - mDevHandle = inst_init(&st_param); - if (mDevHandle == NULL) { - ERROR("%s, Failed to initialize MTL.", __func__); - return -1; - } else { - imtl_init_preparing = false; - } - } - - if (mDevHandle == NULL) { - ERROR("%s, Failed to initialize MTL for RxStart function.", __func__); - return -1; - } - - ParseMemIFParam(request, memif_ops); - - Session *session_ptr = NULL; - switch (request->payload_type) { - case PAYLOAD_TYPE_ST20_VIDEO: { - session_ptr = new RxSt20MtlSession(mDevHandle, *request, getDevicePort(), memif_ops); - break; - } - case PAYLOAD_TYPE_ST22_VIDEO: { - session_ptr = new RxSt22MtlSession(mDevHandle, *request, getDevicePort(), memif_ops); - break; - } - case PAYLOAD_TYPE_ST30_AUDIO: { - session_ptr = new RxSt30MtlSession(mDevHandle, *request, getDevicePort(), memif_ops); - break; - } - case PAYLOAD_TYPE_ST40_ANCILLARY: - default: { - ERROR("Unsupported payload\n"); - return -1; - } - } - - if (session_ptr->init()) { - ERROR("%s, Failed to initialize session.", __func__); - delete session_ptr; - return -1; - } - - INFO("%s, session id: %d", __func__, session_ptr->get_id()); - std::lock_guard lock(ctx_mtx); - mDpCtx.push_back(session_ptr); - return session_ptr->get_id(); -} - -int ProxyContext::RxStart(const mcm_conn_param *request) -{ - if (request->payload_type == PAYLOAD_TYPE_RDMA_VIDEO) - return RxStart_rdma(request); - - return RxStart_mtl(request); -} - -int ProxyContext::TxStart_rdma(const mcm_conn_param *request) -{ - memif_ops_t memif_ops = {0}; - int ret; - - if (!mDevHandle_rdma) { - ret = libfabric_dev_ops.rdma_init(&mDevHandle_rdma); - if (ret) { - INFO("%s, Failed to initialize libfabric.", __func__); - return -EINVAL; - } - } - - ParseMemIFParam(request, memif_ops); - - TxRdmaSession *session_ptr = new TxRdmaSession(mDevHandle_rdma, *request, memif_ops); - - if (session_ptr->init()) { - ERROR("%s, Failed to initialize session.", __func__); - delete session_ptr; - return -1; - } - - INFO("%s, session id: %d", __func__, session_ptr->get_id()); - std::lock_guard lock(ctx_mtx); - mDpCtx.push_back(session_ptr); - return session_ptr->get_id(); -} - -int ProxyContext::TxStart_mtl(const mcm_conn_param *request) -{ - INFO("ProxyContext: TxStart(const mcm_conn_param* request)"); - memif_ops_t memif_ops = {0}; - - /* add lock to protect MTL library initialization to avoid being called by multi-session - * simultaneously */ - if (mDevHandle == NULL && imtl_init_preparing == false) { - - imtl_init_preparing = true; - - struct mtl_init_params st_param = {0}; - - /* set default parameters */ - ParseStInitParam(request, &st_param); - - mDevHandle = inst_init(&st_param); - if (mDevHandle == NULL) { - ERROR("%s, Failed to initialize MTL.", __func__); - return -1; - } else { - imtl_init_preparing = false; - } - } - - if (mDevHandle == NULL) { - ERROR("%s, Failed to initialize MTL for TxStart function.", __func__); - return -1; - } - - ParseMemIFParam(request, memif_ops); - - Session *session_ptr = NULL; - switch (request->payload_type) { - case PAYLOAD_TYPE_ST20_VIDEO: { - session_ptr = new TxSt20MtlSession(mDevHandle, *request, getDevicePort(), memif_ops); - break; - } - case PAYLOAD_TYPE_ST22_VIDEO: { - session_ptr = new TxSt22MtlSession(mDevHandle, *request, getDevicePort(), memif_ops); - break; - } - case PAYLOAD_TYPE_ST30_AUDIO: { - session_ptr = new TxSt30MtlSession(mDevHandle, *request, getDevicePort(), memif_ops); - break; - } - case PAYLOAD_TYPE_ST40_ANCILLARY: - default: { - ERROR("Unsupported payload\n"); - return -1; - } - } - - if (session_ptr->init()) { - ERROR("%s, Failed to initialize session.", __func__); - delete session_ptr; - return -1; - } - - INFO("%s, session id: %d", __func__, session_ptr->get_id()); - std::lock_guard lock(ctx_mtx); - mDpCtx.push_back(session_ptr); - return session_ptr->get_id(); -} - -int ProxyContext::TxStart(const mcm_conn_param *request) -{ - if (request->payload_type == PAYLOAD_TYPE_RDMA_VIDEO) - return TxStart_rdma(request); - - return TxStart_mtl(request); -} - -int ProxyContext::Stop(const int32_t session_id) -{ - int ret = 0; - std::lock_guard lock(ctx_mtx); - auto ctx = std::find_if(mDpCtx.begin(), mDpCtx.end(), - [session_id](auto it) { return it->get_id() == session_id; }); - if (ctx != mDpCtx.end()) { - INFO("%s, Stop session ID: %d", __func__, session_id); - - Session *session_ptr = *ctx; - delete session_ptr; - mDpCtx.erase(ctx); - } else { - ERROR("%s, Illegal session ID: %d", __func__, session_id); - ret = -1; - } - - return ret; -} diff --git a/media-proxy/src/session-base.cc b/media-proxy/src/session-base.cc deleted file mode 100644 index 24a2cc5bf..000000000 --- a/media-proxy/src/session-base.cc +++ /dev/null @@ -1,209 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include -#include - -#include "session-base.h" - -static int on_connect_callback_wrapper(memif_conn_handle_t conn, void *priv) -{ - if (!priv) { - return -1; - } - Session *connection = (Session *)priv; - return connection->on_connect_cb(conn); -} - -static int on_disconnect_callback_wrapper(memif_conn_handle_t conn, void *priv) -{ - if (!priv) { - return -1; - } - Session *connection = (Session *)priv; - return connection->on_disconnect_cb(conn); -} - -static int on_receive_callback_wrapper(memif_conn_handle_t conn, void *priv, uint16_t qid) -{ - if (!priv) { - return -1; - } - Session *connection = (Session *)priv; - return connection->on_receive_cb(conn, qid); -} - -void Session::memif_event_loop() -{ - int err; - memif_socket_handle_t memif_socket = memif_conn_args.socket; - - do { - // INFO("media-proxy waiting event."); - err = memif_poll_event(memif_socket, 1); - // INFO("media-proxy received event."); - } while (!memif_stop && (err == MEMIF_ERR_SUCCESS)); - - INFO("MEMIF DISCONNECTED."); -} - -int Session::shm_init(uint32_t buffer_size, uint32_t log2_ring_size) -{ - int ret = 0; - - /* unlink socket file */ - if (memif_conn_args.is_master && memif_socket_args.path[0] != '@') { - ret = mkdir("/run/mcm", 0666); - if (ret && errno != EEXIST) { - perror("Create directory for MemIF socket."); - return -1; - } - unlink(memif_socket_args.path); - } - - INFO("Create memif socket."); - ret = memif_create_socket(&memif_socket, &memif_socket_args, NULL); - if (ret != MEMIF_ERR_SUCCESS) { - INFO("memif_create_socket: %s", memif_strerror(ret)); - return -1; - } - - memif_conn_args.socket = memif_socket; - memif_conn_args.buffer_size = buffer_size; - memif_conn_args.log2_ring_size = log2_ring_size; - - INFO("create memif interface."); - ret = memif_create(&memif_conn, &memif_conn_args, on_connect_callback_wrapper, - on_disconnect_callback_wrapper, on_receive_callback_wrapper, this); - if (ret != MEMIF_ERR_SUCCESS) { - INFO("memif_create: %s", memif_strerror(ret)); - return -1; - } - - /* Start the MemIF event loop. */ - memif_event_thread = new std::thread(&Session::memif_event_loop, this); - if (!memif_event_thread) { - ERROR("%s(%d), thread create fail\n", __func__, ret); - return -1; - } - - return 0; -} - -int Session::shm_deinit() -{ - int err = 0; - - memif_stop = true; - if (memif_event_thread) { - memif_event_thread->join(); - delete memif_event_thread; - memif_event_thread = 0; - } - - /* free-up resources */ - memif_delete(&memif_conn); - memif_delete_socket(&memif_socket); - - /* unlink socket file */ - if (memif_conn_args.is_master && memif_socket_args.path[0] != '@') { - unlink(memif_socket_args.path); - } - - return 0; -} - -int Session::on_connect_cb(memif_conn_handle_t conn) -{ - INFO("RX RDMA memif connected!"); - - int err = memif_refill_queue(conn, 0, -1, 0); - if (err != MEMIF_ERR_SUCCESS) { - INFO("memif_refill_queue: %s", memif_strerror(err)); - return err; - } - - print_memif_details(conn); - - atomic_store_explicit(&shm_ready, true, std::memory_order_release); - - return 0; -} - -int Session::on_disconnect_cb(memif_conn_handle_t conn) -{ - if (!conn) { - INFO("Invalid Parameters."); - return -EINVAL; - } - - /* release session */ - if (!atomic_load_explicit(&shm_ready, std::memory_order_relaxed)) { - return 0; - } - atomic_store_explicit(&shm_ready, false, std::memory_order_relaxed); - - /* stop event polling thread */ - INFO("Stop poll event\n"); - memif_socket_handle_t socket = memif_get_socket_handle(conn); - if (!socket) { - INFO("Invalide socket handle."); - return -1; - } - - int err = memif_cancel_poll_event(socket); - if (err != MEMIF_ERR_SUCCESS) { - INFO("We are doomed..."); - } - - return 0; -} - -int Session::on_receive_cb(memif_conn_handle_t conn, uint16_t qid) -{ - memif_buffer_t shm_bufs; - uint16_t buf_num = 0; - - /* receive packets from the shared memory */ - int err = memif_rx_burst(conn, qid, &shm_bufs, 1, &buf_num); - if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { - INFO("memif_rx_burst: %s", memif_strerror(err)); - return err; - } - - /* Process on the received buffer. */ - /* Supposed this function will never be called. */ - - err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); - - return 0; -} - -Session::Session(memif_ops_t &memif_ops, mcm_payload_type payload, direction dir_type) - : id(0), memif_socket(0), memif_conn(0), memif_event_thread(0), type(dir_type), - payload_type(payload), memif_socket_args{0}, memif_conn_args{0}, memif_stop(false) -{ - id = memif_ops.m_session_count; - - /* Set application name */ - strlcpy(memif_socket_args.app_name, memif_ops.app_name, sizeof(memif_socket_args.app_name)); - - /* Create memif socket - * Interfaces are internally stored in a database referenced by memif socket. */ - strlcpy(memif_socket_args.path, memif_ops.socket_path, sizeof(memif_socket_args.path)); - - /* Create memif interfaces - * Both interaces are assigned the same socket and same id to create a loopback. */ - shm_ready = ATOMIC_VAR_INIT(false); - memif_conn_args.interface_id = memif_ops.interface_id; - memcpy((char *)memif_conn_args.interface_name, memif_ops.interface_name, - sizeof(memif_conn_args.interface_name)); - memif_conn_args.is_master = memif_ops.is_master; -} - -Session::~Session() { shm_deinit(); } diff --git a/media-proxy/src/session-mtl-st2110-20-rx.cc b/media-proxy/src/session-mtl-st2110-20-rx.cc deleted file mode 100644 index b59536a68..000000000 --- a/media-proxy/src/session-mtl-st2110-20-rx.cc +++ /dev/null @@ -1,276 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "session-mtl.h" - -#if defined(MTL_ZERO_COPY) -static int query_ext_frame_callback_wrapper(void *priv, struct st_ext_frame *ext_frame, - struct st20_rx_frame_meta *meta) -{ - if (!priv) { - return -1; - } - RxSt20MtlSession *s = (RxSt20MtlSession *)priv; - return s->query_ext_frame_cb(ext_frame, meta); -} - -int RxSt20MtlSession::query_ext_frame_cb(struct st_ext_frame *ext_frame, - struct st20_rx_frame_meta *meta) -{ - /* allocate memory */ - uint16_t qid = 0; - uint16_t rx_buf_num = 0; - - if (!atomic_load_explicit(&shm_ready, std::memory_order_relaxed)) { - ERROR("rx_st20p_query_ext_frame: MemIF connection not ready."); - return -1; - } - - memif_buffer_t shm_buf = {0}; - int err = memif_buffer_alloc(memif_conn, qid, &shm_buf, 1, &rx_buf_num, frame_size); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st20p_query_ext_frame: Failed to alloc memif buffer: %s", memif_strerror(err)); - return -1; - } - - mtl_iova_t ext_fb_iova = source_begin_iova + ((uint8_t *)shm_buf.data - source_begin); - - uint8_t planes = st_frame_fmt_planes(ops.output_fmt); - for (uint8_t plane = 0; plane < planes; plane++) { /* assume planes continuous */ - ext_frame->linesize[plane] = st_frame_least_linesize(ops.output_fmt, meta->width, plane); - if (plane == 0) { - ext_frame->addr[plane] = shm_buf.data; - ext_frame->iova[plane] = ext_fb_iova; - } else { - ext_frame->addr[plane] = (uint8_t *)ext_frame->addr[plane - 1] + - ext_frame->linesize[plane - 1] * meta->height; - ext_frame->iova[plane] = - ext_frame->iova[plane - 1] + ext_frame->linesize[plane - 1] * meta->height; - } - } - ext_frame->size = frame_size; - - fifo_mtx.lock(); - fifo.push(shm_buf); - fifo_mtx.unlock(); - - return 0; -} - -int RxSt20MtlSession::on_connect_cb(memif_conn_handle_t conn) -{ - memif_region_details_t region; - - int err = memif_get_buffs_region(conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return err; - } - - source_begin = (uint8_t *)region.addr; - source_begin_iova_map_sz = region.size; - source_begin_iova = mtl_dma_map(st, source_begin, region.size); - if (source_begin_iova == MTL_BAD_IOVA) { - ERROR("Fail to map DMA memory address."); - return -1; - } - - return Session::on_connect_cb(conn); -} - -int RxSt20MtlSession::on_disconnect_cb(memif_conn_handle_t conn) -{ - if (atomic_load_explicit(&shm_ready, std::memory_order_relaxed) && - mtl_dma_unmap(st, source_begin, source_begin_iova, source_begin_iova_map_sz) < 0) { - ERROR("Fail to unmap DMA memory address."); - } - - return Session::on_disconnect_cb(conn); -} -#endif - -void RxSt20MtlSession::frame_thread() -{ - INFO("%s, start\n", __func__); - while (!stop) { - struct st_frame *frame = st20p_rx_get_frame(handle); - if (!frame) { /* no frame */ - pthread_mutex_lock(&wake_mutex); - if (!stop) - pthread_cond_wait(&wake_cond, &wake_mutex); - pthread_mutex_unlock(&wake_mutex); - continue; - } - /* Verify frame data status. */ - if (frame->status == ST_FRAME_STATUS_CORRUPTED) { - ERROR("[DBG] Received corrupted frame.\n"); - } else { - consume_frame(frame); - } - st20p_rx_put_frame(handle, frame); - } -} - -void RxSt20MtlSession::copy_connection_params(const mcm_conn_param &request, std::string &dev_port) -{ - char session_name[NAME_MAX] = ""; - snprintf(session_name, NAME_MAX, "mcm_rx_st20_%d", get_id()); - - inet_pton(AF_INET, request.remote_addr.ip, ops.port.ip_addr[MTL_PORT_P]); - inet_pton(AF_INET, request.local_addr.ip, ops.port.mcast_sip_addr[MTL_PORT_P]); - - ops.port.udp_port[MTL_PORT_P] = atoi(request.local_addr.port); - strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); - ops.port.num_port = 1; - if (request.payload_type_nr == 0) { - ops.port.payload_type = ST_APP_PAYLOAD_TYPE_VIDEO; - } else { - ops.port.payload_type = request.payload_type_nr; - } - ops.name = strdup(session_name); - ops.width = request.width; - ops.height = request.height; - ops.fps = st_frame_rate_to_st_fps((double)request.fps); - ops.transport_fmt = ST20_FMT_YUV_422_PLANAR10LE; - ops.output_fmt = get_st_frame_fmt(request.pix_fmt); - ops.device = ST_PLUGIN_DEVICE_AUTO; - ops.framebuff_cnt = 4; - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops.port.port[MTL_PORT_P]); - printf("INFO: ip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %d ", ops.port.ip_addr[MTL_PORT_P][i]); - } - printf("\n"); - printf("INFO: mcast_sip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %d ", ops.port.mcast_sip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops.port.num_port); - INFO("udp_port : %d", ops.port.udp_port[MTL_PORT_P]); - INFO("payload_type : %d", ops.port.payload_type); - INFO("name : %s", ops.name); - INFO("width : %d", ops.width); - INFO("height : %d", ops.height); - INFO("fps : %d", ops.fps); - INFO("transport_fmt : %d", ops.transport_fmt); - INFO("output_fmt : %d", ops.output_fmt); - INFO("device : %d", ops.device); - INFO("framebuff_cnt : %d", ops.framebuff_cnt); -} - -RxSt20MtlSession::RxSt20MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, - std::string dev_port, memif_ops_t &memif_ops) - : MtlSession(memif_ops, request.payload_type, RX, dev_handle), handle(0), - frame_thread_handle(0), fb_recv(0), ops{0} -{ - copy_connection_params(request, dev_port); - - frame_size = st_frame_size(ops.output_fmt, ops.width, ops.height, false); - - ops.priv = this; // app handle register to lib - ops.notify_frame_available = frame_available_callback_wrapper; - -#if defined(MTL_ZERO_COPY) - ops.flags |= ST20P_RX_FLAG_EXT_FRAME; - ops.flags |= ST20P_RX_FLAG_RECEIVE_INCOMPLETE_FRAME; - ops.query_ext_frame = query_ext_frame_callback_wrapper; - source_begin_iova_map_sz = 0; - source_begin_iova = {0}; - source_begin = nullptr; -#endif -} - -int RxSt20MtlSession::init() -{ -#if defined(MTL_ZERO_COPY) - if (!st_frame_fmt_equal_transport(ops.output_fmt, ops.transport_fmt)) { - ERROR("output_fmt and transport_fmt differ"); - return -1; - } -#endif - - int ret = shm_init(frame_size, 2); - if (ret < 0) { - ERROR("Failed to initialize shared memory"); - return -1; - } - - handle = st20p_rx_create(st, &ops); - if (!handle) { - ERROR("Failed to create MTL RX ST20 session"); - return -1; - } - - /* Start MTL session thread. */ - frame_thread_handle = new std::thread(&RxSt20MtlSession::frame_thread, this); - if (!frame_thread_handle) { - ERROR("Failed to create thread"); - return -1; - } - return 0; -} - -RxSt20MtlSession::~RxSt20MtlSession() -{ - INFO("%s, fb_recv %d\n", __func__, fb_recv); - stop = true; - if (frame_thread_handle) { - frame_thread_handle->join(); - delete frame_thread_handle; - frame_thread_handle = 0; - } - - if (handle) { - st20p_rx_free(handle); - handle = 0; - } -} - -void RxSt20MtlSession::consume_frame(struct st_frame *frame) -{ - int err = 0; - uint16_t qid = 0; - memif_buffer_t rx_buf = {0}; - uint16_t rx_buf_num = 0, rx = 0; - - if (!atomic_load_explicit(&shm_ready, std::memory_order_relaxed)) { - INFO("%s memif not ready\n", __func__); - return; - } - -#if defined(MTL_ZERO_COPY) - fifo_mtx.lock(); - if (fifo.empty()) { - fifo_mtx.unlock(); - ERROR("%s FIFO empty \n", __func__); - return; - } - - rx_buf = fifo.front(); - fifo.pop(); - fifo_mtx.unlock(); - rx_buf_num = 1; -#else - err = memif_buffer_alloc_timeout(memif_conn, qid, &rx_buf, 1, &rx_buf_num, frame_size, 10); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st20p consume_frame: Failed to alloc memif buffer: %s", memif_strerror(err)); - return; - } - - mtl_memcpy(rx_buf.data, frame->addr[0], frame_size); -#endif - - /* Send to microservice application. */ - err = memif_tx_burst(memif_conn, qid, &rx_buf, rx_buf_num, &rx); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st20p consume_frame memif_tx_burst: %s", memif_strerror(err)); - } - - fb_recv++; -} diff --git a/media-proxy/src/session-mtl-st2110-20-tx.cc b/media-proxy/src/session-mtl-st2110-20-tx.cc deleted file mode 100644 index d6ee3290c..000000000 --- a/media-proxy/src/session-mtl-st2110-20-tx.cc +++ /dev/null @@ -1,226 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "session-mtl.h" - -#if defined(MTL_ZERO_COPY) -static int tx_frame_done_callback_wrapper(void *priv, struct st_frame *frame) -{ - if (!priv) { - return -1; - } - TxSt20MtlSession *s = (TxSt20MtlSession *)priv; - return s->frame_done_cb(frame); -} - -int TxSt20MtlSession::frame_done_cb(struct st_frame *frame) -{ - uint16_t qid = 0; - uint16_t buf_num = 1; - - if (!frame) { - ERROR("%s, frame ptr is NULL\n", __func__); - return -1; - } - - memif_conn_handle_t conn = (memif_conn_handle_t)frame->opaque; - if (!conn) { - return -1; - } - - /* return back frame buffer to memif. */ - int err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); - - return err; -} - -int TxSt20MtlSession::on_connect_cb(memif_conn_handle_t conn) -{ - memif_region_details_t region; - - int err = memif_get_buffs_region(conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return err; - } - - source_begin = (uint8_t *)region.addr; - source_begin_iova_map_sz = region.size; - source_begin_iova = mtl_dma_map(st, source_begin, region.size); - if (source_begin_iova == MTL_BAD_IOVA) { - ERROR("Fail to map DMA memory address."); - return -1; - } - - return Session::on_connect_cb(conn); -} - -int TxSt20MtlSession::on_disconnect_cb(memif_conn_handle_t conn) -{ - if (atomic_load_explicit(&shm_ready, std::memory_order_relaxed) && - mtl_dma_unmap(st, source_begin, source_begin_iova, source_begin_iova_map_sz) < 0) { - ERROR("Fail to unmap DMA memory address."); - } - - return Session::on_disconnect_cb(conn); -} -#endif - -void TxSt20MtlSession::copy_connection_params(const mcm_conn_param &request, std::string &dev_port) -{ - char session_name[NAME_MAX] = ""; - - snprintf(session_name, NAME_MAX, "mcm_tx_st20_%d", get_id()); - - inet_pton(AF_INET, request.remote_addr.ip, ops.port.dip_addr[MTL_PORT_P]); - ops.port.udp_port[MTL_PORT_P] = atoi(request.remote_addr.port); - strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); - ops.port.udp_src_port[MTL_PORT_P] = atoi(request.local_addr.port); - ops.port.num_port = 1; - if (request.payload_type_nr == 0) { - ops.port.payload_type = ST_APP_PAYLOAD_TYPE_VIDEO; - } else { - ops.port.payload_type = request.payload_type_nr; - } - ops.name = strdup(session_name); - ops.width = request.width; - ops.height = request.height; - ops.fps = st_frame_rate_to_st_fps((double)request.fps); - ops.input_fmt = get_st_frame_fmt(request.pix_fmt); - ops.transport_fmt = ST20_FMT_YUV_422_PLANAR10LE; - ops.device = ST_PLUGIN_DEVICE_AUTO; - ops.framebuff_cnt = 4; - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops.port.port[MTL_PORT_P]); - printf("dip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %u", ops.port.dip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops.port.num_port); - INFO("udp_port : %d", ops.port.udp_port[MTL_PORT_P]); - INFO("udp_src_port : %d", ops.port.udp_src_port[MTL_PORT_P]); - INFO("payload_type : %d", ops.port.payload_type); - INFO("name : %s", ops.name); - INFO("width : %d", ops.width); - INFO("height : %d", ops.height); - INFO("fps : %d", ops.fps); - INFO("transport_fmt : %d", ops.transport_fmt); - INFO("input_fmt : %d", ops.input_fmt); - INFO("device : %d", ops.device); - INFO("framebuff_cnt : %d", ops.framebuff_cnt); -} - -TxSt20MtlSession::TxSt20MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, - std::string dev_port, memif_ops_t &memif_ops) - : MtlSession(memif_ops, request.payload_type, TX, dev_handle), handle(0), fb_send(0), ops{0} -{ - copy_connection_params(request, dev_port); - - frame_size = st_frame_size(ops.input_fmt, ops.width, ops.height, false); - - ops.priv = this; // app handle register to lib - ops.notify_frame_available = frame_available_callback_wrapper; - -#if defined(MTL_ZERO_COPY) - ops.notify_frame_done = tx_frame_done_callback_wrapper; - ops.flags |= ST20P_TX_FLAG_EXT_FRAME; - source_begin_iova_map_sz = 0; - source_begin_iova = {0}; - source_begin = nullptr; -#endif -} - -int TxSt20MtlSession::init() -{ - handle = st20p_tx_create(st, &ops); - if (!handle) { - ERROR("Failed to create MTL TX ST20 session"); - return -1; - } - - int ret = shm_init(frame_size, 2); - if (ret < 0) { - ERROR("Failed to initialize shared memory"); - return -1; - } - return 0; -} - -TxSt20MtlSession::~TxSt20MtlSession() -{ - INFO("%s, fb_send %d\n", __func__, fb_send); - stop = true; - if (handle) { - st20p_tx_free(handle); - handle = 0; - } -} - -int TxSt20MtlSession::on_receive_cb(memif_conn_handle_t conn, uint16_t qid) -{ - memif_buffer_t shm_bufs = {0}; - uint16_t buf_num = 0; - - if (stop) { - INFO("TX session already stopped."); - return -1; - } - - /* receive packets from the shared memory */ - int err = memif_rx_burst(conn, qid, &shm_bufs, 1, &buf_num); - if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { - INFO("memif_rx_burst: %s", memif_strerror(err)); - return err; - } - - struct st_frame *frame = NULL; - do { - frame = st20p_tx_get_frame(handle); - if (!frame) { - pthread_mutex_lock(&wake_mutex); - if (!stop) - pthread_cond_wait(&wake_cond, &wake_mutex); - pthread_mutex_unlock(&wake_mutex); - } - } while (!frame); - - /* Send out frame. */ -#if defined(MTL_ZERO_COPY) - struct st_ext_frame ext_frame = {0}; - ext_frame.addr[0] = shm_bufs.data; - ext_frame.iova[0] = source_begin_iova + ((uint8_t *)shm_bufs.data - source_begin); - ext_frame.linesize[0] = st_frame_least_linesize(frame->fmt, frame->width, 0); - uint8_t planes = st_frame_fmt_planes(frame->fmt); - for (uint8_t plane = 1; plane < planes; plane++) { /* assume planes continous */ - ext_frame.linesize[plane] = st_frame_least_linesize(frame->fmt, frame->width, plane); - ext_frame.addr[plane] = - (uint8_t *)ext_frame.addr[plane - 1] + ext_frame.linesize[plane - 1] * frame->height; - ext_frame.iova[plane] = - ext_frame.iova[plane - 1] + ext_frame.linesize[plane - 1] * frame->height; - } - ext_frame.size = shm_bufs.len; - ext_frame.opaque = conn; - - st20p_tx_put_ext_frame(handle, frame, &ext_frame); -#else - /* fill frame data. */ - mtl_memcpy(frame->addr[0], shm_bufs.data, shm_bufs.len); - /* Send out frame. */ - st20p_tx_put_frame(handle, frame); - - err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); -#endif - - fb_send++; - - return 0; -} diff --git a/media-proxy/src/session-mtl-st2110-22 rx.cc b/media-proxy/src/session-mtl-st2110-22 rx.cc deleted file mode 100644 index c9a2d65c1..000000000 --- a/media-proxy/src/session-mtl-st2110-22 rx.cc +++ /dev/null @@ -1,277 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "session-mtl.h" - -#if defined(MTL_ZERO_COPY) -static int query_ext_frame_callback_wrapper(void *priv, struct st_ext_frame *ext_frame, - struct st22_rx_frame_meta *meta) -{ - if (!priv) { - return -1; - } - RxSt22MtlSession *s = (RxSt22MtlSession *)priv; - return s->query_ext_frame_cb(ext_frame, meta); -} - -int RxSt22MtlSession::query_ext_frame_cb(struct st_ext_frame *ext_frame, - struct st22_rx_frame_meta *meta) -{ - /* allocate memory */ - uint16_t qid = 0; - uint16_t rx_buf_num = 0; - - if (!atomic_load_explicit(&shm_ready, std::memory_order_relaxed)) { - ERROR("rx_st22p_query_ext_frame: MemIF connection not ready."); - return -1; - } - - memif_buffer_t shm_buf = {0}; - int err = memif_buffer_alloc(memif_conn, qid, &shm_buf, 1, &rx_buf_num, frame_size); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st22p_query_ext_frame: Failed to alloc memif buffer: %s", memif_strerror(err)); - return -1; - } - - uint8_t planes = st_frame_fmt_planes(ops.output_fmt); - mtl_iova_t ext_fb_iova = source_begin_iova + ((uint8_t *)shm_buf.data - source_begin); - - for (uint8_t plane = 0; plane < planes; plane++) { /* assume planes continuous */ - ext_frame->linesize[plane] = st_frame_least_linesize(ops.output_fmt, ops.width, plane); - if (plane == 0) { - ext_frame->addr[plane] = shm_buf.data; - ext_frame->iova[plane] = ext_fb_iova; - } else { - ext_frame->addr[plane] = - (uint8_t *)ext_frame->addr[plane - 1] + ext_frame->linesize[plane - 1] * ops.height; - ext_frame->iova[plane] = - ext_frame->iova[plane - 1] + ext_frame->linesize[plane - 1] * ops.height; - } - } - ext_frame->size = frame_size; - - fifo_mtx.lock(); - fifo.push(shm_buf); - fifo_mtx.unlock(); - - return 0; -} - -int RxSt22MtlSession::on_connect_cb(memif_conn_handle_t conn) -{ - memif_region_details_t region; - - int err = memif_get_buffs_region(conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return err; - } - - source_begin = (uint8_t *)region.addr; - source_begin_iova_map_sz = region.size; - source_begin_iova = mtl_dma_map(st, source_begin, region.size); - if (source_begin_iova == MTL_BAD_IOVA) { - ERROR("Fail to map DMA memory address."); - return -1; - } - - return Session::on_connect_cb(conn); -} - -int RxSt22MtlSession::on_disconnect_cb(memif_conn_handle_t conn) -{ - if (atomic_load_explicit(&shm_ready, std::memory_order_relaxed) && - mtl_dma_unmap(st, source_begin, source_begin_iova, source_begin_iova_map_sz) < 0) { - ERROR("Fail to unmap DMA memory address."); - } - - return Session::on_disconnect_cb(conn); -} -#endif - -void RxSt22MtlSession::frame_thread() -{ - INFO("%s, start\n", __func__); - while (!stop) { - struct st_frame *frame = st22p_rx_get_frame(handle); - if (!frame) { /* no frame */ - pthread_mutex_lock(&wake_mutex); - if (!stop) - pthread_cond_wait(&wake_cond, &wake_mutex); - pthread_mutex_unlock(&wake_mutex); - continue; - } - /* Verify frame data status. */ - if (frame->status == ST_FRAME_STATUS_CORRUPTED) { - ERROR("[DBG] Received corrupted frame.\n"); - } else { - consume_frame(frame); - } - st22p_rx_put_frame(handle, frame); - } -} - -void RxSt22MtlSession::copy_connection_params(const mcm_conn_param &request, std::string &dev_port) -{ - char session_name[NAME_MAX] = ""; - snprintf(session_name, NAME_MAX, "mcm_rx_st22_%d", get_id()); - - inet_pton(AF_INET, request.remote_addr.ip, ops.port.ip_addr[MTL_PORT_P]); - inet_pton(AF_INET, request.local_addr.ip, ops.port.mcast_sip_addr[MTL_PORT_P]); - ops.port.udp_port[MTL_PORT_P] = atoi(request.local_addr.port); - - // ops->port.udp_port[MTL_PORT_P] = RX_ST20_UDP_PORT; - strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); - ops.port.num_port = 1; - if (request.payload_type_nr == 0) { - ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST22; - } else { - ops.port.payload_type = request.payload_type_nr; - } - ops.name = strdup(session_name); - ops.width = request.width; - ops.height = request.height; - ops.fps = st_frame_rate_to_st_fps((double)request.fps); - ops.output_fmt = get_st_frame_fmt(request.pix_fmt); - ops.device = ST_PLUGIN_DEVICE_AUTO; - ops.framebuff_cnt = 4; - ops.pack_type = ST22_PACK_CODESTREAM; - ops.codec = ST22_CODEC_JPEGXS; - ops.codec_thread_cnt = 0; - ops.max_codestream_size = 0; - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops.port.port[MTL_PORT_P]); - printf("INFO: ip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %d ", ops.port.ip_addr[MTL_PORT_P][i]); - } - printf("\n"); - printf("INFO: mcast_sip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %d ", ops.port.mcast_sip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops.port.num_port); - INFO("udp_port : %d", ops.port.udp_port[MTL_PORT_P]); - INFO("payload_type : %d", ops.port.payload_type); - INFO("name : %s", ops.name); - INFO("width : %d", ops.width); - INFO("height : %d", ops.height); - INFO("fps : %d", ops.fps); - INFO("output_fmt : %d", ops.output_fmt); - INFO("device : %d", ops.device); - INFO("framebuff_cnt : %d", ops.framebuff_cnt); -} - -RxSt22MtlSession::RxSt22MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, - std::string dev_port, memif_ops_t &memif_ops) - : MtlSession(memif_ops, request.payload_type, RX, dev_handle), handle(0), - frame_thread_handle(0), fb_recv(0), ops{0} -{ - copy_connection_params(request, dev_port); - - frame_size = st_frame_size(ops.output_fmt, ops.width, ops.height, false); - - ops.priv = this; // app handle register to lib - ops.notify_frame_available = frame_available_callback_wrapper; - -#if defined(MTL_ZERO_COPY) - ops.flags |= ST22P_RX_FLAG_EXT_FRAME; - ops.flags |= ST22P_RX_FLAG_RECEIVE_INCOMPLETE_FRAME; - ops.query_ext_frame = query_ext_frame_callback_wrapper; - source_begin_iova_map_sz = 0; - source_begin_iova = {0}; - source_begin = nullptr; -#endif -} - -int RxSt22MtlSession::init() -{ - int ret = shm_init(frame_size, 2); - if (ret < 0) { - ERROR("Failed to initialize shared memory"); - return -1; - } - - handle = st22p_rx_create(st, &ops); - if (!handle) { - ERROR("Failed to create MTL RX ST22 session"); - return -1; - } - - if (frame_size != st22p_rx_frame_size(handle)) { - ERROR("Frame size differ!!!"); - return -1; - } - - /* Start MTL session thread. */ - frame_thread_handle = new std::thread(&RxSt22MtlSession::frame_thread, this); - if (!frame_thread_handle) { - ERROR("Failed to create thread"); - return -1; - } - return 0; -} - -RxSt22MtlSession::~RxSt22MtlSession() -{ - INFO("%s, fb_recv %d\n", __func__, fb_recv); - stop = true; - if (frame_thread_handle) { - frame_thread_handle->join(); - delete frame_thread_handle; - frame_thread_handle = 0; - } - - if (handle) { - st22p_rx_free(handle); - handle = 0; - } -} - -void RxSt22MtlSession::consume_frame(struct st_frame *frame) -{ - int err = 0; - uint16_t qid = 0; - memif_buffer_t rx_buf = {0}; - uint16_t rx_buf_num = 0, rx = 0; - - if (!atomic_load_explicit(&shm_ready, std::memory_order_relaxed)) { - INFO("%s memif not ready\n", __func__); - return; - } - -#if defined(MTL_ZERO_COPY) - fifo_mtx.lock(); - if (fifo.empty()) { - fifo_mtx.unlock(); - ERROR("%s FIFO empty \n", __func__); - return; - } - - rx_buf = fifo.front(); - fifo.pop(); - fifo_mtx.unlock(); - rx_buf_num = 1; -#else - err = memif_buffer_alloc_timeout(memif_conn, qid, &rx_buf, 1, &rx_buf_num, frame_size, 10); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st22p consume_frame Failed to alloc memif buffer: %s", memif_strerror(err)); - return; - } - - mtl_memcpy(rx_buf.data, frame->addr[0], frame_size); -#endif - - /* Send to microservice application. */ - err = memif_tx_burst(memif_conn, qid, &rx_buf, rx_buf_num, &rx); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st22p consume_frame memif_tx_burst: %s", memif_strerror(err)); - } - - fb_recv++; -} diff --git a/media-proxy/src/session-mtl-st2110-22-tx.cc b/media-proxy/src/session-mtl-st2110-22-tx.cc deleted file mode 100644 index c5d3b65b6..000000000 --- a/media-proxy/src/session-mtl-st2110-22-tx.cc +++ /dev/null @@ -1,230 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "session-mtl.h" - -#if defined(MTL_ZERO_COPY) -static int tx_frame_done_callback_wrapper(void *priv, struct st_frame *frame) -{ - if (!priv) { - return -1; - } - TxSt22MtlSession *s = (TxSt22MtlSession *)priv; - return s->frame_done_cb(frame); -} - -int TxSt22MtlSession::frame_done_cb(struct st_frame *frame) -{ - uint16_t qid = 0; - uint16_t buf_num = 1; - - if (!frame) { - ERROR("%s, frame ptr is NULL\n", __func__); - return -1; - } - - memif_conn_handle_t conn = (memif_conn_handle_t)frame->opaque; - if (!conn) { - return -1; - } - - /* return back frame buffer to memif. */ - int err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); - - return err; -} - -int TxSt22MtlSession::on_connect_cb(memif_conn_handle_t conn) -{ - memif_region_details_t region; - - int err = memif_get_buffs_region(conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return err; - } - - source_begin = (uint8_t *)region.addr; - source_begin_iova_map_sz = region.size; - source_begin_iova = mtl_dma_map(st, source_begin, region.size); - if (source_begin_iova == MTL_BAD_IOVA) { - ERROR("Fail to map DMA memory address."); - return -1; - } - - return Session::on_connect_cb(conn); -} - -int TxSt22MtlSession::on_disconnect_cb(memif_conn_handle_t conn) -{ - if (atomic_load_explicit(&shm_ready, std::memory_order_relaxed) && - mtl_dma_unmap(st, source_begin, source_begin_iova, source_begin_iova_map_sz) < 0) { - ERROR("Fail to unmap DMA memory address."); - } - - return Session::on_disconnect_cb(conn); -} -#endif - -void TxSt22MtlSession::copy_connection_params(const mcm_conn_param &request, std::string &dev_port) -{ - char session_name[NAME_MAX] = ""; - - snprintf(session_name, NAME_MAX, "mcm_tx_st22_%d", get_id()); - - inet_pton(AF_INET, request.remote_addr.ip, ops.port.dip_addr[MTL_PORT_P]); - ops.port.udp_port[MTL_PORT_P] = atoi(request.remote_addr.port); - strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); - ops.port.udp_src_port[MTL_PORT_P] = atoi(request.local_addr.port); - ops.port.num_port = 1; - if (request.payload_type_nr == 0) { - ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST22; - } else { - ops.port.payload_type = request.payload_type_nr; - } - ops.name = strdup(session_name); - ops.width = request.width; - ops.height = request.height; - ops.fps = st_frame_rate_to_st_fps((double)request.fps); - ops.input_fmt = get_st_frame_fmt(request.pix_fmt); - ops.device = ST_PLUGIN_DEVICE_AUTO; - ops.framebuff_cnt = 4; - ops.pack_type = ST22_PACK_CODESTREAM; - ops.codec = ST22_CODEC_JPEGXS; - ops.quality = ST22_QUALITY_MODE_SPEED; - ops.codec_thread_cnt = 0; - ops.codestream_size = ops.width * ops.height * 3 / 8; - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops.port.port[MTL_PORT_P]); - printf("dip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %u", ops.port.dip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops.port.num_port); - INFO("udp_port : %d", ops.port.udp_port[MTL_PORT_P]); - INFO("udp_src_port : %d", ops.port.udp_src_port[MTL_PORT_P]); - INFO("payload_type : %d", ops.port.payload_type); - INFO("name : %s", ops.name); - INFO("width : %d", ops.width); - INFO("height : %d", ops.height); - INFO("fps : %d", ops.fps); - // INFO("transport_fmt : %d", ops.transport_fmt); - INFO("input_fmt : %d", ops.input_fmt); - INFO("device : %d", ops.device); - INFO("framebuff_cnt : %d", ops.framebuff_cnt); -} - -TxSt22MtlSession::TxSt22MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, - std::string dev_port, memif_ops_t &memif_ops) - : MtlSession(memif_ops, request.payload_type, TX, dev_handle), handle(0), fb_send(0), ops{0} -{ - copy_connection_params(request, dev_port); - - frame_size = st_frame_size(ops.input_fmt, ops.width, ops.height, false); - - ops.priv = this; // app handle register to lib - ops.notify_frame_available = frame_available_callback_wrapper; - -#if defined(MTL_ZERO_COPY) - ops.notify_frame_done = tx_frame_done_callback_wrapper; - ops.flags |= ST22P_TX_FLAG_EXT_FRAME; - source_begin_iova_map_sz = 0; - source_begin_iova = {0}; - source_begin = nullptr; -#endif -} - -int TxSt22MtlSession::init() -{ - handle = st22p_tx_create(st, &ops); - if (!handle) { - ERROR("Failed to create MTL TX ST22 session"); - return -1; - } - - int ret = shm_init(frame_size, 2); - if (ret < 0) { - ERROR("Failed to initialize shared memory"); - return -1; - } - return 0; -} - -TxSt22MtlSession::~TxSt22MtlSession() -{ - INFO("%s, fb_send %d\n", __func__, fb_send); - stop = true; - if (handle) { - st22p_tx_free(handle); - handle = 0; - } -} - -int TxSt22MtlSession::on_receive_cb(memif_conn_handle_t conn, uint16_t qid) -{ - memif_buffer_t shm_bufs = {0}; - uint16_t buf_num = 0; - - if (stop) { - INFO("TX session already stopped."); - return -1; - } - - /* receive packets from the shared memory */ - int err = memif_rx_burst(conn, qid, &shm_bufs, 1, &buf_num); - if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { - INFO("memif_rx_burst: %s", memif_strerror(err)); - return err; - } - - struct st_frame *frame = NULL; - do { - frame = st22p_tx_get_frame(handle); - if (!frame) { - pthread_mutex_lock(&wake_mutex); - if (!stop) - pthread_cond_wait(&wake_cond, &wake_mutex); - pthread_mutex_unlock(&wake_mutex); - } - } while (!frame); - - /* Send out frame. */ -#if defined(MTL_ZERO_COPY) - struct st_ext_frame ext_frame = {0}; - ext_frame.addr[0] = shm_bufs.data; - ext_frame.iova[0] = source_begin_iova + ((uint8_t *)shm_bufs.data - source_begin); - ext_frame.linesize[0] = st_frame_least_linesize(frame->fmt, frame->width, 0); - uint8_t planes = st_frame_fmt_planes(frame->fmt); - for (uint8_t plane = 1; plane < planes; plane++) { /* assume planes continous */ - ext_frame.linesize[plane] = st_frame_least_linesize(frame->fmt, frame->width, plane); - ext_frame.addr[plane] = - (uint8_t *)ext_frame.addr[plane - 1] + ext_frame.linesize[plane - 1] * frame->height; - ext_frame.iova[plane] = - ext_frame.iova[plane - 1] + ext_frame.linesize[plane - 1] * frame->height; - } - ext_frame.size = shm_bufs.len; - ext_frame.opaque = conn; - - st22p_tx_put_ext_frame(handle, frame, &ext_frame); -#else - /* fill frame data. */ - mtl_memcpy(frame->addr[0], shm_bufs.data, shm_bufs.len); - /* Send out frame. */ - st22p_tx_put_frame(handle, frame); - - err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); -#endif - - fb_send++; - - return 0; -} diff --git a/media-proxy/src/session-mtl-st2110-30-rx.cc b/media-proxy/src/session-mtl-st2110-30-rx.cc deleted file mode 100644 index 2201dd942..000000000 --- a/media-proxy/src/session-mtl-st2110-30-rx.cc +++ /dev/null @@ -1,138 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "session-mtl.h" - -void RxSt30MtlSession::frame_thread() -{ - INFO("%s, start\n", __func__); - while (!stop) { - struct st30_frame *frame = st30p_rx_get_frame(handle); - if (!frame) { /* no frame */ - pthread_mutex_lock(&wake_mutex); - if (!stop) - pthread_cond_wait(&wake_cond, &wake_mutex); - pthread_mutex_unlock(&wake_mutex); - continue; - } - consume_frame(frame); - st30p_rx_put_frame(handle, frame); - } -} - -void RxSt30MtlSession::copy_connection_params(const mcm_conn_param &request, std::string &dev_port) -{ - char session_name[NAME_MAX] = ""; - snprintf(session_name, NAME_MAX, "mcm_rx_st30_%d", get_id()); - - inet_pton(AF_INET, request.remote_addr.ip, ops.port.ip_addr[MTL_PORT_P]); - ops.port.udp_port[MTL_PORT_P] = atoi(request.local_addr.port); - - strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); - ops.port.num_port = 1; - ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST30; - ops.name = strdup(session_name); - ops.framebuff_cnt = 4; - - ops.fmt = (st30_fmt)request.payload_args.audio_args.format; - ops.channel = request.payload_args.audio_args.channel; - ops.sampling = (st30_sampling)request.payload_args.audio_args.sampling; - ops.ptime = (st30_ptime)request.payload_args.audio_args.ptime; - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops.port.port[MTL_PORT_P]); - printf("INFO: ip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %d ", ops.port.ip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops.port.num_port); - INFO("udp_port : %d", ops.port.udp_port[MTL_PORT_P]); - INFO("payload_type : %d", ops.port.payload_type); - INFO("name : %s", ops.name); - INFO("framebuff_cnt : %d", ops.framebuff_cnt); -} - -RxSt30MtlSession::RxSt30MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, - std::string dev_port, memif_ops_t &memif_ops) - : MtlSession(memif_ops, request.payload_type, RX, dev_handle), handle(0), fb_recv(0), ops{0}, - frame_thread_handle(0) -{ - copy_connection_params(request, dev_port); - - ops.priv = this; // app handle register to lib - ops.notify_frame_available = frame_available_callback_wrapper; - ops.framebuff_size = st30_get_packet_size(ops.fmt, ops.ptime, ops.sampling, ops.channel); -} - -int RxSt30MtlSession::init() -{ - int ret = shm_init(ops.framebuff_size, 4); - if (ret < 0) { - ERROR("Failed to initialize shared memory"); - return -1; - } - - handle = st30p_rx_create(st, &ops); - if (!handle) { - ERROR("Failed to initialize MTL RX ST30 session\n"); - return -1; - } - - /* Start MTL session thread. */ - frame_thread_handle = new std::thread(&RxSt30MtlSession::frame_thread, this); - if (!frame_thread_handle) { - ERROR("Failed to create thread"); - return -1; - } - return 0; -} - -RxSt30MtlSession::~RxSt30MtlSession() -{ - INFO("%s, fb_recv %d\n", __func__, fb_recv); - stop = true; - if (frame_thread_handle) { - frame_thread_handle->join(); - delete frame_thread_handle; - frame_thread_handle = 0; - } - - if (handle) { - st30p_rx_free(handle); - handle = 0; - } -} - -void RxSt30MtlSession::consume_frame(struct st30_frame *frame) -{ - int err = 0; - uint16_t qid = 0; - memif_buffer_t rx_buf; - uint16_t rx_buf_num = 0, rx = 0; - - if (!atomic_load_explicit(&shm_ready, std::memory_order_relaxed)) { - INFO("%s memif not ready\n", __func__); - return; - } - - err = memif_buffer_alloc_timeout(memif_conn, qid, &rx_buf, 1, &rx_buf_num, ops.framebuff_size, - 10); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st30p consume_frame: Failed to alloc memif buffer: %s", memif_strerror(err)); - return; - } - - mtl_memcpy(rx_buf.data, frame->addr, ops.framebuff_size); - - /* Send to microservice application. */ - err = memif_tx_burst(memif_conn, qid, &rx_buf, rx_buf_num, &rx); - if (err != MEMIF_ERR_SUCCESS) { - INFO("rx_st20p consume_frame memif_tx_burst: %s", memif_strerror(err)); - } - - fb_recv++; -} diff --git a/media-proxy/src/session-mtl-st2110-30-tx.cc b/media-proxy/src/session-mtl-st2110-30-tx.cc deleted file mode 100644 index 3912949a9..000000000 --- a/media-proxy/src/session-mtl-st2110-30-tx.cc +++ /dev/null @@ -1,117 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "session-mtl.h" - -void TxSt30MtlSession::copy_connection_params(const mcm_conn_param &request, std::string &dev_port) -{ - char session_name[NAME_MAX] = ""; - - snprintf(session_name, NAME_MAX, "mcm_tx_st30_%d", get_id()); - - inet_pton(AF_INET, request.remote_addr.ip, ops.port.dip_addr[MTL_PORT_P]); - ops.port.udp_port[MTL_PORT_P] = atoi(request.remote_addr.port); - strlcpy(ops.port.port[MTL_PORT_P], dev_port.c_str(), MTL_PORT_MAX_LEN); - ops.port.num_port = 1; - ops.port.payload_type = ST_APP_PAYLOAD_TYPE_ST30; - ops.name = strdup(session_name); - ops.framebuff_cnt = 4; - - ops.fmt = (st30_fmt)request.payload_args.audio_args.format; - ops.channel = request.payload_args.audio_args.channel; - ops.sampling = (st30_sampling)request.payload_args.audio_args.sampling; - ops.ptime = (st30_ptime)request.payload_args.audio_args.ptime; - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops.port.port[MTL_PORT_P]); - printf("dip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %u", ops.port.dip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops.port.num_port); - INFO("udp_port : %d", ops.port.udp_port[MTL_PORT_P]); - INFO("payload_type : %d", ops.port.payload_type); - INFO("name : %s", ops.name); - INFO("framebuff_cnt : %d", ops.framebuff_cnt); -} - -TxSt30MtlSession::TxSt30MtlSession(mtl_handle dev_handle, const mcm_conn_param &request, - std::string dev_port, memif_ops_t &memif_ops) - : MtlSession(memif_ops, request.payload_type, TX, dev_handle), handle(0), fb_send(0), ops{0} -{ - copy_connection_params(request, dev_port); - - ops.priv = this; // app handle register to lib - ops.notify_frame_available = frame_available_callback_wrapper; - ops.framebuff_size = st30_get_packet_size(ops.fmt, ops.ptime, ops.sampling, ops.channel); -} - -int TxSt30MtlSession::init() -{ - handle = st30p_tx_create(st, &ops); - if (!handle) { - ERROR("Failed to create MTL TX ST30 session."); - return -1; - } - - int ret = shm_init(ops.framebuff_size, 4); - if (ret < 0) { - ERROR("Failed to initialize shared memory"); - return -1; - } - return 0; -} - -TxSt30MtlSession::~TxSt30MtlSession() -{ - INFO("%s, fb_send %d\n", __func__, fb_send); - stop = true; - if (handle) { - st30p_tx_free(handle); - handle = 0; - } -} - -int TxSt30MtlSession::on_receive_cb(memif_conn_handle_t conn, uint16_t qid) -{ - memif_buffer_t shm_bufs = {0}; - uint16_t buf_num = 0; - - if (stop) { - INFO("TX session already stopped."); - return -1; - } - - /* receive packets from the shared memory */ - int err = memif_rx_burst(conn, qid, &shm_bufs, 1, &buf_num); - if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { - INFO("memif_rx_burst: %s", memif_strerror(err)); - return err; - } - - struct st30_frame *frame = NULL; - do { - frame = st30p_tx_get_frame(handle); - if (!frame) { - pthread_mutex_lock(&wake_mutex); - if (!stop) - pthread_cond_wait(&wake_cond, &wake_mutex); - pthread_mutex_unlock(&wake_mutex); - } - } while (!frame); - - /* fill frame data. */ - mtl_memcpy(frame->addr, shm_bufs.data, shm_bufs.len); - /* Send out frame. */ - st30p_tx_put_frame(handle, frame); - - err = memif_refill_queue(conn, qid, buf_num, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); - - return 0; -} diff --git a/media-proxy/src/session-mtl.cc b/media-proxy/src/session-mtl.cc deleted file mode 100644 index f9f3180e8..000000000 --- a/media-proxy/src/session-mtl.cc +++ /dev/null @@ -1,97 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include "session-mtl.h" - -int frame_available_callback_wrapper(void *priv) -{ - if (!priv) { - return -1; - } - MtlSession *connection = (MtlSession *)priv; - return connection->frame_available_cb(); -} - -int MtlSession::frame_available_cb() -{ - pthread_mutex_lock(&wake_mutex); - pthread_cond_signal(&wake_cond); - pthread_mutex_unlock(&wake_mutex); - return 0; -} - -MtlSession::MtlSession(memif_ops_t &memif_ops, mcm_payload_type payload, direction dir_type, - mtl_handle st) - : Session(memif_ops, payload, dir_type), st(st), stop(false) -{ - pthread_mutex_init(&wake_mutex, NULL); - pthread_cond_init(&wake_cond, NULL); -} - -MtlSession::~MtlSession() -{ - stop = true; - - pthread_mutex_lock(&wake_mutex); - pthread_cond_signal(&wake_cond); - pthread_mutex_unlock(&wake_mutex); - - pthread_mutex_destroy(&wake_mutex); - pthread_cond_destroy(&wake_cond); -} - -/* Initiliaze MTL library */ -mtl_handle inst_init(struct mtl_init_params *st_param) -{ - if (!st_param) { - return NULL; - } - - st_param->flags |= MTL_FLAG_RX_UDP_PORT_ONLY; - - // create device - mtl_handle dev_handle = mtl_init(st_param); - if (!dev_handle) { - ERROR("%s, st_init fail\n", __func__); - return NULL; - } - - // start MTL device - if (mtl_start(dev_handle) != 0) { - INFO("%s, Fail to start MTL device.", __func__); - return NULL; - } - - return dev_handle; -} - -/* Deinitialize MTL */ -void mtl_deinit(mtl_handle dev_handle) -{ - if (dev_handle) { - // stop tx - mtl_stop(dev_handle); - - mtl_uninit(dev_handle); - } -} - -st_frame_fmt get_st_frame_fmt(video_pixel_format mcm_frame_fmt) -{ - st_frame_fmt mtl_frame_fmt; - switch (mcm_frame_fmt) { - case PIX_FMT_V210: - mtl_frame_fmt = ST_FRAME_FMT_V210; - break; - case PIX_FMT_YUV422RFC4175BE10: - mtl_frame_fmt = ST_FRAME_FMT_YUV422RFC4175PG2BE10; - break; - case PIX_FMT_YUV422PLANAR10LE: - default: - mtl_frame_fmt = ST_FRAME_FMT_YUV422PLANAR10LE; - } - return mtl_frame_fmt; -} diff --git a/media-proxy/src/session-rdma-rx.cc b/media-proxy/src/session-rdma-rx.cc deleted file mode 100644 index 4b96a34fb..000000000 --- a/media-proxy/src/session-rdma-rx.cc +++ /dev/null @@ -1,174 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include -#include -#include - -#include "libfabric_dev.h" -#include "session-rdma.h" - -shm_buf_info_t *RxRdmaSession::get_free_shm_buf() -{ - for (uint32_t i = 0; i < shm_buf_num; i++) { - if (!shm_bufs[i].used) { - return &shm_bufs[i]; - } - } - return NULL; -} - -int RxRdmaSession::pass_empty_buf_to_libfabric() -{ - int err; - shm_buf_info_t *buf_info = NULL; - uint16_t rx_buf_num = 0; - - buf_info = get_free_shm_buf(); - if (!buf_info) - return -ENOMEM; - - err = memif_buffer_alloc(memif_conn, 0, &buf_info->shm_buf, 1, &rx_buf_num, transfer_size); - if (err != MEMIF_ERR_SUCCESS) - return -ENOMEM; - - buf_info->used = true; - - err = libfabric_ep_ops.ep_recv_buf(ep_ctx, buf_info->shm_buf.data, transfer_size, buf_info); - if (err) { - ERROR("%s ep_recv_buf failed with errno: %s", __func__, fi_strerror(-err)); - return err; - } - return 0; -} - -void RxRdmaSession::handle_received_buffers() -{ - shm_buf_info_t *buf_info; - int err; - uint16_t bursted_buf_num; - - err = libfabric_ep_ops.ep_cq_read(ep_ctx, (void **)&buf_info, 1); - if (err) { - if (err != -EAGAIN) - INFO("%s ep_rxcq_read: %s", __func__, strerror(-err)); - return; - } - fb_recv++; - - err = memif_tx_burst(memif_conn, 0, &buf_info->shm_buf, 1, &bursted_buf_num); - if (err != MEMIF_ERR_SUCCESS && bursted_buf_num != 1) { - INFO("%s memif_tx_burst: %s", __func__, memif_strerror(err)); - return; - } - buf_info->used = false; -} - -void RxRdmaSession::frame_thread() -{ - while (!atomic_load_explicit(&shm_ready, std::memory_order_acquire) && !stop) - usleep(1000); - - INFO("%s, RX RDMA thread started\n", __func__); - while (!stop) { - if (!atomic_load_explicit(&shm_ready, std::memory_order_acquire)) - continue; - while (!pass_empty_buf_to_libfabric()) { - } - handle_received_buffers(); - } -} - -RxRdmaSession::RxRdmaSession(libfabric_ctx *dev_handle, const mcm_conn_param &request, - memif_ops_t &memif_ops) - : Session(memif_ops, request.payload_type, RX), ep_cfg{0}, ep_ctx(0), stop(false), - frame_thread_handle(0), fb_recv(0), shm_bufs(0), shm_buf_num(0) -{ - transfer_size = request.payload_args.rdma_args.transfer_size; - - ep_cfg.rdma_ctx = dev_handle; - memcpy(&ep_cfg.remote_addr, &request.remote_addr, sizeof(request.remote_addr)); - memcpy(&ep_cfg.local_addr, &request.local_addr, sizeof(request.local_addr)); - ep_cfg.dir = direction::RX; -} - -int RxRdmaSession::init() -{ - int err = libfabric_ep_ops.ep_init(&ep_ctx, &ep_cfg); - if (err) { - ERROR("Failed to initialize libfabric's end point"); - return -1; - } - - shm_buf_num = 1 << 4; - shm_bufs = (shm_buf_info_t *)calloc(shm_buf_num, sizeof(shm_buf_info_t)); - if (!shm_bufs) { - ERROR("Failed to allocate memory"); - return -1; - } - - err = shm_init(transfer_size, 4); - if (err < 0) { - ERROR("Failed to initialize shared memory"); - return -1; - } - - frame_thread_handle = new std::thread(&RxRdmaSession::frame_thread, this); - if (!frame_thread_handle) { - ERROR("Failed to create thread"); - return -1; - } - return 0; -} - -RxRdmaSession::~RxRdmaSession() -{ - INFO("%s, fb_recv %d\n", __func__, fb_recv); - stop = true; - if (ep_ctx) { - if (libfabric_ep_ops.ep_destroy(&ep_ctx)) { - ERROR("Failed to destroy RDMA context"); - } - ep_ctx = 0; - } - - if (shm_bufs) { - free(shm_bufs); - shm_bufs = 0; - } - - if (frame_thread_handle) { - frame_thread_handle->join(); - delete frame_thread_handle; - frame_thread_handle = 0; - } -} - -int RxRdmaSession::on_connect_cb(memif_conn_handle_t conn) -{ - memif_region_details_t region; - - int err = memif_get_buffs_region(conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return err; - } - - err = libfabric_ep_ops.ep_reg_mr(ep_ctx, region.addr, region.size); - if (err) { - ERROR("%s, ep_reg_mr failed: %s\n", __func__, fi_strerror(-err)); - return err; - } - - return Session::on_connect_cb(conn); -} - -int RxRdmaSession::on_disconnect_cb(memif_conn_handle_t conn) -{ - /* TODO: unregister in libfabric memory regions allocated by memif */ - - return Session::on_disconnect_cb(conn); -} diff --git a/media-proxy/src/session-rdma-tx.cc b/media-proxy/src/session-rdma-tx.cc deleted file mode 100644 index 7c2d7df49..000000000 --- a/media-proxy/src/session-rdma-tx.cc +++ /dev/null @@ -1,146 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation - * - * SPDX-License-Identifier: BSD-3-Clause - */ - -#include -#include -#include - -#include "libfabric_dev.h" -#include "session-rdma.h" - -void TxRdmaSession::handle_sent_buffers() -{ - int err = libfabric_ep_ops.ep_cq_read(ep_ctx, NULL, 1); - if (err) { - if (err != -EAGAIN) - INFO("%s ep_txcq_read: %s", __func__, strerror(-err)); - return; - } - fb_send++; - - err = memif_refill_queue(memif_conn, 0, 1, 0); - if (err != MEMIF_ERR_SUCCESS) - INFO("memif_refill_queue: %s", memif_strerror(err)); -} - -void TxRdmaSession::frame_thread() -{ - while (!atomic_load_explicit(&shm_ready, std::memory_order_acquire) && !stop) - usleep(1000); - - INFO("%s, TX RDMA thread started\n", __func__); - while (!stop) { - if (!atomic_load_explicit(&shm_ready, std::memory_order_acquire)) - continue; - handle_sent_buffers(); - } -} - -TxRdmaSession::TxRdmaSession(libfabric_ctx *dev_handle, const mcm_conn_param &request, - memif_ops_t &memif_ops) - : Session(memif_ops, request.payload_type, TX), ep_cfg{0}, ep_ctx(0), stop(false), - frame_thread_handle(0), fb_send(0) -{ - transfer_size = request.payload_args.rdma_args.transfer_size; - - ep_cfg.rdma_ctx = dev_handle; - memcpy(&ep_cfg.remote_addr, &request.remote_addr, sizeof(request.remote_addr)); - memcpy(&ep_cfg.local_addr, &request.local_addr, sizeof(request.local_addr)); - ep_cfg.dir = direction::TX; -} - -int TxRdmaSession::init() -{ - int err = libfabric_ep_ops.ep_init(&ep_ctx, &ep_cfg); - if (err) { - ERROR("Failed to initialize libfabric's end point"); - return -1; - } - - err = shm_init(transfer_size, 4); - if (err < 0) { - ERROR("Failed to to initialize shared memory"); - return -1; - } - - frame_thread_handle = new std::thread(&TxRdmaSession::frame_thread, this); - if (!frame_thread_handle) { - ERROR("Failed to create thread"); - return -1; - } - return 0; -} - -TxRdmaSession::~TxRdmaSession() -{ - INFO("%s, fb_send %d\n", __func__, fb_send); - stop = true; - if (ep_ctx) { - if (libfabric_ep_ops.ep_destroy(&ep_ctx)) { - ERROR("Failed to destroy RDMA context"); - } - ep_ctx = 0; - } - - if (frame_thread_handle) { - frame_thread_handle->join(); - delete frame_thread_handle; - frame_thread_handle = 0; - } -} - -int TxRdmaSession::on_receive_cb(memif_conn_handle_t conn, uint16_t qid) -{ - memif_buffer_t shm_bufs = {0}; - uint16_t buf_num = 0; - int err = 0; - - if (stop) { - INFO("TX session already stopped."); - return -EINVAL; - } - - /* receive packets from the shared memory */ - err = memif_rx_burst(conn, qid, &shm_bufs, 1, &buf_num); - if (err != MEMIF_ERR_SUCCESS && err != MEMIF_ERR_NOBUF) { - INFO("memif_rx_burst: %s", memif_strerror(err)); - return err; - } - - err = libfabric_ep_ops.ep_send_buf(ep_ctx, shm_bufs.data, shm_bufs.len); - if (err) { - ERROR("ep_send_buf failed with: %s", fi_strerror(-err)); - return err; - } - - return 0; -} - -int TxRdmaSession::on_connect_cb(memif_conn_handle_t conn) -{ - memif_region_details_t region; - - int err = memif_get_buffs_region(conn, ®ion); - if (err) { - ERROR("%s, Getting memory buffers from memif failed. \n", __func__); - return err; - } - - err = libfabric_ep_ops.ep_reg_mr(ep_ctx, region.addr, region.size); - if (err) { - ERROR("%s, ep_reg_mr failed: %s\n", __func__, fi_strerror(-err)); - return err; - } - - return Session::on_connect_cb(conn); -} - -int TxRdmaSession::on_disconnect_cb(memif_conn_handle_t conn) -{ - /* TODO: unregister in libfabric memory regions allocated by memif */ - - return Session::on_disconnect_cb(conn); -} diff --git a/media-proxy/src/shm_memif_common.c b/media-proxy/src/shm_memif_common.c index 568ae8058..16429140e 100644 --- a/media-proxy/src/shm_memif_common.c +++ b/media-proxy/src/shm_memif_common.c @@ -110,38 +110,3 @@ int memif_buffer_alloc_timeout(memif_conn_handle_t conn, uint16_t qid, } return MEMIF_ERR_NOBUF_RING; } - -int memif_get_buffs_region(memif_conn_handle_t conn, memif_region_details_t *region) -{ - memif_details_t md = { 0 }; - ssize_t buflen = 2000; - char *buf = NULL; - int err = 0; - - if (!region || !conn) - return -EINVAL; - - buf = (char *)calloc(buflen, 1); - if (!buf) { - ERROR("Not Enough Memory."); - return -ENOMEM; - } - - err = memif_get_details(conn, &md, buf, buflen); - if (err != MEMIF_ERR_SUCCESS) { - ERROR("%s", memif_strerror(err)); - free(buf); - return -EINVAL; - } - /* Region number 1 holds data buffers */ - if (md.regions_num < 1) { - ERROR("Data buffers not found in memif regions"); - free(buf); - return -EINVAL; - } - - memcpy(region, &md.regions[1], sizeof(md.regions[1])); - free(buf); - - return 0; -} diff --git a/media-proxy/tests/proxy_context_tests.cc b/media-proxy/tests/proxy_context_tests.cc deleted file mode 100644 index 8e9089a2a..000000000 --- a/media-proxy/tests/proxy_context_tests.cc +++ /dev/null @@ -1,10 +0,0 @@ -#include -#include "proxy_context.h" - -// TODO: Create real tests -TEST(ProxyContextTests, ProxyContextConstructor) { - ProxyContext ctx; - EXPECT_EQ(ctx.getTCPListenPort(), 8002); - EXPECT_EQ(ctx.getRPCListenAddress(), "0.0.0.0:8001"); - EXPECT_EQ(ctx.getTCPListenAddress(), "0.0.0.0:8002"); -} diff --git a/tests/unit/media_proxy-session-tests.cc b/tests/unit/media_proxy-session-tests.cc deleted file mode 100644 index 47dd08d02..000000000 --- a/tests/unit/media_proxy-session-tests.cc +++ /dev/null @@ -1,74 +0,0 @@ -#include -#include - -#include "session-mtl.h" - -#define PCI_ADDR "0000:4b:01.0" -#define NET_ADDR "192.168.96.1" - -// template session *test_null() -// { -// session *s = nullptr; - -// int dummy = 1000; -// DEV_HANDLE *d = (DEV_HANDLE *)&dummy; - -// OPTS o{0}; -// memif_ops_t m{0}; - -// try { -// s = new CLASS_NAME(d, &o, nullptr); -// } catch (const exception &e) { -// } -// EXPECT_TRUE(s == nullptr); - -// try { -// s = new CLASS_NAME(nullptr, &o, &m); -// } catch (const exception &e) { -// } -// EXPECT_TRUE(s == nullptr); - -// try { -// s = new CLASS_NAME(d, nullptr, &m); -// } catch (const exception &e) { -// } -// EXPECT_TRUE(s == nullptr); - -// return s; -// } - -// TEST(media_proxy, session_rx_st20p_constructor_null) -// { -// session *s = test_null(); -// EXPECT_TRUE(s == nullptr); -// } - -// TEST(media_proxy, session_tx_st20p_constructor_null) -// { -// session *s = test_null(); -// EXPECT_TRUE(s == nullptr); -// } - -// TEST(media_proxy, session_rx_st22p_constructor_null) -// { -// session *s = test_null(); -// EXPECT_TRUE(s == nullptr); -// } - -// TEST(media_proxy, session_tx_st22p_constructor_null) -// { -// session *s = test_null(); -// EXPECT_TRUE(s == nullptr); -// } - -// TEST(media_proxy, session_rx_st30_constructor_null) -// { -// session *s = test_null(); -// EXPECT_TRUE(s == nullptr); -// } - -// TEST(media_proxy, session_tx_st30_constructor_null) -// { -// session *s = test_null(); -// EXPECT_TRUE(s == nullptr); -// }