From 0f8ddd2dbe14f1fdadd7298c0b7634812f884eca Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 14:36:26 +0300 Subject: [PATCH 01/14] Split of the windows run tests main function into discrete functions. --- ci/run_tests/run_tests_windows.py | 152 ++++++++++++++++-------------- 1 file changed, 83 insertions(+), 69 deletions(-) diff --git a/ci/run_tests/run_tests_windows.py b/ci/run_tests/run_tests_windows.py index 532241bfeb..f59b00e0ae 100644 --- a/ci/run_tests/run_tests_windows.py +++ b/ci/run_tests/run_tests_windows.py @@ -62,6 +62,82 @@ def find_interface(): return None, None +def run_packet_tests(): + return subprocess.run( + os.path.join("Bin", "Packet++Test"), + cwd=os.path.join("Tests", "Packet++Test"), + shell=True, + check=True, # Raise exception if the worker returns in non-zero status code + ) + + +def run_packet_coverage(): + return subprocess.run( + [ + "OpenCppCoverage.exe", + "--verbose", + "--sources", + "Packet++", + "--sources", + "Pcap++", + "--sources", + "Common++", + "--excluded_sources", + "Tests", + "--export_type", + "cobertura:Packet++Coverage.xml", + "--", + os.path.join("Bin", "Packet++Test"), + ], + cwd=os.path.join("Tests", "Packet++Test"), + shell=True, + check=True, # Raise exception if the worker returns in non-zero status code + ) + + +def run_pcap_tests(ip_address: str, skip_tests: list[str]): + return subprocess.run( + [ + os.path.join("Bin", "Pcap++Test"), + "-i", + ip_address, + "-x", + ";".join(skip_tests), + ], + cwd=os.path.join("Tests", "Pcap++Test"), + shell=True, + check=True, # Raise exception if the worker returns in non-zero status code + ) + + +def run_pcap_coverage(ip_address: str, skip_tests: list[str]): + return subprocess.run( + [ + "OpenCppCoverage.exe", + "--verbose", + "--sources", + "Packet++", + "--sources", + "Pcap++", + "--sources", + "Common++", + "--excluded_sources", + "Tests", + "--export_type", + "cobertura:Pcap++Coverage.xml", + "--", + os.path.join("Bin", "Pcap++Test"), + "-i", + ip_address, + "-x", + ";".join(skip_tests), + ], + cwd=os.path.join("Tests", "Pcap++Test"), + shell=True, + check=True, # Raise exception if the worker returns in non-zero status code + ) + + def main(): parser = argparse.ArgumentParser() parser.add_argument( @@ -87,84 +163,22 @@ def main(): exit(1) print(f"Interface is {tcpreplay_interface} and IP address is {ip_address}") + if args.coverage: + run_packet_coverage() + else: + run_packet_tests() + try: tcpreplay_cmd = ( f'tcpreplay.exe -i "{tcpreplay_interface}" --mbps=10 -l 0 {PCAP_FILE_PATH}' ) tcpreplay_proc = subprocess.Popen(tcpreplay_cmd, shell=True, cwd=TCPREPLAY_PATH) - if args.coverage: - completed_process = subprocess.run( - [ - "OpenCppCoverage.exe", - "--verbose", - "--sources", - "Packet++", - "--sources", - "Pcap++", - "--sources", - "Common++", - "--excluded_sources", - "Tests", - "--export_type", - "cobertura:Packet++Coverage.xml", - "--", - os.path.join("Bin", "Packet++Test"), - ], - cwd=os.path.join("Tests", "Packet++Test"), - shell=True, - ) - else: - completed_process = subprocess.run( - os.path.join("Bin", "Packet++Test"), - cwd=os.path.join("Tests", "Packet++Test"), - shell=True, - ) - if completed_process.returncode != 0: - print("Error while executing Packet++ tests: " + str(completed_process)) - exit(completed_process.returncode) - skip_tests = ["TestRemoteCapture"] + args.skip_tests if args.coverage: - completed_process = subprocess.run( - [ - "OpenCppCoverage.exe", - "--verbose", - "--sources", - "Packet++", - "--sources", - "Pcap++", - "--sources", - "Common++", - "--excluded_sources", - "Tests", - "--export_type", - "cobertura:Pcap++Coverage.xml", - "--", - os.path.join("Bin", "Pcap++Test"), - "-i", - ip_address, - "-x", - ";".join(skip_tests), - ], - cwd=os.path.join("Tests", "Pcap++Test"), - shell=True, - ) + run_pcap_coverage(ip_address, skip_tests) else: - completed_process = subprocess.run( - [ - os.path.join("Bin", "Pcap++Test"), - "-i", - ip_address, - "-x", - ";".join(skip_tests), - ], - cwd=os.path.join("Tests", "Pcap++Test"), - shell=True, - ) - if completed_process.returncode != 0: - print("Error while executing Pcap++ tests: " + str(completed_process)) - exit(completed_process.returncode) + run_pcap_tests(ip_address, skip_tests) finally: subprocess.call(["taskkill", "/F", "/T", "/PID", str(tcpreplay_proc.pid)]) From 96100ed9d86d64d499dd22863c48bd2683b8ab0a Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:02:43 +0300 Subject: [PATCH 02/14] Added TcpReplay class to unify interface between windows and unix test runs. --- ci/run_tests/run_tests.py | 23 ++++----------- ci/run_tests/tcp_replay.py | 59 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 17 deletions(-) create mode 100644 ci/run_tests/tcp_replay.py diff --git a/ci/run_tests/run_tests.py b/ci/run_tests/run_tests.py index df4f3fe373..3ccc74e0e9 100644 --- a/ci/run_tests/run_tests.py +++ b/ci/run_tests/run_tests.py @@ -3,22 +3,9 @@ import os import subprocess import argparse -from contextlib import contextmanager from scapy.all import get_if_addr -PCAP_FILE_PATH = os.path.join("Tests", "Pcap++Test", "PcapExamples", "example.pcap") - - -@contextmanager -def tcp_replay_worker(interface: str, tcpreplay_dir: str): - tcpreplay_proc = subprocess.Popen( - ["tcpreplay", "-i", interface, "--mbps=10", "-l", "0", PCAP_FILE_PATH], - cwd=tcpreplay_dir, - ) - try: - yield tcpreplay_proc - finally: - tcpreplay_proc.kill() +from tcp_replay import TcpReplay def run_packet_tests(args: list[str], use_sudo: bool): @@ -31,11 +18,11 @@ def run_packet_tests(args: list[str], use_sudo: bool): raise RuntimeError(f"Error while executing Packet++ tests: {completed_process}") -def run_pcap_tests(interface: str, tcpreplay_dir: str, args: list[str], use_sudo: bool): +def run_pcap_tests(interface: str, tcpreplay: TcpReplay, args: list[str], use_sudo: bool): ip_address = get_if_addr(interface) print(f"IP address is: {ip_address}") - with tcp_replay_worker(interface, tcpreplay_dir): + with tcpreplay.replay(interface): cmd_line = ["sudo"] if use_sudo else [] cmd_line += [os.path.join("Bin", "Pcap++Test"), "-i", ip_address, *args] @@ -84,9 +71,11 @@ def main(): run_packet_tests(args.packet_test_args.split(), args.use_sudo) if "pcap" in args.test_suites: + tcp_replay = TcpReplay(args.tcpreplay_dir) + run_pcap_tests( args.interface, - args.tcpreplay_dir, + tcp_replay, args.pcap_test_args.split(), args.use_sudo, ) diff --git a/ci/run_tests/tcp_replay.py b/ci/run_tests/tcp_replay.py new file mode 100644 index 0000000000..48331434e6 --- /dev/null +++ b/ci/run_tests/tcp_replay.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import os +import subprocess +import sys +from collections.abc import Generator +from dataclasses import dataclass +from contextlib import contextmanager +from pathlib import Path + +WIN32_TCPREPLAY_PATH = "tcpreplay-4.4.1-win" +PCAP_FILE_PATH = os.path.join("Tests", "Pcap++Test", "PcapExamples", "example.pcap") + +@contextmanager +def tcp_replay_worker(interface: str, tcpreplay_dir: str): + tcpreplay_proc = subprocess.Popen( + ["tcpreplay", "-i", interface, "--mbps=10", "-l", "0", PCAP_FILE_PATH], + cwd=tcpreplay_dir, + ) + try: + yield tcpreplay_proc + finally: + tcpreplay_proc.kill() + + +@dataclass +class TcpReplayTask: + """A replay task that holds the tcpreplay instance and the subprocess procedure.""" + replay: TcpReplay + procedure: subprocess.Popen + + +class TcpReplay: + def __init__(self, tcpreplay_dir: str): + """ + A wrapper class for managing tcpreplay operations. + + :param tcpreplay_dir: Directory where tcpreplay is located. + """ + self.tcpreplay_dir = tcpreplay_dir + + @contextmanager + def replay(self, interface: str, pcap_file: Path) -> Generator[TcpReplayTask, None, None]: + """Context manager that starts tcpreplay and yields a TcpReplayTask.""" + cmd = ["tcpreplay", "-i", interface, "--mbps=10", "-l", "0", str(pcap_file)] + proc = subprocess.Popen(cmd, cwd=self.tcpreplay_dir) + try: + yield TcpReplayTask(replay=self, procedure=proc) + finally: + self._kill_process(proc) + + + @staticmethod + def _kill_process(proc: subprocess.Popen) -> None: + if sys.platform == "win32": + # Use taskkill to kill the process and its children + subprocess.call(["taskkill", "/F", "/T", "/PID", str(proc.pid)]) + else: + proc.kill() \ No newline at end of file From 798b9dfa0f0e6f3d0ba1d270a8b32c7820d9fa9d Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:09:05 +0300 Subject: [PATCH 03/14] Updated docs --- ci/run_tests/tcp_replay.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ci/run_tests/tcp_replay.py b/ci/run_tests/tcp_replay.py index 48331434e6..a3e844757a 100644 --- a/ci/run_tests/tcp_replay.py +++ b/ci/run_tests/tcp_replay.py @@ -41,7 +41,12 @@ def __init__(self, tcpreplay_dir: str): @contextmanager def replay(self, interface: str, pcap_file: Path) -> Generator[TcpReplayTask, None, None]: - """Context manager that starts tcpreplay and yields a TcpReplayTask.""" + """ + Context manager that starts tcpreplay and yields a TcpReplayTask. + + :param interface: Network interface to use for replaying packets. + :param pcap_file: Path to the pcap file to replay. + """ cmd = ["tcpreplay", "-i", interface, "--mbps=10", "-l", "0", str(pcap_file)] proc = subprocess.Popen(cmd, cwd=self.tcpreplay_dir) try: From f5dcba1dac00bb60c7c26b9a74a1ccab264a366d Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:20:31 +0300 Subject: [PATCH 04/14] Renamed tcp_replay.py to tcp_replay_utils.py --- ci/run_tests/run_tests.py | 2 +- .../{tcp_replay.py => tcp_replay_utils.py} | 28 ++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) rename ci/run_tests/{tcp_replay.py => tcp_replay_utils.py} (66%) diff --git a/ci/run_tests/run_tests.py b/ci/run_tests/run_tests.py index 3ccc74e0e9..a0709adfb2 100644 --- a/ci/run_tests/run_tests.py +++ b/ci/run_tests/run_tests.py @@ -5,7 +5,7 @@ import argparse from scapy.all import get_if_addr -from tcp_replay import TcpReplay +from tcp_replay_utils import TcpReplay def run_packet_tests(args: list[str], use_sudo: bool): diff --git a/ci/run_tests/tcp_replay.py b/ci/run_tests/tcp_replay_utils.py similarity index 66% rename from ci/run_tests/tcp_replay.py rename to ci/run_tests/tcp_replay_utils.py index a3e844757a..f4c0b732f5 100644 --- a/ci/run_tests/tcp_replay.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -8,7 +8,6 @@ from contextlib import contextmanager from pathlib import Path -WIN32_TCPREPLAY_PATH = "tcpreplay-4.4.1-win" PCAP_FILE_PATH = os.path.join("Tests", "Pcap++Test", "PcapExamples", "example.pcap") @contextmanager @@ -54,6 +53,33 @@ def replay(self, interface: str, pcap_file: Path) -> Generator[TcpReplayTask, No finally: self._kill_process(proc) + def get_nic_list(self): + """ + Get the list of network interfaces using tcpreplay. Only works on Windows. + + :return: List of network interface names. + """ + if sys.platform != "win32": + # We don't use it on non-Windows platforms yet. + raise RuntimeError("This method is only supported on Windows!") + + completed_process = subprocess.run( + ["tcpreplay", "--listnics"], + shell=True, + capture_output=True, + cwd=self.tcpreplay_dir, + ) + if completed_process.returncode != 0: + raise RuntimeError('Error executing "tcpreplay --listnics"!') + + raw_nics_output = completed_process.stdout.decode("utf-8") + nics = [] + for row in raw_nics_output.split("\n")[2:]: + columns = row.split("\t") + if len(columns) > 1 and columns[1].startswith("\\Device\\NPF_"): + nics.append(columns[1]) + return nics + @staticmethod def _kill_process(proc: subprocess.Popen) -> None: From 5aeb0b1b39021d40586770d13ad10e810065ecec Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:25:23 +0300 Subject: [PATCH 05/14] Fixed replay call to use the default Pcap file. --- ci/run_tests/run_tests.py | 4 ++-- ci/run_tests/tcp_replay_utils.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ci/run_tests/run_tests.py b/ci/run_tests/run_tests.py index a0709adfb2..16e24237ef 100644 --- a/ci/run_tests/run_tests.py +++ b/ci/run_tests/run_tests.py @@ -5,7 +5,7 @@ import argparse from scapy.all import get_if_addr -from tcp_replay_utils import TcpReplay +from tcp_replay_utils import TcpReplay, PCAP_FILE_PATH def run_packet_tests(args: list[str], use_sudo: bool): @@ -22,7 +22,7 @@ def run_pcap_tests(interface: str, tcpreplay: TcpReplay, args: list[str], use_su ip_address = get_if_addr(interface) print(f"IP address is: {ip_address}") - with tcpreplay.replay(interface): + with tcpreplay.replay(interface, PCAP_FILE_PATH): cmd_line = ["sudo"] if use_sudo else [] cmd_line += [os.path.join("Bin", "Pcap++Test"), "-i", ip_address, *args] diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index f4c0b732f5..189be6be4d 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os import subprocess import sys from collections.abc import Generator @@ -8,7 +7,7 @@ from contextlib import contextmanager from pathlib import Path -PCAP_FILE_PATH = os.path.join("Tests", "Pcap++Test", "PcapExamples", "example.pcap") +PCAP_FILE_PATH = Path("Tests", "Pcap++Test", "PcapExamples", "example.pcap").absolute() @contextmanager def tcp_replay_worker(interface: str, tcpreplay_dir: str): From fa5bbcfac5c68e6b487fc69244257921a8709081 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:26:07 +0300 Subject: [PATCH 06/14] Updated windows script to use TcpReplay class. --- ci/run_tests/run_tests_windows.py | 75 +++++++++++-------------------- 1 file changed, 27 insertions(+), 48 deletions(-) diff --git a/ci/run_tests/run_tests_windows.py b/ci/run_tests/run_tests_windows.py index f59b00e0ae..894f1f8d4a 100644 --- a/ci/run_tests/run_tests_windows.py +++ b/ci/run_tests/run_tests_windows.py @@ -4,10 +4,9 @@ import scapy.arch.windows from ipaddress import IPv4Address +from tcp_replay_utils import TcpReplay, PCAP_FILE_PATH + TCPREPLAY_PATH = "tcpreplay-4.4.1-win" -PCAP_FILE_PATH = os.path.abspath( - os.path.join("Tests", "Pcap++Test", "PcapExamples", "example.pcap") -) def validate_ipv4_address(address): @@ -28,37 +27,22 @@ def get_ip_by_guid(guid): return None -def find_interface(): - completed_process = subprocess.run( - ["tcpreplay.exe", "--listnics"], - shell=True, - capture_output=True, - cwd=TCPREPLAY_PATH, - ) - if completed_process.returncode != 0: - print('Error executing "tcpreplay.exe --listnics"!') - exit(1) +def find_interface(tcp_replay: TcpReplay): + nic_devices = tcp_replay.get_nic_list() + + for device in nic_devices: + nic_guid = device.lstrip("\\Device\\NPF_") + ip_address = get_ip_by_guid(nic_guid) + + if ip_address and not ip_address.startswith("169.254"): + completed_process = subprocess.run( + ["curl", "--interface", ip_address, "www.google.com"], + capture_output=True, + shell=True, + ) + if completed_process.returncode == 0: + return device, ip_address - raw_nics_output = completed_process.stdout.decode("utf-8") - for row in raw_nics_output.split("\n")[2:]: - columns = row.split("\t") - if len(columns) > 1 and columns[1].startswith("\\Device\\NPF_"): - interface = columns[1] - try: - nic_guid = interface.lstrip("\\Device\\NPF_") - ip_address = get_ip_by_guid(nic_guid) - if ip_address.startswith("169.254"): - continue - completed_process = subprocess.run( - ["curl", "--interface", ip_address, "www.google.com"], - capture_output=True, - shell=True, - ) - if completed_process.returncode != 0: - continue - return interface, ip_address - except Exception: - pass return None, None @@ -157,32 +141,27 @@ def main(): ) args = parser.parse_args() - tcpreplay_interface, ip_address = find_interface() - if not tcpreplay_interface or not ip_address: - print("Cannot find an interface to run tests on!") - exit(1) - print(f"Interface is {tcpreplay_interface} and IP address is {ip_address}") - if args.coverage: run_packet_coverage() else: run_packet_tests() - try: - tcpreplay_cmd = ( - f'tcpreplay.exe -i "{tcpreplay_interface}" --mbps=10 -l 0 {PCAP_FILE_PATH}' - ) - tcpreplay_proc = subprocess.Popen(tcpreplay_cmd, shell=True, cwd=TCPREPLAY_PATH) + tcp_replay = TcpReplay(TCPREPLAY_PATH) - skip_tests = ["TestRemoteCapture"] + args.skip_tests + tcpreplay_interface, ip_address = find_interface(tcp_replay) + if not tcpreplay_interface or not ip_address: + print("Cannot find an interface to run tests on!") + exit(1) + + print(f"Interface is {tcpreplay_interface} and IP address is {ip_address}") + + skip_tests = ["TestRemoteCapture"] + args.skip_tests + with tcp_replay.replay(tcpreplay_interface, PCAP_FILE_PATH): if args.coverage: run_pcap_coverage(ip_address, skip_tests) else: run_pcap_tests(ip_address, skip_tests) - finally: - subprocess.call(["taskkill", "/F", "/T", "/PID", str(tcpreplay_proc.pid)]) - if __name__ == "__main__": main() From 55afc3d07ccb456ae4a07c08862418cfe05d3c71 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:28:28 +0300 Subject: [PATCH 07/14] Lint --- ci/run_tests/tcp_replay_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index 189be6be4d..b71669707d 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -86,4 +86,4 @@ def _kill_process(proc: subprocess.Popen) -> None: # Use taskkill to kill the process and its children subprocess.call(["taskkill", "/F", "/T", "/PID", str(proc.pid)]) else: - proc.kill() \ No newline at end of file + proc.kill() From 9f6751c8b10a5c06b361fbefe93eb1ca15bb8c21 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:29:17 +0300 Subject: [PATCH 08/14] Removed old tcp_replay_worker function. --- ci/run_tests/tcp_replay_utils.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index b71669707d..66bf804066 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -9,17 +9,6 @@ PCAP_FILE_PATH = Path("Tests", "Pcap++Test", "PcapExamples", "example.pcap").absolute() -@contextmanager -def tcp_replay_worker(interface: str, tcpreplay_dir: str): - tcpreplay_proc = subprocess.Popen( - ["tcpreplay", "-i", interface, "--mbps=10", "-l", "0", PCAP_FILE_PATH], - cwd=tcpreplay_dir, - ) - try: - yield tcpreplay_proc - finally: - tcpreplay_proc.kill() - @dataclass class TcpReplayTask: From 1af46b6a3225a30b7d0c592bb9ad852d62e3b02f Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:35:55 +0300 Subject: [PATCH 09/14] Fix param name. --- ci/run_tests/run_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/run_tests/run_tests.py b/ci/run_tests/run_tests.py index 16e24237ef..8e53b59b9f 100644 --- a/ci/run_tests/run_tests.py +++ b/ci/run_tests/run_tests.py @@ -18,11 +18,11 @@ def run_packet_tests(args: list[str], use_sudo: bool): raise RuntimeError(f"Error while executing Packet++ tests: {completed_process}") -def run_pcap_tests(interface: str, tcpreplay: TcpReplay, args: list[str], use_sudo: bool): +def run_pcap_tests(interface: str, tcp_replay: TcpReplay, args: list[str], use_sudo: bool): ip_address = get_if_addr(interface) print(f"IP address is: {ip_address}") - with tcpreplay.replay(interface, PCAP_FILE_PATH): + with tcp_replay.replay(interface, PCAP_FILE_PATH): cmd_line = ["sudo"] if use_sudo else [] cmd_line += [os.path.join("Bin", "Pcap++Test"), "-i", ip_address, *args] From 0eed8b7f7bf41dece49691e64b6cc4d690529504 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:50:01 +0300 Subject: [PATCH 10/14] Use full path to the tcp replay executable. --- ci/run_tests/tcp_replay_utils.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index 66bf804066..1d0ec41f39 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -24,7 +24,11 @@ def __init__(self, tcpreplay_dir: str): :param tcpreplay_dir: Directory where tcpreplay is located. """ - self.tcpreplay_dir = tcpreplay_dir + self.executable = Path(tcpreplay_dir) / "tcpreplay" + if sys.platform == "win32": + self.executable = self.executable.with_suffix(".exe") + if not self.executable.exists(): + raise FileNotFoundError(f"tcpreplay executable not found at {self.executable}") @contextmanager def replay(self, interface: str, pcap_file: Path) -> Generator[TcpReplayTask, None, None]: @@ -34,8 +38,8 @@ def replay(self, interface: str, pcap_file: Path) -> Generator[TcpReplayTask, No :param interface: Network interface to use for replaying packets. :param pcap_file: Path to the pcap file to replay. """ - cmd = ["tcpreplay", "-i", interface, "--mbps=10", "-l", "0", str(pcap_file)] - proc = subprocess.Popen(cmd, cwd=self.tcpreplay_dir) + cmd = [self.executable, "-i", interface, "--mbps=10", "-l", "0", str(pcap_file)] + proc = subprocess.Popen(cmd) try: yield TcpReplayTask(replay=self, procedure=proc) finally: @@ -52,10 +56,9 @@ def get_nic_list(self): raise RuntimeError("This method is only supported on Windows!") completed_process = subprocess.run( - ["tcpreplay", "--listnics"], + [self.executable, "--listnics"], shell=True, capture_output=True, - cwd=self.tcpreplay_dir, ) if completed_process.returncode != 0: raise RuntimeError('Error executing "tcpreplay --listnics"!') From 21cbb7425ee33baf95434a1daaa511e4cc0b0af0 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 15:57:57 +0300 Subject: [PATCH 11/14] Fixed case where tcpreplay is on path. --- ci/run_tests/tcp_replay_utils.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index 1d0ec41f39..f678ff61d4 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -18,13 +18,17 @@ class TcpReplayTask: class TcpReplay: - def __init__(self, tcpreplay_dir: str): + def __init__(self, tcpreplay_dir: str | None = None): """ A wrapper class for managing tcpreplay operations. - :param tcpreplay_dir: Directory where tcpreplay is located. + :param tcpreplay_dir: Directory where tcpreplay is located. If None, assumes tcpreplay is in the system PATH. """ - self.executable = Path(tcpreplay_dir) / "tcpreplay" + if tcpreplay_dir is None: + self.executable = Path("tcpreplay") + else: + self.executable = Path(tcpreplay_dir) / "tcpreplay" + if sys.platform == "win32": self.executable = self.executable.with_suffix(".exe") if not self.executable.exists(): From 7ccbb2ed5593eb3ecce659d84442520a1a4c58fd Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sat, 20 Sep 2025 16:00:14 +0300 Subject: [PATCH 12/14] Lint --- ci/run_tests/run_tests.py | 4 +++- ci/run_tests/tcp_replay_utils.py | 10 +++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/ci/run_tests/run_tests.py b/ci/run_tests/run_tests.py index 8e53b59b9f..b606708202 100644 --- a/ci/run_tests/run_tests.py +++ b/ci/run_tests/run_tests.py @@ -18,7 +18,9 @@ def run_packet_tests(args: list[str], use_sudo: bool): raise RuntimeError(f"Error while executing Packet++ tests: {completed_process}") -def run_pcap_tests(interface: str, tcp_replay: TcpReplay, args: list[str], use_sudo: bool): +def run_pcap_tests( + interface: str, tcp_replay: TcpReplay, args: list[str], use_sudo: bool +): ip_address = get_if_addr(interface) print(f"IP address is: {ip_address}") diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index f678ff61d4..c1e2a4a30c 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -13,6 +13,7 @@ @dataclass class TcpReplayTask: """A replay task that holds the tcpreplay instance and the subprocess procedure.""" + replay: TcpReplay procedure: subprocess.Popen @@ -32,10 +33,14 @@ def __init__(self, tcpreplay_dir: str | None = None): if sys.platform == "win32": self.executable = self.executable.with_suffix(".exe") if not self.executable.exists(): - raise FileNotFoundError(f"tcpreplay executable not found at {self.executable}") + raise FileNotFoundError( + f"tcpreplay executable not found at {self.executable}" + ) @contextmanager - def replay(self, interface: str, pcap_file: Path) -> Generator[TcpReplayTask, None, None]: + def replay( + self, interface: str, pcap_file: Path + ) -> Generator[TcpReplayTask, None, None]: """ Context manager that starts tcpreplay and yields a TcpReplayTask. @@ -75,7 +80,6 @@ def get_nic_list(self): nics.append(columns[1]) return nics - @staticmethod def _kill_process(proc: subprocess.Popen) -> None: if sys.platform == "win32": From 0f9a997187b53a4ceef59ed2a093d7a7afcc772d Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sun, 21 Sep 2025 14:38:36 +0300 Subject: [PATCH 13/14] Fix existence check for system tcpreplay. --- ci/run_tests/tcp_replay_utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index c1e2a4a30c..63ee12af26 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -32,7 +32,10 @@ def __init__(self, tcpreplay_dir: str | None = None): if sys.platform == "win32": self.executable = self.executable.with_suffix(".exe") - if not self.executable.exists(): + + # Checking for executable existence does not work if it's in PATH + version_proc = subprocess.run([self.executable, "--version"], shell=True) + if version_proc.returncode != 0: raise FileNotFoundError( f"tcpreplay executable not found at {self.executable}" ) From b25bb116df472798154311804b150ad3cae8be03 Mon Sep 17 00:00:00 2001 From: Dimitar Krastev Date: Sun, 21 Sep 2025 14:56:00 +0300 Subject: [PATCH 14/14] Run subprocess directly. --- ci/run_tests/tcp_replay_utils.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/ci/run_tests/tcp_replay_utils.py b/ci/run_tests/tcp_replay_utils.py index 63ee12af26..370fcf4c56 100644 --- a/ci/run_tests/tcp_replay_utils.py +++ b/ci/run_tests/tcp_replay_utils.py @@ -34,11 +34,7 @@ def __init__(self, tcpreplay_dir: str | None = None): self.executable = self.executable.with_suffix(".exe") # Checking for executable existence does not work if it's in PATH - version_proc = subprocess.run([self.executable, "--version"], shell=True) - if version_proc.returncode != 0: - raise FileNotFoundError( - f"tcpreplay executable not found at {self.executable}" - ) + subprocess.run([self.executable, "--version"], capture_output=True, check=True) @contextmanager def replay( @@ -69,7 +65,6 @@ def get_nic_list(self): completed_process = subprocess.run( [self.executable, "--listnics"], - shell=True, capture_output=True, ) if completed_process.returncode != 0: