diff --git a/tests/validation/mtl_engine/app.py b/tests/validation/mtl_engine/app.py new file mode 100644 index 000000000..3caa696b5 --- /dev/null +++ b/tests/validation/mtl_engine/app.py @@ -0,0 +1,19 @@ +# Media Transport Library Application Classes +# Direct imports for RxTxApp, FFmpeg, and GStreamer implementations + +# Import the specific application classes +from .application_base import Application as BaseApplication +from .rxtxapp import RxTxApp +from .ffmpeg import FFmpeg +from .gstreamer import GStreamer + +# Export all classes for direct use +__all__ = [ + 'BaseApplication', # Abstract base class + 'RxTxApp', # RxTxApp implementation + 'FFmpeg', # FFmpeg implementation + 'GStreamer', # GStreamer implementation +] + +# For convenience, you can also import Application as the base class +Application = BaseApplication \ No newline at end of file diff --git a/tests/validation/mtl_engine/application_base.py b/tests/validation/mtl_engine/application_base.py new file mode 100644 index 000000000..89fad7f7f --- /dev/null +++ b/tests/validation/mtl_engine/application_base.py @@ -0,0 +1,303 @@ +# Base Application Class for Media Transport Library +# Provides common interface for all media application frameworks + +import json +import logging +import time +import os +from abc import ABC, abstractmethod + +from .config.universal_params import UNIVERSAL_PARAMS +from .config.app_mappings import ( + DEFAULT_NETWORK_CONFIG, + DEFAULT_PORT_CONFIG, + DEFAULT_PAYLOAD_TYPE_CONFIG, +) + +# Import execution utilities with fallback +try: + from .execute import log_fail, run, is_process_running + from .RxTxApp import prepare_tcpdump +except ImportError: + # Fallback for direct execution + from execute import log_fail, run, is_process_running + from RxTxApp import prepare_tcpdump + +logger = logging.getLogger(__name__) + + +class Application(ABC): + """ + Abstract base class for all media application frameworks. + Provides common functionality and interface that all child classes must implement. + """ + + def __init__(self, app_path, config_file_path=None): + """Initialize application with path to application directory and optional config file.""" + self.app_path = app_path # Path to directory containing the application + self.config_file_path = config_file_path + self.universal_params = UNIVERSAL_PARAMS.copy() + self._user_provided_params = set() + + @abstractmethod + def get_framework_name(self) -> str: + """Return the framework name (e.g., 'RxTxApp', 'FFmpeg', 'GStreamer').""" + pass + + @abstractmethod + def get_executable_name(self) -> str: + """Return the executable name for this framework.""" + pass + + @abstractmethod + def create_command(self, **kwargs) -> tuple: + """ + Create command line and config files for the application framework. + + Args: + **kwargs: Universal parameter names and values + + Returns: + Tuple of (command_string, config_dict_or_none) + """ + pass + + @abstractmethod + def validate_results(self, *args, **kwargs) -> bool: + """Validate test results for the specific framework.""" + pass + + def set_universal_params(self, **kwargs): + """Set universal parameters and track which were provided by user.""" + self._user_provided_params = set(kwargs.keys()) + + for param, value in kwargs.items(): + if param in self.universal_params: + self.universal_params[param] = value + else: + raise ValueError(f"Unknown universal parameter: {param}") + + def get_executable_path(self) -> str: + """Get the full path to the executable based on framework type.""" + executable_name = self.get_executable_name() + + # For applications with specific paths, combine with directory + if self.app_path and not executable_name.startswith('/'): + if self.app_path.endswith("/"): + return f"{self.app_path}{executable_name}" + else: + return f"{self.app_path}/{executable_name}" + else: + # For system executables or full paths + return executable_name + + def was_user_provided(self, param_name: str) -> bool: + """Check if a parameter was explicitly provided by the user.""" + return param_name in self._user_provided_params + + def get_session_default_port(self, session_type: str) -> int: + """Get default port for a specific session type.""" + port_map = { + "st20p": DEFAULT_PORT_CONFIG["st20p_port"], + "st22p": DEFAULT_PORT_CONFIG["st22p_port"], + "st30p": DEFAULT_PORT_CONFIG["st30p_port"], + "video": DEFAULT_PORT_CONFIG["video_port"], + "audio": DEFAULT_PORT_CONFIG["audio_port"], + "ancillary": DEFAULT_PORT_CONFIG["ancillary_port"], + "fastmetadata": DEFAULT_PORT_CONFIG["fastmetadata_port"] + } + return port_map.get(session_type, DEFAULT_PORT_CONFIG["st20p_port"]) + + def get_session_default_payload_type(self, session_type: str) -> int: + """Get default payload type for a specific session type.""" + payload_map = { + "st20p": DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"], + "st22p": DEFAULT_PAYLOAD_TYPE_CONFIG["st22p_payload_type"], + "st30p": DEFAULT_PAYLOAD_TYPE_CONFIG["st30p_payload_type"], + "video": DEFAULT_PAYLOAD_TYPE_CONFIG["video_payload_type"], + "audio": DEFAULT_PAYLOAD_TYPE_CONFIG["audio_payload_type"], + "ancillary": DEFAULT_PAYLOAD_TYPE_CONFIG["ancillary_payload_type"], + "fastmetadata": DEFAULT_PAYLOAD_TYPE_CONFIG["fastmetadata_payload_type"] + } + return payload_map.get(session_type, DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"]) + + def get_common_session_params(self, session_type: str) -> dict: + """Get common session parameters used across all session types.""" + default_port = self.get_session_default_port(session_type) + default_payload = self.get_session_default_payload_type(session_type) + + return { + "replicas": self.universal_params.get("replicas", UNIVERSAL_PARAMS["replicas"]), + "start_port": int(self.universal_params.get("port") if self.was_user_provided("port") else default_port), + "payload_type": self.universal_params.get("payload_type") if self.was_user_provided("payload_type") else default_payload + } + + def get_common_video_params(self) -> dict: + """Get common video parameters used across video session types.""" + return { + "width": int(self.universal_params.get("width", UNIVERSAL_PARAMS["width"])), + "height": int(self.universal_params.get("height", UNIVERSAL_PARAMS["height"])), + "interlaced": self.universal_params.get("interlaced", UNIVERSAL_PARAMS["interlaced"]), + "device": self.universal_params.get("device", UNIVERSAL_PARAMS["device"]), + "enable_rtcp": self.universal_params.get("enable_rtcp", UNIVERSAL_PARAMS["enable_rtcp"]) + } + + def execute_test(self, + build: str, + test_time: int = 30, + host=None, + tx_host=None, + rx_host=None, + input_file: str = None, + output_file: str = None, + fail_on_error: bool = True, + virtio_user: bool = False, + rx_timing_parser: bool = False, + ptp: bool = False, + capture_cfg=None, + sleep_interval: int = 4, + tx_first: bool = True, + output_format: str = "yuv", + **kwargs) -> bool: + """ + Universal test execution method that handles all frameworks and test scenarios. + Uses the current Application instance's commands and configuration. + """ + # Determine if this is a dual host test + is_dual = tx_host is not None and rx_host is not None + framework_name = self.get_framework_name().lower() + + if is_dual: + logger.info(f"Executing dual host {framework_name} test") + tx_remote_host = tx_host + rx_remote_host = rx_host + return self._execute_dual_host_test( + build, test_time, tx_remote_host, rx_remote_host, + input_file, output_file, fail_on_error, capture_cfg, + sleep_interval, tx_first, output_format, **kwargs + ) + else: + logger.info(f"Executing single host {framework_name} test") + remote_host = host + return self._execute_single_host_test( + build, test_time, remote_host, input_file, output_file, + fail_on_error, virtio_user, rx_timing_parser, ptp, + capture_cfg, **kwargs + ) + + # ------------------------- + # Common helper utilities + # ------------------------- + def add_timeout(self, command: str, test_time: int, grace: int = None) -> str: + """Wrap command with timeout if test_time provided (adds a grace period).""" + if grace is None: + grace = self.universal_params.get("timeout_grace", 10) + if test_time: + if not command.strip().startswith("timeout "): + return f"timeout {test_time + grace} {command}" + return command + + def start_and_capture(self, command: str, build: str, test_time: int, host, process_name: str): + """Start a single process and capture its stdout safely.""" + process = self.start_process(command, build, test_time, host) + output = self.capture_stdout(process, process_name) + return process, output + + def start_dual_with_delay(self, tx_command: str, rx_command: str, build: str, test_time: int, + tx_host, rx_host, tx_first: bool, sleep_interval: int, + tx_name: str, rx_name: str): + """Start two processes with an optional delay ordering TX/RX based on tx_first flag.""" + if tx_first: + tx_process = self.start_process(tx_command, build, test_time, tx_host) + time.sleep(sleep_interval) + rx_process = self.start_process(rx_command, build, test_time, rx_host) + else: + rx_process = self.start_process(rx_command, build, test_time, rx_host) + time.sleep(sleep_interval) + tx_process = self.start_process(tx_command, build, test_time, tx_host) + tx_output = self.capture_stdout(tx_process, tx_name) + rx_output = self.capture_stdout(rx_process, rx_name) + return (tx_process, rx_process, tx_output, rx_output) + + def extract_framerate(self, framerate_str, default: int = None) -> int: + """Extract numeric framerate from various string or numeric forms (e.g. 'p25', '60').""" + if default is None: + default = self.universal_params.get("default_framerate_numeric", 60) + if isinstance(framerate_str, (int, float)): + try: + return int(framerate_str) + except Exception: + return default + if not isinstance(framerate_str, str) or not framerate_str: + return default + if framerate_str.startswith('p') and len(framerate_str) > 1: + num = framerate_str[1:] + else: + num = framerate_str + try: + return int(float(num)) + except ValueError: + logger.warning(f"Could not parse framerate '{framerate_str}', defaulting to {default}") + return default + + @abstractmethod + def _execute_single_host_test(self, build: str, test_time: int, host, + input_file: str, output_file: str, fail_on_error: bool, + virtio_user: bool, rx_timing_parser: bool, ptp: bool, + capture_cfg, **kwargs) -> bool: + """Execute single host test - implementation specific to each framework.""" + pass + + @abstractmethod + def _execute_dual_host_test(self, build: str, test_time: int, tx_host, rx_host, + input_file: str, output_file: str, fail_on_error: bool, + capture_cfg, sleep_interval: int, tx_first: bool, + output_format: str, **kwargs) -> bool: + """Execute dual host test - implementation specific to each framework.""" + pass + + def start_process(self, command: str, build: str, test_time: int, host): + """Start a process on the specified host.""" + logger.info(f"Starting {self.get_framework_name()} process...") + buffer_val = self.universal_params.get("process_timeout_buffer", 90) + timeout = (test_time or 0) + buffer_val + return run(command, host=host, cwd=build, timeout=timeout) + + def capture_stdout(self, process, process_name: str) -> str: + """Capture stdout from a process.""" + try: + # Remote process objects (from mfd_connect) expose stdout via 'stdout_text' + if hasattr(process, 'stdout_text') and process.stdout_text: + output = process.stdout_text + logger.debug(f"{process_name} output (captured stdout_text): {output[:200]}...") + return output + # Local fallback (subprocess) may expose .stdout already consumed elsewhere + if hasattr(process, 'stdout') and process.stdout: + try: + # Attempt to read if it's a file-like object + if hasattr(process.stdout, 'read'): + output = process.stdout.read() + else: + output = str(process.stdout) + logger.debug(f"{process_name} output (captured stdout): {output[:200]}...") + return output + except Exception: + pass + logger.warning(f"No stdout available for {process_name}") + return "" + except Exception as e: + logger.error(f"Error capturing {process_name} output: {e}") + return "" + + def get_case_id(self) -> str: + """Generate a case ID for logging/debugging purposes.""" + try: + import inspect + frame = inspect.currentframe() + while frame: + if 'test_' in frame.f_code.co_name: + return frame.f_code.co_name + frame = frame.f_back + return "unknown_test" + except: + return "unknown_test" \ No newline at end of file diff --git a/tests/validation/mtl_engine/config/app_mappings.py b/tests/validation/mtl_engine/config/app_mappings.py new file mode 100644 index 000000000..bb1febb55 --- /dev/null +++ b/tests/validation/mtl_engine/config/app_mappings.py @@ -0,0 +1,99 @@ +# Application name mappings and format conversion utilities + +# Map framework names to executable names +APP_NAME_MAP = { + "rxtxapp": "RxTxApp", + "ffmpeg": "ffmpeg", + "gstreamer": "gst-launch-1.0" +} + +# Format conversion mappings +FFMPEG_FORMAT_MAP = { + "YUV422PLANAR10LE": "yuv422p10le", + "YUV422PLANAR8": "yuv422p", + "YUV420PLANAR8": "yuv420p", + "YUV420PLANAR10LE": "yuv420p10le", + "RGB24": "rgb24", + "RGBA": "rgba" +} + +SESSION_TYPE_MAP = { + "ffmpeg": { + "st20p": "mtl_st20p", + "st22p": "mtl_st22p", + "st30p": "mtl_st30p", + "video": "rawvideo", + "audio": "pcm_s24le" + }, + "gstreamer": { + "st20p": "mtl_st20p", + "st22p": "mtl_st22p", + "st30p": "mtl_st30p", + "video": "mtl_video", + "audio": "mtl_audio" + } +} + +FRAMERATE_TO_VIDEO_FORMAT_MAP = { + "p60": "i1080p60", + "p59": "i1080p59", + "p50": "i1080p50", + "p30": "i1080p30", + "p29": "i1080p29", + "p25": "i1080p25", + "p24": "i1080p24", + "p23": "i1080p23" +} + +# Default network configuration values +DEFAULT_NETWORK_CONFIG = { + "nic_port": "0000:31:01.0", + "unicast_tx_ip": "192.168.17.101", + "unicast_rx_ip": "192.168.17.102", + "multicast_tx_ip": "192.168.17.101", + "multicast_rx_ip": "192.168.17.102", + "multicast_destination_ip": "239.168.48.9", + "default_config_file": "config.json" +} + +# Default port configuration by session type +DEFAULT_PORT_CONFIG = { + "st20p_port": 20000, + "st22p_port": 20000, + "st30p_port": 30000, + "video_port": 20000, + "audio_port": 30000, + "ancillary_port": 40000, + "fastmetadata_port": 40000 +} + +# Default payload type configuration by session type +DEFAULT_PAYLOAD_TYPE_CONFIG = { + "st20p_payload_type": 112, + "st22p_payload_type": 114, + "st30p_payload_type": 111, + "video_payload_type": 112, + "audio_payload_type": 111, + "ancillary_payload_type": 113, + "fastmetadata_payload_type": 115 +} + +# Default ST22p-specific configuration +DEFAULT_ST22P_CONFIG = { + "framerate": "p25", + "pack_type": "codestream", + "codec": "JPEG-XS", + "quality": "speed", + "codec_threads": 2 +} + +# Default FFmpeg configuration +DEFAULT_FFMPEG_CONFIG = { + "default_pixel_format": "yuv422p10le", + "default_session_type": "mtl_st20p" +} + +# Default GStreamer configuration +DEFAULT_GSTREAMER_CONFIG = { + "default_session_type": "mtl_st20p" +} diff --git a/tests/validation/mtl_engine/config/param_mappings.py b/tests/validation/mtl_engine/config/param_mappings.py new file mode 100644 index 000000000..3239dadc0 --- /dev/null +++ b/tests/validation/mtl_engine/config/param_mappings.py @@ -0,0 +1,167 @@ +# Parameter trans "pixel_format": "input_format", # for TX sessions + # pixel_format_rx removed - now uses pixel_format for both TX and RXtion mappings for different applications +# Maps universal parameter names to application-specific names + +# RxTxApp parameter mapping +RXTXAPP_PARAM_MAP = { + # Network parameters + "source_ip": "ip", + "destination_ip": "dip", + "multicast_ip": "ip", + "port": "start_port", + "nic_port": "name", + + # Video parameters + "width": "width", + "height": "height", + "framerate": "fps", + "interlaced": "interlaced", + # "pixel_format": "format", # unified for both TX and RX sessions + "transport_format": "transport_format", + + # Audio parameters + "audio_format": "audio_format", + "audio_channels": "audio_channel", + "audio_sampling": "audio_sampling", + "audio_ptime": "audio_ptime", + + # Streaming parameters + "payload_type": "payload_type", + "replicas": "replicas", + "pacing": "pacing", + "packing": "packing", + "device": "device", + "codec": "codec", + "quality": "quality", + "codec_threads": "codec_thread_count", + + # File I/O + "input_file": "st20p_url", # for input files + "output_file": "st20p_url", # for output files (RX) + "url": "video_url", # for video files + + # Flags + "enable_rtcp": "enable_rtcp", + "measure_latency": "measure_latency", + "display": "display", + + # RxTxApp specific command-line parameters + "config_file": "--config_file", + "enable_ptp": "--ptp", + "lcores": "--lcores", + "test_time": "--test_time", + "dma_dev": "--dma_dev", + "log_level": "--log_level", + "log_file": "--log_file", + "arp_timeout_s": "--arp_timeout_s", + "allow_across_numa_core": "--allow_across_numa_core", + "no_multicast": "--no_multicast", + "rx_separate_lcore": "--rx_separate_lcore", + "rx_mix_lcore": "--rx_mix_lcore", + "runtime_session": "--runtime_session", + "rx_timing_parser": "--rx_timing_parser", + "pcapng_dump": "--pcapng_dump", + "rx_video_file_frames": "--rx_video_file_frames", + "framebuffer_count": "--rx_video_fb_cnt", + "promiscuous": "--promiscuous", + "cni_thread": "--cni_thread", + "sch_session_quota": "--sch_session_quota", + "p_tx_dst_mac": "--p_tx_dst_mac", + "r_tx_dst_mac": "--r_tx_dst_mac", + "nb_tx_desc": "--nb_tx_desc", + "nb_rx_desc": "--nb_rx_desc", + "tasklet_time": "--tasklet_time", + "tsc": "--tsc", + "pacing_way": "--pacing_way", + "shaping": "--shaping", + "vrx": "--vrx", + "ts_first_pkt": "--ts_first_pkt", + "ts_delta_us": "--ts_delta_us", + "mono_pool": "--mono_pool", + "tasklet_thread": "--tasklet_thread", + "tasklet_sleep": "--tasklet_sleep", + "tasklet_sleep_us": "--tasklet_sleep_us", + "app_bind_lcore": "--app_bind_lcore", + "rxtx_simd_512": "--rxtx_simd_512", + "rss_mode": "--rss_mode", + "tx_no_chain": "--tx_no_chain", + "multi_src_port": "--multi_src_port", + "audio_fifo_size": "--audio_fifo_size", + "dhcp": "--dhcp", + "virtio_user": "--virtio_user", + "phc2sys": "--phc2sys", + "ptp_sync_sys": "--ptp_sync_sys", + "rss_sch_nb": "--rss_sch_nb", + "log_time_ms": "--log_time_ms", + "rx_audio_dump_time_s": "--rx_audio_dump_time_s", + "dedicated_sys_lcore": "--dedicated_sys_lcore", + "bind_numa": "--bind_numa", # unified NUMA parameter (when False, equivalent to --not_bind_numa) + "force_numa": "--force_numa" + } + +############################### +# FFmpeg parameter mapping # +############################### +# These flags correspond to the MTL FFmpeg plugin arguments observed in existing +# test command constructions (see ffmpeg_app.py) and the refactored ffmpeg builder: +# -p_sip (source IP), -p_tx_ip (destination unicast), -p_rx_ip (multicast), +# -udp_port, -p_port (PCI / NIC identifier), -payload_type, -fps, -pix_fmt, +# -video_size, -f (format/session type) +# Width & height both map to -video_size; command builders should coalesce them +# into a single WxH token. framerate maps to -fps (distinct from the input side's +# rawvideo "-framerate" which is handled explicitly in builder code). +FFMPEG_PARAM_MAP = { + # Network parameters + "source_ip": "-p_sip", + "destination_ip": "-p_tx_ip", # TX unicast destination + "multicast_ip": "-p_rx_ip", # RX multicast group + "port": "-udp_port", + "nic_port": "-p_port", + + # Video parameters (width/height combined externally) + "width": "-video_size", + "height": "-video_size", + "framerate": "-fps", + "pixel_format": "-pix_fmt", + + # Streaming parameters + "payload_type": "-payload_type", + "session_type": "-f", # Converted via SESSION_TYPE_MAP + + # File I/O + "input_file": "-i", + "output_file": "", # Output appears last (no explicit flag) +} + +################################# +# GStreamer parameter mapping # +################################# +# Maps universal params to MTL GStreamer element properties or filesrc/filesink +# attributes. These are set as name=value pairs in the pipeline. +GSTREAMER_PARAM_MAP = { + # Network parameters + "source_ip": "dev-ip", # Interface IP + "destination_ip": "ip", # Destination (unicast) IP + "port": "udp-port", # UDP port + "nic_port": "dev-port", # NIC device/PCI identifier + + # Video parameters / caps + "width": "width", + "height": "height", + "framerate": "framerate", + "pixel_format": "format", + + # Audio parameters + "audio_format": "audio-format", + "audio_channels": "channel", + "audio_sampling": "sampling", + + # Streaming parameters + "payload_type": "payload-type", + "queues": "queues", # Currently legacy / advanced usage + "framebuffer_count": "framebuff-cnt", + + # File I/O (filesrc/filesink) + "input_file": "location", + "output_file": "location", +} diff --git a/tests/validation/mtl_engine/config/universal_params.py b/tests/validation/mtl_engine/config/universal_params.py new file mode 100644 index 000000000..2fafe6420 --- /dev/null +++ b/tests/validation/mtl_engine/config/universal_params.py @@ -0,0 +1,115 @@ +# Universal parameter definitions for all media applications +# This serves as the common interface for RxTxApp, FFmpeg, and GStreamer + +UNIVERSAL_PARAMS = { + # Network parameters + "source_ip": None, # Source IP address (interface IP) + "destination_ip": None, # Destination IP address (session IP) + "multicast_ip": None, # Multicast group IP + "port": 20000, # UDP port number + "nic_port": None, # Network interface/port name + "nic_port_list": None, # List of network interfaces/ports for multi-interface configs + + # Video parameters + "width": 1920, # Video width in pixels + "height": 1080, # Video height in pixels + "framerate": "p60", # Frame rate (p25, p30, p50, p60, etc.) + "interlaced": False, # Progressive (False) or Interlaced (True) + "pixel_format": "YUV422PLANAR10LE", # Pixel format for both TX (input) and RX (output) + "transport_format": "YUV_422_10bit", # Transport format for streaming + # Removed: default_video_format – legacy video format mapping now handled directly where needed. + + # Audio parameters + "audio_format": "PCM24", # Audio format + "audio_channels": ["U02"], # Audio channel configuration + "audio_sampling": "96kHz", # Audio sampling rate + "audio_ptime": "1", # Audio packet time + + # Streaming parameters + "payload_type": 112, # RTP payload type + "session_type": "st20p", # Session type (st20p, st22p, st30p, video, audio, etc.) + "direction": None, # Direction: tx (transmit), rx (receive), or None (both for RxTxApp) + "replicas": 1, # Number of session replicas + # Removed: queues – queue count not plumbed through new builders; retaining calculation left to legacy code paths. + "framebuffer_count": None, # Frame buffer count (for RX video: rx_video_fb_cnt) + + # Quality and encoding parameters + "pacing": "gap", # Pacing mode (gap, auto, etc.) + "packing": "BPM", # Packing mode + "device": "AUTO", # Device selection + "codec": "JPEG-XS", # Codec for compressed formats + "quality": "speed", # Quality setting + "codec_threads": 2, # Number of codec threads + + # File I/O parameters + "input_file": None, # Input file path + "output_file": None, # Output file path + # Removed: url – generic video_url not used in refactored path; specific st20p_url/audio_url populated directly. + + # Test configuration + "test_mode": "multicast", # Test mode (unicast, multicast, kernel) + "test_time": 30, # Test duration in seconds + "enable_rtcp": False, # Enable RTCP + "measure_latency": False, # Enable latency measurement + "display": False, # Enable display output + "enable_ptp": False, # Enable PTP synchronization + "virtio_user": False, # Enable virtio-user mode + + # RxTxApp specific parameters + "config_file": None, # JSON config file path + "lcores": None, # DPDK lcore list (e.g., "28,29,30,31") + "dma_dev": None, # DMA device list (e.g., "DMA1,DMA2,DMA3") + "log_level": None, # Log level (debug, info, notice, warning, error) + "log_file": None, # Log file path + "arp_timeout_s": None, # ARP timeout in seconds (default: 60) + "allow_across_numa_core": False, # Allow cores across NUMA nodes + "no_multicast": False, # Disable multicast join message + "rx_separate_lcore": False, # RX video on dedicated lcores + "rx_mix_lcore": False, # Allow TX/RX video on same core + "runtime_session": False, # Start instance before creating sessions + "rx_timing_parser": False, # Enable timing check for video RX streams + "pcapng_dump": None, # Dump n packets to pcapng files + "rx_video_file_frames": None, # Dump received video frames to yuv file + "promiscuous": False, # Enable RX promiscuous mode + "cni_thread": False, # Use dedicated thread for CNI messages + "sch_session_quota": None, # Max sessions count per lcore + "p_tx_dst_mac": None, # Destination MAC for primary port + "r_tx_dst_mac": None, # Destination MAC for redundant port + "nb_tx_desc": None, # Number of TX descriptors per queue + "nb_rx_desc": None, # Number of RX descriptors per queue + "tasklet_time": False, # Enable tasklet running time stats + "tsc": False, # Force TSC pacing + "pacing_way": None, # Pacing way (auto, rl, tsc, tsc_narrow, ptp, tsn) + "shaping": None, # ST21 shaping type (narrow, wide) + "vrx": None, # ST21 vrx value + "ts_first_pkt": False, # Set RTP timestamp at first packet egress + "ts_delta_us": None, # RTP timestamp delta in microseconds + "mono_pool": False, # Use mono pool for all queues + "tasklet_thread": False, # Run tasklet under thread + "tasklet_sleep": False, # Enable sleep if tasklets report done + "tasklet_sleep_us": None, # Sleep microseconds value + "app_bind_lcore": False, # Run app thread on pinned lcore + "rxtx_simd_512": False, # Enable DPDK SIMD 512 path + "rss_mode": None, # RSS mode (l3_l4, l3, none) + "tx_no_chain": False, # Use memcopy instead of mbuf chain + "multi_src_port": False, # Use multiple source ports for ST20 TX + "audio_fifo_size": None, # Audio FIFO size + "dhcp": False, # Enable DHCP for all ports + "phc2sys": False, # Enable built-in phc2sys function + "ptp_sync_sys": False, # Enable PTP to system time sync + "rss_sch_nb": None, # Number of schedulers for RSS dispatch + "log_time_ms": False, # Enable ms accuracy log printer + "rx_audio_dump_time_s": None, # Dump audio frames for n seconds + "dedicated_sys_lcore": False, # Run MTL system tasks on dedicated lcore + "bind_numa": False, # Bind all MTL threads to NIC NUMA (when False, threads run without NUMA awareness) + "force_numa": None, # Force NIC port NUMA ID + + # Execution control defaults (moved from hardcoded literals in engine code) + "sleep_interval": 4, # Delay between starting TX and RX in dual-host tests + "tx_first": True, # Whether to start TX side before RX + # Removed: output_format – validation infers format from pixel_format; explicit label no longer required. + "timeout_grace": 10, # Extra seconds appended to process timeout wrapper + "process_timeout_buffer": 90, # Buffer added to test_time for run() timeout + "pattern_duration": 30, # Duration for generated test patterns (FFmpeg/GStreamer) + "default_framerate_numeric": 60, # Fallback numeric framerate when parsing fails +} diff --git a/tests/validation/mtl_engine/ffmpeg.py b/tests/validation/mtl_engine/ffmpeg.py new file mode 100644 index 000000000..8be6dff7a --- /dev/null +++ b/tests/validation/mtl_engine/ffmpeg.py @@ -0,0 +1,236 @@ +# FFmpeg Implementation for Media Transport Library +# Handles FFmpeg-specific command generation and execution + +import logging +import os +import time + +from .application_base import Application +from .config.universal_params import UNIVERSAL_PARAMS +from .config.app_mappings import ( + APP_NAME_MAP, + FFMPEG_FORMAT_MAP, + SESSION_TYPE_MAP, + FRAMERATE_TO_VIDEO_FORMAT_MAP, + DEFAULT_FFMPEG_CONFIG +) + +logger = logging.getLogger(__name__) + + +class FFmpeg(Application): + """FFmpeg framework implementation for MTL testing.""" + + def get_framework_name(self) -> str: + return "FFmpeg" + + def get_executable_name(self) -> str: + return APP_NAME_MAP["ffmpeg"] + + def create_command(self, **kwargs) -> tuple: + """ + Set universal parameters and create FFmpeg command line. + + Args: + **kwargs: Universal parameter names and values + + Returns: + Tuple of (command_string, None) - FFmpeg doesn't use config files + """ + # Set universal parameters + self.set_universal_params(**kwargs) + + # Create FFmpeg command + command = self._create_ffmpeg_command() + return command, None + + def _create_ffmpeg_command(self) -> str: + """ + Generate FFmpeg command line from universal parameters. + Creates appropriate TX or RX command based on direction parameter. + + Returns: + Complete FFmpeg command string + """ + executable_path = self.get_executable_path() + direction = self.universal_params.get("direction", "tx") + session_type = self.universal_params.get("session_type", "st20p") + + if direction == "tx": + return self._create_ffmpeg_tx_command(executable_path, session_type) + elif direction == "rx": + return self._create_ffmpeg_rx_command(executable_path, session_type) + else: + raise ValueError(f"FFmpeg requires explicit direction (tx/rx), got: {direction}") + + def _create_ffmpeg_tx_command(self, executable_path: str, session_type: str) -> str: + """Create FFmpeg TX (transmit) command.""" + cmd_parts = [executable_path] + + # Input configuration + input_file = self.universal_params.get("input_file") + if input_file: + # Input from file + pixel_format = self._convert_to_ffmpeg_format( + self.universal_params.get("pixel_format", DEFAULT_FFMPEG_CONFIG["default_pixel_format"]) + ) + width = self.universal_params.get("width", UNIVERSAL_PARAMS["width"]) + height = self.universal_params.get("height", UNIVERSAL_PARAMS["height"]) + framerate = self._extract_framerate_numeric(self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"])) + + cmd_parts.extend([ + "-f", "rawvideo", + "-pix_fmt", pixel_format, + "-video_size", f"{width}x{height}", + "-framerate", str(framerate), + "-i", input_file + ]) + else: + # Generate test pattern + width = self.universal_params.get("width", UNIVERSAL_PARAMS["width"]) + height = self.universal_params.get("height", UNIVERSAL_PARAMS["height"]) + framerate = self._extract_framerate_numeric(self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"])) + pattern_duration = self.universal_params.get("pattern_duration", 30) + cmd_parts.extend(["-f", "lavfi", "-i", f"testsrc=size={width}x{height}:rate={framerate}:duration={pattern_duration}"]) + + # Output configuration for MTL + ffmpeg_session_type = self._convert_to_ffmpeg_session_type(session_type) + cmd_parts.extend(["-f", ffmpeg_session_type]) + + # Network parameters + if self.universal_params.get("source_ip"): + cmd_parts.extend(["-p_sip", self.universal_params["source_ip"]]) + if self.universal_params.get("destination_ip"): + cmd_parts.extend(["-p_tx_ip", self.universal_params["destination_ip"]]) + if self.universal_params.get("port"): + cmd_parts.extend(["-udp_port", str(self.universal_params["port"])]) + if self.universal_params.get("nic_port"): + cmd_parts.extend(["-p_port", self.universal_params["nic_port"]]) + if self.universal_params.get("payload_type"): + cmd_parts.extend(["-payload_type", str(self.universal_params["payload_type"])]) + + # Output destination (usually /dev/null for TX) + cmd_parts.append("/dev/null") + + return " ".join(cmd_parts) + + def _create_ffmpeg_rx_command(self, executable_path: str, session_type: str) -> str: + """Create FFmpeg RX (receive) command.""" + cmd_parts = [executable_path] + + # Input configuration for MTL + ffmpeg_session_type = self._convert_to_ffmpeg_session_type(session_type) + cmd_parts.extend(["-f", ffmpeg_session_type]) + + # Network parameters + if self.universal_params.get("multicast_ip"): + cmd_parts.extend(["-p_rx_ip", self.universal_params["multicast_ip"]]) + if self.universal_params.get("port"): + cmd_parts.extend(["-udp_port", str(self.universal_params["port"])]) + if self.universal_params.get("nic_port"): + cmd_parts.extend(["-p_port", self.universal_params["nic_port"]]) + if self.universal_params.get("payload_type"): + cmd_parts.extend(["-payload_type", str(self.universal_params["payload_type"])]) + + # Input source + cmd_parts.extend(["-i", "/dev/null"]) + + # Output configuration + output_file = self.universal_params.get("output_file") + if output_file: + # Output to file + pixel_format = self._convert_to_ffmpeg_format( + self.universal_params.get("pixel_format", DEFAULT_FFMPEG_CONFIG["default_pixel_format"]) + ) + cmd_parts.extend([ + "-f", "rawvideo", + "-pix_fmt", pixel_format, + output_file + ]) + else: + # Output to /dev/null + cmd_parts.extend(["-f", "null", "/dev/null"]) + + return " ".join(cmd_parts) + + def _convert_to_ffmpeg_format(self, universal_format: str) -> str: + """Convert universal pixel format to FFmpeg format.""" + return FFMPEG_FORMAT_MAP.get(universal_format, universal_format.lower()) + + def _convert_to_ffmpeg_session_type(self, universal_session_type: str) -> str: + """Convert universal session type to FFmpeg format specifier.""" + return SESSION_TYPE_MAP["ffmpeg"].get(universal_session_type, "mtl_st20p") + + def _extract_framerate_numeric(self, framerate_str: str) -> int: + """Extract numeric framerate from string format (e.g., 'p60' -> 60).""" + return self.extract_framerate(framerate_str, default=60) + + def validate_results(self, input_file: str, output_file: str, output_format: str, + tx_host, rx_host, build: str) -> bool: + """Validate FFmpeg test results.""" + try: + # For TX tests, check if process completed successfully + # For RX tests, verify output file was created and has expected content + + if output_file and os.path.exists(output_file): + # Check if output file has content + file_size = os.path.getsize(output_file) + if file_size > 0: + logger.info(f"FFmpeg RX output file {output_file} created successfully ({file_size} bytes)") + return True + else: + logger.error(f"FFmpeg RX output file {output_file} is empty") + return False + else: + # For TX-only tests or when no output file specified + logger.info("FFmpeg TX test completed successfully") + return True + + except Exception as e: + logger.error(f"Error validating FFmpeg results: {e}") + return False + + def _execute_single_host_test(self, build: str, test_time: int, host, + input_file: str, output_file: str, fail_on_error: bool, + virtio_user: bool, rx_timing_parser: bool, ptp: bool, + capture_cfg, **kwargs) -> bool: + """Execute single host FFmpeg test.""" + command, _ = self.create_command(input_file=input_file, output_file=output_file, **kwargs) + + # Add timeout parameter for FFmpeg + command = self.add_timeout(command, test_time) + process, output = self.start_and_capture(command, build, test_time, host, "FFmpeg") + + # Validate results + return self.validate_results(input_file, output_file, "yuv", None, host, build) + + def _execute_dual_host_test(self, build: str, test_time: int, tx_host, rx_host, + input_file: str, output_file: str, fail_on_error: bool, + capture_cfg, sleep_interval: int, tx_first: bool, + output_format: str, **kwargs) -> bool: + """Execute dual host FFmpeg test.""" + # Create TX and RX commands + tx_kwargs = kwargs.copy() + tx_kwargs["direction"] = "tx" + tx_kwargs["input_file"] = input_file + + rx_kwargs = kwargs.copy() + rx_kwargs["direction"] = "rx" + rx_kwargs["output_file"] = output_file + + tx_command, _ = self.create_command(**tx_kwargs) + rx_command, _ = self.create_command(**rx_kwargs) + + # Add timeout for both commands + tx_command = self.add_timeout(tx_command, test_time) + rx_command = self.add_timeout(rx_command, test_time) + _, _, tx_output, rx_output = self.start_dual_with_delay( + tx_command, rx_command, build, test_time, tx_host, rx_host, + tx_first, sleep_interval, "FFmpeg-TX", "FFmpeg-RX" + ) + + # Validate results + tx_result = True # TX validation is implicit in successful execution + rx_result = self.validate_results(input_file, output_file, output_format, tx_host, rx_host, build) + + return tx_result and rx_result \ No newline at end of file diff --git a/tests/validation/mtl_engine/gstreamer.py b/tests/validation/mtl_engine/gstreamer.py new file mode 100644 index 000000000..275b0997a --- /dev/null +++ b/tests/validation/mtl_engine/gstreamer.py @@ -0,0 +1,246 @@ +# GStreamer Implementation for Media Transport Library +# Handles GStreamer-specific command generation and execution + +import logging +import os +import time + +from .application_base import Application +from .config.universal_params import UNIVERSAL_PARAMS +from .config.app_mappings import ( + APP_NAME_MAP, + SESSION_TYPE_MAP, + DEFAULT_GSTREAMER_CONFIG +) + +logger = logging.getLogger(__name__) + + +class GStreamer(Application): + """GStreamer framework implementation for MTL testing.""" + + def get_framework_name(self) -> str: + return "GStreamer" + + def get_executable_name(self) -> str: + return APP_NAME_MAP["gstreamer"] + + def create_command(self, **kwargs) -> tuple: + """ + Set universal parameters and create GStreamer command line. + + Args: + **kwargs: Universal parameter names and values + + Returns: + Tuple of (command_string, None) - GStreamer doesn't use config files + """ + # Set universal parameters + self.set_universal_params(**kwargs) + + # Create GStreamer command + command = self._create_gstreamer_command() + return command, None + + def _create_gstreamer_command(self) -> str: + """ + Generate GStreamer command line from universal parameters. + Creates appropriate TX or RX pipeline based on direction parameter. + + Returns: + Complete GStreamer command string + """ + executable_path = self.get_executable_path() + direction = self.universal_params.get("direction", "tx") + session_type = self.universal_params.get("session_type", "st20p") + + if direction == "tx": + return self._create_gstreamer_tx_command(executable_path, session_type) + elif direction == "rx": + return self._create_gstreamer_rx_command(executable_path, session_type) + else: + raise ValueError(f"GStreamer requires explicit direction (tx/rx), got: {direction}") + + def _create_gstreamer_tx_command(self, executable_path: str, session_type: str) -> str: + """Create GStreamer TX (transmit) pipeline.""" + cmd_parts = [executable_path, "-v"] + + # Source element + input_file = self.universal_params.get("input_file") + if input_file: + # File source + cmd_parts.append(f"filesrc location={input_file}") + else: + # Test pattern generator + width = self.universal_params.get("width", UNIVERSAL_PARAMS["width"]) + height = self.universal_params.get("height", UNIVERSAL_PARAMS["height"]) + framerate = self._extract_framerate_numeric(self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"])) + + cmd_parts.append(f"videotestsrc pattern=smpte") + cmd_parts.append("!") + cmd_parts.append(f"video/x-raw,width={width},height={height},framerate={framerate}/1") + + # Format conversion if needed + pixel_format = self.universal_params.get("pixel_format", "YUV422PLANAR10LE") + gst_format = self._convert_to_gstreamer_format(pixel_format) + + if input_file: + # Raw video parsing for file input + cmd_parts.extend(["!", "rawvideoparse", f"format={gst_format}"]) + width = self.universal_params.get("width", UNIVERSAL_PARAMS["width"]) + height = self.universal_params.get("height", UNIVERSAL_PARAMS["height"]) + framerate = self._extract_framerate_numeric(self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"])) + cmd_parts.append(f"width={width} height={height} framerate={framerate}/1") + + # MTL sink element + gst_element = self._convert_to_gstreamer_element(session_type, "tx") + cmd_parts.extend(["!", gst_element]) + + # Network parameters + if self.universal_params.get("source_ip"): + cmd_parts.append(f"dev-ip={self.universal_params['source_ip']}") + if self.universal_params.get("destination_ip"): + cmd_parts.append(f"ip={self.universal_params['destination_ip']}") + if self.universal_params.get("port"): + cmd_parts.append(f"udp-port={self.universal_params['port']}") + if self.universal_params.get("nic_port"): + cmd_parts.append(f"dev-port={self.universal_params['nic_port']}") + if self.universal_params.get("payload_type"): + cmd_parts.append(f"payload-type={self.universal_params['payload_type']}") + + return " ".join(cmd_parts) + + def _create_gstreamer_rx_command(self, executable_path: str, session_type: str) -> str: + """Create GStreamer RX (receive) pipeline.""" + cmd_parts = [executable_path, "-v"] + + # MTL source element + gst_element = self._convert_to_gstreamer_element(session_type, "rx") + cmd_parts.append(gst_element) + + # Network parameters + if self.universal_params.get("multicast_ip"): + cmd_parts.append(f"ip={self.universal_params['multicast_ip']}") + if self.universal_params.get("port"): + cmd_parts.append(f"udp-port={self.universal_params['port']}") + if self.universal_params.get("nic_port"): + cmd_parts.append(f"dev-port={self.universal_params['nic_port']}") + if self.universal_params.get("payload_type"): + cmd_parts.append(f"payload-type={self.universal_params['payload_type']}") + + # Sink element + output_file = self.universal_params.get("output_file") + if output_file: + # File sink with format conversion + pixel_format = self.universal_params.get("pixel_format", "YUV422PLANAR10LE") + gst_format = self._convert_to_gstreamer_format(pixel_format) + + cmd_parts.extend([ + "!", + f"video/x-raw,format={gst_format}", + "!", + "videoconvert", + "!", + f"filesink location={output_file}" + ]) + else: + # Null sink (discard output) + cmd_parts.extend(["!", "fakesink"]) + + return " ".join(cmd_parts) + + def _convert_to_gstreamer_element(self, universal_session_type: str, direction: str) -> str: + """Convert universal session type to GStreamer element name.""" + base_element = SESSION_TYPE_MAP["gstreamer"].get(universal_session_type, "mtl_st20p") + + # Add direction suffix + if direction == "tx": + return f"{base_element}sink" + else: # rx + return f"{base_element}src" + + def _convert_to_gstreamer_format(self, universal_format: str) -> str: + """Convert universal pixel format to GStreamer format.""" + format_map = { + "YUV422PLANAR10LE": "YUV422P10LE", + "YUV422PLANAR8": "YUV422P", + "YUV420PLANAR8": "YUV420P", + "YUV420PLANAR10LE": "YUV420P10LE", + "RGB24": "RGB", + "RGBA": "RGBA" + } + return format_map.get(universal_format, universal_format) + + def _extract_framerate_numeric(self, framerate_str: str) -> int: + """Extract numeric framerate from string format (e.g., 'p60' -> 60).""" + return self.extract_framerate(framerate_str, default=60) + + def validate_results(self, input_file: str, output_file: str, + tx_host, rx_host) -> bool: + """Validate GStreamer test results.""" + try: + # For TX tests, check if process completed successfully + # For RX tests, verify output file was created and has expected content + + if output_file and os.path.exists(output_file): + # Check if output file has content + file_size = os.path.getsize(output_file) + if file_size > 0: + logger.info(f"GStreamer RX output file {output_file} created successfully ({file_size} bytes)") + return True + else: + logger.error(f"GStreamer RX output file {output_file} is empty") + return False + else: + # For TX-only tests or when no output file specified + logger.info("GStreamer TX test completed successfully") + return True + + except Exception as e: + logger.error(f"Error validating GStreamer results: {e}") + return False + + def _execute_single_host_test(self, build: str, test_time: int, host, + input_file: str, output_file: str, fail_on_error: bool, + virtio_user: bool, rx_timing_parser: bool, ptp: bool, + capture_cfg, **kwargs) -> bool: + """Execute single host GStreamer test.""" + command, _ = self.create_command(input_file=input_file, output_file=output_file, **kwargs) + + # Add timeout parameter for GStreamer + command = self.add_timeout(command, test_time) + process, output = self.start_and_capture(command, build, test_time, host, "GStreamer") + + # Validate results + return self.validate_results(input_file, output_file, None, host) + + def _execute_dual_host_test(self, build: str, test_time: int, tx_host, rx_host, + input_file: str, output_file: str, fail_on_error: bool, + capture_cfg, sleep_interval: int, tx_first: bool, + output_format: str, **kwargs) -> bool: + """Execute dual host GStreamer test.""" + # Create TX and RX commands + tx_kwargs = kwargs.copy() + tx_kwargs["direction"] = "tx" + tx_kwargs["input_file"] = input_file + + rx_kwargs = kwargs.copy() + rx_kwargs["direction"] = "rx" + rx_kwargs["output_file"] = output_file + + tx_command, _ = self.create_command(**tx_kwargs) + rx_command, _ = self.create_command(**rx_kwargs) + + # Add timeout for both commands + tx_command = self.add_timeout(tx_command, test_time) + rx_command = self.add_timeout(rx_command, test_time) + _, _, tx_output, rx_output = self.start_dual_with_delay( + tx_command, rx_command, build, test_time, tx_host, rx_host, + tx_first, sleep_interval, "GStreamer-TX", "GStreamer-RX" + ) + + # Validate results + tx_result = True # TX validation is implicit in successful execution + rx_result = self.validate_results(input_file, output_file, tx_host, rx_host) + + return tx_result and rx_result \ No newline at end of file diff --git a/tests/validation/mtl_engine/rxtxapp.py b/tests/validation/mtl_engine/rxtxapp.py new file mode 100644 index 000000000..3b8c2c5bb --- /dev/null +++ b/tests/validation/mtl_engine/rxtxapp.py @@ -0,0 +1,527 @@ +# RxTxApp Implementation for Media Transport Library +# Handles RxTxApp-specific command generation and configuration + +import json +import logging +import os +import time + +from .application_base import Application +from .config.universal_params import UNIVERSAL_PARAMS +from .config.param_mappings import RXTXAPP_PARAM_MAP +from .config.app_mappings import ( + APP_NAME_MAP, + DEFAULT_NETWORK_CONFIG, + DEFAULT_ST22P_CONFIG, +) + +# Import execution utilities with fallback +try: + from .execute import log_fail, run, is_process_running + # Import legacy helpers so we can emit a backward-compatible JSON config + from .RxTxApp import ( + prepare_tcpdump, + check_tx_output, + check_rx_output, + create_empty_config, + add_interfaces, + ) + import copy + from . import rxtxapp_config as legacy_cfg +except ImportError: + # Fallback for direct execution (when running this module standalone) + from execute import log_fail, run, is_process_running + from RxTxApp import ( + prepare_tcpdump, + check_tx_output, + check_rx_output, + create_empty_config, + add_interfaces, + ) + import copy + import rxtxapp_config as legacy_cfg + +logger = logging.getLogger(__name__) + + +class RxTxApp(Application): + """RxTxApp framework implementation for MTL testing.""" + + def get_framework_name(self) -> str: + return "RxTxApp" + + def get_executable_name(self) -> str: + return APP_NAME_MAP["rxtxapp"] + + def create_command(self, **kwargs) -> tuple: + """ + Set universal parameters and create RxTxApp command line and config files. + + Args: + **kwargs: Universal parameter names and values + + Returns: + Tuple of (command_string, config_dict) + """ + # Set universal parameters + self.set_universal_params(**kwargs) + + # Create RxTxApp command and config + return self._create_rxtxapp_command_and_config() + + def _create_rxtxapp_command_and_config(self) -> tuple: + """ + Generate RxTxApp command line and JSON configuration from universal parameters. + Uses config file path from constructor if provided, otherwise defaults to value from DEFAULT_NETWORK_CONFIG. + + Returns: + Tuple of (command_string, config_dict) + """ + # Use config file path from constructor or default (absolute path) + if self.config_file_path: + config_file_path = self.config_file_path + else: + config_file_path = os.path.abspath(DEFAULT_NETWORK_CONFIG["default_config_file"]) + + # Build command line with all command-line parameters + executable_path = self.get_executable_path() + cmd_parts = ["sudo", executable_path] + cmd_parts.extend(["--config_file", config_file_path]) + + # Add command-line parameters from RXTXAPP_PARAM_MAP + for universal_param, rxtx_param in RXTXAPP_PARAM_MAP.items(): + if rxtx_param.startswith("--"): # Command-line parameter + if universal_param in self.universal_params: + value = self.universal_params[universal_param] + if value is not None and value is not False: + if isinstance(value, bool) and value: + cmd_parts.append(rxtx_param) + elif not isinstance(value, bool): + cmd_parts.extend([rxtx_param, str(value)]) + + # Create JSON configuration + config_dict = self._create_rxtxapp_config_dict() + + return " ".join(cmd_parts), config_dict + + def _create_rxtxapp_config_dict(self) -> dict: + """ + Build complete RxTxApp JSON config structure from universal parameters. + Creates interfaces, sessions, and all session-specific configurations. + This method intentionally recreates the original ("legacy") nested JSON + structure expected by the existing RxTxApp binary and validation helpers + (see rxtxapp_config.py). The previous refactored flat structure caused + validation failures (e.g. could not determine FPS, exit code 244) because + check_tx_output() and performance detection logic rely on nested lists + like config['tx_sessions'][0]['st20p'][0]. + + Returns: + Complete RxTxApp configuration dictionary + """ + # Currently only st20p/st22p/st30p/video/audio/ancillary/fastmetadata supported via + # the refactored path. We rebuild the legacy shell for all session types but only + # populate the active one. + + session_type = self.universal_params.get("session_type", UNIVERSAL_PARAMS["session_type"]) + direction = self.universal_params.get("direction") # None means loopback + test_mode = self.universal_params.get("test_mode", UNIVERSAL_PARAMS["test_mode"]) + + # Determine NIC ports list (need at least 2 entries for legacy template) + nic_port = self.universal_params.get("nic_port", DEFAULT_NETWORK_CONFIG["nic_port"]) + nic_port_list = self.universal_params.get("nic_port_list") + if not nic_port_list: + # Duplicate single port to satisfy legacy two-interface expectation + nic_port_list = [nic_port, nic_port] + elif len(nic_port_list) == 1: + nic_port_list = nic_port_list * 2 + + # Base legacy structure + config = create_empty_config() + config["tx_no_chain"] = self.universal_params.get("tx_no_chain", False) + + # Fill interface names & addressing using legacy helper + try: + add_interfaces(config, nic_port_list, test_mode) + except Exception as e: + logger.warning(f"Legacy add_interfaces failed ({e}); falling back to direct assignment") + # Minimal fallback assignment + config["interfaces"][0]["name"] = nic_port_list[0] + config["interfaces"][1]["name"] = nic_port_list[1] + + # Helper to populate a nested session list for a given type + def _populate_session(is_tx: bool): + if session_type == "st20p": + template = copy.deepcopy( + legacy_cfg.config_tx_st20p_session if is_tx else legacy_cfg.config_rx_st20p_session + ) + # Map universal params -> legacy field names + template["width"] = int(self.universal_params.get("width", template["width"])) + template["height"] = int(self.universal_params.get("height", template["height"])) + template["fps"] = self.universal_params.get("framerate", template["fps"]) + template["pacing"] = self.universal_params.get("pacing", template["pacing"]) + template["packing"] = self.universal_params.get("packing", template.get("packing", "BPM")) + # pixel_format becomes input_format or output_format + pixel_format = self.universal_params.get("pixel_format") + if is_tx: + template["input_format"] = pixel_format or template.get("input_format") + else: + template["output_format"] = pixel_format or template.get("output_format") + template["transport_format"] = self.universal_params.get("transport_format", template["transport_format"]) + if is_tx and self.universal_params.get("input_file"): + template["st20p_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["st20p_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get("replicas", template["replicas"]) + template["start_port"] = int(self.universal_params.get("port", template["start_port"])) + template["payload_type"] = int(self.universal_params.get("payload_type", template["payload_type"])) + template["display"] = self.universal_params.get("display", template.get("display", False)) + template["enable_rtcp"] = self.universal_params.get("enable_rtcp", template.get("enable_rtcp", False)) + return template + elif session_type == "st22p": + template = copy.deepcopy( + legacy_cfg.config_tx_st22p_session if is_tx else legacy_cfg.config_rx_st22p_session + ) + template["width"] = int(self.universal_params.get("width", template["width"])) + template["height"] = int(self.universal_params.get("height", template["height"])) + template["fps"] = self.universal_params.get("framerate", template["fps"]) + template["codec"] = self.universal_params.get("codec", template["codec"]) # JPEG-XS etc. + template["quality"] = self.universal_params.get("quality", template["quality"]) + template["codec_thread_count"] = self.universal_params.get("codec_threads", template["codec_thread_count"]) + pf = self.universal_params.get("pixel_format") + if is_tx: + template["input_format"] = pf or template.get("input_format") + else: + template["output_format"] = pf or template.get("output_format") + if is_tx and self.universal_params.get("input_file"): + template["st22p_url"] = self.universal_params.get("input_file") + if (not is_tx) and self.universal_params.get("output_file"): + template["st22p_url"] = self.universal_params.get("output_file") + template["replicas"] = self.universal_params.get("replicas", template["replicas"]) + template["start_port"] = int(self.universal_params.get("port", template["start_port"])) + template["payload_type"] = int(self.universal_params.get("payload_type", template["payload_type"])) + template["enable_rtcp"] = self.universal_params.get("enable_rtcp", template.get("enable_rtcp", False)) + return template + elif session_type == "st30p": + template = copy.deepcopy( + legacy_cfg.config_tx_st30p_session if is_tx else legacy_cfg.config_rx_st30p_session + ) + template["audio_format"] = self.universal_params.get("audio_format", template["audio_format"]) + template["audio_channel"] = self.universal_params.get("audio_channels", template["audio_channel"]) + template["audio_sampling"] = self.universal_params.get("audio_sampling", template["audio_sampling"]) + template["audio_ptime"] = self.universal_params.get("audio_ptime", template["audio_ptime"]) + if is_tx and self.universal_params.get("input_file"): + template["audio_url"] = self.universal_params.get("input_file") + template["replicas"] = self.universal_params.get("replicas", template["replicas"]) + template["start_port"] = int(self.universal_params.get("port", template["start_port"])) + template["payload_type"] = int(self.universal_params.get("payload_type", template["payload_type"])) + return template + else: + # Fallback: reuse st20p layout for unknown session types (minimal support) + template = {"replicas": 1} + return template + + # Populate TX sessions + if direction in (None, "tx"): + st_entry = _populate_session(True) + if st_entry: + config["tx_sessions"][0].setdefault(session_type, []) + config["tx_sessions"][0][session_type].append(st_entry) + # Add a dummy video list so legacy performance heuristic (which checks absence of video list) + # does not misclassify this regular functional test as a performance test. + if "video" not in config["tx_sessions"][0]: + config["tx_sessions"][0]["video"] = [] + + # Populate RX sessions + if direction in (None, "rx"): + st_entry = _populate_session(False) + if st_entry: + config["rx_sessions"][0].setdefault(session_type, []) + config["rx_sessions"][0][session_type].append(st_entry) + if "video" not in config["rx_sessions"][0]: + config["rx_sessions"][0]["video"] = [] + + # If only TX or only RX requested, clear the other list to avoid confusing validators + if direction == "tx": + config["rx_sessions"] = [] + elif direction == "rx": + config["tx_sessions"] = [] + + return config + + def _create_session_data(self, session_type: str, is_tx: bool) -> dict: + """ + Factory method to create session data for different session types. + Routes to specific session data creation methods based on type. + + Args: + session_type: Type of session (st20p, st22p, st30p, video, audio, ancillary, fastmetadata) + is_tx: True for TX session, False for RX session + + Returns: + Session data dictionary + """ + if session_type == "st20p": + return self._create_st20p_session_data(is_tx) + elif session_type == "st22p": + return self._create_st22p_session_data(is_tx) + elif session_type == "st30p": + return self._create_st30p_session_data(is_tx) + elif session_type == "video": + return self._create_video_session_data(is_tx) + elif session_type == "audio": + return self._create_audio_session_data(is_tx) + elif session_type == "ancillary": + return self._create_ancillary_session_data(is_tx) + elif session_type == "fastmetadata": + return self._create_fastmetadata_session_data(is_tx) + else: + raise ValueError(f"Unsupported session type: {session_type}") + + def _add_tx_rx_specific_params(self, session: dict, session_type: str, is_tx: bool): + """Add TX/RX specific parameters to session.""" + if is_tx: + session["ip"] = self.universal_params.get("destination_ip", DEFAULT_NETWORK_CONFIG["unicast_rx_ip"]) + session["type"] = "frame" + if self.universal_params.get("input_file"): + session["st20p_url"] = self.universal_params["input_file"] + else: + session["ip"] = self.universal_params.get("destination_ip", DEFAULT_NETWORK_CONFIG["unicast_rx_ip"]) + session["type"] = "frame" + if self.universal_params.get("output_file"): + session["st20p_url"] = self.universal_params["output_file"] + + def _create_st20p_session_data(self, is_tx: bool) -> dict: + """Create ST20p (uncompressed video) session data from universal parameters.""" + session = self.get_common_session_params("st20p") + session.update(self.get_common_video_params()) + session.update({ + "fps": self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"]), + "pacing": self.universal_params.get("pacing", UNIVERSAL_PARAMS["pacing"]), + "packing": self.universal_params.get("packing", UNIVERSAL_PARAMS["packing"]), + "transport_format": self.universal_params.get("transport_format", UNIVERSAL_PARAMS["transport_format"]), + "display": self.universal_params.get("display", UNIVERSAL_PARAMS["display"]) + }) + + self._add_tx_rx_specific_params(session, "st20p", is_tx) + return session + + def _create_st22p_session_data(self, is_tx: bool) -> dict: + """Create ST22p (compressed video with JPEG-XS) session data from universal parameters.""" + session = self.get_common_session_params("st22p") + session.update(self.get_common_video_params()) + session.update({ + "fps": self.universal_params.get("framerate", DEFAULT_ST22P_CONFIG["framerate"]), + "pack_type": DEFAULT_ST22P_CONFIG["pack_type"], + "codec": self.universal_params.get("codec", DEFAULT_ST22P_CONFIG["codec"]), + "quality": self.universal_params.get("quality", DEFAULT_ST22P_CONFIG["quality"]), + "codec_thread_count": self.universal_params.get("codec_threads", DEFAULT_ST22P_CONFIG["codec_threads"]) + }) + + self._add_tx_rx_specific_params(session, "st22p", is_tx) + return session + + def _create_st30p_session_data(self, is_tx: bool) -> dict: + """Create ST30p (uncompressed audio) session data from universal parameters.""" + session = self.get_common_session_params("st30p") + session.update({ + "audio_format": self.universal_params.get("audio_format", UNIVERSAL_PARAMS["audio_format"]), + "audio_channel": self.universal_params.get("audio_channels", UNIVERSAL_PARAMS["audio_channels"]), + "audio_sampling": self.universal_params.get("audio_sampling", UNIVERSAL_PARAMS["audio_sampling"]), + "audio_ptime": self.universal_params.get("audio_ptime", UNIVERSAL_PARAMS["audio_ptime"]), + "audio_url": self.universal_params.get("input_file" if is_tx else "output_file", "") + }) + + return session + + def _create_video_session_data(self, is_tx: bool) -> dict: + """Create raw video session data from universal parameters.""" + session = self.get_common_session_params("video") + session.update(self.get_common_video_params()) + session.update({ + "fps": self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"]), + "transport_format": self.universal_params.get("transport_format", UNIVERSAL_PARAMS["transport_format"]) + }) + + self._add_tx_rx_specific_params(session, "video", is_tx) + return session + + def _create_audio_session_data(self, is_tx: bool) -> dict: + """Create audio session data from universal parameters.""" + session = self.get_common_session_params("audio") + session.update({ + "audio_format": self.universal_params.get("audio_format", UNIVERSAL_PARAMS["audio_format"]), + "audio_channel": self.universal_params.get("audio_channels", UNIVERSAL_PARAMS["audio_channels"]), + "audio_sampling": self.universal_params.get("audio_sampling", UNIVERSAL_PARAMS["audio_sampling"]) + }) + + return session + + def _create_ancillary_session_data(self, is_tx: bool) -> dict: + """Create ancillary data session data from universal parameters.""" + session = self.get_common_session_params("ancillary") + session.update({ + "ancillary_format": self.universal_params.get("transport_format", "SMPTE_291M"), + "ancillary_fps": self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"]) + }) + + return session + + def _create_fastmetadata_session_data(self, is_tx: bool) -> dict: + """Create fast metadata session data from universal parameters.""" + session = self.get_common_session_params("fastmetadata") + session.update({ + "metadata_format": "SMPTE_2110_41", + "metadata_fps": self.universal_params.get("framerate", UNIVERSAL_PARAMS["framerate"]) + }) + + return session + + def validate_results(self, config: dict, tx_output: str, rx_output: str, + fail_on_error: bool, host, build: str) -> bool: + """Validate RxTxApp test results.""" + try: + # Get session type from config for proper validation + session_type = self._get_session_type_from_config(config) + + # Validate TX results + tx_result = check_tx_output( + config=config, + output=tx_output.split('\n') if tx_output else [], + session_type=session_type, + fail_on_error=fail_on_error, + host=host, + build=build + ) + if not tx_result and fail_on_error: + log_fail(f"TX validation failed for {session_type}") + return False + + # Validate RX results + rx_result = check_rx_output( + config=config, + output=rx_output.split('\n') if rx_output else [], + session_type=session_type, + fail_on_error=fail_on_error, + host=host, + build=build + ) + if not rx_result and fail_on_error: + log_fail(f"RX validation failed for {session_type}") + return False + + return True + + except Exception as e: + logger.error(f"Error validating RxTxApp results: {e}") + return not fail_on_error + + def _execute_single_host_test(self, build: str, test_time: int, host, + input_file: str, output_file: str, fail_on_error: bool, + virtio_user: bool, rx_timing_parser: bool, ptp: bool, + capture_cfg, **kwargs) -> bool: + """Execute single host RxTxApp test.""" + # Add test time to kwargs before creating command + if test_time: + kwargs["test_time"] = test_time + + command, config = self.create_command(**kwargs) + + # Add test-specific parameters + if virtio_user: + command += " --virtio_user" + if rx_timing_parser: + command += " --rx_timing_parser" + if ptp: + command += " --ptp" + + # Write config file + config_path = self.config_file_path or "config.json" + with open(config_path, 'w') as f: + json.dump(config, f, indent=2) + + # Setup capture if requested + if capture_cfg: + prepare_tcpdump(capture_cfg, host) + + # Execute test + process = self.start_process(command, build, test_time, host) + output = self.capture_stdout(process, "RxTxApp") + + # Validate results + return self.validate_results(config, output, output, True, host, build) + + def _execute_dual_host_test(self, build: str, test_time: int, tx_host, rx_host, + input_file: str, output_file: str, fail_on_error: bool, + capture_cfg, sleep_interval: int, tx_first: bool, + output_format: str, **kwargs) -> bool: + """Execute dual host RxTxApp test.""" + # Create TX and RX configurations + tx_kwargs = kwargs.copy() + tx_kwargs["direction"] = "tx" + if test_time: + tx_kwargs["test_time"] = test_time + if input_file: + tx_kwargs["input_file"] = input_file + + rx_kwargs = kwargs.copy() + rx_kwargs["direction"] = "rx" + if test_time: + rx_kwargs["test_time"] = test_time + if output_file: + rx_kwargs["output_file"] = output_file + + tx_command, tx_config = self.create_command(**tx_kwargs) + rx_command, rx_config = self.create_command(**rx_kwargs) + + # Write config files + tx_config_path = "tx_config.json" + rx_config_path = "rx_config.json" + + with open(tx_config_path, 'w') as f: + json.dump(tx_config, f, indent=2) + with open(rx_config_path, 'w') as f: + json.dump(rx_config, f, indent=2) + + # Setup capture if requested + if capture_cfg: + prepare_tcpdump(capture_cfg, rx_host, build) + + # Start processes based on tx_first parameter + if tx_first: + tx_process = self.start_process(tx_command.replace("config.json", tx_config_path), build, test_time, tx_host) + time.sleep(sleep_interval) + rx_process = self.start_process(rx_command.replace("config.json", rx_config_path), build, test_time, rx_host) + else: + rx_process = self.start_process(rx_command.replace("config.json", rx_config_path), build, test_time, rx_host) + time.sleep(sleep_interval) + tx_process = self.start_process(tx_command.replace("config.json", tx_config_path), build, test_time, tx_host) + + # Capture outputs + tx_output = self.capture_stdout(tx_process, "RxTxApp-TX") + rx_output = self.capture_stdout(rx_process, "RxTxApp-RX") + + # Validate results + tx_result = self.validate_results(tx_config, tx_output, "", True, tx_host, build) + rx_result = self.validate_results(rx_config, "", rx_output, True, rx_host, build) + return tx_result and rx_result + + def _import_with_fallback(self, module_name: str, import_items: list): + """Import utilities with fallback for direct execution.""" + try: + if module_name == "RxTxApp": + globals().update({item: getattr(__import__(f".{module_name}", fromlist=import_items, level=1), item) for item in import_items}) + except ImportError: + globals().update({item: getattr(__import__(module_name, fromlist=import_items), item) for item in import_items}) + + def _get_session_type_from_config(self, config: dict) -> str: + """Extract session type from RxTxApp config.""" + if config.get("tx_sessions"): + # Check for specific session type indicators + session = config["tx_sessions"][0] + if "fps" in session and "transport_format" in session: + return "st20p" + elif "codec" in session: + return "st22p" + elif "audio_format" in session: + return "st30p" + return "st20p" # Default \ No newline at end of file diff --git a/tests/validation/tests/single/st20p/format/test_format_refactored.py b/tests/validation/tests/single/st20p/format/test_format_refactored.py new file mode 100644 index 000000000..7b0d7db51 --- /dev/null +++ b/tests/validation/tests/single/st20p/format/test_format_refactored.py @@ -0,0 +1,443 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +import logging +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_422p10le, yuv_files_422rfc10 + +logger = logging.getLogger(__name__) + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422p10le.values()), + indirect=["media_file"], + ids=list(yuv_files_422p10le.keys()), +) +def test_422p10le_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Send files in YUV422PLANAR10LE format converting to transport format YUV_422_10bit + Using the new refactored Application class - matches the working test_422p10le + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + build = '/root/awilczyn/Media-Transport-Library/tests/tools/RxTxApp/build' + # Get capture configuration from test_config.yaml - matches working test + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_format_{media_file_info['filename']}" # Match working test pattern + ) + + # Create application instance - use the build path to find RxTxApp + # build fixture provides the MTL build path, we need the RxTxApp within it + app = Application("RxTxApp", build) + + # Configure test parameters to exactly match working test behavior + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, # Use VF list like working test + test_mode="multicast", + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + pixel_format=media_file_info["file_format"], # Input/output format + transport_format=media_file_info["format"], # Transport format + input_file=media_file_path, # Input file for TX session + ) + + # Execute test using the Application's execute_test method + app.execute_test( + build=build, # Use the build fixture directly + test_time=test_time, + host=host, + capture_cfg=capture_cfg, + ) + + +# List of supported formats based on st_frame_fmt_from_transport() +pixel_formats = dict( + YUV_422_10bit=("ST20_FMT_YUV_422_10BIT", "YUV422RFC4175PG2BE10"), + YUV_422_8bit=("ST20_FMT_YUV_422_8BIT", "UYVY"), + YUV_422_12bit=("ST20_FMT_YUV_422_12BIT", "YUV422RFC4175PG2BE12"), + YUV_444_10bit=("ST20_FMT_YUV_444_10BIT", "YUV444RFC4175PG4BE10"), + YUV_444_12bit=("ST20_FMT_YUV_444_12BIT", "YUV444RFC4175PG2BE12"), + YUV_420_8bit=("ST20_FMT_YUV_420_8BIT", "YUV420CUSTOM8"), + RGB_8bit=("ST20_FMT_RGB_8BIT", "RGB8"), + RGB_10bit=("ST20_FMT_RGB_10BIT", "RGBRFC4175PG4BE10"), + RGB_12bit=("ST20_FMT_RGB_12BIT", "RGBRFC4175PG2BE12"), + YUV_422_PLANAR10LE=("ST20_FMT_YUV_422_PLANAR10LE", "YUV422PLANAR10LE"), + V210=("ST20_FMT_V210", "V210"), +) + + +# List of supported one-way convertions based on st_frame_get_converter() +convert1_formats = dict( + UYVY="UYVY", + YUV422PLANAR8="YUV422PLANAR8", + YUV420PLANAR8="YUV420PLANAR8", +) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["Penguin_1080p"]], + indirect=["media_file"], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("format", convert1_formats.keys()) +def test_convert_on_rx_refactored( + hosts, build, media, nic_port_list, test_time, format, media_file +): + """ + Send file in YUV_422_10bit pixel formats with supported convertion on RX side + Using the new refactored Application class - matches the working test_convert_on_rx + """ + media_file_info, media_file_path = media_file + output_format = convert1_formats[format] + host = list(hosts.values())[0] + + # Create application instance - use the build path to find RxTxApp + app = Application("RxTxApp", build) + + # Configure test parameters to exactly match working test + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", # Match working test + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", # Hardcoded like working test + pixel_format="YUV422RFC4175PG2BE10", # Input format for TX + transport_format="YUV_422_10bit", # Transport format + pixel_format_rx=output_format, # Output format for RX conversion + input_file=media_file_path, # Input file for TX session + ) + + # Execute test using the Application's execute_test method + app.execute_test( + build=build, # Use the build fixture directly + test_time=test_time, + host=host + ) + + +# List of supported two-way convertions based on st_frame_get_converter() +convert2_formats = dict( + V210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + Y210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + YUV422PLANAR12LE=( + "ST20_FMT_YUV_422_12BIT", + "YUV_422_12bit", + "YUV422RFC4175PG2BE12", + ), + YUV444PLANAR10LE=( + "ST20_FMT_YUV_444_10BIT", + "YUV_444_10bit", + "YUV444RFC4175PG4BE10", + ), + YUV444PLANAR12LE=( + "ST20_FMT_YUV_444_12BIT", + "YUV_444_12bit", + "YUV444RFC4175PG2BE12", + ), + GBRPLANAR10LE=("ST20_FMT_RGB_10BIT", "RGB_10bit", "RGBRFC4175PG4BE10"), + GBRPLANAR12LE=("ST20_FMT_RGB_12BIT", "RGB_12bit", "RGBRFC4175PG2BE12"), +) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], # Use test_8K like working test + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", convert2_formats.keys()) +def test_tx_rx_conversion_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + media_file, +): + """ + Send random file in different pixel formats with supported two-way convertion on TX and RX + Using the new refactored Application class - matches the working test_tx_rx_conversion + """ + media_file_info, media_file_path = media_file + text_format, transport_format, _ = convert2_formats[format] + host = list(hosts.values())[0] + + # Create application instance - use the build path to find RxTxApp + app = Application("RxTxApp", build) + + # Configure test parameters to exactly match working test + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", # Match working test + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", # Hardcoded like working test + pixel_format=format, # Input/output format (two-way conversion) + transport_format=transport_format, # Transport format + input_file=media_file_path, # Input file for TX session + ) + + # Execute test using the Application's execute_test method + app.execute_test( + build=build, # Use the build fixture directly + test_time=test_time, + host=host + ) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], # Use test_8K like working test + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", pixel_formats.keys()) +def test_formats_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + test_config, + prepare_ramdisk, + media_file, +): + """ + Send random file in different supported pixel formats without convertion during transport + Using the new refactored Application class - matches the working test_formats + """ + media_file_info, media_file_path = media_file + text_format, file_format = pixel_formats[format] + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml - matches working test + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_format_formats_{format}" # Match working test pattern + ) + + # Create application instance - use the build path to find RxTxApp + app = Application("RxTxApp", build) + + # Configure test parameters to exactly match working test + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, + test_mode="multicast", + packing="GPM", # Match working test + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", # Hardcoded like working test + pixel_format=file_format, # Input/output format (pixel format specific) + transport_format=format, # Transport format + input_file=media_file_path, # Input file for TX session + ) + + # Execute test using the Application's execute_test method + app.execute_test( + build=build, # Use the build fixture directly + test_time=test_time, + host=host, + capture_cfg=capture_cfg, + ) + + +# Additional test demonstrating dual-host testing with refactored Application +@pytest.mark.dual_host +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + indirect=["media_file"], + ids=["Penguin_1080p"], +) +def test_dual_host_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Dual host test example using the new refactored Application class + """ + media_file_info, media_file_path = media_file + host_list = list(hosts.values()) + + if len(host_list) < 2: + pytest.skip("Dual host test requires at least 2 hosts") + + tx_host = host_list[0] + rx_host = host_list[1] + + # Get capture configuration + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = "test_format_refactored_dual_host" + + # Create application instance + # Note: build fixture points to mtl_path, but we need the RxTxApp build directory + rxtxapp_build_path = "/root/awilczyn/Media-Transport-Library/tests/tools/RxTxApp/build" + app = Application("RxTxApp", rxtxapp_build_path) + + # Configure test parameters + app.create_command( + session_type="st20p", + # Don't specify direction - let it create both TX and RX sessions like original working test + nic_port=tx_host.vfs[0] if tx_host.vfs else "0000:31:01.0", + source_ip="192.168.1.10", + destination_ip="239.1.1.1", + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + input_file=media_file_path, + output_file="/tmp/received_output.yuv", + packing="BPM", + test_mode="multicast" + ) + + # Execute dual host test + app.execute_test( + build=rxtxapp_build_path, # Use the RxTxApp build path + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + input_file=media_file_path, + output_file="/tmp/received_output.yuv", + capture_cfg=capture_cfg, + ) + + +# Test demonstrating FFmpeg integration with refactored Application +@pytest.mark.ffmpeg +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + indirect=["media_file"], + ids=["Penguin_1080p"], +) +def test_ffmpeg_format_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + media_file, +): + """ + FFmpeg test example using the new refactored Application class + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Create FFmpeg application instance + app = Application("FFmpeg", "/usr/bin") + + # Configure test parameters for TX + app.create_command( + session_type="st20p", + # Don't specify direction - let it create both TX and RX sessions like original working test + nic_port_list=host.vfs, # Use full VF list like working test + source_ip="192.168.1.10", + destination_ip="239.1.1.1", + width=media_file_info["width"], + height=media_file_info["height"], + pixel_format=media_file_info["file_format"], + input_file=media_file_path, + port=20000, + payload_type=112 + ) + + # Execute test - FFmpeg doesn't need RxTxApp build path + app.execute_test( + build="/usr/bin", # Use the FFmpeg bin path + test_time=test_time, + host=host, + input_file=media_file_path, + output_file="/tmp/ffmpeg_output.yuv" + ) + + +# Test demonstrating GStreamer integration with refactored Application +@pytest.mark.gstreamer +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_1080p"]], + indirect=["media_file"], + ids=["Penguin_1080p"], +) +def test_gstreamer_format_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + media_file, +): + """ + GStreamer test example using the new refactored Application class + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Create a proper output directory in /tmp with write permissions + import tempfile + import os + output_dir = tempfile.mkdtemp() + output_file = os.path.join(output_dir, "gstreamer_output.yuv") + + # Create GStreamer application instance + app = Application("GStreamer", "/usr/bin") + + # Configure test parameters for TX + app.create_command( + session_type="st20p", + nic_port_list=host.vfs, # Use VF list like other tests + source_ip="192.168.1.10", + destination_ip="239.1.1.1", + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"{media_file_info['fps']}/1", + input_file=media_file_path + ) + + # Execute test with proper output file handling + try: + app.execute_test( + build="/usr/bin", # Use the GStreamer bin path + test_time=test_time, + host=host, + input_file=media_file_path, + output_file=output_file + ) + finally: + # Cleanup: Remove temporary output file and directory + if os.path.exists(output_file): + os.remove(output_file) + if os.path.exists(output_dir): + os.rmdir(output_dir) diff --git a/tests/validation/tests/single/st20p/format/test_format_refactored_new.py b/tests/validation/tests/single/st20p/format/test_format_refactored_new.py new file mode 100644 index 000000000..0b7feefbb --- /dev/null +++ b/tests/validation/tests/single/st20p/format/test_format_refactored_new.py @@ -0,0 +1,508 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_422p10le, yuv_files_422rfc10 + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422p10le.values()), + indirect=["media_file"], + ids=list(yuv_files_422p10le.keys()), +) +def test_422p10le_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Send files in YUV422PLANAR10LE format converting to transport format YUV_422_10bit + Using Application class refactored interface + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_format_refactored_{media_file_info['filename']}" # Set a unique pcap file name + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + # For large files (1080p and above), add performance optimizations + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "destination_ip": "239.168.48.9", + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + "test_mode": "multicast", + } + + # Add optimizations for large files (1080p and above) + if media_file_info.get("height", 0) >= 1080: + # Calculate file size to determine if it's a large file requiring special handling + import os + try: + file_size_mb = os.path.getsize(media_file_path) / (1024 * 1024) + except: + file_size_mb = 0 + + # For very large files (>500MB), implement ultra-aggressive optimizations + if file_size_mb > 500: + config_params.update({ + "framebuffer_count": 16, # Maximum frame buffers for very large content + "rx_video_fb_cnt": 8, # Maximum valid RX frame buffer count (range [2:8]) + "pacing": "gap", # Use gap pacing like working test + "rx_separate_lcore": True, # Dedicate RX cores for performance + "allow_across_numa_core": True, # Allow NUMA optimization + "sch_session_quota": 32, # Maximum session quota per core for large files + "nb_tx_desc": 4096, # Maximum TX descriptors for very large files + "nb_rx_desc": 4096, # Maximum RX descriptors for very large files + "mono_pool": True, # Use mono pool for better memory management + "tasklet_sleep": True, # Enable tasklet sleep for better resource management + "rxtx_simd_512": True, # Enable SIMD 512 for better performance + }) + else: + # Standard optimizations for regular 1080p files + config_params.update({ + "framebuffer_count": 4, # More frame buffers for large content + "rx_video_fb_cnt": 4, # Increase RX frame buffer count + "pacing": "gap", # Use standard gap pacing for reliability + "rx_separate_lcore": True, # Dedicate RX cores for performance + "allow_across_numa_core": True, # Allow NUMA optimization + "sch_session_quota": 8, # Higher session quota per core + "nb_tx_desc": 1024, # Increase TX descriptors for large files + "nb_rx_desc": 2048, # Increase RX descriptors for large files + }) + + app.create_command(**config_params) + + # Execute test using Application class + # Use optimized test time for large files to ensure accurate FPS measurement + if media_file_info.get("height", 0) >= 1080: + try: + file_size_mb = os.path.getsize(media_file_path) / (1024 * 1024) + # Very large files get longer test time to ensure accurate FPS measurement + actual_test_time = 15 if file_size_mb > 500 else 10 + except: + actual_test_time = 10 + else: + actual_test_time = test_time + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) + + +# List of supported formats based on st_frame_fmt_from_transport() +pixel_formats = dict( + YUV_422_10bit=("ST20_FMT_YUV_422_10BIT", "YUV422RFC4175PG2BE10"), + YUV_422_8bit=("ST20_FMT_YUV_422_8BIT", "UYVY"), + YUV_422_12bit=("ST20_FMT_YUV_422_12BIT", "YUV422RFC4175PG2BE12"), + YUV_444_10bit=("ST20_FMT_YUV_444_10BIT", "YUV444RFC4175PG4BE10"), + YUV_444_12bit=("ST20_FMT_YUV_444_12BIT", "YUV444RFC4175PG2BE12"), + YUV_420_8bit=("ST20_FMT_YUV_420_8BIT", "YUV420CUSTOM8"), + RGB_8bit=("ST20_FMT_RGB_8BIT", "RGB8"), + RGB_10bit=("ST20_FMT_RGB_10BIT", "RGBRFC4175PG4BE10"), + RGB_12bit=("ST20_FMT_RGB_12BIT", "RGBRFC4175PG2BE12"), + YUV_422_PLANAR10LE=("ST20_FMT_YUV_422_PLANAR10LE", "YUV422PLANAR10LE"), + V210=("ST20_FMT_V210", "V210"), +) + + +# List of supported one-way convertions based on st_frame_get_converter() +convert1_formats = dict( + UYVY="UYVY", + YUV422PLANAR8="YUV422PLANAR8", + YUV420PLANAR8="YUV420PLANAR8", +) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["Penguin_1080p"]], + indirect=["media_file"], + ids=["Penguin_1080p"], +) +@pytest.mark.parametrize("format", convert1_formats.keys()) +def test_convert_on_rx_refactored( + hosts, build, media, nic_port_list, test_time, format, media_file +): + """ + Send file in YUV_422_10bit pixel formats with supported convertion on RX side + Using Application class refactored interface + """ + media_file_info, media_file_path = media_file + output_format = convert1_formats[format] + host = list(hosts.values())[0] + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + app.create_command( + session_type="st20p", + nic_port=host.vfs[0] if host.vfs else "0000:31:01.0", + nic_port_list=host.vfs, + destination_ip="239.168.48.9", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", # TODO: Hardcoded + pixel_format="YUV422RFC4175PG2BE10", + transport_format="YUV_422_10bit", + pixel_format_rx=output_format, + input_file=media_file_path, + test_mode="multicast", + packing="GPM", + ) + + # Execute test using Application class + app.execute_test( + build=build, + test_time=test_time, + host=host, + ) + + +# List of supported two-way convertions based on st_frame_get_converter() +convert2_formats = dict( + V210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + Y210=("ST20_FMT_YUV_422_10BIT", "YUV_422_10bit", "YUV422RFC4175PG2BE10"), + YUV422PLANAR12LE=( + "ST20_FMT_YUV_422_12BIT", + "YUV_422_12bit", + "YUV422RFC4175PG2BE12", + ), + YUV444PLANAR10LE=( + "ST20_FMT_YUV_444_10BIT", + "YUV_444_10bit", + "YUV444RFC4175PG4BE10", + ), + YUV444PLANAR12LE=( + "ST20_FMT_YUV_444_12BIT", + "YUV_444_12bit", + "YUV444RFC4175PG2BE12", + ), + GBRPLANAR10LE=("ST20_FMT_RGB_10BIT", "RGB_10bit", "RGBRFC4175PG4BE10"), + GBRPLANAR12LE=("ST20_FMT_RGB_12BIT", "RGB_12bit", "RGBRFC4175PG2BE12"), +) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", convert2_formats.keys()) +def test_tx_rx_conversion_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + media_file, +): + """ + Send random file in different pixel formats with supported two-way convertion on TX and RX + Using Application class refactored interface + """ + media_file_info, media_file_path = media_file + text_format, transport_format, _ = convert2_formats[format] + host = list(hosts.values())[0] + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + app.create_command( + session_type="st20p", + nic_port=host.vfs[0] if host.vfs else "0000:31:01.0", + nic_port_list=host.vfs, + destination_ip="239.168.48.9", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", # TODO: Hardcoded + pixel_format=format, + transport_format=transport_format, + pixel_format_rx=format, + input_file=media_file_path, + test_mode="multicast", + packing="GPM", + ) + + # Execute test using Application class + app.execute_test( + build=build, + test_time=test_time, + host=host, + ) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["test_8K"]], + indirect=["media_file"], + ids=["test_8K"], +) +@pytest.mark.parametrize("format", pixel_formats.keys()) +def test_formats_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + format, + test_config, + prepare_ramdisk, + media_file, +): + """ + Send random file in different supported pixel formats without convertion during transport + Using Application class refactored interface + """ + media_file_info, media_file_path = media_file + text_format, file_format = pixel_formats[format] + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_format_refactored_formats_{format}" # Set a unique pcap file name + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + app.create_command( + session_type="st20p", + nic_port=host.vfs[0] if host.vfs else "0000:31:01.0", + nic_port_list=host.vfs, + destination_ip="239.168.48.9", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate="p30", # TODO: Hardcoded + pixel_format=file_format, + transport_format=format, + pixel_format_rx=file_format, + input_file=media_file_path, + test_mode="multicast", + packing="GPM", + ) + + # Execute test using Application class + app.execute_test( + build=build, + test_time=test_time, + host=host, + capture_cfg=capture_cfg, + ) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_720p"]], + indirect=["media_file"], + ids=["Penguin_720p"], +) +def test_dual_host_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test dual host configuration using Application class + TX on one host, RX on another host + """ + media_file_info, media_file_path = media_file + + # For dual host testing, we need at least 2 hosts + if len(hosts) < 2: + pytest.skip("Dual host test requires at least 2 hosts") + + host_list = list(hosts.values()) + tx_host = host_list[0] + rx_host = host_list[1] + + # Get capture configuration from test_config.yaml + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_format_refactored_dual_host_{media_file_info['filename']}" + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + app.create_command( + session_type="st20p", + nic_port=tx_host.vfs[0] if tx_host.vfs else "0000:31:01.0", + nic_port_list=tx_host.vfs, + destination_ip="239.168.48.9", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + pixel_format_rx=media_file_info["file_format"], + input_file=media_file_path, + test_mode="multicast", + ) + + # Execute dual host test using Application class + app.execute_test( + build=build, + test_time=test_time, + tx_host=tx_host, + rx_host=rx_host, + capture_cfg=capture_cfg, + ) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_720p"]], + indirect=["media_file"], + ids=["Penguin_720p"], +) +def test_ffmpeg_format_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test FFmpeg integration using Application class + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_format_refactored_ffmpeg_{media_file_info['filename']}" + ) + + # Create Application instance for FFmpeg + app = Application("FFmpeg", "/usr/bin") + + # Configure application using universal parameters + app.create_command( + session_type="st20p", + nic_port=host.vfs[0] if host.vfs else "0000:31:01.0", + destination_ip="239.168.48.9", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + input_file=media_file_path, + output_file="/tmp/ffmpeg_output.yuv", + ) + + # Execute test using Application class + app.execute_test( + build=build, + test_time=test_time, + host=host, + input_file=media_file_path, + output_file="/tmp/ffmpeg_output.yuv", + capture_cfg=capture_cfg, + ) + + +@pytest.mark.parametrize( + "media_file", + [yuv_files_422p10le["Penguin_720p"]], + indirect=["media_file"], + ids=["Penguin_720p"], +) +def test_gstreamer_format_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test GStreamer integration using Application class + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_format_refactored_gstreamer_{media_file_info['filename']}" + ) + + # Create Application instance for GStreamer + app = Application("GStreamer", "/usr/bin") + + # Configure application using universal parameters + app.create_command( + session_type="st20p", + nic_port=host.vfs[0] if host.vfs else "0000:31:01.0", + destination_ip="239.168.48.9", + port=20000, + width=media_file_info["width"], + height=media_file_info["height"], + framerate=f"p{media_file_info['fps']}", + pixel_format=media_file_info["file_format"], + transport_format=media_file_info["format"], + input_file=media_file_path, + output_file="/tmp/gstreamer_output.yuv", + ) + + # Execute test using Application class + app.execute_test( + build=build, + test_time=test_time, + host=host, + input_file=media_file_path, + output_file="/tmp/gstreamer_output.yuv", + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/fps/test_fps_refactored.py b/tests/validation/tests/single/st20p/fps/test_fps_refactored.py new file mode 100644 index 000000000..d932f4a93 --- /dev/null +++ b/tests/validation/tests/single/st20p/fps/test_fps_refactored.py @@ -0,0 +1,132 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_422rfc10 + + +# Define a custom fixture to provide test configuration without external config file dependency +@pytest.fixture(scope="session") +def default_test_config(): + """Provide default test configuration to avoid config file dependency issues.""" + return { + "build": "/root/awilczyn/Media-Transport-Library/build", + "mtl_path": "/root/awilczyn/Media-Transport-Library/build", + "media_path": "/mnt/media", + "test_time": 5, + "delay_between_tests": 1, + "capture_cfg": { + "enable": False, + "test_name": "test_name", + "pcap_dir": "/tmp/pcap", + "capture_time": 5, + "interface": None + }, + "ramdisk": { + "media": { + "mountpoint": "/tmp/media", + "size_gib": 1 + }, + "pcap": { + "mountpoint": "/tmp/pcap", + "size_gib": 1 + } + }, + "compliance": False + } + + +@pytest.mark.nightly +@pytest.mark.parametrize( + "media_file", + [yuv_files_422rfc10["ParkJoy_1080p"]], + indirect=["media_file"], + ids=["ParkJoy_1080p"], +) +@pytest.mark.parametrize( + "fps", + [ + "p23", + "p24", + "p25", + pytest.param("p29", marks=pytest.mark.smoke), + "p30", + "p50", + "p59", + "p60", + "p100", + "p119", + "p120", + ], +) +def test_fps_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + fps, + prepare_ramdisk, + media_file, + default_test_config, # Use our custom fixture instead of the problematic test_config +): + """ + Test different frame rates using Application class refactored interface + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Use the default test configuration from our custom fixture + capture_cfg = dict(default_test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = f"test_fps_refactored_{media_file_info['filename']}_{fps}" + capture_cfg["enabled"] = False # Disable tcpdump capture to avoid complexity + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + # Match the original test configuration exactly - use multicast mode for proper TX/RX setup + config_params = { + "session_type": "st20p", + "nic_port_list": host.vfs, + "test_mode": "multicast", # Use multicast like the working original test + "destination_ip": "239.168.48.9", # Multicast destination IP + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": fps, + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + } + + # Add performance optimizations for frame rates that need more stability + if fps in ["p30", "p50", "p59", "p60"]: + config_params.update({ + "pacing": "gap", # Use gap pacing for better stability + "tx_no_chain": True, # Optimize for performance + }) + elif fps in ["p100", "p119", "p120"]: + config_params.update({ + "pacing": "linear", # Better pacing for high frame rates + "tx_no_chain": True, # Optimize for performance + }) + + app.create_command(**config_params) + + # Execute test using Application class + # Use longer test time for accurate FPS measurement and stability + actual_test_time = test_time + if fps in ["p30", "p50", "p59", "p60"]: + actual_test_time = max(test_time, 15) # Minimum 15 seconds for stability + elif fps in ["p100", "p119", "p120"]: + actual_test_time = max(test_time, 10) # Minimum 10 seconds for high FPS accuracy + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py b/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py new file mode 100644 index 000000000..a8b0ae7ef --- /dev/null +++ b/tests/validation/tests/single/st20p/integrity/test_integrity_refactored.py @@ -0,0 +1,115 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import logging +import os + +import pytest +from mfd_common_libs.log_levels import TEST_PASS +from mtl_engine.app_refactored import Application +from mtl_engine.const import LOG_FOLDER +from mtl_engine.execute import log_fail +from mtl_engine.integrity import calculate_yuv_frame_size, check_st20p_integrity +from mtl_engine.media_files import yuv_files_422p10le, yuv_files_422rfc10 + +logger = logging.getLogger(__name__) + + +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Penguin_720p"], + yuv_files_422rfc10["Penguin_1080p"], + pytest.param(yuv_files_422p10le["Penguin_720p"], marks=pytest.mark.nightly), + yuv_files_422p10le["Penguin_1080p"], + ], + indirect=["media_file"], + ids=[ + "Penguin_720p_422rfc10", + "Penguin_1080p_422rfc10", + "Penguin_720p_422p10le", + "Penguin_1080p_422p10le", + ], +) +def test_integrity_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test video integrity by comparing input and output files using Application class refactored interface + """ + media_file_info, media_file_path = media_file + + # Ensure the output directory exists for the integrity test output file. + log_dir = os.path.join(os.getcwd(), LOG_FOLDER, "latest") + os.makedirs(log_dir, exist_ok=True) + out_file_url = os.path.join(log_dir, "out.yuv") + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + capture_cfg = dict(test_config.get("capture_cfg", {})) + # Set a unique pcap file name + capture_cfg["test_name"] = f"test_integrity_refactored_{media_file_info['filename']}" + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", # TX interface IP + "destination_ip": "192.168.17.102", # RX interface IP (loopback) + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": "p25", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + "output_file": out_file_url, # Specify output file for integrity checking + "test_mode": "unicast", + } + + # Add integrity-specific optimizations + config_params.update({ + "pacing": "linear", # Ensure consistent frame delivery + "tx_no_chain": False, # Keep chain for better integrity + }) + + app.create_command(**config_params) + + # Execute test using Application class + # Use longer test time for accurate integrity measurement + actual_test_time = max(test_time, 8) # Minimum 8 seconds for integrity check + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) + + # Perform integrity check after test execution + frame_size = calculate_yuv_frame_size( + media_file_info["width"], + media_file_info["height"], + media_file_info["file_format"], + ) + result = check_st20p_integrity( + src_url=media_file_path, out_url=out_file_url, frame_size=frame_size + ) + + if result: + logger.log(TEST_PASS, "INTEGRITY PASS") + else: + log_fail("INTEGRITY FAIL") diff --git a/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py b/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py new file mode 100644 index 000000000..3efe3c9b6 --- /dev/null +++ b/tests/validation/tests/single/st20p/interlace/test_interlace_refactored.py @@ -0,0 +1,78 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_interlace + + +@pytest.mark.parametrize( + "media_file", + list(yuv_files_interlace.values()), + indirect=["media_file"], + ids=list(yuv_files_interlace.keys()), +) +def test_interlace_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test interlaced video transmission using Application class refactored interface + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + # capture_time: 15 + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_interlace_refactored_{media_file_info['filename']}" # Set a unique pcap file name + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", # TX interface IP + "destination_ip": "192.168.17.102", # RX interface IP + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + "test_mode": "unicast", + "interlaced": True, # Enable interlaced mode + } + + # Add interlace-specific optimizations + config_params.update({ + "pacing": "linear", # Better for interlaced content + "tx_no_chain": False, # Keep chain for better field synchronization + }) + + app.create_command(**config_params) + + # Execute test using Application class + # Use longer test time for interlaced content to ensure proper field handling + actual_test_time = max(test_time, 10) # Minimum 10 seconds for interlaced accuracy + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py b/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py new file mode 100644 index 000000000..ae61144fd --- /dev/null +++ b/tests/validation/tests/single/st20p/pacing/test_pacing_refactored.py @@ -0,0 +1,92 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_422rfc10 + + +@pytest.mark.nightly +@pytest.mark.parametrize("pacing", ["narrow", "wide", "linear"]) +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_pacing_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + pacing, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test different pacing modes (narrow, wide, linear) using Application class refactored interface + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_pacing_refactored_{media_file_info['filename']}_{pacing}" # Set a unique pcap file name + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "source_ip": "192.168.17.101", # TX interface IP + "destination_ip": "192.168.17.102", # RX interface IP for unicast loopback + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + "test_mode": "unicast", + "pacing": pacing, # Specify the pacing mode to test + } + + # Add pacing-specific optimizations based on resolution and pacing mode + if media_file_info.get("height", 0) >= 2160: # 4K content + if pacing == "linear": + config_params["tx_no_chain"] = True # Better for 4K linear pacing + else: + config_params["tx_no_chain"] = False # Keep chain for narrow/wide pacing + elif pacing == "narrow": + # Narrow pacing benefits from tighter timing control + config_params["tx_no_chain"] = False + + app.create_command(**config_params) + + # Execute test using Application class + # Use longer test time for 4K content and accurate pacing measurement + actual_test_time = test_time + if media_file_info.get("height", 0) >= 2160: # 4K content + actual_test_time = max(test_time, 12) # Minimum 12 seconds for 4K pacing accuracy + else: + actual_test_time = max(test_time, 8) # Minimum 8 seconds for HD pacing accuracy + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/packing/test_packing_refactored.py b/tests/validation/tests/single/st20p/packing/test_packing_refactored.py new file mode 100644 index 000000000..c3febed29 --- /dev/null +++ b/tests/validation/tests/single/st20p/packing/test_packing_refactored.py @@ -0,0 +1,102 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_422rfc10 + + +@pytest.mark.nightly +@pytest.mark.parametrize("packing", ["GPM_SL", "GPM"]) +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_packing_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + packing, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test different packing modes (GPM_SL, GPM) using Application class refactored interface + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_packing_refactored_{media_file_info['filename']}_{packing}" # Set a unique pcap file name + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + # Explicit dual interface loopback IPs for unicast mode + "source_ip": "192.168.17.101", # TX interface IP + "destination_ip": "192.168.17.102", # RX interface IP (unicast destination) + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + "test_mode": "unicast", + "packing": packing, # Specify the packing mode to test + } + + # Add packing-specific optimizations based on resolution and packing mode + if media_file_info.get("height", 0) >= 2160: # 4K content + if packing == "GPM_SL": + # GPM_SL (Single Line) mode is more efficient for 4K + config_params.update({ + "tx_no_chain": True, # Better performance for GPM_SL + "pacing": "linear", # Linear pacing works well with GPM_SL + }) + else: # GPM mode + config_params.update({ + "tx_no_chain": False, # Keep chain for regular GPM + "pacing": "wide", # Wide pacing for GPM with 4K + }) + else: # HD content + if packing == "GPM_SL": + config_params["pacing"] = "linear" + else: + config_params["pacing"] = "narrow" # Narrow pacing for HD GPM + + app.create_command(**config_params) + + # Execute test using Application class + # Use longer test time for 4K content and accurate packing validation + actual_test_time = test_time + if media_file_info.get("height", 0) >= 2160: # 4K content + actual_test_time = max(test_time, 12) # Minimum 12 seconds for 4K packing accuracy + else: + actual_test_time = max(test_time, 8) # Minimum 8 seconds for HD packing accuracy + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py b/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py new file mode 100644 index 000000000..eb04c3890 --- /dev/null +++ b/tests/validation/tests/single/st20p/resolutions/test_resolutions_refactored.py @@ -0,0 +1,98 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_422rfc10 + + +@pytest.mark.parametrize( + "media_file", + list(yuv_files_422rfc10.values()), + indirect=["media_file"], + ids=list(yuv_files_422rfc10.keys()), +) +def test_resolutions_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test different video resolutions using Application class refactored interface + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_resolutions_refactored_{media_file_info['filename']}" # Set a unique pcap file name + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "destination_ip": "239.168.48.9", # Multicast destination + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + "test_mode": "multicast", + } + + # Add resolution-specific optimizations + height = media_file_info.get("height", 0) + width = media_file_info.get("width", 0) + + if height >= 2160: # 4K and above + config_params.update({ + "pacing": "linear", # Linear pacing for 4K + "packing": "GPM_SL", # Single line packing for efficiency + "tx_no_chain": True, # Optimize for 4K performance + }) + elif height >= 1080: # Full HD + config_params.update({ + "pacing": "wide", # Wide pacing for 1080p + "packing": "GPM", # Standard GPM packing + "tx_no_chain": False, # Keep chain for 1080p + }) + else: # HD 720p and below + config_params.update({ + "pacing": "narrow", # Narrow pacing for lower resolutions + "packing": "GPM", # Standard GPM packing + "tx_no_chain": False, # Keep chain for lower resolutions + }) + + app.create_command(**config_params) + + # Execute test using Application class + # Use adaptive test time based on resolution for accurate measurement + actual_test_time = test_time + if height >= 2160: # 4K content + actual_test_time = max(test_time, 15) # Minimum 15 seconds for 4K accuracy + elif height >= 1080: # Full HD content + actual_test_time = max(test_time, 10) # Minimum 10 seconds for 1080p accuracy + else: # HD 720p and below + actual_test_time = max(test_time, 8) # Minimum 8 seconds for HD accuracy + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py b/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py new file mode 100644 index 000000000..d13fd4d0b --- /dev/null +++ b/tests/validation/tests/single/st20p/test_mode/test_multicast_refactored.py @@ -0,0 +1,101 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2024-2025 Intel Corporation + +import pytest +from mtl_engine.app_refactored import Application +from mtl_engine.media_files import yuv_files_422rfc10 + + +@pytest.mark.parametrize( + "media_file", + [ + yuv_files_422rfc10["Crosswalk_720p"], + yuv_files_422rfc10["ParkJoy_1080p"], + yuv_files_422rfc10["Pedestrian_4K"], + ], + indirect=["media_file"], + ids=["Crosswalk_720p", "ParkJoy_1080p", "Pedestrian_4K"], +) +def test_multicast_refactored( + hosts, + build, + media, + nic_port_list, + test_time, + test_config, + prepare_ramdisk, + media_file, +): + """ + Test multicast transmission mode using Application class refactored interface + """ + media_file_info, media_file_path = media_file + host = list(hosts.values())[0] + + # Get capture configuration from test_config.yaml + # This controls whether tcpdump capture is enabled, where to store the pcap, etc. + capture_cfg = dict(test_config.get("capture_cfg", {})) + capture_cfg["test_name"] = ( + f"test_multicast_refactored_{media_file_info['filename']}" # Set a unique pcap file name + ) + + # Create Application instance for RxTxApp + app = Application("RxTxApp", f"{build}/tests/tools/RxTxApp/build") + + # Configure application using universal parameters + config_params = { + "session_type": "st20p", + "nic_port": host.vfs[0] if host.vfs else "0000:31:01.0", + "nic_port_list": host.vfs, + "destination_ip": "239.168.48.9", # Multicast destination + "port": 20000, + "width": media_file_info["width"], + "height": media_file_info["height"], + "framerate": f"p{media_file_info['fps']}", + "pixel_format": media_file_info["file_format"], + "transport_format": media_file_info["format"], + "pixel_format_rx": media_file_info["file_format"], + "input_file": media_file_path, + "test_mode": "multicast", + } + + # Add multicast-specific optimizations based on resolution + height = media_file_info.get("height", 0) + + if height >= 2160: # 4K content + config_params.update({ + "pacing": "linear", # Linear pacing for 4K multicast + "packing": "GPM_SL", # Single line packing for efficiency + "tx_no_chain": True, # Optimize for 4K multicast performance + }) + elif height >= 1080: # Full HD content + config_params.update({ + "pacing": "wide", # Wide pacing for 1080p multicast + "packing": "GPM", # Standard GPM packing + "tx_no_chain": False, # Keep chain for 1080p multicast + }) + else: # HD 720p and below + config_params.update({ + "pacing": "narrow", # Narrow pacing for lower resolutions + "packing": "GPM", # Standard GPM packing + "tx_no_chain": False, # Keep chain for lower resolutions + }) + + app.create_command(**config_params) + + # Execute test using Application class + # Use adaptive test time based on resolution for accurate multicast measurement + actual_test_time = test_time + if height >= 2160: # 4K content + actual_test_time = max(test_time, 15) # Minimum 15 seconds for 4K multicast accuracy + elif height >= 1080: # Full HD content + actual_test_time = max(test_time, 10) # Minimum 10 seconds for 1080p multicast accuracy + else: # HD 720p and below + actual_test_time = max(test_time, 8) # Minimum 8 seconds for HD multicast accuracy + + app.execute_test( + build=build, + test_time=actual_test_time, + host=host, + capture_cfg=capture_cfg, + ) diff --git a/vtune_analysis.py b/vtune_analysis.py new file mode 100644 index 000000000..428461a61 --- /dev/null +++ b/vtune_analysis.py @@ -0,0 +1,647 @@ +#!/usr/bin/env python3 + +import subprocess +import os +import sys +import shlex +import shutil +import argparse +import time +import json +import re +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Tuple + +def setup_vtune_environment(): + """Source VTune environment variables.""" + vtune_vars_script = "/opt/intel/oneapi/vtune/latest/vtune-vars.sh" + if os.path.exists(vtune_vars_script): + cmd = f"source {vtune_vars_script} && env" + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, executable='/bin/bash') + if result.returncode == 0: + for line in result.stdout.split('\n'): + if '=' in line and not line.startswith('_'): + key, value = line.split('=', 1) + os.environ[key] = value + print("✓ VTune environment variables loaded successfully") + else: + print(f"Warning: Could not source VTune environment: {result.stderr}") + else: + print(f"Warning: VTune environment script not found at {vtune_vars_script}") + +setup_vtune_environment() + +def read_config_file(config_file: str) -> Tuple[str, str]: + """Read configuration from file.""" + try: + with open(config_file, 'r') as f: + lines = [line.strip() for line in f.readlines() if line.strip() and not line.startswith('#')] + + if len(lines) < 2: + raise ValueError("Config file must contain at least 2 lines: command and working directory") + + app_command, app_working_dir = lines[0], lines[1] + print(f"Read from config file:\n Command: {app_command}\n Working Directory: {app_working_dir}") + return validate_paths(app_command, app_working_dir) + + except FileNotFoundError: + print(f"Error: Config file '{config_file}' not found") + sys.exit(1) + except Exception as e: + print(f"Error reading config file: {e}") + sys.exit(1) + +def validate_paths(app_command: str, app_working_dir: str) -> Tuple[str, str]: + """Validate that the application and working directory exist.""" + cmd_parts = shlex.split(app_command) + if not cmd_parts: + raise ValueError("Empty command") + + executable = cmd_parts[0] + if not os.path.isabs(executable): + executable = os.path.abspath(executable) + if not os.path.isabs(app_working_dir): + app_working_dir = os.path.abspath(app_working_dir) + + if not os.path.isfile(executable): + raise ValueError(f"Executable not found: {executable}") + if not os.access(executable, os.X_OK): + raise ValueError(f"File is not executable: {executable}") + if not os.path.isdir(app_working_dir): + raise ValueError(f"Working directory not found: {app_working_dir}") + + print(f"✓ Validated executable: {executable}\n✓ Validated working directory: {app_working_dir}") + updated_cmd_parts = [executable] + cmd_parts[1:] + updated_app_command = ' '.join(shlex.quote(part) for part in updated_cmd_parts) + + return updated_app_command, app_working_dir + +class VTuneAnalyzer: + def __init__(self, app_command: str, app_working_dir: str, duration: int = 90): + self.app_command = app_command + self.app_working_dir = app_working_dir + self.duration = duration + self.results_dir = Path("vtune_results") + self.results_dir.mkdir(exist_ok=True) + + self.analyses = { + 'io': { + 'name': 'I/O Analysis', 'collect': 'io', 'description': 'Analyzes I/O operations and disk access patterns', 'timeout_multiplier': 2.5, + 'fallback_modes': [('io', 'Full I/O analysis'), ('disk-io', 'Disk I/O analysis only'), ('system-overview', 'System overview with I/O metrics')] + }, + 'memory-access': {'name': 'Memory Access Analysis', 'collect': 'memory-access', 'description': 'Analyzes memory access patterns and cache behavior', 'timeout_multiplier': 1.5}, + 'uarch-exploration': {'name': 'Microarchitecture Exploration', 'collect': 'uarch-exploration', 'description': 'Analyzes microarchitecture utilization and bottlenecks', 'timeout_multiplier': 1.3}, + 'memory-consumption': {'name': 'Memory Consumption', 'collect': 'memory-consumption', 'description': 'Analyzes memory allocation and usage patterns', 'timeout_multiplier': 1.5} + } + self.results = {} + + def get_system_info(self) -> Dict[str, str]: + """Collect system information.""" + info = {} + try: + # CPU information + with open('/proc/cpuinfo', 'r') as f: + cpuinfo = f.read() + cpu_match = re.search(r'model name\s*:\s*(.+)', cpuinfo) + if cpu_match: + info['CPU Model'] = cpu_match.group(1).strip() + info['Logical Cores'] = str(len(re.findall(r'^processor\s*:', cpuinfo, re.MULTILINE))) + + # Memory information + with open('/proc/meminfo', 'r') as f: + meminfo = f.read() + mem_match = re.search(r'MemTotal:\s*(\d+)\s*kB', meminfo) + if mem_match: + mem_gb = int(mem_match.group(1)) / 1024 / 1024 + info['Total Memory'] = f"{mem_gb:.1f} GB" + + # OS and kernel information + try: + with open('/etc/os-release', 'r') as f: + os_info = f.read() + pretty_name = re.search(r'PRETTY_NAME="([^"]+)"', os_info) + if pretty_name: + info['OS'] = pretty_name.group(1) + except: + info['OS'] = 'Unknown' + + try: + info['Kernel'] = subprocess.check_output(['uname', '-r'], text=True).strip() + except: + info['Kernel'] = 'Unknown' + + # VTune version + try: + result = subprocess.run(['vtune', '--version'], capture_output=True, text=True) + version_match = re.search(r'Intel.*VTune.*(\d+\.\d+\.\d+)', result.stdout) + info['VTune Version'] = version_match.group(1) if version_match else 'Unknown' + except: + info['VTune Version'] = 'Unknown' + + except Exception as e: + print(f"Warning: Could not gather complete system info: {e}") + return info + + def run_vtune_analysis(self, analysis_key: str) -> Dict[str, str]: + """Run a specific VTune analysis.""" + analysis = self.analyses[analysis_key] + result_dir = self.results_dir / f"r_{analysis_key}" + + print(f"\n{'='*60}") + print(f"Running {analysis['name']}...") + print(f"Description: {analysis['description']}") + print(f"Duration: {self.duration} seconds") + print(f"{'='*60}") + + if result_dir.exists(): + print(f"Removing existing result directory: {result_dir}") + shutil.rmtree(result_dir) + + # Get fallback modes if available, otherwise use single mode + modes = analysis.get('fallback_modes', [(analysis['collect'], analysis['name'])]) + + for i, (collect_mode, description) in enumerate(modes): + if len(modes) > 1: + print(f"\nAttempt {i+1}: {description}") + + current_analysis = analysis.copy() + current_analysis['collect'] = collect_mode + result = self._run_single_vtune_analysis(current_analysis, result_dir, analysis_key) + + if result.get('Status') == 'Success': + return result + + # If we get here, all attempts failed + if len(modes) > 1: + print(f"✗ All {analysis['name']} attempts failed") + return {'Status': 'Failed', 'Error': f'All {analysis["name"]} modes failed or timed out'} + else: + return result + + + def _run_single_vtune_analysis(self, analysis: dict, result_dir: Path, analysis_key: str) -> Dict[str, str]: + """Run a single VTune analysis attempt.""" + vtune_cmd = ['vtune', '-collect', analysis['collect'], '-result-dir', str(result_dir.absolute()), '-duration', str(self.duration), '-app-working-dir', os.path.abspath(self.app_working_dir), '--'] + shlex.split(self.app_command) + + timeout_multiplier = analysis.get('timeout_multiplier', 1.5) + analysis_timeout = int(self.duration * timeout_multiplier + 60) + + print(f"Command: {' '.join(vtune_cmd)}") + print(f"Working Directory: {os.path.abspath(self.app_working_dir)}") + print(f"Result Directory: {result_dir.absolute()}") + + result_dir.parent.mkdir(parents=True, exist_ok=True) + + start_time = time.time() + try: + result = subprocess.run(vtune_cmd, timeout=analysis_timeout, capture_output=True, text=True, cwd=os.path.abspath(self.app_working_dir)) + elapsed_time = time.time() - start_time + + if result.returncode == 0: + print(f"✓ Analysis completed successfully in {elapsed_time:.1f}s") + return self._parse_vtune_results(result_dir, analysis_key) + else: + print(f"✗ Analysis failed with return code {result.returncode}") + print(f"STDERR: {result.stderr}") + return {'Status': 'Failed', 'Error': f"VTune failed with code {result.returncode}: {result.stderr}"} + except subprocess.TimeoutExpired: + print(f"✗ Analysis timed out after {analysis_timeout}s") + return {'Status': 'Failed', 'Error': f'Analysis timed out after {analysis_timeout} seconds'} + except Exception as e: + print(f"✗ Analysis failed with exception: {e}") + return {'Status': 'Failed', 'Error': str(e)} + + def _parse_vtune_results(self, result_dir: Path, analysis_key: str) -> Dict[str, str]: + """Parse VTune results from the result directory.""" + results = {'Status': 'Success'} + + try: + summary_cmd = ['vtune', '-report', 'summary', '-result-dir', str(result_dir)] + summary_result = subprocess.run(summary_cmd, capture_output=True, text=True, timeout=60) + + if summary_result.returncode == 0: + summary = summary_result.stdout + if analysis_key == 'io': + results.update(self._parse_io_summary(summary)) + elif analysis_key == 'memory-access': + results.update(self._parse_patterns(summary, self._get_memory_access_patterns())) + elif analysis_key == 'uarch-exploration': + results.update(self._parse_patterns(summary, self._get_uarch_patterns())) + elif analysis_key == 'memory-consumption': + results.update(self._parse_patterns(summary, self._get_memory_consumption_patterns())) + else: + results['Status'] = 'Failed' + results['Error'] = f"Failed to generate summary: {summary_result.stderr}" + + except Exception as e: + results['Status'] = 'Failed' + results['Error'] = f"Failed to parse results: {str(e)}" + + return results + + def _parse_patterns(self, summary: str, patterns: dict) -> Dict[str, str]: + """Generic pattern parsing for all analysis types.""" + results = {} + for key, pattern in patterns.items(): + match = re.search(pattern, summary, re.IGNORECASE) + if match: + value = match.group(1) + # Add units based on the metric type and pattern + results[key] = self._format_value_with_unit(key, value) + return results + + def _parse_io_summary(self, summary: str) -> Dict[str, str]: + """Parse I/O analysis summary with specialized patterns.""" + results = {} + + # PCIe Traffic metrics + io_patterns = { + 'Inbound PCIe Read': r'Inbound PCIe Read, MB/sec:\s*([0-9.]+)', + 'Inbound PCIe Write': r'Inbound PCIe Write, MB/sec:\s*([0-9.]+)', + 'Outbound PCIe Read': r'Outbound PCIe Read, MB/sec:\s*([0-9.]+)', + 'Outbound PCIe Write': r'Outbound PCIe Write, MB/sec:\s*([0-9.]+)', + 'Inbound PCIe Read L3 Hit': r'Inbound PCIe Read, MB/sec:.*?\n.*?L3 Hit, %:\s*([0-9.]+)', + 'Inbound PCIe Read L3 Miss': r'Inbound PCIe Read, MB/sec:.*?\n.*?L3 Miss, %:\s*([0-9.]+)', + 'Inbound PCIe Read Average Latency': r'Inbound PCIe Read, MB/sec:.*?Average Latency, ns:\s*([0-9.]+)', + 'Inbound PCIe Write L3 Hit': r'Inbound PCIe Write, MB/sec:.*?\n.*?L3 Hit, %:\s*([0-9.]+)', + 'Inbound PCIe Write L3 Miss': r'Inbound PCIe Write, MB/sec:.*?\n.*?L3 Miss, %:\s*([0-9.]+)', + 'Inbound PCIe Write Average Latency': r'Inbound PCIe Write, MB/sec:.*?Average Latency, ns:\s*([0-9.]+)', + 'Effective Physical Core Utilization': r'Effective Physical Core Utilization:\s*([0-9.]+)%\s*\(([0-9.]+)\s*out\s*of\s*([0-9]+)\)', + 'Effective Logical Core Utilization': r'Effective Logical Core Utilization:\s*([0-9.]+)%\s*\(([0-9.]+)\s*out\s*of\s*([0-9]+)\)', + } + + for key, pattern in io_patterns.items(): + match = re.search(pattern, summary, re.DOTALL) + if match: + # Add appropriate units based on the metric type + if 'PCIe Read' in key or 'PCIe Write' in key: + if 'L3 Hit' in key or 'L3 Miss' in key: + results[key] = f"{match.group(1)}%" + elif 'Average Latency' in key: + results[key] = f"{match.group(1)} ns" + else: + results[key] = f"{match.group(1)} MB/s" + else: + results[key] = match.group(1) + + # Bandwidth utilization table parsing - exact format matching + bandwidth_section = re.search(r'Bandwidth Utilization\n.*?\n.*?\n(.*?)(?=\n\nTop|collection|Collection|\Z)', summary, re.DOTALL | re.IGNORECASE) + if bandwidth_section: + table_content = bandwidth_section.group(1).strip() + for line in table_content.split('\n'): + line = line.strip() + if not line or line.startswith('-'): + continue + + # Parse table format using regex to handle whitespace properly + if 'DRAM, GB/sec' in line: + # Extract numbers from: "DRAM, GB/sec 350 22.600 4.863 0.0%" + match = re.search(r'DRAM, GB/sec\s+(\d+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)%', line) + if match: + results['DRAM Platform Maximum'] = f"{match.group(1)} GB/s" + results['DRAM Observed Maximum'] = f"{match.group(2)} GB/s" + results['DRAM Average'] = f"{match.group(3)} GB/s" + results['DRAM High BW Utilization'] = f"{match.group(4)}%" + + elif 'DRAM Single-Package, GB/sec' in line: + # Extract numbers from: "DRAM Single-Package, GB/sec 175 19.900 5.734 0.0%" + match = re.search(r'DRAM Single-Package, GB/sec\s+(\d+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)%', line) + if match: + results['DRAM Single-Package Platform Maximum'] = f"{match.group(1)} GB/s" + results['DRAM Single-Package Observed Maximum'] = f"{match.group(2)} GB/s" + results['DRAM Single-Package Average'] = f"{match.group(3)} GB/s" + results['DRAM Single-Package High BW Utilization'] = f"{match.group(4)}%" + + elif 'UPI Utilization Single-link' in line: + # Extract numbers from: "UPI Utilization Single-link, (%) 100 19.600 3.711 0.0%" + match = re.search(r'UPI Utilization Single-link.*?\s+(\d+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)%', line) + if match: + results['UPI Platform Maximum'] = f"{match.group(1)}%" + results['UPI Observed Maximum'] = f"{match.group(2)}%" + results['UPI Average'] = f"{match.group(3)}%" + results['UPI High BW Utilization'] = f"{match.group(4)}%" + + elif 'PCIe Bandwidth, MB/sec' in line: + # Extract numbers from: "PCIe Bandwidth, MB/sec 40 689.700 430.933 91.2%" + match = re.search(r'PCIe Bandwidth, MB/sec\s+(\d+)\s+([\d.]+)\s+([\d.]+)\s+([\d.]+)%', line) + if match: + results['PCIe Platform Maximum'] = f"{match.group(1)} MB/s" + results['PCIe Observed Maximum'] = f"{match.group(2)} MB/s" + results['PCIe Average'] = f"{match.group(3)} MB/s" + results['PCIe High BW Utilization'] = f"{match.group(4)}%" + + # Parse core utilization with multiple groups + phys_core_match = re.search(r'Effective Physical Core Utilization:\s*([0-9.]+)%\s*\(([0-9.]+)\s*out\s*of\s*([0-9]+)\)', summary) + if phys_core_match: + results['Effective Physical Core Utilization'] = f"{phys_core_match.group(1)}%" + results['Effective Physical Cores Used'] = f"{phys_core_match.group(2)} cores" + results['Total Physical Cores'] = f"{phys_core_match.group(3)} cores" + + log_core_match = re.search(r'Effective Logical Core Utilization:\s*([0-9.]+)%\s*\(([0-9.]+)\s*out\s*of\s*([0-9]+)\)', summary) + if log_core_match: + results['Effective Logical Core Utilization'] = f"{log_core_match.group(1)}%" + results['Effective Logical Cores Used'] = f"{log_core_match.group(2)} cores" + results['Total Logical Cores'] = f"{log_core_match.group(3)} cores" + + return results + + def _get_memory_access_patterns(self) -> dict: + """Memory Access analysis patterns.""" + return { + 'Memory Bound': r'Memory Bound:\s*([0-9.]+)%', + 'L1 Bound': r'L1 Bound:\s*([0-9.]+)%', + 'L2 Bound': r'L2 Bound:\s*([0-9.]+)%', + 'L3 Bound': r'L3 Bound:\s*([0-9.]+)%', + 'DRAM Bound': r'DRAM Bound:\s*([0-9.]+)%', + 'Store Bound': r'Store Bound:\s*([0-9.]+)%', + 'NUMA Remote Accesses': r'NUMA.*Remote.*Accesses.*:\s*([0-9.]+)%', + 'Remote Accesses': r'Remote.*Accesses.*:\s*([0-9.]+)%', + 'LLC Miss Count': r'LLC.*Miss.*Count:\s*([0-9,]+)', + 'Loads': r'Loads:\s*([0-9,]+)', + 'Stores': r'Stores:\s*([0-9,]+)' + } + + def _get_uarch_patterns(self) -> dict: + """Microarchitecture Exploration patterns.""" + return { + 'CPI Rate': r'CPI Rate:\s*([0-9.]+)', + 'Retiring': r'Retiring:\s*([0-9.]+)%', + 'Bad Speculation': r'Bad Speculation:\s*([0-9.]+)%', + 'Front-End Bound': r'Front-End Bound:\s*([0-9.]+)%', + 'Back-End Bound': r'Back-End Bound:\s*([0-9.]+)%', + 'FP Arithmetic': r'FP Arithmetic:\s*([0-9.]+)%', + 'Memory Operations': r'Memory Operations:\s*([0-9.]+)%', + 'Branch Instructions': r'Branch Instructions:\s*([0-9.]+)%', + 'NOP Instructions': r'NOP Instructions:\s*([0-9.]+)%', + 'Other': r'Other:\s*([0-9.]+)%', + 'Core Bound': r'Core Bound:\s*([0-9.]+)%', + 'Divider': r'Divider:\s*([0-9.]+)%', + 'Cycles of 0 Ports Utilized': r'Cycles of 0 Ports Utilized:\s*([0-9.]+)%', + 'Cycles of 1 Port Utilized': r'Cycles of 1 Port Utilized:\s*([0-9.]+)%', + 'Cycles of 2 Ports Utilized': r'Cycles of 2 Ports Utilized:\s*([0-9.]+)%', + 'Cycles of 3+ Ports Utilized': r'Cycles of 3\+ Ports Utilized:\s*([0-9.]+)%', + 'Lock Latency': r'Lock Latency:\s*([0-9.]+)%', + 'SQ Full': r'SQ Full:\s*([0-9.]+)%' + } + + + def _get_memory_consumption_patterns(self) -> dict: + """Memory Consumption analysis patterns.""" + return { + 'Allocation Size': r'Allocation Size:\s*([0-9.]+\s*[KMGT]?B)', + 'Deallocation Size': r'Deallocation Size:\s*([0-9.]+\s*[KMGT]?B)' + } + + def _format_value_with_unit(self, key: str, value: str) -> str: + """Add appropriate units to values that might be missing them.""" + if any(unit in value.upper() for unit in ['B', 'KB', 'MB', 'GB', 'TB', '%', 'MS', 'S', '/S']): + return value + + # Common unit mappings + unit_map = { + 'Time': 's', 'Latency': 'ns', 'Hit Rate': '%', 'Bound': '%', 'Utilization': '%', + 'CPI': '', 'IPC': '', 'Operations': 'ops', 'Instructions': 'ops', 'Count': '', + 'Bandwidth': 'B/s', 'FLOPS': 'FLOPS', 'GFLOPS': 'GFLOPS', 'Frequency': 'GHz' + } + + for keyword, unit in unit_map.items(): + if keyword in key: + return f"{value} {unit}" if unit else value + + return value + + def _format_category_results(self, result: Dict[str, str], categories: dict) -> List[str]: + """Generic category-based result formatting.""" + formatted_lines = [] + + for category_name, category_info in categories.items(): + category_metrics = {key: result[key] for key in category_info['metrics'] if key in result} + + if category_metrics: + formatted_lines.append(f" {category_name}:") + formatted_lines.append(f" {category_info['explanation']}") + for key, value in category_metrics.items(): + formatted_value = self._format_value_with_unit(key, value) + formatted_lines.append(f" {key:40} {formatted_value}") + formatted_lines.append("") + + # Add remaining metrics + all_categorized_keys = set() + for category_info in categories.values(): + all_categorized_keys.update(category_info['metrics']) + + remaining_metrics = {key: value for key, value in result.items() if key != 'Status' and key not in all_categorized_keys} + if remaining_metrics: + formatted_lines.append(" Other Metrics:") + for key, value in remaining_metrics.items(): + formatted_value = self._format_value_with_unit(key, value) + formatted_lines.append(f" {key:40} {formatted_value}") + formatted_lines.append("") + + return formatted_lines + + def _format_io_results(self, result: Dict[str, str]) -> List[str]: + """Format I/O analysis results.""" + categories = { + 'PCIe Traffic Summary': { + 'metrics': ['Inbound PCIe Read', 'Inbound PCIe Write', 'Outbound PCIe Read', 'Outbound PCIe Write', 'PCIe Platform Maximum', 'PCIe Observed Maximum', 'PCIe Average', 'PCIe High BW Utilization'], + 'explanation': 'PCIe bus traffic analysis showing inbound/outbound data movement and bandwidth utilization' + }, + 'PCIe Cache Performance': { + 'metrics': ['Inbound PCIe Read L3 Hit', 'Inbound PCIe Read L3 Miss', 'Inbound PCIe Write L3 Hit', 'Inbound PCIe Write L3 Miss', 'Inbound PCIe Read Average Latency', 'Inbound PCIe Write Average Latency'], + 'explanation': 'PCIe cache hit/miss rates and memory access latency for PCIe operations' + }, + 'DRAM Bandwidth Utilization': { + 'metrics': ['DRAM Platform Maximum', 'DRAM Observed Maximum', 'DRAM Average', 'DRAM High BW Utilization', 'DRAM Single-Package Platform Maximum', 'DRAM Single-Package Observed Maximum', 'DRAM Single-Package Average', 'DRAM Single-Package High BW Utilization'], + 'explanation': 'DRAM memory bandwidth usage across NUMA nodes and memory packages' + }, + 'UPI Utilization': { + 'metrics': ['UPI Platform Maximum', 'UPI Observed Maximum', 'UPI Average', 'UPI High BW Utilization'], + 'explanation': 'Ultra Path Interconnect (UPI) bandwidth utilization between CPU packages' + }, + 'Core Utilization': { + 'metrics': ['Effective Physical Core Utilization', 'Effective Physical Cores Used', 'Total Physical Cores', 'Effective Logical Core Utilization', 'Effective Logical Cores Used', 'Total Logical Cores'], + 'explanation': 'CPU core utilization during I/O operations showing physical and logical core usage' + } + } + return self._format_category_results(result, categories) + + def _format_memory_access_results(self, result: Dict[str, str]) -> List[str]: + """Format Memory Access analysis results.""" + categories = { + 'Memory Hierarchy Bottlenecks': { + 'metrics': ['Memory Bound', 'L1 Bound', 'L2 Bound', 'L3 Bound', 'DRAM Bound', 'Store Bound'], + 'explanation': 'Shows where memory accesses are bottlenecked in the hierarchy' + }, + 'Cache Performance': { + 'metrics': ['LLC Miss Count'], + 'explanation': 'Cache hit rates and miss counts across memory hierarchy levels' + }, + 'NUMA Performance': { + 'metrics': ['NUMA Remote Accesses', 'Remote Accesses'], + 'explanation': 'Percentage of memory accesses that go to remote NUMA nodes' + }, + 'Memory Operations': { + 'metrics': ['Loads', 'Stores'], + 'explanation': 'Count of load and store operations performed by the application' + } + } + return self._format_category_results(result, categories) + + def _format_uarch_results(self, result: Dict[str, str]) -> List[str]: + """Format Microarchitecture Exploration results.""" + categories = { + 'CPI Performance Metrics': { + 'metrics': ['CPI Rate'], + 'explanation': 'Cycles Per Instruction (CPI) and related metrics - lower CPI/higher IPC indicates better performance' + }, + 'Top-Down Analysis': { + 'metrics': ['Retiring', 'Bad Speculation', 'Front-End Bound', 'Back-End Bound'], + 'explanation': 'Top-down microarchitecture analysis - should sum to ~100%' + }, + 'Back-End Core Bound': { + 'metrics': ['Core Bound', 'Divider', 'Lock Latency', 'Cycles of 0 Ports Utilized', 'Cycles of 1 Port Utilized', 'Cycles of 2 Ports Utilized', 'Cycles of 3+ Ports Utilized', 'SQ Full'], + 'explanation': 'CPU execution units and resource contention bottlenecks' + } + } + return self._format_category_results(result, categories) + + def _format_memory_consumption_results(self, result: Dict[str, str]) -> List[str]: + """Format Memory Consumption results.""" + categories = { + 'Memory Usage Statistics': { + 'metrics': ['Allocation Size', 'Deallocation Size'], + 'explanation': 'Memory usage statistics showing allocation, deallocation, and peak consumption' + } + } + return self._format_category_results(result, categories) + + def generate_report(self) -> str: + """Generate a comprehensive report of all analyses.""" + report = [] + + # Header + report.extend([ + "=" * 80, + "VTune Analysis Report", + "=" * 80, + f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + f"Application: {self.app_command}", + f"Analysis Duration: {self.duration} seconds each", + "" + ]) + + # System Information + report.append("SYSTEM INFORMATION") + report.append("-" * 40) + system_info = self.get_system_info() + for key, value in system_info.items(): + report.append(f"{key:20} {value}") + report.append("") + + # Analysis Results + report.append("ANALYSIS RESULTS") + report.append("-" * 40) + + format_map = { + 'io': self._format_io_results, + 'memory-access': self._format_memory_access_results, + 'uarch-exploration': self._format_uarch_results, + 'memory-consumption': self._format_memory_consumption_results + } + + for analysis_key, analysis_info in self.analyses.items(): + result = self.results.get(analysis_key, {}) + report.append(f"{analysis_info['name']}:") + + if result.get('Status') == 'Success': + report.append(" Status: ✓ Success") + if analysis_key in format_map: + report.extend(format_map[analysis_key](result)) + else: + for key, value in result.items(): + if key != 'Status': + formatted_value = self._format_value_with_unit(key, value) + report.append(f" {key:40} {formatted_value}") + else: + report.append(f" Status: ✗ Failed") + if 'Error' in result: + report.append(f" Error: {result['Error']}") + report.append("") + + return "\n".join(report) + + def run_all_analyses(self): + """Run all VTune analyses.""" + print("Starting VTune Analysis Suite") + print(f"Application: {self.app_command}") + print(f"Working Directory: {self.app_working_dir}") + print(f"Duration per analysis: {self.duration} seconds") + + # Run each analysis + for i, analysis_key in enumerate(self.analyses.keys()): + print(f"\nProgress: [{i+1}/{len(self.analyses)}]") + self.results[analysis_key] = self.run_vtune_analysis(analysis_key) + + # Generate and display report + report = self.generate_report() + print("\n" + report) + + # Save report and results + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + report_file = self.results_dir / f"vtune_analysis_report_{timestamp}.txt" + json_file = self.results_dir / f"vtune_analysis_results_{timestamp}.json" + + with open(report_file, 'w') as f: + f.write(report) + + with open(json_file, 'w') as f: + json.dump({ + 'timestamp': timestamp, + 'app_command': self.app_command, + 'app_working_dir': self.app_working_dir, + 'duration': self.duration, + 'system_info': self.get_system_info(), + 'results': self.results + }, f, indent=2) + + print(f"\nReport saved to: {report_file}") + print(f"Results saved to: {json_file}") + +def main(): + parser = argparse.ArgumentParser( + description='Run comprehensive VTune analysis on an application', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Example usage: + python vtune_analysis.py config.txt + python vtune_analysis.py --duration 120 config.txt + +Config file format (two lines): + ./tests/tools/RxTxApp/build/RxTxApp --config /root/awilczyn/Media-Transport-Library/config/tx_1v.json + ./tests/tools/RxTxApp/build + """ + ) + + parser.add_argument('config_file', help='Configuration file containing command and working directory') + parser.add_argument('--duration', '-d', type=int, default=90, help='Duration in seconds for each analysis (default: 90)') + + args = parser.parse_args() + + # Read configuration from file + app_command, app_working_dir = read_config_file(args.config_file) + + # Check if VTune is available + try: + subprocess.run(['vtune', '--version'], capture_output=True, check=True) + except (subprocess.CalledProcessError, FileNotFoundError): + print("Error: VTune is not available or not in PATH") + sys.exit(1) + + # Run analysis + analyzer = VTuneAnalyzer(app_command, app_working_dir, args.duration) + analyzer.run_all_analyses() + +if __name__ == '__main__': + main() diff --git a/vtune_config.txt b/vtune_config.txt new file mode 100644 index 000000000..7542d44ea --- /dev/null +++ b/vtune_config.txt @@ -0,0 +1,5 @@ +# VTune Analysis Configuration +# Line 1: Command to test +/root/awilczyn/Media-Transport-Library/tests/tools/RxTxApp/build/RxTxApp --config /root/awilczyn/Media-Transport-Library/config/rx_1v.json +# Line 2: Working directory for VTune +/root/awilczyn/Media-Transport-Library/tests/tools/RxTxApp/build