1- # Base Application Class for Media Transport Library
2- # Provides common interface for all media application frameworks
1+ """Base application class providing unified interface for media framework adapters."""
32
43import logging
54import re
65import time
76from abc import ABC , abstractmethod
87
9- from .config .app_mappings import DEFAULT_PAYLOAD_TYPE_CONFIG , DEFAULT_PORT_CONFIG
8+ from .config .app_mappings import (DEFAULT_PAYLOAD_TYPE_CONFIG ,
9+ DEFAULT_PORT_CONFIG )
1010from .config .universal_params import UNIVERSAL_PARAMS
1111from .execute import run
1212
@@ -98,71 +98,6 @@ def was_user_provided(self, param_name: str) -> bool:
9898 """Check if a parameter was explicitly provided by the user."""
9999 return param_name in self ._user_provided_params
100100
101- def get_session_default_port (self , session_type : str ) -> int :
102- """Get default port for a specific session type."""
103- port_map = {
104- "st20p" : DEFAULT_PORT_CONFIG ["st20p_port" ],
105- "st22p" : DEFAULT_PORT_CONFIG ["st22p_port" ],
106- "st30p" : DEFAULT_PORT_CONFIG ["st30p_port" ],
107- "video" : DEFAULT_PORT_CONFIG ["video_port" ],
108- "audio" : DEFAULT_PORT_CONFIG ["audio_port" ],
109- "ancillary" : DEFAULT_PORT_CONFIG ["ancillary_port" ],
110- "fastmetadata" : DEFAULT_PORT_CONFIG ["fastmetadata_port" ],
111- }
112- return port_map .get (session_type , DEFAULT_PORT_CONFIG ["st20p_port" ])
113-
114- def get_session_default_payload_type (self , session_type : str ) -> int :
115- """Get default payload type for a specific session type."""
116- payload_map = {
117- "st20p" : DEFAULT_PAYLOAD_TYPE_CONFIG ["st20p_payload_type" ],
118- "st22p" : DEFAULT_PAYLOAD_TYPE_CONFIG ["st22p_payload_type" ],
119- "st30p" : DEFAULT_PAYLOAD_TYPE_CONFIG ["st30p_payload_type" ],
120- "video" : DEFAULT_PAYLOAD_TYPE_CONFIG ["video_payload_type" ],
121- "audio" : DEFAULT_PAYLOAD_TYPE_CONFIG ["audio_payload_type" ],
122- "ancillary" : DEFAULT_PAYLOAD_TYPE_CONFIG ["ancillary_payload_type" ],
123- "fastmetadata" : DEFAULT_PAYLOAD_TYPE_CONFIG ["fastmetadata_payload_type" ],
124- }
125- return payload_map .get (
126- session_type , DEFAULT_PAYLOAD_TYPE_CONFIG ["st20p_payload_type" ]
127- )
128-
129- def get_common_session_params (self , session_type : str ) -> dict :
130- """Get common session parameters used across all session types."""
131- default_port = self .get_session_default_port (session_type )
132- default_payload = self .get_session_default_payload_type (session_type )
133-
134- return {
135- "replicas" : self .universal_params .get (
136- "replicas" , UNIVERSAL_PARAMS ["replicas" ]
137- ),
138- "start_port" : int (
139- self .universal_params .get ("port" )
140- if self .was_user_provided ("port" )
141- else default_port
142- ),
143- "payload_type" : (
144- self .universal_params .get ("payload_type" )
145- if self .was_user_provided ("payload_type" )
146- else default_payload
147- ),
148- }
149-
150- def get_common_video_params (self ) -> dict :
151- """Get common video parameters used across video session types."""
152- return {
153- "width" : int (self .universal_params .get ("width" , UNIVERSAL_PARAMS ["width" ])),
154- "height" : int (
155- self .universal_params .get ("height" , UNIVERSAL_PARAMS ["height" ])
156- ),
157- "interlaced" : self .universal_params .get (
158- "interlaced" , UNIVERSAL_PARAMS ["interlaced" ]
159- ),
160- "device" : self .universal_params .get ("device" , UNIVERSAL_PARAMS ["device" ]),
161- "enable_rtcp" : self .universal_params .get (
162- "enable_rtcp" , UNIVERSAL_PARAMS ["enable_rtcp" ]
163- ),
164- }
165-
166101 def prepare_execution (self , build : str , host = None , ** kwargs ):
167102 """Hook method called before execution to perform framework-specific setup.
168103
@@ -223,20 +158,7 @@ def execute_test(
223158 if not is_dual :
224159 cmd = self .add_timeout (self .command , test_time )
225160 logger .info (f"[single] Running { framework_name } command: { cmd } " )
226- # Optional tcpdump capture hook retained for RxTxApp compatibility
227- if (
228- capture_cfg
229- and capture_cfg .get ("enable" )
230- and "prepare_tcpdump" in globals ()
231- ):
232- try :
233- # prepare_tcpdump not yet implemented; left to change in the future
234- # prepare_tcpdump(capture_cfg, host)
235- pass
236- except Exception as e :
237- logger .warning (f"capture setup failed: { e } " )
238161 proc = self .start_process (cmd , build , test_time , host )
239- # Start netsniff capture if provided (RxTxApp-specific)
240162 if netsniff and hasattr (self , "_start_netsniff_capture" ):
241163 try :
242164 self ._start_netsniff_capture (netsniff )
@@ -252,7 +174,7 @@ def execute_test(
252174 f"{ framework_name } process wait timed out (continuing to capture output)"
253175 )
254176 self .last_output = self .capture_stdout (proc , framework_name )
255- self .last_return_code = getattr ( proc , "returncode" , None )
177+ self .last_return_code = proc . return_code
256178 return self .validate_results ()
257179
258180 # Dual-host execution (tx self, rx rx_app)
@@ -297,75 +219,48 @@ def execute_test(
297219 else :
298220 rx_app .last_output = rx_app .capture_stdout (first_proc , first_label )
299221 self .last_output = self .capture_stdout (second_proc , second_label )
300- self .last_return_code = getattr ( first_proc , "returncode" , None )
301- rx_app .last_return_code = getattr ( second_proc , "returncode" , None )
222+ self .last_return_code = first_proc . return_code
223+ rx_app .last_return_code = second_proc . return_code
302224 tx_ok = self .validate_results ()
303225 rx_ok = rx_app .validate_results ()
304226 return tx_ok and rx_ok
305227
306- # -------------------------
307- # Common helper utilities
308- # -------------------------
309228 def add_timeout (self , command : str , test_time : int , grace : int = None ) -> str :
310- """Wrap command with timeout if test_time provided (adds a grace period)."""
229+ """Wrap command with timeout if test_time provided.
230+
231+ Args:
232+ command: Shell command to wrap
233+ test_time: Test duration in seconds
234+ grace: Grace period to add (default from universal_params)
235+
236+ Returns:
237+ Command wrapped with timeout
238+ """
311239 if grace is None :
312240 grace = self .universal_params .get ("timeout_grace" , 10 )
313- # If the command already has an internal --test_time X argument, ensure the wrapper
314- # timeout is >= that internal value + grace to avoid premature SIGTERM (RC 124).
241+
242+ # Extract internal --test_time to prevent premature timeout termination
315243 internal_test_time = None
316244 m = re .search (r"--test_time\s+(\d+)" , command )
317245 if m :
318246 try :
319247 internal_test_time = int (m .group (1 ))
320248 except ValueError :
321249 internal_test_time = None
250+
322251 effective_test_time = test_time or internal_test_time
323252 if internal_test_time and test_time and internal_test_time != test_time :
324253 logger .debug (
325- f"Mismatch between execute_test test_time={ test_time } and command --test_time { internal_test_time } ; "
326- f"using max"
254+ f"Test time mismatch (execute={ test_time } , command={ internal_test_time } ); using max"
327255 )
328256 effective_test_time = max (internal_test_time , test_time )
329257 elif internal_test_time and not test_time :
330258 effective_test_time = internal_test_time
259+
331260 if effective_test_time and not command .strip ().startswith ("timeout " ):
332261 return f"timeout { effective_test_time + grace } { command } "
333262 return command
334263
335- def start_and_capture (
336- self , command : str , build : str , test_time : int , host , process_name : str
337- ):
338- """Start a single process and capture its stdout safely."""
339- process = self .start_process (command , build , test_time , host )
340- output = self .capture_stdout (process , process_name )
341- return process , output
342-
343- def start_dual_with_delay (
344- self ,
345- tx_command : str ,
346- rx_command : str ,
347- build : str ,
348- test_time : int ,
349- tx_host ,
350- rx_host ,
351- tx_first : bool ,
352- sleep_interval : int ,
353- tx_name : str ,
354- rx_name : str ,
355- ):
356- """Start two processes with an optional delay ordering TX/RX based on tx_first flag."""
357- if tx_first :
358- tx_process = self .start_process (tx_command , build , test_time , tx_host )
359- time .sleep (sleep_interval )
360- rx_process = self .start_process (rx_command , build , test_time , rx_host )
361- else :
362- rx_process = self .start_process (rx_command , build , test_time , rx_host )
363- time .sleep (sleep_interval )
364- tx_process = self .start_process (tx_command , build , test_time , tx_host )
365- tx_output = self .capture_stdout (tx_process , tx_name )
366- rx_output = self .capture_stdout (rx_process , rx_name )
367- return (tx_process , rx_process , tx_output , rx_output )
368-
369264 def extract_framerate (self , framerate_str , default : int = None ) -> int :
370265 """Extract numeric framerate from various string or numeric forms (e.g. 'p25', '60')."""
371266 if default is None :
@@ -389,47 +284,42 @@ def extract_framerate(self, framerate_str, default: int = None) -> int:
389284 )
390285 return default
391286
392- # Legacy execute_* abstract methods removed; unified execute_test used instead.
393-
394287 def start_process (self , command : str , build : str , test_time : int , host ):
395- """Start a process on the specified host."""
288+ """Start a process on the specified host using mfd_connect ."""
396289 logger .info (f"Starting { self .get_framework_name ()} process..." )
397290 buffer_val = self .universal_params .get ("process_timeout_buffer" , 90 )
398291 timeout = (test_time or 0 ) + buffer_val
399292 return run (command , host = host , cwd = build , timeout = timeout )
400293
401294 def capture_stdout (self , process , process_name : str ) -> str :
402- """Capture stdout from a process."""
295+ """Capture stdout from mfd_connect process.
296+
297+ Note: Must be called after process.wait() completes, as stdout_text
298+ is only available after the process finishes.
299+
300+ Args:
301+ process: mfd_connect process object with stdout_text attribute
302+ process_name: Name for logging purposes
303+
304+ Returns:
305+ Process stdout as string, or empty string if unavailable
306+ """
403307 try :
404- # Remote process objects (from mfd_connect) expose stdout via 'stdout_text'
405- if hasattr (process , "stdout_text" ) and process .stdout_text :
406- output = process .stdout_text
407- logger .debug (
408- f"{ process_name } output (captured stdout_text): { output [:200 ]} ..."
409- )
410- return output
411- # Local fallback (subprocess) may expose .stdout already consumed elsewhere
412- if hasattr (process , "stdout" ) and process .stdout :
413- try :
414- # Attempt to read if it's a file-like object
415- if hasattr (process .stdout , "read" ):
416- output = process .stdout .read ()
417- else :
418- output = str (process .stdout )
419- logger .debug (
420- f"{ process_name } output (captured stdout): { output [:200 ]} ..."
421- )
422- return output
423- except Exception :
424- pass
425- logger .warning (f"No stdout available for { process_name } " )
308+ output = process .stdout_text or ""
309+ if output :
310+ logger .debug (f"{ process_name } output: { output [:200 ]} ..." )
311+ return output
312+ except AttributeError :
313+ logger .error (
314+ f"Process object missing stdout_text attribute for { process_name } "
315+ )
426316 return ""
427317 except Exception as e :
428- logger .error (f"Error capturing { process_name } output : { e } " )
318+ logger .error (f"Error capturing output from { process_name } : { e } " )
429319 return ""
430320
431321 def get_case_id (self ) -> str :
432- """Generate a case ID for logging/debugging purposes ."""
322+ """Generate test case identifier from call stack for logging."""
433323 try :
434324 import inspect
435325
0 commit comments