Skip to content

Commit 3c29b7b

Browse files
committed
Divied for parent Application and child RxTxApp, FFmpeg and Gstreamer classess approach
Signed-off-by: Wilczynski, Andrzej <[email protected]>
1 parent 8adf99b commit 3c29b7b

File tree

7 files changed

+1394
-1096
lines changed

7 files changed

+1394
-1096
lines changed

tests/validation/mtl_engine/app.py

Lines changed: 19 additions & 1050 deletions
Large diffs are not rendered by default.
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
# Base Application Class for Media Transport Library
2+
# Provides common interface for all media application frameworks
3+
4+
import json
5+
import logging
6+
import time
7+
import os
8+
from abc import ABC, abstractmethod
9+
10+
from .config.universal_params import UNIVERSAL_PARAMS
11+
from .config.app_mappings import (
12+
DEFAULT_NETWORK_CONFIG,
13+
DEFAULT_PORT_CONFIG,
14+
DEFAULT_PAYLOAD_TYPE_CONFIG,
15+
)
16+
17+
# Import execution utilities with fallback
18+
try:
19+
from .execute import log_fail, run, is_process_running
20+
from .RxTxApp import prepare_tcpdump
21+
except ImportError:
22+
# Fallback for direct execution
23+
from execute import log_fail, run, is_process_running
24+
from RxTxApp import prepare_tcpdump
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
class Application(ABC):
30+
"""
31+
Abstract base class for all media application frameworks.
32+
Provides common functionality and interface that all child classes must implement.
33+
"""
34+
35+
def __init__(self, app_path, config_file_path=None):
36+
"""Initialize application with path to application directory and optional config file."""
37+
self.app_path = app_path # Path to directory containing the application
38+
self.config_file_path = config_file_path
39+
self.universal_params = UNIVERSAL_PARAMS.copy()
40+
self._user_provided_params = set()
41+
42+
@abstractmethod
43+
def get_framework_name(self) -> str:
44+
"""Return the framework name (e.g., 'RxTxApp', 'FFmpeg', 'GStreamer')."""
45+
pass
46+
47+
@abstractmethod
48+
def get_executable_name(self) -> str:
49+
"""Return the executable name for this framework."""
50+
pass
51+
52+
@abstractmethod
53+
def create_command(self, **kwargs) -> tuple:
54+
"""
55+
Create command line and config files for the application framework.
56+
57+
Args:
58+
**kwargs: Universal parameter names and values
59+
60+
Returns:
61+
Tuple of (command_string, config_dict_or_none)
62+
"""
63+
pass
64+
65+
@abstractmethod
66+
def validate_results(self, *args, **kwargs) -> bool:
67+
"""Validate test results for the specific framework."""
68+
pass
69+
70+
def set_universal_params(self, **kwargs):
71+
"""Set universal parameters and track which were provided by user."""
72+
self._user_provided_params = set(kwargs.keys())
73+
74+
for param, value in kwargs.items():
75+
if param in self.universal_params:
76+
self.universal_params[param] = value
77+
else:
78+
raise ValueError(f"Unknown universal parameter: {param}")
79+
80+
def get_executable_path(self) -> str:
81+
"""Get the full path to the executable based on framework type."""
82+
executable_name = self.get_executable_name()
83+
84+
# For applications with specific paths, combine with directory
85+
if self.app_path and not executable_name.startswith('/'):
86+
if self.app_path.endswith("/"):
87+
return f"{self.app_path}{executable_name}"
88+
else:
89+
return f"{self.app_path}/{executable_name}"
90+
else:
91+
# For system executables or full paths
92+
return executable_name
93+
94+
def was_user_provided(self, param_name: str) -> bool:
95+
"""Check if a parameter was explicitly provided by the user."""
96+
return param_name in self._user_provided_params
97+
98+
def get_session_default_port(self, session_type: str) -> int:
99+
"""Get default port for a specific session type."""
100+
port_map = {
101+
"st20p": DEFAULT_PORT_CONFIG["st20p_port"],
102+
"st22p": DEFAULT_PORT_CONFIG["st22p_port"],
103+
"st30p": DEFAULT_PORT_CONFIG["st30p_port"],
104+
"video": DEFAULT_PORT_CONFIG["video_port"],
105+
"audio": DEFAULT_PORT_CONFIG["audio_port"],
106+
"ancillary": DEFAULT_PORT_CONFIG["ancillary_port"],
107+
"fastmetadata": DEFAULT_PORT_CONFIG["fastmetadata_port"]
108+
}
109+
return port_map.get(session_type, DEFAULT_PORT_CONFIG["st20p_port"])
110+
111+
def get_session_default_payload_type(self, session_type: str) -> int:
112+
"""Get default payload type for a specific session type."""
113+
payload_map = {
114+
"st20p": DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"],
115+
"st22p": DEFAULT_PAYLOAD_TYPE_CONFIG["st22p_payload_type"],
116+
"st30p": DEFAULT_PAYLOAD_TYPE_CONFIG["st30p_payload_type"],
117+
"video": DEFAULT_PAYLOAD_TYPE_CONFIG["video_payload_type"],
118+
"audio": DEFAULT_PAYLOAD_TYPE_CONFIG["audio_payload_type"],
119+
"ancillary": DEFAULT_PAYLOAD_TYPE_CONFIG["ancillary_payload_type"],
120+
"fastmetadata": DEFAULT_PAYLOAD_TYPE_CONFIG["fastmetadata_payload_type"]
121+
}
122+
return payload_map.get(session_type, DEFAULT_PAYLOAD_TYPE_CONFIG["st20p_payload_type"])
123+
124+
def get_common_session_params(self, session_type: str) -> dict:
125+
"""Get common session parameters used across all session types."""
126+
default_port = self.get_session_default_port(session_type)
127+
default_payload = self.get_session_default_payload_type(session_type)
128+
129+
return {
130+
"replicas": self.universal_params.get("replicas", UNIVERSAL_PARAMS["replicas"]),
131+
"start_port": int(self.universal_params.get("port") if self.was_user_provided("port") else default_port),
132+
"payload_type": self.universal_params.get("payload_type") if self.was_user_provided("payload_type") else default_payload
133+
}
134+
135+
def get_common_video_params(self) -> dict:
136+
"""Get common video parameters used across video session types."""
137+
return {
138+
"width": int(self.universal_params.get("width", UNIVERSAL_PARAMS["width"])),
139+
"height": int(self.universal_params.get("height", UNIVERSAL_PARAMS["height"])),
140+
"interlaced": self.universal_params.get("interlaced", UNIVERSAL_PARAMS["interlaced"]),
141+
"device": self.universal_params.get("device", UNIVERSAL_PARAMS["device"]),
142+
"enable_rtcp": self.universal_params.get("enable_rtcp", UNIVERSAL_PARAMS["enable_rtcp"])
143+
}
144+
145+
def execute_test(self,
146+
build: str,
147+
test_time: int = 30,
148+
host=None,
149+
tx_host=None,
150+
rx_host=None,
151+
input_file: str = None,
152+
output_file: str = None,
153+
fail_on_error: bool = True,
154+
virtio_user: bool = False,
155+
rx_timing_parser: bool = False,
156+
ptp: bool = False,
157+
capture_cfg=None,
158+
sleep_interval: int = 4,
159+
tx_first: bool = True,
160+
output_format: str = "yuv",
161+
**kwargs) -> bool:
162+
"""
163+
Universal test execution method that handles all frameworks and test scenarios.
164+
Uses the current Application instance's commands and configuration.
165+
"""
166+
# Determine if this is a dual host test
167+
is_dual = tx_host is not None and rx_host is not None
168+
framework_name = self.get_framework_name().lower()
169+
170+
if is_dual:
171+
logger.info(f"Executing dual host {framework_name} test")
172+
tx_remote_host = tx_host
173+
rx_remote_host = rx_host
174+
return self._execute_dual_host_test(
175+
build, test_time, tx_remote_host, rx_remote_host,
176+
input_file, output_file, fail_on_error, capture_cfg,
177+
sleep_interval, tx_first, output_format, **kwargs
178+
)
179+
else:
180+
logger.info(f"Executing single host {framework_name} test")
181+
remote_host = host
182+
return self._execute_single_host_test(
183+
build, test_time, remote_host, input_file, output_file,
184+
fail_on_error, virtio_user, rx_timing_parser, ptp,
185+
capture_cfg, **kwargs
186+
)
187+
188+
# -------------------------
189+
# Common helper utilities
190+
# -------------------------
191+
def add_timeout(self, command: str, test_time: int, grace: int = None) -> str:
192+
"""Wrap command with timeout if test_time provided (adds a grace period)."""
193+
if grace is None:
194+
grace = self.universal_params.get("timeout_grace", 10)
195+
if test_time:
196+
if not command.strip().startswith("timeout "):
197+
return f"timeout {test_time + grace} {command}"
198+
return command
199+
200+
def start_and_capture(self, command: str, build: str, test_time: int, host, process_name: str):
201+
"""Start a single process and capture its stdout safely."""
202+
process = self.start_process(command, build, test_time, host)
203+
output = self.capture_stdout(process, process_name)
204+
return process, output
205+
206+
def start_dual_with_delay(self, tx_command: str, rx_command: str, build: str, test_time: int,
207+
tx_host, rx_host, tx_first: bool, sleep_interval: int,
208+
tx_name: str, rx_name: str):
209+
"""Start two processes with an optional delay ordering TX/RX based on tx_first flag."""
210+
if tx_first:
211+
tx_process = self.start_process(tx_command, build, test_time, tx_host)
212+
time.sleep(sleep_interval)
213+
rx_process = self.start_process(rx_command, build, test_time, rx_host)
214+
else:
215+
rx_process = self.start_process(rx_command, build, test_time, rx_host)
216+
time.sleep(sleep_interval)
217+
tx_process = self.start_process(tx_command, build, test_time, tx_host)
218+
tx_output = self.capture_stdout(tx_process, tx_name)
219+
rx_output = self.capture_stdout(rx_process, rx_name)
220+
return (tx_process, rx_process, tx_output, rx_output)
221+
222+
def extract_framerate(self, framerate_str, default: int = None) -> int:
223+
"""Extract numeric framerate from various string or numeric forms (e.g. 'p25', '60')."""
224+
if default is None:
225+
default = self.universal_params.get("default_framerate_numeric", 60)
226+
if isinstance(framerate_str, (int, float)):
227+
try:
228+
return int(framerate_str)
229+
except Exception:
230+
return default
231+
if not isinstance(framerate_str, str) or not framerate_str:
232+
return default
233+
if framerate_str.startswith('p') and len(framerate_str) > 1:
234+
num = framerate_str[1:]
235+
else:
236+
num = framerate_str
237+
try:
238+
return int(float(num))
239+
except ValueError:
240+
logger.warning(f"Could not parse framerate '{framerate_str}', defaulting to {default}")
241+
return default
242+
243+
@abstractmethod
244+
def _execute_single_host_test(self, build: str, test_time: int, host,
245+
input_file: str, output_file: str, fail_on_error: bool,
246+
virtio_user: bool, rx_timing_parser: bool, ptp: bool,
247+
capture_cfg, **kwargs) -> bool:
248+
"""Execute single host test - implementation specific to each framework."""
249+
pass
250+
251+
@abstractmethod
252+
def _execute_dual_host_test(self, build: str, test_time: int, tx_host, rx_host,
253+
input_file: str, output_file: str, fail_on_error: bool,
254+
capture_cfg, sleep_interval: int, tx_first: bool,
255+
output_format: str, **kwargs) -> bool:
256+
"""Execute dual host test - implementation specific to each framework."""
257+
pass
258+
259+
def start_process(self, command: str, build: str, test_time: int, host):
260+
"""Start a process on the specified host."""
261+
logger.info(f"Starting {self.get_framework_name()} process...")
262+
buffer_val = self.universal_params.get("process_timeout_buffer", 90)
263+
timeout = (test_time or 0) + buffer_val
264+
return run(command, host=host, cwd=build, timeout=timeout)
265+
266+
def capture_stdout(self, process, process_name: str) -> str:
267+
"""Capture stdout from a process."""
268+
try:
269+
# Remote process objects (from mfd_connect) expose stdout via 'stdout_text'
270+
if hasattr(process, 'stdout_text') and process.stdout_text:
271+
output = process.stdout_text
272+
logger.debug(f"{process_name} output (captured stdout_text): {output[:200]}...")
273+
return output
274+
# Local fallback (subprocess) may expose .stdout already consumed elsewhere
275+
if hasattr(process, 'stdout') and process.stdout:
276+
try:
277+
# Attempt to read if it's a file-like object
278+
if hasattr(process.stdout, 'read'):
279+
output = process.stdout.read()
280+
else:
281+
output = str(process.stdout)
282+
logger.debug(f"{process_name} output (captured stdout): {output[:200]}...")
283+
return output
284+
except Exception:
285+
pass
286+
logger.warning(f"No stdout available for {process_name}")
287+
return ""
288+
except Exception as e:
289+
logger.error(f"Error capturing {process_name} output: {e}")
290+
return ""
291+
292+
def get_case_id(self) -> str:
293+
"""Generate a case ID for logging/debugging purposes."""
294+
try:
295+
import inspect
296+
frame = inspect.currentframe()
297+
while frame:
298+
if 'test_' in frame.f_code.co_name:
299+
return frame.f_code.co_name
300+
frame = frame.f_back
301+
return "unknown_test"
302+
except:
303+
return "unknown_test"

0 commit comments

Comments
 (0)