diff --git a/tests/validation/mtl_engine/application_base.py b/tests/validation/mtl_engine/application_base.py new file mode 100644 index 000000000..81722f4ad --- /dev/null +++ b/tests/validation/mtl_engine/application_base.py @@ -0,0 +1,415 @@ +# Base Application Class for Media Transport Library +# Provides common interface for all media application frameworks + +import logging +import re +import time +from abc import ABC, abstractmethod + +from .config.app_mappings import DEFAULT_PAYLOAD_TYPE_CONFIG, DEFAULT_PORT_CONFIG +from .config.universal_params import UNIVERSAL_PARAMS +from .execute import run + +logger = logging.getLogger(__name__) + + +class Application(ABC): + """Abstract base class shared by all framework adapters (RxTxApp / FFmpeg / GStreamer). + + Unified model: + 1. create_command(...) MUST be called first. It populates: + - self.command: full shell command string ready to run + - self.config: optional dict (RxTxApp) written immediately if a config_file_path is supplied + 2. execute_test(...) ONLY executes already prepared command(s); it NEVER builds commands. + - Single-host: call execute_test with host=... + - Dual-host: create TWO application objects (tx_app, rx_app) each with its own create_command(); + then call tx_app.execute_test(tx_host=..., rx_host=..., rx_app=rx_app). + 3. validate_results() now has a uniform no-argument signature and consumes internal state + (self.universal_params, self.config, self.last_output, and any produced files). + """ + + def __init__(self, app_path, config_file_path=None): + self.app_path = app_path + self.config_file_path = config_file_path + self.universal_params = UNIVERSAL_PARAMS.copy() + self._user_provided_params = set() + self.command: str | None = None + self.config: dict | None = None + self.last_output: str | None = None + self.last_return_code: int | None = None + + @abstractmethod + def get_framework_name(self) -> str: + """Return the framework name (e.g., 'RxTxApp', 'FFmpeg', 'GStreamer').""" + pass + + @abstractmethod + def get_executable_name(self) -> str: + """Return the executable name for this framework.""" + pass + + @abstractmethod + def create_command(self, **kwargs): + """Populate self.command (+ self.config for frameworks that need it). + + Implementations MUST: + - call self.set_universal_params(**kwargs) + - set self.command (string) + - optionally set self.config + - write config file immediately if applicable + They MAY return (self.command, self.config) for backward compatibility with existing tests. + """ + raise NotImplementedError + + @abstractmethod + def validate_results(self) -> bool: # type: ignore[override] + """Framework-specific validation implemented by subclasses. + + Subclasses should read: self.universal_params, self.config, self.last_output, etc. + Must return True/False. + """ + raise NotImplementedError + + def set_universal_params(self, **kwargs): + """Set universal parameters and track which were provided by user.""" + self._user_provided_params = set(kwargs.keys()) + + for param, value in kwargs.items(): + if param in self.universal_params: + self.universal_params[param] = value + else: + raise ValueError(f"Unknown universal parameter: {param}") + + def get_executable_path(self) -> str: + """Get the full path to the executable based on framework type.""" + executable_name = self.get_executable_name() + + # For applications with specific paths, combine with directory + if self.app_path and not executable_name.startswith("/"): + if self.app_path.endswith("/"): + return f"{self.app_path}{executable_name}" + else: + return f"{self.app_path}/{executable_name}" + else: + # For system executables or full paths + return executable_name + + def was_user_provided(self, param_name: str) -> bool: + """Check if a parameter was explicitly provided by the user.""" + return param_name in self._user_provided_params + + def get_session_default_port(self, session_type: str) -> int: + """Get default port for a specific session type.""" + port_map = { + "st20p": DEFAULT_PORT_CONFIG["st20p_port"], + "st22p": DEFAULT_PORT_CONFIG["st22p_port"], + "st30p": DEFAULT_PORT_CONFIG["st30p_port"], + "video": DEFAULT_PORT_CONFIG["video_port"], + "audio": DEFAULT_PORT_CONFIG["audio_port"], + "ancillary": DEFAULT_PORT_CONFIG["ancillary_port"], + "fastmetadata": DEFAULT_PORT_CONFIG["fastmetadata_port"], + } + return port_map.get(session_type, DEFAULT_PORT_CONFIG["st20p_port"]) + + def get_session_default_payload_type(self, session_type: str) -> int: + """Get default payload type for a specific session type.""" + payload_map = { + "st20p": DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"], + "st22p": DEFAULT_PAYLOAD_TYPE_CONFIG["st22p_payload_type"], + "st30p": DEFAULT_PAYLOAD_TYPE_CONFIG["st30p_payload_type"], + "video": DEFAULT_PAYLOAD_TYPE_CONFIG["video_payload_type"], + "audio": DEFAULT_PAYLOAD_TYPE_CONFIG["audio_payload_type"], + "ancillary": DEFAULT_PAYLOAD_TYPE_CONFIG["ancillary_payload_type"], + "fastmetadata": DEFAULT_PAYLOAD_TYPE_CONFIG["fastmetadata_payload_type"], + } + return payload_map.get( + session_type, DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"] + ) + + def get_common_session_params(self, session_type: str) -> dict: + """Get common session parameters used across all session types.""" + default_port = self.get_session_default_port(session_type) + default_payload = self.get_session_default_payload_type(session_type) + + return { + "replicas": self.universal_params.get( + "replicas", UNIVERSAL_PARAMS["replicas"] + ), + "start_port": int( + self.universal_params.get("port") + if self.was_user_provided("port") + else default_port + ), + "payload_type": ( + self.universal_params.get("payload_type") + if self.was_user_provided("payload_type") + else default_payload + ), + } + + def get_common_video_params(self) -> dict: + """Get common video parameters used across video session types.""" + return { + "width": int(self.universal_params.get("width", UNIVERSAL_PARAMS["width"])), + "height": int( + self.universal_params.get("height", UNIVERSAL_PARAMS["height"]) + ), + "interlaced": self.universal_params.get( + "interlaced", UNIVERSAL_PARAMS["interlaced"] + ), + "device": self.universal_params.get("device", UNIVERSAL_PARAMS["device"]), + "enable_rtcp": self.universal_params.get( + "enable_rtcp", UNIVERSAL_PARAMS["enable_rtcp"] + ), + } + + def execute_test( + self, + build: str, + test_time: int = 30, + host=None, + tx_host=None, + rx_host=None, + rx_app=None, + sleep_interval: int = 4, + tx_first: bool = True, + capture_cfg=None, + ) -> bool: + """Execute a prepared command (or two for dual host). + + Usage patterns: + # Single host + app.create_command(...) + app.execute_test(build=..., host=my_host, test_time=10) + + # Dual host + tx_app.create_command(direction='tx', ...) + rx_app.create_command(direction='rx', ...) + tx_app.execute_test(build=..., tx_host=hostA, rx_host=hostB, rx_app=rx_app) + """ + is_dual = tx_host is not None and rx_host is not None + if is_dual and not rx_app: + raise ValueError("rx_app instance required for dual-host execution") + if not is_dual and not host: + raise ValueError("host required for single-host execution") + + if not self.command: + raise RuntimeError("create_command() must be called before execute_test()") + framework_name = self.get_framework_name() + + # Single-host execution + if not is_dual: + cmd = self.add_timeout(self.command, test_time) + logger.info(f"[single] Running {framework_name} command: {cmd}") + # Optional tcpdump capture hook retained for RxTxApp compatibility + if ( + capture_cfg + and capture_cfg.get("enable") + and "prepare_tcpdump" in globals() + ): + try: + # prepare_tcpdump not yet implemented; left to change in the future + # prepare_tcpdump(capture_cfg, host) + pass + except Exception as e: + logger.warning(f"capture setup failed: {e}") + proc = self.start_process(cmd, build, test_time, host) + try: + proc.wait( + timeout=(test_time or 0) + + self.universal_params.get("process_timeout_buffer", 90) + ) + except Exception: + logger.warning( + f"{framework_name} process wait timed out (continuing to capture output)" + ) + self.last_output = self.capture_stdout(proc, framework_name) + self.last_return_code = getattr(proc, "returncode", None) + return self.validate_results() + + # Dual-host execution (tx self, rx rx_app) + assert rx_app is not None + if not rx_app.command: + raise RuntimeError( + "rx_app has no prepared command (call create_command first)" + ) + tx_cmd = self.add_timeout(self.command, test_time) + rx_cmd = rx_app.add_timeout(rx_app.command, test_time) + primary_first = tx_first + first_cmd, first_host, first_label = ( + (tx_cmd, tx_host, f"{framework_name}-TX") + if primary_first + else (rx_cmd, rx_host, f"{rx_app.get_framework_name()}-RX") + ) + second_cmd, second_host, second_label = ( + (rx_cmd, rx_host, f"{rx_app.get_framework_name()}-RX") + if primary_first + else (tx_cmd, tx_host, f"{framework_name}-TX") + ) + logger.info(f"[dual] Starting first: {first_label} -> {first_cmd}") + first_proc = self.start_process(first_cmd, build, test_time, first_host) + time.sleep(sleep_interval) + logger.info(f"[dual] Starting second: {second_label} -> {second_cmd}") + second_proc = self.start_process(second_cmd, build, test_time, second_host) + # Wait processes + total_timeout = (test_time or 0) + self.universal_params.get( + "process_timeout_buffer", 90 + ) + for p, label in [(first_proc, first_label), (second_proc, second_label)]: + try: + p.wait(timeout=total_timeout) + except Exception: + logger.warning( + f"Process {label} wait timeout; capturing partial output" + ) + # Capture outputs + if primary_first: + self.last_output = self.capture_stdout(first_proc, first_label) + rx_app.last_output = rx_app.capture_stdout(second_proc, second_label) + else: + rx_app.last_output = rx_app.capture_stdout(first_proc, first_label) + self.last_output = self.capture_stdout(second_proc, second_label) + self.last_return_code = getattr(first_proc, "returncode", None) + rx_app.last_return_code = getattr(second_proc, "returncode", None) + tx_ok = self.validate_results() + rx_ok = rx_app.validate_results() + return tx_ok and rx_ok + + # ------------------------- + # Common helper utilities + # ------------------------- + def add_timeout(self, command: str, test_time: int, grace: int = None) -> str: + """Wrap command with timeout if test_time provided (adds a grace period).""" + if grace is None: + grace = self.universal_params.get("timeout_grace", 10) + # If the command already has an internal --test_time X argument, ensure the wrapper + # timeout is >= that internal value + grace to avoid premature SIGTERM (RC 124). + internal_test_time = None + m = re.search(r"--test_time\s+(\d+)", command) + if m: + try: + internal_test_time = int(m.group(1)) + except ValueError: + internal_test_time = None + effective_test_time = test_time or internal_test_time + if internal_test_time and test_time and internal_test_time != test_time: + logger.debug( + f"Mismatch between execute_test test_time={test_time} and command --test_time {internal_test_time}; " + f"using max" + ) + effective_test_time = max(internal_test_time, test_time) + elif internal_test_time and not test_time: + effective_test_time = internal_test_time + if effective_test_time and not command.strip().startswith("timeout "): + return f"timeout {effective_test_time + grace} {command}" + return command + + def start_and_capture( + self, command: str, build: str, test_time: int, host, process_name: str + ): + """Start a single process and capture its stdout safely.""" + process = self.start_process(command, build, test_time, host) + output = self.capture_stdout(process, process_name) + return process, output + + def start_dual_with_delay( + self, + tx_command: str, + rx_command: str, + build: str, + test_time: int, + tx_host, + rx_host, + tx_first: bool, + sleep_interval: int, + tx_name: str, + rx_name: str, + ): + """Start two processes with an optional delay ordering TX/RX based on tx_first flag.""" + if tx_first: + tx_process = self.start_process(tx_command, build, test_time, tx_host) + time.sleep(sleep_interval) + rx_process = self.start_process(rx_command, build, test_time, rx_host) + else: + rx_process = self.start_process(rx_command, build, test_time, rx_host) + time.sleep(sleep_interval) + tx_process = self.start_process(tx_command, build, test_time, tx_host) + tx_output = self.capture_stdout(tx_process, tx_name) + rx_output = self.capture_stdout(rx_process, rx_name) + return (tx_process, rx_process, tx_output, rx_output) + + def extract_framerate(self, framerate_str, default: int = None) -> int: + """Extract numeric framerate from various string or numeric forms (e.g. 'p25', '60').""" + if default is None: + default = self.universal_params.get("default_framerate_numeric", 60) + if isinstance(framerate_str, (int, float)): + try: + return int(framerate_str) + except Exception: + return default + if not isinstance(framerate_str, str) or not framerate_str: + return default + if framerate_str.startswith("p") and len(framerate_str) > 1: + num = framerate_str[1:] + else: + num = framerate_str + try: + return int(float(num)) + except ValueError: + logger.warning( + f"Could not parse framerate '{framerate_str}', defaulting to {default}" + ) + return default + + # Legacy execute_* abstract methods removed; unified execute_test used instead. + + def start_process(self, command: str, build: str, test_time: int, host): + """Start a process on the specified host.""" + logger.info(f"Starting {self.get_framework_name()} process...") + buffer_val = self.universal_params.get("process_timeout_buffer", 90) + timeout = (test_time or 0) + buffer_val + return run(command, host=host, cwd=build, timeout=timeout) + + def capture_stdout(self, process, process_name: str) -> str: + """Capture stdout from a process.""" + try: + # Remote process objects (from mfd_connect) expose stdout via 'stdout_text' + if hasattr(process, "stdout_text") and process.stdout_text: + output = process.stdout_text + logger.debug( + f"{process_name} output (captured stdout_text): {output[:200]}..." + ) + return output + # Local fallback (subprocess) may expose .stdout already consumed elsewhere + if hasattr(process, "stdout") and process.stdout: + try: + # Attempt to read if it's a file-like object + if hasattr(process.stdout, "read"): + output = process.stdout.read() + else: + output = str(process.stdout) + logger.debug( + f"{process_name} output (captured stdout): {output[:200]}..." + ) + return output + except Exception: + pass + logger.warning(f"No stdout available for {process_name}") + return "" + except Exception as e: + logger.error(f"Error capturing {process_name} output: {e}") + return "" + + def get_case_id(self) -> str: + """Generate a case ID for logging/debugging purposes.""" + try: + import inspect + + frame = inspect.currentframe() + while frame: + if "test_" in frame.f_code.co_name: + return frame.f_code.co_name + frame = frame.f_back + return "unknown_test" + except Exception: + return "unknown_test" diff --git a/tests/validation/mtl_engine/config/app_mappings.py b/tests/validation/mtl_engine/config/app_mappings.py new file mode 100644 index 000000000..25239962b --- /dev/null +++ b/tests/validation/mtl_engine/config/app_mappings.py @@ -0,0 +1,101 @@ +# Application name mappings and format conversion utilities + +# Map framework names to executable names +APP_NAME_MAP = {"rxtxapp": "RxTxApp", "ffmpeg": "ffmpeg", "gstreamer": "gst-launch-1.0"} + +# Format conversion mappings +FFMPEG_FORMAT_MAP = { + "YUV422PLANAR10LE": "yuv422p10le", + "YUV422PLANAR8": "yuv422p", + "YUV420PLANAR8": "yuv420p", + "YUV420PLANAR10LE": "yuv420p10le", + "RGB24": "rgb24", + "RGBA": "rgba", + "YUV422RFC4175PG2BE10": "yuv422p10le", # RFC4175 to planar 10-bit LE +} + +SESSION_TYPE_MAP = { + "ffmpeg": { + "st20p": "mtl_st20p", + "st22p": "mtl_st22p", + "st30p": "mtl_st30p", + "video": "rawvideo", + "audio": "pcm_s24le", + }, + "gstreamer": { + "st20p": "mtl_st20p", + "st22p": "mtl_st22p", + "st30p": "mtl_st30p", + "video": "mtl_video", + "audio": "mtl_audio", + }, +} + +FRAMERATE_TO_VIDEO_FORMAT_MAP = { + "p60": "i1080p60", + "p59": "i1080p59", + "p50": "i1080p50", + "p30": "i1080p30", + "p29": "i1080p29", + "p25": "i1080p25", + "p24": "i1080p24", + "p23": "i1080p23", +} + +# Default network configuration values +DEFAULT_NETWORK_CONFIG = { + "nic_port": "0000:31:01.0", + "unicast_tx_ip": "192.168.17.101", + "unicast_rx_ip": "192.168.17.102", + "multicast_tx_ip": "192.168.17.101", + "multicast_rx_ip": "192.168.17.102", + "multicast_destination_ip": "239.168.48.9", + "default_config_file": "config.json", +} + +# Default port configuration by session type +DEFAULT_PORT_CONFIG = { + "st20p_port": 20000, + "st22p_port": 20000, + "st30p_port": 30000, + "video_port": 20000, + "audio_port": 30000, + "ancillary_port": 40000, + "fastmetadata_port": 40000, +} + +# Default payload type configuration by session type +DEFAULT_PAYLOAD_TYPE_CONFIG = { + "st20p_payload_type": 112, + "st22p_payload_type": 114, + "st30p_payload_type": 111, + "video_payload_type": 112, + "audio_payload_type": 111, + "ancillary_payload_type": 113, + "fastmetadata_payload_type": 115, +} + +# Default ST22p-specific configuration +DEFAULT_ST22P_CONFIG = { + "framerate": "p25", + "pack_type": "codestream", + "codec": "JPEG-XS", + "quality": "speed", + "codec_threads": 2, +} + +# Default FFmpeg configuration +DEFAULT_FFMPEG_CONFIG = { + "default_pixel_format": "yuv422p10le", + "default_session_type": "mtl_st20p", +} + +# Default GStreamer configuration +DEFAULT_GSTREAMER_CONFIG = {"default_session_type": "mtl_st20p"} + +# DSA (Data Streaming Accelerator) device configuration +# Maps NUMA node to DSA device PCI addresses +DSA_DEVICES = { + "numa0": "0000:6a:01.0", + "numa1": "0000:e7:01.0", +} diff --git a/tests/validation/mtl_engine/config/param_mappings.py b/tests/validation/mtl_engine/config/param_mappings.py new file mode 100644 index 000000000..e730f2eb3 --- /dev/null +++ b/tests/validation/mtl_engine/config/param_mappings.py @@ -0,0 +1,143 @@ +# Parameter mappings for different applications +# Maps universal parameter names to application-specific names + +# RxTxApp parameter mapping +RXTXAPP_PARAM_MAP = { + # Network parameters + "source_ip": "ip", + "destination_ip": "dip", + "multicast_ip": "ip", + "port": "start_port", + "nic_port": "name", + # Video parameters + "width": "width", + "height": "height", + "framerate": "fps", + "interlaced": "interlaced", + "transport_format": "transport_format", + # Audio parameters + "audio_format": "audio_format", + "audio_channels": "audio_channel", + "audio_sampling": "audio_sampling", + "audio_ptime": "audio_ptime", + # Streaming parameters + "payload_type": "payload_type", + "replicas": "replicas", + "pacing": "pacing", + "packing": "packing", + "device": "device", + "codec": "codec", + "quality": "quality", + "codec_threads": "codec_thread_count", + # File I/O + "input_file": "st20p_url", # for input files + "output_file": "st20p_url", # for output files (RX) + "url": "video_url", # for video files + # Flags + "enable_rtcp": "enable_rtcp", + "measure_latency": "measure_latency", + "display": "display", + # RxTxApp specific command-line parameters + "config_file": "--config_file", + "enable_ptp": "--ptp", + "lcores": "--lcores", + "test_time": "--test_time", + "dma_dev": "--dma_dev", + "log_level": "--log_level", + "log_file": "--log_file", + "arp_timeout_s": "--arp_timeout_s", + "allow_across_numa_core": "--allow_across_numa_core", + "no_multicast": "--no_multicast", + "rx_separate_lcore": "--rx_separate_lcore", + "rx_mix_lcore": "--rx_mix_lcore", + "runtime_session": "--runtime_session", + "rx_timing_parser": "--rx_timing_parser", + "pcapng_dump": "--pcapng_dump", + "rx_video_file_frames": "--rx_video_file_frames", + "framebuffer_count": "--rx_video_fb_cnt", + "promiscuous": "--promiscuous", + "cni_thread": "--cni_thread", + "sch_session_quota": "--sch_session_quota", + "p_tx_dst_mac": "--p_tx_dst_mac", + "r_tx_dst_mac": "--r_tx_dst_mac", + "nb_tx_desc": "--nb_tx_desc", + "nb_rx_desc": "--nb_rx_desc", + "tasklet_time": "--tasklet_time", + "tsc": "--tsc", + "pacing_way": "--pacing_way", + "shaping": "--shaping", + "vrx": "--vrx", + "ts_first_pkt": "--ts_first_pkt", + "ts_delta_us": "--ts_delta_us", + "mono_pool": "--mono_pool", + "tasklet_thread": "--tasklet_thread", + "tasklet_sleep": "--tasklet_sleep", + "tasklet_sleep_us": "--tasklet_sleep_us", + "app_bind_lcore": "--app_bind_lcore", + "rxtx_simd_512": "--rxtx_simd_512", + "rss_mode": "--rss_mode", + "tx_no_chain": "--tx_no_chain", + "multi_src_port": "--multi_src_port", + "audio_fifo_size": "--audio_fifo_size", + "dhcp": "--dhcp", + "virtio_user": "--virtio_user", + "phc2sys": "--phc2sys", + "ptp_sync_sys": "--ptp_sync_sys", + "rss_sch_nb": "--rss_sch_nb", + "log_time_ms": "--log_time_ms", + "rx_audio_dump_time_s": "--rx_audio_dump_time_s", + "dedicated_sys_lcore": "--dedicated_sys_lcore", + "bind_numa": "--bind_numa", + "force_numa": "--force_numa", +} + +# FFmpeg parameter mapping +# Maps universal params to FFmpeg MTL plugin flags. +# Width & height both map to -video_size; command builders coalesce them into WxH format. +# Framerate maps to -fps (distinct from input rawvideo's -framerate). +FFMPEG_PARAM_MAP = { + # Network parameters + "source_ip": "-p_sip", + "destination_ip": "-p_tx_ip", # TX unicast destination + "multicast_ip": "-p_rx_ip", # RX multicast group + "port": "-udp_port", + "nic_port": "-p_port", + # Video parameters (width/height combined externally) + "width": "-video_size", + "height": "-video_size", + "framerate": "-fps", + "pixel_format": "-pix_fmt", + # Streaming parameters + "payload_type": "-payload_type", + "session_type": "-f", # Converted via SESSION_TYPE_MAP + # File I/O + "input_file": "-i", + "output_file": "", # Output appears last (no explicit flag) +} + +# GStreamer parameter mapping +# Maps universal params to MTL GStreamer element properties. +# Set as name=value pairs in the pipeline. +GSTREAMER_PARAM_MAP = { + # Network parameters + "source_ip": "dev-ip", # Interface IP + "destination_ip": "ip", # Destination (unicast) IP + "port": "udp-port", # UDP port + "nic_port": "dev-port", # NIC device/PCI identifier + # Video parameters / caps + "width": "width", + "height": "height", + "framerate": "framerate", + "pixel_format": "format", + # Audio parameters + "audio_format": "audio-format", + "audio_channels": "channel", + "audio_sampling": "sampling", + # Streaming parameters + "payload_type": "payload-type", + "queues": "queues", # Currently legacy / advanced usage + "framebuffer_count": "framebuff-cnt", + # File I/O (filesrc/filesink) + "input_file": "location", + "output_file": "location", +} diff --git a/tests/validation/mtl_engine/config/universal_params.py b/tests/validation/mtl_engine/config/universal_params.py new file mode 100644 index 000000000..d5a1ff46c --- /dev/null +++ b/tests/validation/mtl_engine/config/universal_params.py @@ -0,0 +1,110 @@ +# Universal parameter definitions for all media applications +# This serves as the common interface for RxTxApp, FFmpeg, and GStreamer + +UNIVERSAL_PARAMS = { + # Network parameters + "source_ip": None, # Source IP address (interface IP) + "destination_ip": None, # Destination IP address (session IP) + "multicast_ip": None, # Multicast group IP + "port": 20000, # UDP port number + "nic_port": None, # Network interface/port name + "nic_port_list": None, # List of network interfaces/ports + "tx_nic_port": None, # Override NIC port for TX direction + "rx_nic_port": None, # Override NIC port for RX direction + # Video parameters + "width": 1920, # Video width in pixels + "height": 1080, # Video height in pixels + "framerate": "p60", # Frame rate (p25, p30, p50, p60, etc.) + "interlaced": False, # Progressive (False) or Interlaced (True) + "pixel_format": "YUV422PLANAR10LE", # Pixel format for TX input and RX output + "transport_format": "YUV_422_10bit", # Transport format for streaming + # Audio parameters + "audio_format": "PCM24", # Audio format + "audio_channels": ["U02"], # Audio channel configuration + "audio_sampling": "96kHz", # Audio sampling rate + "audio_ptime": "1", # Audio packet time + # ST41 (Fast Metadata) parameters + "fastmetadata_data_item_type": 1234567, # Data Item Type for ST41 + "fastmetadata_k_bit": 0, # K-bit value for ST41 + "fastmetadata_fps": "p59", # Frame rate for ST41 + "type_mode": "frame", # Type mode for ST41: "rtp" or "frame" + # Streaming parameters + "payload_type": 112, # RTP payload type + "session_type": "st20p", # Session type (st20p, st22p, st30p, video, audio, etc.) + "direction": None, # Direction: tx, rx, or None (both for RxTxApp) + "replicas": 1, # Number of session replicas + "framebuffer_count": None, # Frame buffer count (RX video: rx_video_fb_cnt) + # Quality and encoding parameters + "pacing": "gap", # Pacing mode (gap, auto, etc.) + "packing": "BPM", # Packing mode + "device": "AUTO", # Device selection + "codec": "JPEG-XS", # Codec for compressed formats + "quality": "speed", # Quality setting + "codec_threads": 2, # Number of codec threads + # File I/O parameters + "input_file": None, # Input file path + "output_file": None, # Output file path + # Test configuration + "test_mode": "multicast", # Test mode (unicast, multicast, kernel) + "test_time": 30, # Test duration in seconds + "enable_rtcp": False, # Enable RTCP + "measure_latency": False, # Enable latency measurement + "display": False, # Enable display output + "enable_ptp": False, # Enable PTP synchronization + "virtio_user": False, # Enable virtio-user mode + # RxTxApp specific parameters + "config_file": None, # JSON config file path + "lcores": None, # DPDK lcore list (e.g., "28,29,30,31") + "dma_dev": None, # DMA device list (e.g., "DMA1,DMA2,DMA3") + "log_level": None, # Log level (debug, info, notice, warning, error) + "log_file": None, # Log file path + "arp_timeout_s": None, # ARP timeout in seconds (default: 60) + "allow_across_numa_core": False, # Allow cores across NUMA nodes + "no_multicast": False, # Disable multicast join message + "rx_separate_lcore": False, # RX video on dedicated lcores + "rx_mix_lcore": False, # Allow TX/RX video on same core + "runtime_session": False, # Start instance before creating sessions + "rx_timing_parser": False, # Enable timing check for video RX streams + "pcapng_dump": None, # Dump n packets to pcapng files + "rx_video_file_frames": None, # Dump received video frames to yuv file + "promiscuous": False, # Enable RX promiscuous mode + "cni_thread": False, # Use dedicated thread for CNI messages + "sch_session_quota": None, # Max sessions count per lcore + "p_tx_dst_mac": None, # Destination MAC for primary port + "r_tx_dst_mac": None, # Destination MAC for redundant port + "nb_tx_desc": None, # Number of TX descriptors per queue + "nb_rx_desc": None, # Number of RX descriptors per queue + "tasklet_time": False, # Enable tasklet running time stats + "tsc": False, # Force TSC pacing + "pacing_way": None, # Pacing way (auto, rl, tsc, tsc_narrow, ptp, tsn) + "shaping": None, # ST21 shaping type (narrow, wide) + "vrx": None, # ST21 vrx value + "ts_first_pkt": False, # Set RTP timestamp at first packet egress + "ts_delta_us": None, # RTP timestamp delta in microseconds + "mono_pool": False, # Use mono pool for all queues + "tasklet_thread": False, # Run tasklet under thread + "tasklet_sleep": False, # Enable sleep if tasklets report done + "tasklet_sleep_us": None, # Sleep microseconds value + "app_bind_lcore": False, # Run app thread on pinned lcore + "rxtx_simd_512": False, # Enable DPDK SIMD 512 path + "rss_mode": None, # RSS mode (l3_l4, l3, none) + "tx_no_chain": False, # Use memcopy instead of mbuf chain + "multi_src_port": False, # Use multiple source ports for ST20 TX + "audio_fifo_size": None, # Audio FIFO size + "dhcp": False, # Enable DHCP for all ports + "phc2sys": False, # Enable built-in phc2sys function + "ptp_sync_sys": False, # Enable PTP to system time sync + "rss_sch_nb": None, # Number of schedulers for RSS dispatch + "log_time_ms": False, # Enable ms accuracy log printer + "rx_audio_dump_time_s": None, # Dump audio frames for n seconds + "dedicated_sys_lcore": False, # Run MTL system tasks on dedicated lcore + "bind_numa": False, # Bind all MTL threads to NIC NUMA + "force_numa": None, # Force NIC port NUMA ID + # Execution control defaults + "sleep_interval": 4, # Delay between starting TX and RX + "tx_first": True, # Whether to start TX side before RX + "timeout_grace": 10, # Extra seconds for process timeout + "process_timeout_buffer": 90, # Buffer added to test_time for run() timeout + "pattern_duration": 30, # Duration for generated test patterns + "default_framerate_numeric": 60, # Fallback numeric framerate +} diff --git a/tests/validation/mtl_engine/rxtxapp.py b/tests/validation/mtl_engine/rxtxapp.py new file mode 100644 index 000000000..6b94fda80 --- /dev/null +++ b/tests/validation/mtl_engine/rxtxapp.py @@ -0,0 +1,655 @@ +# RxTxApp Implementation for Media Transport Library +# Handles RxTxApp-specific command generation and configuration + +import json +import logging +import os + +from .application_base import Application +from .config.app_mappings import APP_NAME_MAP, DEFAULT_NETWORK_CONFIG +from .config.param_mappings import RXTXAPP_PARAM_MAP +from .config.universal_params import UNIVERSAL_PARAMS + +# Import execution utilities with fallback +try: + import copy + + from . import rxtxapp_config as legacy_cfg + from .execute import log_fail + + # Import legacy helpers so we can emit a backward-compatible JSON config + from .RxTxApp import ( + add_interfaces, + check_rx_output, + check_tx_output, + create_empty_config, + kernel_ip_dict, + multicast_ip_dict, + unicast_ip_dict, + ) +except ImportError: + # Fallback for direct execution (when running this module standalone) + import copy + + import rxtxapp_config as legacy_cfg + from execute import log_fail + from RxTxApp import ( + add_interfaces, + check_rx_output, + check_tx_output, + create_empty_config, + kernel_ip_dict, + multicast_ip_dict, + unicast_ip_dict, + ) + +logger = logging.getLogger(__name__) + + +class RxTxApp(Application): + """RxTxApp framework implementation (unified model).""" + + def get_framework_name(self) -> str: + return "RxTxApp" + + def get_executable_name(self) -> str: + return APP_NAME_MAP["rxtxapp"] + + def create_command(self, **kwargs): # type: ignore[override] + self.set_universal_params(**kwargs) + cmd, cfg = self._create_rxtxapp_command_and_config() + self.command = cmd + self.config = cfg + # Write config immediately if path known + config_path = ( + self.config_file_path + or self.universal_params.get("config_file") + or "config.json" + ) + try: + with open(config_path, "w") as f: + json.dump(cfg, f, indent=2) + except Exception as e: + logger.warning(f"Failed to write RxTxApp config file {config_path}: {e}") + return self.command, self.config + + def _create_rxtxapp_command_and_config(self) -> tuple: + """ + Generate RxTxApp command line and JSON configuration from universal parameters. + Uses config file path from constructor if provided, otherwise defaults to value from DEFAULT_NETWORK_CONFIG. + + Returns: + Tuple of (command_string, config_dict) + """ + # Use config file path from constructor or default (absolute path) + if self.config_file_path: + config_file_path = self.config_file_path + else: + config_file_path = os.path.abspath( + DEFAULT_NETWORK_CONFIG["default_config_file"] + ) + + # Build command line with all command-line parameters + executable_path = self.get_executable_path() + cmd_parts = ["sudo", executable_path] + cmd_parts.extend(["--config_file", config_file_path]) + + # Add command-line parameters from RXTXAPP_PARAM_MAP + session_type = self.universal_params.get( + "session_type", UNIVERSAL_PARAMS["session_type"] + ) + for universal_param, rxtx_param in RXTXAPP_PARAM_MAP.items(): + # Skip file I/O generic mapping for st22p; we set st22p_url explicitly in JSON + if session_type == "st22p" and universal_param in ( + "input_file", + "output_file", + ): + continue + # Skip test_time unless explicitly provided (for VTune tests, duration is controlled by VTune) + if universal_param == "test_time" and not self.was_user_provided( + "test_time" + ): + continue + if rxtx_param.startswith("--"): # Command-line parameter only + if universal_param in self.universal_params: + value = self.universal_params[universal_param] + if value is not None and value is not False: + if isinstance(value, bool) and value: + cmd_parts.append(rxtx_param) + elif not isinstance(value, bool): + cmd_parts.extend([rxtx_param, str(value)]) + + # Create JSON configuration + config_dict = self._create_rxtxapp_config_dict() + + return " ".join(cmd_parts), config_dict + + def _create_rxtxapp_config_dict(self) -> dict: + """ + Build complete RxTxApp JSON config structure from universal parameters. + Creates interfaces, sessions, and all session-specific configurations. + This method intentionally recreates the original ("legacy") nested JSON + structure expected by the existing RxTxApp binary and validation helpers. + The previous refactored flat structure caused validation failures because + check_tx_output() and performance detection logic rely on nested lists + like config['tx_sessions'][0]['st20p'][0]. + + Returns: + Complete RxTxApp configuration dictionary + """ + # Currently only st20p/st22p/st30p/video/audio/ancillary/fastmetadata supported + # We rebuild the legacy shell for all session types but only populate the active one. + + session_type = self.universal_params.get( + "session_type", UNIVERSAL_PARAMS["session_type"] + ) + direction = self.universal_params.get("direction") # None means loopback + test_mode = self.universal_params.get( + "test_mode", UNIVERSAL_PARAMS["test_mode"] + ) + + # Determine NIC ports list (need at least 2 entries for legacy loopback template) + nic_port = self.universal_params.get( + "nic_port", DEFAULT_NETWORK_CONFIG["nic_port"] + ) + nic_port_list = self.universal_params.get("nic_port_list") + replicas = self.universal_params.get("replicas", 1) + + if not nic_port_list: + # For single-direction (tx-only or rx-only) with replicas on same port, + # only use one interface to avoid MTL duplicate port error + # For loopback (direction=None), need two interfaces + if direction in ("tx", "rx") and replicas >= 1: + nic_port_list = [nic_port] # Single interface for single-direction + else: + nic_port_list = [nic_port, nic_port] # Duplicate for loopback + elif len(nic_port_list) == 1: + # Same logic: single interface for single-direction, duplicate for loopback + if direction in ("tx", "rx") and replicas >= 1: + pass # Keep single element + else: + nic_port_list = nic_port_list * 2 + + # Base legacy structure + config = create_empty_config() + config["tx_no_chain"] = self.universal_params.get("tx_no_chain", False) + + # Fill interface names & addressing using legacy helper + try: + add_interfaces(config, nic_port_list, test_mode) + except Exception as e: + logger.warning( + f"Legacy add_interfaces failed ({e}); falling back to direct assignment" + ) + # Minimal fallback assignment - handle single or dual interface configs + config["interfaces"][0]["name"] = nic_port_list[0] + # Set IP addresses based on test mode + if test_mode == "unicast": + config["interfaces"][0]["ip"] = unicast_ip_dict["tx_interfaces"] + config["tx_sessions"][0]["dip"][0] = unicast_ip_dict["tx_sessions"] + config["rx_sessions"][0]["ip"][0] = unicast_ip_dict["rx_sessions"] + elif test_mode == "multicast": + config["interfaces"][0]["ip"] = multicast_ip_dict["tx_interfaces"] + config["tx_sessions"][0]["dip"][0] = multicast_ip_dict["tx_sessions"] + config["rx_sessions"][0]["ip"][0] = multicast_ip_dict["rx_sessions"] + elif test_mode == "kernel": + config["tx_sessions"][0]["dip"][0] = kernel_ip_dict["tx_sessions"] + config["rx_sessions"][0]["ip"][0] = kernel_ip_dict["rx_sessions"] + + if len(nic_port_list) > 1: + config["interfaces"][1]["name"] = nic_port_list[1] + if test_mode == "unicast": + config["interfaces"][1]["ip"] = unicast_ip_dict["rx_interfaces"] + elif test_mode == "multicast": + config["interfaces"][1]["ip"] = multicast_ip_dict["rx_interfaces"] + elif direction in ("tx", "rx"): + # For single-direction single-interface, remove second interface + if len(config["interfaces"]) > 1: + config["interfaces"] = [config["interfaces"][0]] + + # Fix session interface indices when using single interface + # Template has TX on interface[0] and RX on interface[1], but with single interface both should use [0] + if len(config["interfaces"]) == 1: + if config["tx_sessions"] and len(config["tx_sessions"]) > 0: + config["tx_sessions"][0]["interface"] = [0] + if config["rx_sessions"] and len(config["rx_sessions"]) > 0: + config["rx_sessions"][0]["interface"] = [0] + + # Override interface IPs and session IPs with user-provided source_ip/destination_ip if specified + # This allows tests to use custom IP addressing instead of hardcoded unicast_ip_dict values + if test_mode == "unicast": + user_source_ip = self.universal_params.get("source_ip") + user_dest_ip = self.universal_params.get("destination_ip") + + if direction == "tx" and len(config["interfaces"]) >= 1: + # TX: interface IP = source_ip (local), session dip = destination_ip (remote RX) + if user_source_ip: + config["interfaces"][0]["ip"] = user_source_ip + if ( + user_dest_ip + and config["tx_sessions"] + and len(config["tx_sessions"]) > 0 + ): + config["tx_sessions"][0]["dip"][0] = user_dest_ip + elif direction == "rx" and len(config["interfaces"]) >= 1: + # RX: interface IP = destination_ip (local bind), session ip = source_ip (filter for TX) + if user_dest_ip: + config["interfaces"][0]["ip"] = user_dest_ip + if ( + user_source_ip + and config["rx_sessions"] + and len(config["rx_sessions"]) > 0 + ): + config["rx_sessions"][0]["ip"][0] = user_source_ip + elif direction is None and len(config["interfaces"]) >= 2: + # Loopback: TX interface uses source_ip, RX interface uses destination_ip + if user_source_ip: + config["interfaces"][0]["ip"] = user_source_ip + if user_dest_ip: + config["interfaces"][1]["ip"] = user_dest_ip + if ( + user_dest_ip + and config["tx_sessions"] + and len(config["tx_sessions"]) > 0 + ): + config["tx_sessions"][0]["dip"][0] = user_dest_ip + if ( + user_source_ip + and config["rx_sessions"] + and len(config["rx_sessions"]) > 0 + ): + config["rx_sessions"][0]["ip"][0] = user_source_ip + + # Helper to populate a nested session list for a given type + def _populate_session(is_tx: bool): + if session_type == "st20p": + template = copy.deepcopy( + legacy_cfg.config_tx_st20p_session + if is_tx + else legacy_cfg.config_rx_st20p_session + ) + # Map universal params -> legacy field names + template["width"] = int( + self.universal_params.get("width", template["width"]) + ) + template["height"] = int( + self.universal_params.get("height", template["height"]) + ) + template["fps"] = self.universal_params.get( + "framerate", template["fps"] + ) + template["pacing"] = self.universal_params.get( + "pacing", template["pacing"] + ) + template["packing"] = self.universal_params.get( + "packing", template.get("packing", "BPM") + ) + # pixel_format becomes input_format or output_format + pixel_format = self.universal_params.get("pixel_format") + if is_tx: + template["input_format"] = pixel_format or template.get( + "input_format" + ) + else: + template["output_format"] = pixel_format or template.get( + "output_format" + ) + template["transport_format"] = self.universal_params.get( + "transport_format", template["transport_format"] + ) + if is_tx and self.universal_params.get("input_file"): + template["st20p_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["st20p_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + template["display"] = self.universal_params.get( + "display", template.get("display", False) + ) + template["enable_rtcp"] = self.universal_params.get( + "enable_rtcp", template.get("enable_rtcp", False) + ) + return template + elif session_type == "st22p": + template = copy.deepcopy( + legacy_cfg.config_tx_st22p_session + if is_tx + else legacy_cfg.config_rx_st22p_session + ) + template["width"] = int( + self.universal_params.get("width", template["width"]) + ) + template["height"] = int( + self.universal_params.get("height", template["height"]) + ) + template["fps"] = self.universal_params.get( + "framerate", template["fps"] + ) + template["codec"] = self.universal_params.get( + "codec", template["codec"] + ) # JPEG-XS etc. + template["quality"] = self.universal_params.get( + "quality", template["quality"] + ) + template["codec_thread_count"] = self.universal_params.get( + "codec_threads", template["codec_thread_count"] + ) + pf = self.universal_params.get("pixel_format") + if is_tx: + template["input_format"] = pf or template.get("input_format") + else: + template["output_format"] = pf or template.get("output_format") + if is_tx and self.universal_params.get("input_file"): + template["st22p_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["st22p_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + template["enable_rtcp"] = self.universal_params.get( + "enable_rtcp", template.get("enable_rtcp", False) + ) + return template + elif session_type == "st30p": + template = copy.deepcopy( + legacy_cfg.config_tx_st30p_session + if is_tx + else legacy_cfg.config_rx_st30p_session + ) + template["audio_format"] = self.universal_params.get( + "audio_format", template["audio_format"] + ) + template["audio_channel"] = self.universal_params.get( + "audio_channels", template["audio_channel"] + ) + template["audio_sampling"] = self.universal_params.get( + "audio_sampling", template["audio_sampling"] + ) + template["audio_ptime"] = self.universal_params.get( + "audio_ptime", template["audio_ptime"] + ) + if is_tx and self.universal_params.get("input_file"): + template["audio_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["audio_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + return template + + elif session_type == "fastmetadata": + template = copy.deepcopy( + legacy_cfg.config_tx_st41_session + if is_tx + else legacy_cfg.config_rx_st41_session + ) + template["payload_type"] = int( + self.universal_params.get("payload_type", template["payload_type"]) + ) + template["fastmetadata_data_item_type"] = int( + self.universal_params.get( + "fastmetadata_data_item_type", + template["fastmetadata_data_item_type"], + ) + ) + template["fastmetadata_k_bit"] = int( + self.universal_params.get( + "fastmetadata_k_bit", template["fastmetadata_k_bit"] + ) + ) + if is_tx: + template["type"] = self.universal_params.get( + "type_mode", template["type"] + ) + template["fastmetadata_fps"] = self.universal_params.get( + "fastmetadata_fps", template["fastmetadata_fps"] + ) + template["fastmetadata_url"] = self.universal_params.get( + "input_file", template["fastmetadata_url"] + ) + else: + template["fastmetadata_url"] = self.universal_params.get( + "output_file", template.get("fastmetadata_url", "") + ) + template["replicas"] = self.universal_params.get( + "replicas", template["replicas"] + ) + template["start_port"] = int( + self.universal_params.get("port", template["start_port"]) + ) + return template + + else: + # Fallback: reuse st20p layout for unknown session types (minimal support) + template = {"replicas": 1} + return template + + # Populate TX sessions + if direction in (None, "tx"): + st_entry = _populate_session(True) + if st_entry: + config["tx_sessions"][0].setdefault(session_type, []) + config["tx_sessions"][0][session_type].append(st_entry) + # Ensure non-empty video list to force functional validation instead of FPS performance path + placeholder_video = { + "type": "placeholder", + "video_format": "", + "pg_format": "", + } + current_video_list = config["tx_sessions"][0].get("video") + if not current_video_list: + config["tx_sessions"][0]["video"] = [placeholder_video] + elif len(current_video_list) == 0: + current_video_list.append(placeholder_video) + + # Populate RX sessions + if direction in (None, "rx"): + st_entry = _populate_session(False) + if st_entry: + config["rx_sessions"][0].setdefault(session_type, []) + config["rx_sessions"][0][session_type].append(st_entry) + placeholder_video = { + "type": "placeholder", + "video_format": "", + "pg_format": "", + } + current_video_list = config["rx_sessions"][0].get("video") + if not current_video_list: + config["rx_sessions"][0]["video"] = [placeholder_video] + elif len(current_video_list) == 0: + current_video_list.append(placeholder_video) + + # If only TX or only RX requested, clear the other list + if direction == "tx": + config["rx_sessions"] = [] + elif direction == "rx": + config["tx_sessions"] = [] + + return config + + def validate_results(self) -> bool: # type: ignore[override] + """ + Validate execution results exactly like original RxTxApp.execute_test(). + + Matches the validation pattern from mtl_engine/RxTxApp.py: + - For st20p: Check RX output + TX/RX converter creation (NOT tx result lines) + - For st22p: Check RX output only + - For video/audio/etc: Check both TX and RX outputs + + Returns True if validation passes. Raises AssertionError on failure. + """ + + def _fail(msg: str): + try: + log_fail(msg) + except Exception: + logger.error(msg) + raise AssertionError(msg) + + try: + if not self.config: + _fail("RxTxApp validate_results called without config") + + session_type = self._get_session_type_from_config(self.config) + output_lines = self.last_output.split("\n") if self.last_output else [] + rc = getattr(self, "last_return_code", None) + + # 1. Check return code (must be 0 or None for dual-host secondary) + if rc not in (0, None): + _fail(f"Process return code {rc} indicates failure") + + # 2. Validate based on session type - match original RxTxApp.execute_test() logic + passed = True + + if session_type == "st20p": + # Original validation: check_rx_output + check_tx_converter_output + check_rx_converter_output + # Note: Original does NOT check check_tx_output for st20p! + passed = passed and check_rx_output( + config=self.config, + output=output_lines, + session_type="st20p", + fail_on_error=False, + host=None, + build=None, + ) + + # Import converter check functions if available + try: + from .RxTxApp import ( + check_rx_converter_output, + check_tx_converter_output, + ) + + passed = passed and check_tx_converter_output( + config=self.config, + output=output_lines, + session_type="st20p", + fail_on_error=False, + host=None, + build="", + ) + + passed = passed and check_rx_converter_output( + config=self.config, + output=output_lines, + session_type="st20p", + fail_on_error=False, + host=None, + build="", + ) + except ImportError: + # Fallback: if converter checks not available, just check RX + logger.warning( + "Converter check functions not available, using RX check only" + ) + + if not passed: + _fail("st20p validation failed (RX output or converter checks)") + + elif session_type in ("st22p", "st30p", "fastmetadata"): + # Original validation: check_rx_output only (no TX result line for st22p/st30p/fastmetadata) + passed = check_rx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + + if not passed: + _fail(f"{session_type} validation failed (RX output check)") + + elif session_type in ("video", "audio", "ancillary"): + # Original validation: check both TX and RX outputs + _tx_ok = check_tx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + _rx_ok = check_rx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + + if not (_tx_ok and _rx_ok): + _fail(f"{session_type} validation failed (TX or RX output check)") + + else: + # Unknown session type - default to checking both + logger.warning( + f"Unknown session type {session_type}, using default validation" + ) + _tx_ok = check_tx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + _rx_ok = check_rx_output( + config=self.config, + output=output_lines, + session_type=session_type, + fail_on_error=False, + host=None, + build=None, + ) + + if not (_tx_ok and _rx_ok): + _fail(f"{session_type} validation failed") + + logger.info(f"RxTxApp validation passed for {session_type}") + return True + + except AssertionError: + # Already handled/logged + raise + except Exception as e: + _fail(f"RxTxApp validation unexpected error: {e}") + + def _get_session_type_from_config(self, config: dict) -> str: + """Extract session type from RxTxApp config.""" + # Inspect nested lists to identify actual session type; legacy layout nests under tx_sessions[i][type] + if not config.get("tx_sessions"): + return "st20p" + for tx_entry in config["tx_sessions"]: + for possible in ( + "st22p", + "st20p", + "st30p", + "fastmetadata", + "video", + "audio", + "ancillary", + ): + if possible in tx_entry and tx_entry[possible]: + return possible + return "st20p" diff --git a/tests/validation/tests/single/performance/test_vf_multisession_fps_variants.py b/tests/validation/tests/single/performance/test_vf_multisession_fps_variants.py new file mode 100644 index 000000000..8ea8467ac --- /dev/null +++ b/tests/validation/tests/single/performance/test_vf_multisession_fps_variants.py @@ -0,0 +1,711 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +""" +VF Multi-Session FPS Variants Performance Tests + +Tests TX/RX performance using Virtual Functions (VFs) with different FPS values. +- Tests FPS: 25, 30, 50, 59 +- Tests multiple sessions (1, 2, 4, 8, 14, 15, 16, 17, 20, 24, 32) to validate performance scaling +- Validates both TX FPS achievement and RX frame reception +- Supports optional DSA (Data Streaming Accelerator) for DMA offload +- Uses same physical NIC with VF0 for profiled side and VF1 for companion +- Skips first 30 seconds of measurements to exclude unstable initialization period +""" + +import logging +import re +import time + +import pytest +from mfd_common_libs.log_levels import TEST_PASS +from mtl_engine.config.app_mappings import DSA_DEVICES +from mtl_engine.execute import log_fail, run +from mtl_engine.rxtxapp import RxTxApp + +logger = logging.getLogger(__name__) + +# Configuration: FPS measurement warmup period (seconds) +# This warmup period is excluded from FPS measurements to avoid unstable initialization +FPS_WARMUP_SECONDS = 30 +FPS_TOLERANCE_PCT = 0.99 # 99% of requested FPS required for pass + + +def get_companion_log_summary(host, log_path: str, max_lines: int = 30) -> None: + """ + Retrieve and display summary of companion process log from remote host. + + Args: + host: Host connection object + log_path: Path to log file on remote host + max_lines: Maximum number of lines to display + """ + try: + # Try to read last N lines of the log file + result = host.connection.execute_command( + f"tail -n {max_lines} {log_path} 2>/dev/null || echo 'Log file not found'", + shell=True + ) + if result.stdout and "Log file not found" not in result.stdout: + logger.debug(f"Companion log ({log_path}, last {max_lines} lines):") + for line in result.stdout.splitlines(): + if line.strip(): + logger.debug(f" {line}") + except Exception as e: + logger.warning(f"Could not retrieve companion log from {log_path}: {e}") + + +def _monitor_fps_generic(log_lines: list, expected_fps: float, num_sessions: int, + session_pattern: str, fps_tolerance_pct: float = FPS_TOLERANCE_PCT, + warmup_seconds: int = FPS_WARMUP_SECONDS) -> tuple: + """ + Generic FPS monitoring from RxTxApp log output (used by both TX and RX). + + Args: + log_lines: List of log output lines + expected_fps: Target FPS value + num_sessions: Number of sessions to validate + session_pattern: Regex pattern for matching session FPS logs (TX_VIDEO_SESSION or RX_VIDEO_SESSION) + fps_tolerance_pct: Required percentage of target FPS + warmup_seconds: Number of seconds to skip at start for initialization + + Returns: + Tuple of (all_successful: bool, successful_count: int, details: dict) + """ + from datetime import datetime + + fps_pattern = re.compile(session_pattern) + timestamp_pattern = re.compile(r"MTL:\s+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})") + + successful_sessions = set() + session_fps_history = {} + start_timestamp = None + min_required_fps = expected_fps * fps_tolerance_pct + + for line in log_lines: + # Track timestamps to identify warmup period + ts_match = timestamp_pattern.search(line) + if ts_match and start_timestamp is None: + start_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S") + + match = fps_pattern.search(line) + if match: + session_id = int(match.group(2)) + actual_fps = float(match.group(3)) + + # Calculate elapsed time from start if timestamp exists on this line + if ts_match and start_timestamp: + current_timestamp = datetime.strptime(ts_match.group(1), "%Y-%m-%d %H:%M:%S") + elapsed_seconds = (current_timestamp - start_timestamp).total_seconds() + + # Skip measurements during warmup period + if elapsed_seconds < warmup_seconds: + continue + elif start_timestamp is None: + # No timestamp found yet, skip this measurement (still in early startup) + continue + + if session_id not in session_fps_history: + session_fps_history[session_id] = [] + session_fps_history[session_id].append(actual_fps) + + # Determine success based on average FPS >= tolerance of requested + for session_id, fps_history in session_fps_history.items(): + if fps_history: + avg_fps = sum(fps_history) / len(fps_history) + if avg_fps >= min_required_fps: + successful_sessions.add(session_id) + + details = { + "successful_count": len(successful_sessions), + "successful_sessions": sorted(list(successful_sessions)), + "session_fps_history": session_fps_history, + "min_required_fps": min_required_fps, + } + + return len(successful_sessions) == num_sessions, len(successful_sessions), details + + +def monitor_tx_fps(log_lines: list, expected_fps: float, num_sessions: int, + fps_tolerance_pct: float = FPS_TOLERANCE_PCT, + warmup_seconds: int = FPS_WARMUP_SECONDS) -> tuple: + """ + Monitor TX FPS from RxTxApp log output. + + Args: + log_lines: List of log output lines from RxTxApp + expected_fps: Target FPS value for validation + num_sessions: Number of sessions to validate + fps_tolerance_pct: Required percentage of target FPS (default: 99%) + warmup_seconds: Warmup period to skip (default: 30s) + + Returns: + Tuple of (all_successful: bool, successful_count: int, details: dict) + """ + return _monitor_fps_generic( + log_lines, expected_fps, num_sessions, + r"TX_VIDEO_SESSION\(\d+,(\d+):app_tx_st20p_(\d+)\):\s+fps\s+([\d.]+)", + fps_tolerance_pct, warmup_seconds + ) + + +def monitor_rx_fps(log_lines: list, expected_fps: float, num_sessions: int, + fps_tolerance_pct: float = FPS_TOLERANCE_PCT, + warmup_seconds: int = FPS_WARMUP_SECONDS) -> tuple: + """ + Monitor RX FPS from RxTxApp log output. + + Args: + log_lines: List of log output lines from RxTxApp + expected_fps: Target FPS value for validation + num_sessions: Number of sessions to validate + fps_tolerance_pct: Required percentage of target FPS (default: 99%) + warmup_seconds: Warmup period to skip (default: 30s) + + Returns: + Tuple of (all_successful: bool, successful_count: int, details: dict) + """ + return _monitor_fps_generic( + log_lines, expected_fps, num_sessions, + r"RX_VIDEO_SESSION\(\d+,(\d+):app_rx_st20p_(\d+)\):\s+fps\s+([\d.]+)", + fps_tolerance_pct, warmup_seconds + ) + + +def monitor_tx_frames(log_lines: list, num_sessions: int) -> dict: + """ + Extract TX frame counts from RxTxApp log output. + + Args: + log_lines: List of log output lines + num_sessions: Number of sessions + + Returns: + Dictionary mapping session_id -> transmitted frames (raw count from logs) + """ + tx_frames_pattern = re.compile( + r"TX_VIDEO_SESSION\(\d+,\d+:app_tx_st20p_(\d+)\):\s+fps\s+[\d.]+\s+frames\s+(\d+)" + ) + + session_tx_frames = {} + for line in log_lines: + match = tx_frames_pattern.search(line) + if match: + session_id = int(match.group(1)) + frames_transmitted = int(match.group(2)) + + # Keep the highest frame count seen (last report) + if session_id not in session_tx_frames or frames_transmitted > session_tx_frames[session_id]: + session_tx_frames[session_id] = frames_transmitted + + return session_tx_frames + + +def monitor_rx_frames_simple(log_lines: list, num_sessions: int) -> dict: + """ + Extract RX frame counts from RxTxApp log output. + + Args: + log_lines: List of log output lines + num_sessions: Number of sessions + + Returns: + Dictionary mapping session_id -> received frames (raw count from logs) + """ + # Pattern for periodic RX logs: RX_VIDEO_SESSION(0,0:app_rx_st20p_0): fps 59.899856 frames 599 + rx_frames_pattern_periodic = re.compile( + r"RX_VIDEO_SESSION\(\d+,\d+:app_rx_st20p_(\d+)\):\s+fps\s+[\d.]+\s+frames\s+(\d+)" + ) + # Pattern for final result: app_rx_st20p_result(0), OK, fps 59.12, 1180 frame received + rx_frames_pattern_result = re.compile( + r"app_rx_st20p_result\((\d+)\),\s+OK,\s+fps\s+[\d.]+,\s+(\d+)\s+frame received" + ) + + session_rx_frames = {} + for line in log_lines: + # Try periodic pattern first + match = rx_frames_pattern_periodic.search(line) + if match: + session_id = int(match.group(1)) + frames_received = int(match.group(2)) + + # Keep the highest frame count seen (last report) + if session_id not in session_rx_frames or frames_received > session_rx_frames[session_id]: + session_rx_frames[session_id] = frames_received + else: + # Try result pattern + match = rx_frames_pattern_result.search(line) + if match: + session_id = int(match.group(1)) + frames_received = int(match.group(2)) + + # Keep the highest frame count seen (last report) + if session_id not in session_rx_frames or frames_received > session_rx_frames[session_id]: + session_rx_frames[session_id] = frames_received + + return session_rx_frames + + +# Media file configurations for different FPS values +MEDIA_CONFIGS = { + 25: { + "filename": "HDR_BBC_v4_008_Penguin1_1920x1080_10bit_25Hz_180frames_yuv422p10be_To_yuv422rfc4175be10.yuv", + "file_format": "YUV422RFC4175PG2BE10", + "format": "YUV_422_10bit", + "width": 1920, + "height": 1080, + "fps": 25, + }, + 30: { + "filename": "HDR_BBC_v4_008_Penguin1_1920x1080_10bit_25Hz_180frames_yuv422p10be_To_yuv422rfc4175be10.yuv", + "file_format": "YUV422RFC4175PG2BE10", + "format": "YUV_422_10bit", + "width": 1920, + "height": 1080, + "fps": 30, + }, + 50: { + "filename": "CrowdRun_3840x2160_50fps_10frames_yuv422rfc4175be10.yuv", + "file_format": "YUV422RFC4175PG2BE10", + "format": "YUV_422_10bit", + "width": 3840, + "height": 2160, + "fps": 50, + }, + 59: { + "filename": "CSGObuymenu_1080p_60fps_120frames_yuv422rfc4175be10.yuv", + "file_format": "YUV422RFC4175PG2BE10", + "format": "YUV_422_10bit", + "width": 1920, + "height": 1080, + "fps": 59, + }, +} + + +def get_dsa_device_for_nic(nic_pci_address: str) -> str: + """ + Get the appropriate DSA device for a given NIC based on NUMA node. + + Args: + nic_pci_address: PCI address of the NIC (e.g., "0000:18:01.0" or "0000:af:01.0") + + Returns: + DSA device PCI address for the appropriate NUMA node + """ + try: + bus = int(nic_pci_address.split(":")[1], 16) + return DSA_DEVICES["numa0"] if bus < 0x80 else DSA_DEVICES["numa1"] + except (ValueError, IndexError): + logger.warning(f"Could not determine NUMA node for {nic_pci_address}, defaulting to NUMA 0") + return DSA_DEVICES["numa0"] + + +def _display_session_results(direction: str, dsa_label: str, num_sessions: int, fps: float, + fps_details: dict, tx_frame_counts: dict, rx_frame_counts: dict) -> None: + """ + Display FPS and frame count results for all sessions. + + Args: + direction: "TX" or "RX" (which side is being tested) + dsa_label: Label with DSA information (e.g., " with DSA (0000:6d:01.0)") + num_sessions: Total number of sessions + fps: Requested frame rate + fps_details: FPS monitoring results containing: + - successful_sessions: List of session IDs that met FPS requirements + - session_fps_history: Dict mapping session_id -> list of FPS values + - min_required_fps: Minimum FPS threshold for pass + tx_frame_counts: TX frame counts per session (dict: session_id -> count) + rx_frame_counts: RX frame counts per session (dict: session_id -> count) + """ + successful_count = len(fps_details.get("successful_sessions", [])) + min_required = fps_details.get("min_required_fps", fps * FPS_TOLERANCE_PCT) + + logger.info("=" * 80) + logger.info( + f"{direction} {'FPS Results' if direction == 'TX' else 'Results'}{dsa_label}: " + f"{successful_count}/{num_sessions} sessions at target {fps} fps " + f"(min required: {min_required:.1f} fps, after {FPS_WARMUP_SECONDS}s warmup)" + ) + logger.info("=" * 80) + + for session_id in range(num_sessions): + fps_history = fps_details.get("session_fps_history", {}).get(session_id, []) + tx_frames = tx_frame_counts.get(session_id, 0) + rx_frames = rx_frame_counts.get(session_id, 0) + + # Calculate success percentage with defensive checks + if tx_frames > 0: + success_pct = (rx_frames / tx_frames * 100) + if success_pct > 100.0: + logger.warning( + f"Session {session_id}: RX frames ({rx_frames}) > TX frames ({tx_frames}) " + "- companion may have terminated early" + ) + success_pct_display = f"{success_pct:.1f}% (>100%!)" + else: + success_pct_display = f"{success_pct:.1f}%" + else: + success_pct_display = "N/A (no TX data)" + + successful_session_ids = fps_details.get("successful_sessions", []) + + if fps_history: + avg_fps = sum(fps_history) / len(fps_history) + min_fps = min(fps_history) + max_fps = max(fps_history) + fps_status = "✓" if session_id in successful_session_ids else "✗" + logger.info( + f" Session {session_id}: FPS: requested={fps}, avg={avg_fps:.1f}, " + f"min={min_fps:.1f}, max={max_fps:.1f} {fps_status} | " + f"Frames: TX={tx_frames}, RX={rx_frames}, Success={success_pct_display}" + ) + else: + logger.info( + f" Session {session_id}: FPS: requested={fps}, No data ✗ | " + f"Frames: TX={tx_frames}, RX={rx_frames}, Success={success_pct_display}" + ) + logger.info("=" * 80) + + +@pytest.mark.nightly +@pytest.mark.performance +@pytest.mark.parametrize("use_dsa", [False, True], ids=["no-dsa", "dsa"]) +@pytest.mark.parametrize("fps", [25, 30, 50, 59], ids=["25fps", "30fps", "50fps", "59fps"]) +@pytest.mark.parametrize("num_sessions", [1, 2, 4, 8, 14, 15, 16, 17, 20, 24, 32], ids=["1sess", "2sess", "4sess", "8sess", "14sess", "15sess", "16sess", "17sess", "20sess", "24sess", "32sess"]) +def test_vf_tx_fps_variants(hosts, build, media, test_time, nic_port_list, fps, num_sessions, use_dsa) -> None: + """ + Test TX multi-session performance with different FPS values. + + Args: + hosts: Host fixtures + build: Build directory path + media: Media directory path + test_time: Test duration in seconds + nic_port_list: List of available NIC ports (triggers VF creation) + fps: Frame rate to test (25, 30, 50, or 59) + num_sessions: Number of concurrent sessions (1, 2, 4, 8, 14, 15, 16, 17, 20, 24, or 32) + use_dsa: Whether to use DSA for DMA operations + """ + host = list(hosts.values())[0] + + if not hasattr(host, "vfs") or len(host.vfs) < 2: + pytest.skip("Test requires at least 2 VFs on host") + + media_config = MEDIA_CONFIGS[fps] + media_file_path = f"{media}/{media_config['filename']}" + + tx_vf = host.vfs[0] + rx_vf = host.vfs[1] + + # Configure DSA if enabled + dsa_device = get_dsa_device_for_nic(tx_vf) if use_dsa else None + dsa_suffix = "_dsa" if use_dsa else "" + dsa_label = f" with DSA ({dsa_device})" if use_dsa else "" + + logger.info("=" * 80) + logger.info(f"VF TX FPS Variant{dsa_label}: {num_sessions} sessions @ {fps} fps") + logger.info(f"TX={tx_vf}, RX={rx_vf}, Media: {media_config['filename']}") + logger.info("=" * 80) + + # Start companion RX + rx_config_path = f"{build}/tests/vf_tx_fps{fps}_{num_sessions}s{dsa_suffix}_rx_config.json" + rx_app = RxTxApp(app_path=f"{build}/tests/tools/RxTxApp/build", config_file_path=rx_config_path) + + rx_app.create_command( + session_type="st20p", + direction="rx", + test_mode="unicast", + transport_format=media_config["format"], + width=media_config["width"], + height=media_config["height"], + framerate=f"p{fps}", + output_file="/dev/null", + nic_port=rx_vf, + source_ip="192.168.30.101", + destination_ip="192.168.30.102", + port=20000, + replicas=num_sessions, + sch_session_quota=60, + test_time=test_time + 10, + ) + + companion_log = f"{build}/tests/vf_tx_fps{fps}_{num_sessions}s{dsa_suffix}_rx_companion.log" + logger.info(f"Starting companion RX, logs will be saved to: {companion_log}") + logger.info(f"Companion RX will run for {test_time + 10} seconds (test_time={test_time} + 10s buffer)") + rx_process = host.connection.start_process( + f"{rx_app.command} > {companion_log} 2>&1", cwd=build, shell=True + ) + time.sleep(10) + + try: + # Configure TX with optional DSA + tx_config_path = f"{build}/tests/vf_tx_fps{fps}_{num_sessions}s{dsa_suffix}_config.json" + tx_app = RxTxApp(app_path=f"{build}/tests/tools/RxTxApp/build", config_file_path=tx_config_path) + + tx_kwargs = { + "session_type": "st20p", + "direction": "tx", + "test_mode": "unicast", + "transport_format": media_config["format"], + "width": media_config["width"], + "height": media_config["height"], + "framerate": f"p{fps}", + "input_file": media_file_path, + "nic_port": tx_vf, + "source_ip": "192.168.30.101", + "destination_ip": "192.168.30.102", + "port": 20000, + "replicas": num_sessions, + "sch_session_quota": 60, + "test_time": test_time, + } + + if use_dsa: + tx_kwargs["dma_dev"] = dsa_device + + tx_app.create_command(**tx_kwargs) + + logger.info(f"Running TX{dsa_label}") + + result = run(tx_app.command, cwd=build, timeout=test_time + 60, testcmd=True, host=host) + + # Log TX output to the main test log + logger.info("=" * 80) + logger.info(f"TX Process Output") + logger.info("=" * 80) + if result.stdout_text: + for line in result.stdout_text.splitlines(): + if any(keyword in line.lower() for keyword in ["fps", "session", "error", "fail", "warn", "mismatch"]): + logger.info(f"TX: {line}") + else: + logger.warning("No TX output captured!") + logger.info("=" * 80) + + # Retrieve companion RX logs before validation + logger.info("=" * 80) + logger.info(f"Companion RX Process Logs") + logger.info("=" * 80) + get_companion_log_summary(host, companion_log, max_lines=50) + logger.info("=" * 80) + + if result.return_code != 0: + log_fail(f"TX process{dsa_label} failed with exit code {result.return_code}") + pytest.fail(f"TX process{dsa_label} failed with exit code {result.return_code}") + + # Validate TX FPS achievement from TX logs (skip first warmup period) + success, successful_count, fps_details = monitor_tx_fps( + result.stdout_text.splitlines(), fps, num_sessions + ) + + # Get TX frame counts from TX logs (measure from beginning) + tx_frame_counts = monitor_tx_frames( + result.stdout_text.splitlines(), num_sessions + ) + + # Wait for companion to finish writing (it runs test_time+10, so should be done) + time.sleep(5) + + # Get RX frame counts from companion RX log (measure from beginning) + try: + companion_result = host.connection.execute_command(f"cat {companion_log}", shell=True) + rx_frame_counts = monitor_rx_frames_simple( + companion_result.stdout.splitlines() if companion_result.stdout else [], + num_sessions + ) + except Exception as e: + logger.warning(f"Could not read companion log for frame counts: {e}") + rx_frame_counts = {} + + # Display session results + _display_session_results("TX", dsa_label, num_sessions, fps, fps_details, + tx_frame_counts, rx_frame_counts) + + if not success: + failure_msg = f"Only {successful_count}/{num_sessions} sessions reached target {fps} fps (min {fps_details['min_required_fps']:.1f} fps){dsa_label}" + log_fail(failure_msg) + pytest.fail(failure_msg) + + logger.log(TEST_PASS, f"TX FPS variant test{dsa_label} passed: {num_sessions} sessions @ {fps} fps") + + finally: + try: + rx_process.stop() + if rx_process.running: + time.sleep(2) + rx_process.kill() + except Exception as e: + logger.warning(f"Error stopping companion RX: {e}") + + +@pytest.mark.nightly +@pytest.mark.performance +@pytest.mark.parametrize("use_dsa", [False, True], ids=["no-dsa", "dsa"]) +@pytest.mark.parametrize("fps", [25, 30, 50, 59], ids=["25fps", "30fps", "50fps", "59fps"]) +@pytest.mark.parametrize("num_sessions", [1, 2, 4, 8, 14, 15, 16, 17, 20, 24, 32], ids=["1sess", "2sess", "4sess", "8sess", "14sess", "15sess", "16sess", "17sess", "20sess", "24sess", "32sess"]) +def test_vf_rx_fps_variants(hosts, build, media, test_time, nic_port_list, fps, num_sessions, use_dsa) -> None: + """ + Test RX multi-session performance with different FPS values. + + Args: + hosts: Host fixtures + build: Build directory path + media: Media directory path + test_time: Test duration in seconds + nic_port_list: List of available NIC ports (triggers VF creation) + fps: Frame rate to test (25, 30, 50, or 59) + num_sessions: Number of concurrent sessions (1, 2, 4, 8, 14, 15, 16, 17, 20, 24, or 32) + use_dsa: Whether to use DSA for DMA operations + """ + host = list(hosts.values())[0] + + if not hasattr(host, "vfs") or len(host.vfs) < 2: + pytest.skip("Test requires at least 2 VFs on host") + + media_config = MEDIA_CONFIGS[fps] + media_file_path = f"{media}/{media_config['filename']}" + + rx_vf = host.vfs[0] + tx_vf = host.vfs[1] + + # Configure DSA if enabled + dsa_device = get_dsa_device_for_nic(rx_vf) if use_dsa else None + dsa_suffix = "_dsa" if use_dsa else "" + dsa_label = f" with DSA ({dsa_device})" if use_dsa else "" + + logger.info("=" * 80) + logger.info(f"VF RX FPS Variant{dsa_label}: {num_sessions} sessions @ {fps} fps") + logger.info(f"RX={rx_vf}, TX={tx_vf}, Media: {media_config['filename']}") + logger.info("=" * 80) + + # Start companion TX + tx_config_path = f"{build}/tests/vf_rx_fps{fps}_{num_sessions}s{dsa_suffix}_tx_config.json" + tx_app = RxTxApp(app_path=f"{build}/tests/tools/RxTxApp/build", config_file_path=tx_config_path) + + tx_app.create_command( + session_type="st20p", + direction="tx", + test_mode="unicast", + transport_format=media_config["format"], + width=media_config["width"], + height=media_config["height"], + framerate=f"p{fps}", + input_file=media_file_path, + nic_port=tx_vf, + source_ip="192.168.31.101", + destination_ip="192.168.31.102", + port=20000, + replicas=num_sessions, + sch_session_quota=60, + test_time=test_time + 10, + ) + + companion_log = f"{build}/tests/vf_rx_fps{fps}_{num_sessions}s{dsa_suffix}_tx_companion.log" + logger.info(f"Starting companion TX, logs will be saved to: {companion_log}") + logger.info(f"Companion TX will run for {test_time + 10} seconds (test_time={test_time} + 10s buffer)") + tx_process = host.connection.start_process( + f"{tx_app.command} > {companion_log} 2>&1", cwd=build, shell=True + ) + time.sleep(10) + + try: + # Configure RX with optional DSA + rx_config_path = f"{build}/tests/vf_rx_fps{fps}_{num_sessions}s{dsa_suffix}_config.json" + rx_app = RxTxApp(app_path=f"{build}/tests/tools/RxTxApp/build", config_file_path=rx_config_path) + + rx_kwargs = { + "session_type": "st20p", + "direction": "rx", + "test_mode": "unicast", + "transport_format": media_config["format"], + "width": media_config["width"], + "height": media_config["height"], + "framerate": f"p{fps}", + "output_file": "/dev/null", + "nic_port": rx_vf, + "source_ip": "192.168.31.101", + "destination_ip": "192.168.31.102", + "port": 20000, + "replicas": num_sessions, + "sch_session_quota": 60, + "test_time": test_time, + } + + if use_dsa: + rx_kwargs["dma_dev"] = dsa_device + + rx_app.create_command(**rx_kwargs) + + logger.info(f"Running RX{dsa_label}") + + result = run(rx_app.command, cwd=build, timeout=test_time + 60, testcmd=True, host=host) + + # Log RX output to the main test log + logger.info("=" * 80) + logger.info(f"RX Process Output") + logger.info("=" * 80) + if result.stdout_text: + for line in result.stdout_text.splitlines(): + if any(keyword in line.lower() for keyword in ["frame", "session", "error", "fail", "warn", "dma", "mismatch"]): + logger.info(f"RX: {line}") + else: + logger.warning("No RX output captured!") + logger.info("=" * 80) + + # Retrieve companion TX logs before validation + logger.info("=" * 80) + logger.info(f"Companion TX Process Logs") + logger.info("=" * 80) + get_companion_log_summary(host, companion_log, max_lines=50) + logger.info("=" * 80) + + if result.return_code != 0: + log_fail(f"RX process{dsa_label} failed with exit code {result.return_code}") + pytest.fail(f"RX process{dsa_label} failed with exit code {result.return_code}") + + # Validate RX FPS achievement from RX logs (skip first warmup period) + rx_fps_success, rx_fps_successful_count, rx_fps_details = monitor_rx_fps( + result.stdout_text.splitlines(), fps, num_sessions + ) + + # Get RX frame counts from RX logs (measure from beginning) + rx_frame_counts = monitor_rx_frames_simple( + result.stdout_text.splitlines(), num_sessions + ) + + # Wait for companion to finish writing (it runs test_time+10, so should be done) + time.sleep(5) + + # Get TX frame counts from companion TX log (measure from beginning) + tx_frame_counts = {} + try: + companion_result = host.connection.execute_command(f"cat {companion_log}", shell=True) + companion_lines = companion_result.stdout.splitlines() if companion_result.stdout else [] + + # Get TX frame counts (from beginning) + tx_frame_counts = monitor_tx_frames( + companion_lines, num_sessions + ) + except Exception as e: + logger.warning(f"Could not read companion log for frame counts: {e}") + + # Display session results + _display_session_results("RX", dsa_label, num_sessions, fps, rx_fps_details, + tx_frame_counts, rx_frame_counts) + + # Test passes/fails based on RX FPS achievement + if not rx_fps_success: + failure_msg = f"Only {rx_fps_successful_count}/{num_sessions} sessions reached target {fps} fps (min {rx_fps_details['min_required_fps']:.1f} fps){dsa_label}" + log_fail(failure_msg) + pytest.fail(failure_msg) + + logger.log(TEST_PASS, f"RX FPS variant test{dsa_label} passed: {num_sessions} sessions @ {fps} fps") + + finally: + try: + tx_process.stop() + if tx_process.running: + time.sleep(2) + tx_process.kill() + except Exception as e: + logger.warning(f"Error stopping companion TX: {e}") diff --git a/tests/validation/tests/single/performance/test_vf_multisession_rx_fps.py b/tests/validation/tests/single/performance/test_vf_multisession_rx_fps.py new file mode 100644 index 000000000..8b211657c --- /dev/null +++ b/tests/validation/tests/single/performance/test_vf_multisession_rx_fps.py @@ -0,0 +1,222 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +""" +VF Multi-Session RX FPS Performance Tests + +Tests RX performance using Virtual Functions (VFs) on a single NIC. +- RX sessions run on VF0 with companion TX on VF1 (same physical NIC) +- Tests multiple sessions (1, 2, 4, 8, 16) with sch_session_quota=60 +- Performance validated by checking received frames and frame rate +- Test fails if RX sessions don't receive expected traffic +""" + +import logging +import os +import re +import time + +import pytest +from mfd_common_libs.log_levels import TEST_FAIL, TEST_PASS +from mtl_engine.execute import log_fail +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + +logger = logging.getLogger(__name__) + + +def monitor_rx_performance( + log_lines: list, num_sessions: int, expected_frames_min: int = 100 +) -> tuple: + """Monitor RX performance from RxTxApp log output.""" + rx_frames_pattern = re.compile( + r"app_rx_st20p_result\((\d+)\),\s+OK,\s+fps\s+[\d.]+,\s+(\d+)\s+frame received" + ) + + session_frames = {} + for line in log_lines: + match = rx_frames_pattern.search(line) + if match: + session_id = int(match.group(1)) + frames_received = int(match.group(2)) + if ( + session_id not in session_frames + or frames_received > session_frames[session_id] + ): + session_frames[session_id] = frames_received + + successful_sessions = { + sid for sid, frames in session_frames.items() if frames >= expected_frames_min + } + + details = { + "successful_count": len(successful_sessions), + "total_sessions": num_sessions, + "successful_sessions": sorted(list(successful_sessions)), + "session_frames": session_frames, + "expected_frames_min": expected_frames_min, + } + + return len(successful_sessions) == num_sessions, len(successful_sessions), details + + +@pytest.mark.smoke +@pytest.mark.nightly +@pytest.mark.performance +@pytest.mark.parametrize( + "num_sessions", + [1, 2, 4, 8, 16], + ids=["1sess", "2sess", "4sess", "8sess", "16sess"], +) +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["Penguin_1080p"]], + indirect=["media_file"], + ids=["1080p"], +) +def test_vf_multisession_rx_fps( + hosts, + build, + nic_port_list, + test_time, + media_file, + num_sessions, +): + """Test RX multi-session performance using VFs.""" + host = list(hosts.values())[0] + media_file_info, media_file_path = media_file + expected_fps = int(media_file_info["fps"]) + + if num_sessions <= 2: + tolerance = 0.85 + elif num_sessions <= 4: + tolerance = 0.80 + elif num_sessions <= 8: + tolerance = 0.70 + else: + tolerance = 0.50 + + expected_frames_min = int(test_time * expected_fps * tolerance) + + if not hasattr(host, "vfs") or len(host.vfs) < 2: + pytest.skip("Test requires at least 2 VFs on host") + + rx_vf = host.vfs[0] + tx_vf = host.vfs[1] + + logger.info("=" * 80) + logger.info(f"VF Multi-Session RX: {num_sessions} sessions, RX={rx_vf}, TX={tx_vf}") + logger.info(f"Target: {expected_fps} fps, Min frames: {expected_frames_min}") + logger.info("=" * 80) + + tx_config_path = f"{build}/tests/vf_rx_companion_tx_{num_sessions}s_config.json" + tx_app = RxTxApp( + app_path=f"{build}/tests/tools/RxTxApp/build", config_file_path=tx_config_path + ) + + tx_app.create_command( + session_type="st20p", + direction="tx", + test_mode="unicast", + transport_format=media_file_info["format"], + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{expected_fps}", + input_file=media_file_path, + nic_port=tx_vf, + source_ip="192.168.31.101", + destination_ip="192.168.31.102", + port=20000, + replicas=num_sessions, + sch_session_quota=60, + test_time=test_time + 10, + ) + + companion_log_path = f"{build}/tests/vf_rx_companion_tx_{num_sessions}s.log" + logger.info(f"Starting companion TX: {companion_log_path}") + tx_process = host.connection.start_process( + f"{tx_app.command} > {companion_log_path} 2>&1", cwd=build, shell=True + ) + time.sleep(10) + logger.info("Companion TX started") + + try: + rx_config_path = f"{build}/tests/vf_rx_{num_sessions}s_config.json" + rx_app = RxTxApp( + app_path=f"{build}/tests/tools/RxTxApp/build", + config_file_path=rx_config_path, + ) + + rx_app.create_command( + session_type="st20p", + direction="rx", + test_mode="unicast", + transport_format=media_file_info["format"], + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{expected_fps}", + output_file="/dev/null", + nic_port=rx_vf, + source_ip="192.168.31.101", + destination_ip="192.168.31.102", + port=20000, + replicas=num_sessions, + sch_session_quota=60, + test_time=test_time, + ) + + rx_log_path = f"{build}/tests/vf_rx_{num_sessions}s.log" + logger.info(f"Running RX: {rx_log_path}") + + from mtl_engine.execute import run + + result = run( + rx_app.command, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=host, + ) + + with open(rx_log_path, "w") as f: + f.write(result.stdout_text) + + for line in result.stdout_text.splitlines(): + logger.debug(f"[RX] {line}") + + if result.return_code != 0: + log_fail(f"RX process failed with exit code {result.return_code}") + pytest.fail(f"RX process failed with exit code {result.return_code}") + + success, successful_count, rx_details = monitor_rx_performance( + result.stdout_text.splitlines(), num_sessions, expected_frames_min + ) + + logger.info("=" * 80) + logger.info( + f"RX Results: {successful_count}/{num_sessions} sessions successful" + ) + for session_id in range(num_sessions): + frames = rx_details["session_frames"].get(session_id, 0) + status = "✓" if session_id in rx_details["successful_sessions"] else "✗" + logger.info(f" Session {session_id}: {frames} frames {status}") + logger.info("=" * 80) + + if not success: + failure_msg = ( + f"Only {successful_count}/{num_sessions} sessions " + f"received sufficient frames (min {expected_frames_min})" + ) + log_fail(failure_msg) + pytest.fail(failure_msg) + + logger.log(TEST_PASS, f"RX test passed: {num_sessions} sessions") + + finally: + try: + tx_process.stop() + if tx_process.running: + time.sleep(2) + tx_process.kill() + except Exception as e: + logger.warning(f"Error stopping companion TX: {e}") diff --git a/tests/validation/tests/single/performance/test_vf_multisession_tx_fps.py b/tests/validation/tests/single/performance/test_vf_multisession_tx_fps.py new file mode 100644 index 000000000..f8388696e --- /dev/null +++ b/tests/validation/tests/single/performance/test_vf_multisession_tx_fps.py @@ -0,0 +1,221 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +""" +VF Multi-Session TX FPS Performance Tests + +Tests TX performance using Virtual Functions (VFs) on a single NIC. +- TX sessions run on VF0 with companion RX on VF1 (same physical NIC) +- Tests multiple sessions (1, 2, 4, 8, 16) with sch_session_quota=60 +- Performance validated by checking FPS in live logs +- Test fails if FPS drops below target (with 2 fps tolerance) +""" + +import logging +import os +import re +import threading +import time + +import pytest +from mfd_common_libs.log_levels import TEST_FAIL, TEST_PASS +from mtl_engine.execute import log_fail +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + +logger = logging.getLogger(__name__) + + +def monitor_fps_in_logs( + log_lines: list, expected_fps: float, num_sessions: int, fps_tolerance: float = 2.0 +) -> tuple: + """Monitor FPS from RxTxApp log output.""" + fps_pattern = re.compile( + r"TX_VIDEO_SESSION\(\d+,(\d+):app_tx_st20p_(\d+)\):\s+fps\s+([\d.]+)" + ) + + successful_sessions = set() + session_fps_history = {} + + for line in log_lines: + match = fps_pattern.search(line) + if match: + session_id = int(match.group(2)) + actual_fps = float(match.group(3)) + + if session_id not in session_fps_history: + session_fps_history[session_id] = [] + session_fps_history[session_id].append(actual_fps) + + if abs(actual_fps - expected_fps) <= fps_tolerance: + successful_sessions.add(session_id) + + details = { + "successful_count": len(successful_sessions), + "total_sessions": num_sessions, + "successful_sessions": sorted(list(successful_sessions)), + "session_fps_history": session_fps_history, + } + + return len(successful_sessions) == num_sessions, len(successful_sessions), details + + +@pytest.mark.smoke +@pytest.mark.nightly +@pytest.mark.performance +@pytest.mark.parametrize( + "num_sessions", + [1, 2, 4, 8, 16], + ids=["1sess", "2sess", "4sess", "8sess", "16sess"], +) +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["Penguin_1080p"]], + indirect=["media_file"], + ids=["1080p"], +) +def test_vf_multisession_tx_fps( + hosts, + build, + nic_port_list, + test_time, + media_file, + num_sessions, +): + """Test TX multi-session performance using VFs with FPS validation.""" + host = list(hosts.values())[0] + media_file_info, media_file_path = media_file + expected_fps = int(media_file_info["fps"]) + + if not hasattr(host, "vfs") or len(host.vfs) < 2: + pytest.skip("Test requires at least 2 VFs on host") + + tx_vf = host.vfs[0] + rx_vf = host.vfs[1] + + logger.info("=" * 80) + logger.info(f"VF Multi-Session TX: {num_sessions} sessions, TX={tx_vf}, RX={rx_vf}") + logger.info(f"Target: {expected_fps} fps") + logger.info("=" * 80) + + rx_config_path = f"{build}/tests/vf_tx_companion_rx_{num_sessions}s_config.json" + rx_app = RxTxApp( + app_path=f"{build}/tests/tools/RxTxApp/build", config_file_path=rx_config_path + ) + + rx_app.create_command( + session_type="st20p", + direction="rx", + test_mode="unicast", + transport_format=media_file_info["format"], + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{expected_fps}", + output_file="/dev/null", + nic_port=rx_vf, + source_ip="192.168.30.101", + destination_ip="192.168.30.102", + port=20000, + replicas=num_sessions, + sch_session_quota=60, + test_time=test_time + 10, + ) + + companion_log_path = f"{build}/tests/vf_tx_companion_rx_{num_sessions}s.log" + logger.info(f"Starting companion RX: {companion_log_path}") + rx_process = host.connection.start_process( + f"{rx_app.command} > {companion_log_path} 2>&1", cwd=build, shell=True + ) + time.sleep(10) + logger.info("Companion RX started") + + try: + tx_config_path = f"{build}/tests/vf_tx_{num_sessions}s_config.json" + tx_app = RxTxApp( + app_path=f"{build}/tests/tools/RxTxApp/build", + config_file_path=tx_config_path, + ) + + tx_app.create_command( + session_type="st20p", + direction="tx", + test_mode="unicast", + transport_format=media_file_info["format"], + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{expected_fps}", + input_file=media_file_path, + nic_port=tx_vf, + source_ip="192.168.30.101", + destination_ip="192.168.30.102", + port=20000, + replicas=num_sessions, + sch_session_quota=60, + test_time=test_time, + ) + + tx_log_path = f"{build}/tests/vf_tx_{num_sessions}s.log" + logger.info(f"Running TX: {tx_log_path}") + + from mtl_engine.execute import run + + result = run( + tx_app.command, + cwd=build, + timeout=test_time + 60, + testcmd=True, + host=host, + ) + + with open(tx_log_path, "w") as f: + f.write(result.stdout_text) + + for line in result.stdout_text.splitlines(): + logger.debug(f"[TX] {line}") + + if result.return_code != 0: + log_fail(f"TX process failed with exit code {result.return_code}") + pytest.fail(f"TX process failed with exit code {result.return_code}") + + success, successful_count, fps_details = monitor_fps_in_logs( + result.stdout_text.splitlines(), expected_fps, num_sessions + ) + + logger.info("=" * 80) + logger.info( + f"TX FPS Results: {successful_count}/{num_sessions} sessions at target" + ) + for session_id, fps_history in fps_details["session_fps_history"].items(): + if fps_history: + avg_fps = sum(fps_history) / len(fps_history) + min_fps = min(fps_history) + max_fps = max(fps_history) + status = ( + "✓" if session_id in fps_details["successful_sessions"] else "✗" + ) + logger.info( + f" Session {session_id}: avg={avg_fps:.1f}, " + f"min={min_fps:.1f}, max={max_fps:.1f} fps {status}" + ) + logger.info("=" * 80) + + if not success: + failure_msg = ( + f"Only {successful_count}/{num_sessions} sessions " + f"reached target {expected_fps} fps (±2 tolerance)" + ) + log_fail(failure_msg) + pytest.fail(failure_msg) + + logger.log( + TEST_PASS, f"TX test passed: {num_sessions} sessions @ {expected_fps} fps" + ) + + finally: + try: + rx_process.stop() + if rx_process.running: + time.sleep(2) + rx_process.kill() + except Exception as e: + logger.warning(f"Error stopping companion RX: {e}") diff --git a/tests/validation/tests/single/st20p/format/test_format_refactored.py b/tests/validation/tests/single/st20p/format/test_format_refactored.py new file mode 100644 index 000000000..4071e7e30 --- /dev/null +++ b/tests/validation/tests/single/st20p/format/test_format_refactored.py @@ -0,0 +1,234 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422p10le, yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.smoke +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422p10le.values()), + indirect=["media_file"], + ids=list(yuv_files_422p10le.keys()), +) +def test_422p10le_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """Send files in YUV422PLANAR10LE format converting to transport format YUV_422_10bit""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = f"test_format_refactored_{media_file_info['filename']}" + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + capture_cfg=capture_cfg, + ) + + +# List of supported formats based on st_frame_fmt_from_transport() +pixel_formats = dict( + YUV_422_10bit=("ST20_FMT_YUV_422_10BIT", "YUV422RFC4175PG2BE10"), + YUV_422_8bit=("ST20_FMT_YUV_422_8BIT", "UYVY"), + YUV_422_12bit=("ST20_FMT_YUV_422_12BIT", "YUV422RFC4175PG2BE12"), + YUV_444_10bit=("ST20_FMT_YUV_444_10BIT", "YUV444RFC4175PG4BE10"), + YUV_444_12bit=("ST20_FMT_YUV_444_12BIT", "YUV444RFC4175PG2BE12"), + YUV_420_8bit=("ST20_FMT_YUV_420_8BIT", "YUV420CUSTOM8"), + RGB_8bit=("ST20_FMT_RGB_8BIT", "RGB8"), + RGB_10bit=("ST20_FMT_RGB_10BIT", "RGBRFC4175PG4BE10"), + RGB_12bit=("ST20_FMT_RGB_12BIT", "RGBRFC4175PG2BE12"), + YUV_422_PLANAR10LE=("ST20_FMT_YUV_422_PLANAR10LE", "YUV422PLANAR10LE"), + V210=("ST20_FMT_V210", "V210"), +) + + +# List of supported one-way convertions based on st_frame_get_converter() +convert1_formats = dict( + UYVY="UYVY", + YUV422PLANAR8="YUV422PLANAR8", + YUV420PLANAR8="YUV420PLANAR8", +) + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["Penguin_1080p"]], + indirect=["media_file"], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("format", convert1_formats.keys()) +def test_convert_on_rx_refactored( + hosts, build, media, nic_port_list, test_time, format, media_file +): + """Send file in YUV_422_10bit pixel formats with supported conversion on RX side""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", + pixel_format="YUV422RFC4175PG2BE10", + transport_format="YUV_422_10bit", + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + ) + + +# List of supported two-way convertions based on st_frame_get_converter() +convert2_formats = dict( + V210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + Y210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + YUV422PLANAR12LE=( + "ST20_FMT_YUV_422_12BIT", + "YUV_422_12bit", + "YUV422RFC4175PG2BE12", + ), + YUV444PLANAR10LE=( + "ST20_FMT_YUV_444_10BIT", + "YUV_444_10bit", + "YUV444RFC4175PG4BE10", + ), + YUV444PLANAR12LE=( + "ST20_FMT_YUV_444_12BIT", + "YUV_444_12bit", + "YUV444RFC4175PG2BE12", + ), + GBRPLANAR10LE=("ST20_FMT_RGB_10BIT", "RGB_10bit", "RGBRFC4175PG4BE10"), + GBRPLANAR12LE=("ST20_FMT_RGB_12BIT", "RGB_12bit", "RGBRFC4175PG2BE12"), +) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", convert2_formats.keys()) +def test_tx_rx_conversion_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + media_file, +): + """Send file in different pixel formats with supported two-way conversion on TX and RX""" + media_file_info, media_file_path = media_file + text_format, transport_format, _ = convert2_formats[format] + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", + pixel_format=format, + transport_format=transport_format, + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + ) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", pixel_formats.keys()) +def test_formats_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + test_config, + prepare_ramdisk, + media_file, +): + """Send file in different supported pixel formats without conversion during transport""" + media_file_info, media_file_path = media_file + text_format, file_format = pixel_formats[format] + host = list(hosts.values())[0] + + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = f"test_format_refactored_formats_{format}" + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", + pixel_format=file_format, + transport_format=format, + input_file=media_file_path, + test_time=test_time, + ) + + app.execute_test( + build=build, + test_time=test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/fps/test_fps_refactored.py b/tests/validation/tests/single/st20p/fps/test_fps_refactored.py new file mode 100644 index 000000000..072e60e9f --- /dev/null +++ b/tests/validation/tests/single/st20p/fps/test_fps_refactored.py @@ -0,0 +1,76 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["ParkJoy_1080p"]], + indirect=["media_file"], + ids=["ParkJoy_1080p"], +) +@pytest.mark.parametrize( + "fps", + [ + "p23", + "p24", + "p25", + pytest.param("p29", marks=pytest.mark.smoke), + "p30", + "p50", + "p59", + "p60", + "p100", + "p119", + "p120", + ], +) +def test_fps_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + fps, + prepare_ramdisk, + media_file, +): + """Test different frame rates""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port_list": host.vfs, + "test_mode": "multicast", + "destination_ip": "239.168.48.9", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": fps, + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_time": test_time, + } + + if fps in ["p30", "p50", "p59", "p60"]: + config_params.update({"pacing": "gap", "tx_no_chain": True}) + elif fps in ["p100", "p119", "p120"]: + config_params.update({"pacing": "linear", "tx_no_chain": True}) + + app.create_command(**config_params) + + actual_test_time = test_time + if fps in ["p30", "p50", "p59", "p60"]: + actual_test_time = max(test_time, 15) + elif fps in ["p100", "p119", "p120"]: + actual_test_time = max(test_time, 10) + + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py b/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py new file mode 100644 index 000000000..0df6bbb7f --- /dev/null +++ b/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py @@ -0,0 +1,91 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import logging +import os + +import pytest +from mfd_common_libs.log_levels import TEST_PASS +from mtl_engine.const import LOG_FOLDER +from mtl_engine.execute import log_fail +from mtl_engine.integrity import calculate_yuv_frame_size, check_st20p_integrity +from mtl_engine.media_files import yuv_files_422p10le, yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + +logger = logging.getLogger(__name__) + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Penguin_720p"], + yuv_files_422rfc10["Penguin_1080p"], + pytest.param(yuv_files_422p10le["Penguin_720p"], marks=pytest.mark.nightly), + yuv_files_422p10le["Penguin_1080p"], + ], + indirect=["media_file"], + ids=[ + "Penguin_720p_422rfc10", + "Penguin_1080p_422rfc10", + "Penguin_720p_422p10le", + "Penguin_1080p_422p10le", + ], +) +def test_integrity_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test video integrity by comparing input and output files""" + media_file_info, media_file_path = media_file + + log_dir = os.path.join(os.getcwd(), LOG_FOLDER, "latest") + os.makedirs(log_dir, exist_ok=True) + out_file_url = os.path.join(log_dir, "out.yuv") + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + app.create_command( + session_type="st20p", + nic_port=host.vfs[0] if host.vfs else "0000:31:01.0", + nic_port_list=host.vfs, + source_ip="192.168.17.101", + destination_ip="192.168.17.102", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p25", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + input_file=media_file_path, + output_file=out_file_url, + test_mode="unicast", + pacing="linear", + test_time=test_time, + ) + + actual_test_time = max(test_time, 8) + app.execute_test(build=build, test_time=actual_test_time, host=host) + + frame_size = calculate_yuv_frame_size( + media_file_info["width"], + media_file_info["height"], + media_file_info["file_format"], + ) + result = check_st20p_integrity( + src_url=media_file_path, out_url=out_file_url, frame_size=frame_size + ) + + if result: + logger.log(TEST_PASS, "INTEGRITY PASS") + else: + log_fail("INTEGRITY FAIL") + raise AssertionError( + "st20p integrity test failed content integrity comparison." + ) diff --git a/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py b/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py new file mode 100644 index 000000000..7ecbaaf95 --- /dev/null +++ b/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py @@ -0,0 +1,53 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_interlace +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_interlace.values()), + indirect=["media_file"], + ids=list(yuv_files_interlace.keys()), +) +def test_interlace_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test interlaced video transmission""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", + "destination_ip": "192.168.17.102", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "unicast", + "interlaced": True, + "pacing": "linear", + "tx_no_chain": False, + "test_time": test_time, + } + + app.create_command(**config_params) + actual_test_time = max(test_time, 10) + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py b/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py new file mode 100644 index 000000000..c585c1696 --- /dev/null +++ b/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize("pacing", ["narrow", "wide", "linear"]) +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_pacing_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + pacing, + prepare_ramdisk, + media_file, +): + """Test different pacing modes (narrow, wide, linear)""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", + "destination_ip": "192.168.17.102", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "unicast", + "pacing": pacing, + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + if height >= 2160: + config_params["tx_no_chain"] = True if pacing == "linear" else False + actual_test_time = max(test_time, 12) + elif pacing == "narrow": + config_params["tx_no_chain"] = False + actual_test_time = max(test_time, 8) + else: + actual_test_time = max(test_time, 8) + + app.create_command(**config_params) + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/packing/test_packing_refactored.py b/tests/validation/tests/single/st20p/packing/test_packing_refactored.py new file mode 100644 index 000000000..63a3ff5fe --- /dev/null +++ b/tests/validation/tests/single/st20p/packing/test_packing_refactored.py @@ -0,0 +1,67 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize("packing", ["GPM_SL", "GPM"]) +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_packing_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + packing, + prepare_ramdisk, + media_file, +): + """Test different packing modes (GPM_SL, GPM)""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", + "destination_ip": "192.168.17.102", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "unicast", + "packing": packing, + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + if height >= 2160: + if packing == "GPM_SL": + config_params.update({"tx_no_chain": True, "pacing": "linear"}) + else: + config_params.update({"tx_no_chain": False, "pacing": "wide"}) + actual_test_time = max(test_time, 12) + else: + config_params["pacing"] = "linear" if packing == "GPM_SL" else "narrow" + actual_test_time = max(test_time, 8) + + app.create_command(**config_params) + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py b/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py new file mode 100644 index 000000000..6e7dcb7cd --- /dev/null +++ b/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py @@ -0,0 +1,70 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422rfc10.values()), + indirect=["media_file"], + ids=list(yuv_files_422rfc10.keys()), +) +def test_resolutions_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test different video resolutions""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "destination_ip": "239.168.48.9", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "multicast", + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + + if height >= 2160: + config_params.update( + {"pacing": "linear", "packing": "GPM_SL", "tx_no_chain": True} + ) + elif height >= 1080: + config_params.update({"pacing": "wide", "packing": "GPM", "tx_no_chain": False}) + else: + config_params.update( + {"pacing": "narrow", "packing": "GPM", "tx_no_chain": False} + ) + + app.create_command(**config_params) + + actual_test_time = test_time + if height >= 2160: + actual_test_time = max(test_time, 15) + elif height >= 1080: + actual_test_time = max(test_time, 10) + else: + actual_test_time = max(test_time, 8) + + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py b/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py new file mode 100644 index 000000000..a84defd5f --- /dev/null +++ b/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py @@ -0,0 +1,66 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.media_files import yuv_files_422rfc10 +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_multicast_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + prepare_ramdisk, + media_file, +): + """Test multicast transmission mode""" + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + app = RxTxApp(f"{build}/tests/tools/RxTxApp/build") + + config_params = { + "session_type": "st20p", + "nic_port_list": host.vfs, + "destination_ip": "239.168.48.9", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "input_file": media_file_path, + "test_mode": "multicast", + "test_time": test_time, + } + + height = media_file_info.get("height", 0) + if height >= 2160: + config_params.update( + {"pacing": "linear", "packing": "GPM_SL", "tx_no_chain": True} + ) + actual_test_time = max(test_time, 15) + elif height >= 1080: + config_params.update({"pacing": "wide", "packing": "GPM", "tx_no_chain": False}) + actual_test_time = max(test_time, 10) + else: + config_params.update( + {"pacing": "narrow", "packing": "GPM", "tx_no_chain": False} + ) + actual_test_time = max(test_time, 8) + + app.create_command(**config_params) + app.execute_test(build=build, test_time=actual_test_time, host=host) diff --git a/tests/validation/tests/single/st22p/codec/test_codec_refactored.py b/tests/validation/tests/single/st22p/codec/test_codec_refactored.py new file mode 100644 index 000000000..be49876db --- /dev/null +++ b/tests/validation/tests/single/st22p/codec/test_codec_refactored.py @@ -0,0 +1,52 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("codec", ["JPEG-XS", "H264_CBR"]) +def test_codec_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + codec, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec=codec, + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/format/test_format_refactored.py b/tests/validation/tests/single/st22p/format/test_format_refactored.py new file mode 100644 index 000000000..cb236be29 --- /dev/null +++ b/tests/validation/tests/single/st22p/format/test_format_refactored.py @@ -0,0 +1,51 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.nightly +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422p10le.values()), + ids=list(yuv_files_422p10le.keys()), +) +def test_format_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/fps/test_fps_refactored.py b/tests/validation/tests/single/st22p/fps/test_fps_refactored.py new file mode 100644 index 000000000..3e39e63d9 --- /dev/null +++ b/tests/validation/tests/single/st22p/fps/test_fps_refactored.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize( + "fps", + [ + "p23", + "p24", + pytest.param("p25", marks=pytest.mark.nightly), + "p29", + "p30", + "p50", + "p59", + "p60", + "p100", + "p119", + "p120", + ], +) +@pytest.mark.parametrize("codec", ["JPEG-XS", "H264_CBR"]) +def test_fps_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + fps, + codec, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=fps, + codec=codec, + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=16, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/interlace/test_interlace_refactored.py b/tests/validation/tests/single/st22p/interlace/test_interlace_refactored.py new file mode 100644 index 000000000..7c5473a0e --- /dev/null +++ b/tests/validation/tests/single/st22p/interlace/test_interlace_refactored.py @@ -0,0 +1,50 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_interlace +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + list(yuv_files_interlace.values()), + ids=list(yuv_files_interlace.keys()), +) +def test_interlace_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + interlaced=True, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host) diff --git a/tests/validation/tests/single/st22p/quality/test_quality_refactored.py b/tests/validation/tests/single/st22p/quality/test_quality_refactored.py new file mode 100644 index 000000000..4ebfcdf0e --- /dev/null +++ b/tests/validation/tests/single/st22p/quality/test_quality_refactored.py @@ -0,0 +1,67 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os +import shutil + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("quality", ["quality", "speed"]) +@pytest.mark.nightly +def test_quality_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + quality, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + # Ensure kahawai.json (plugin configuration) is available in build cwd so st22 encoder can load plugins + kahawai_src = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../../../../..", "kahawai.json") + ) + kahawai_dst = os.path.join(build, "kahawai.json") + try: + if os.path.exists(kahawai_src) and not os.path.exists(kahawai_dst): + shutil.copy2(kahawai_src, kahawai_dst) + except Exception as e: + print(f"Warning: failed to stage kahawai.json into build dir: {e}") + app.create_command( + session_type="st22p", + test_mode="multicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality=quality, + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + result = app.execute_test(build=build, test_time=test_time, host=host) + # Enforce result to avoid silent pass when validation fails + assert ( + result + ), "Refactored st22p quality test failed validation (TX/RX outputs or return code)." diff --git a/tests/validation/tests/single/st22p/test_mode/test_unicast_refactored.py b/tests/validation/tests/single/st22p/test_mode/test_unicast_refactored.py new file mode 100644 index 000000000..e26d39609 --- /dev/null +++ b/tests/validation/tests/single/st22p/test_mode/test_unicast_refactored.py @@ -0,0 +1,49 @@ +# SPDX-License-Identifier: BSD-3-Clause + +import os + +import pytest +from mtl_engine.media_files import yuv_files_422p10le +from mtl_engine.rxtxapp import RxTxApp + + +@pytest.mark.refactored +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + ids=["Penguin_1080p"], +) +def test_unicast_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + media_file_info = media_file + host = list(hosts.values())[0] + + app = RxTxApp(app_path="./tests/tools/RxTxApp/build") + input_path = ( + os.path.join(media, media_file_info["filename"]) + if media + else media_file_info["filename"] + ) + app.create_command( + session_type="st22p", + test_mode="unicast", + nic_port_list=host.vfs, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + codec="JPEG-XS", + quality="speed", + pixel_format=media_file_info["file_format"], + input_file=input_path, + codec_threads=2, + test_time=test_time, + ) + app.execute_test(build=build, test_time=test_time, host=host)