Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions ci/run_tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,9 @@
import os
import subprocess
import argparse
from contextlib import contextmanager
from scapy.all import get_if_addr

PCAP_FILE_PATH = os.path.join("Tests", "Pcap++Test", "PcapExamples", "example.pcap")


@contextmanager
def tcp_replay_worker(interface: str, tcpreplay_dir: str):
tcpreplay_proc = subprocess.Popen(
["tcpreplay", "-i", interface, "--mbps=10", "-l", "0", PCAP_FILE_PATH],
cwd=tcpreplay_dir,
)
try:
yield tcpreplay_proc
finally:
tcpreplay_proc.kill()
from tcp_replay_utils import TcpReplay, PCAP_FILE_PATH


def run_packet_tests(args: list[str], use_sudo: bool):
Expand All @@ -31,11 +18,13 @@ def run_packet_tests(args: list[str], use_sudo: bool):
raise RuntimeError(f"Error while executing Packet++ tests: {completed_process}")


def run_pcap_tests(interface: str, tcpreplay_dir: str, args: list[str], use_sudo: bool):
def run_pcap_tests(
interface: str, tcp_replay: TcpReplay, args: list[str], use_sudo: bool
):
ip_address = get_if_addr(interface)
print(f"IP address is: {ip_address}")

with tcp_replay_worker(interface, tcpreplay_dir):
with tcp_replay.replay(interface, PCAP_FILE_PATH):
cmd_line = ["sudo"] if use_sudo else []
cmd_line += [os.path.join("Bin", "Pcap++Test"), "-i", ip_address, *args]

Expand Down Expand Up @@ -84,9 +73,11 @@ def main():
run_packet_tests(args.packet_test_args.split(), args.use_sudo)

if "pcap" in args.test_suites:
tcp_replay = TcpReplay(args.tcpreplay_dir)

run_pcap_tests(
args.interface,
args.tcpreplay_dir,
tcp_replay,
args.pcap_test_args.split(),
args.use_sudo,
)
Expand Down
217 changes: 105 additions & 112 deletions ci/run_tests/run_tests_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import scapy.arch.windows
from ipaddress import IPv4Address

from tcp_replay_utils import TcpReplay, PCAP_FILE_PATH

TCPREPLAY_PATH = "tcpreplay-4.4.1-win"
PCAP_FILE_PATH = os.path.abspath(
os.path.join("Tests", "Pcap++Test", "PcapExamples", "example.pcap")
)


def validate_ipv4_address(address):
Expand All @@ -28,38 +27,99 @@ def get_ip_by_guid(guid):
return None


def find_interface():
completed_process = subprocess.run(
["tcpreplay.exe", "--listnics"],
def find_interface(tcp_replay: TcpReplay):
nic_devices = tcp_replay.get_nic_list()

for device in nic_devices:
nic_guid = device.lstrip("\\Device\\NPF_")
ip_address = get_ip_by_guid(nic_guid)

if ip_address and not ip_address.startswith("169.254"):
completed_process = subprocess.run(
["curl", "--interface", ip_address, "www.google.com"],
capture_output=True,
shell=True,
)
if completed_process.returncode == 0:
return device, ip_address

return None, None


def run_packet_tests():
return subprocess.run(
os.path.join("Bin", "Packet++Test"),
cwd=os.path.join("Tests", "Packet++Test"),
shell=True,
capture_output=True,
cwd=TCPREPLAY_PATH,
check=True, # Raise exception if the worker returns in non-zero status code
)
if completed_process.returncode != 0:
print('Error executing "tcpreplay.exe --listnics"!')
exit(1)

raw_nics_output = completed_process.stdout.decode("utf-8")
for row in raw_nics_output.split("\n")[2:]:
columns = row.split("\t")
if len(columns) > 1 and columns[1].startswith("\\Device\\NPF_"):
interface = columns[1]
try:
nic_guid = interface.lstrip("\\Device\\NPF_")
ip_address = get_ip_by_guid(nic_guid)
if ip_address.startswith("169.254"):
continue
completed_process = subprocess.run(
["curl", "--interface", ip_address, "www.google.com"],
capture_output=True,
shell=True,
)
if completed_process.returncode != 0:
continue
return interface, ip_address
except Exception:
pass
return None, None

def run_packet_coverage():
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a method of just one action, why should it be its own method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, It's easier to read at the call site. The main function is already quite complex, and it is easier to understand what run_packet_***() does at a glance than having the raw subprocess run command with all the arguments and having to understand what it does.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the functions are that complex, moving back and forth between functions is also not very easy to read

Copy link
Collaborator Author

@Dimi1010 Dimi1010 Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can revert it, but the way I see it the main function's focus is orchestration and the run_*** functions are the actual implementations of how the actual actions are achieved. The orchestration shouldn't need to worry about how the actions are done, and the actions shouldn't need to be concerned with the orchestration.

It also mirrors the way it is written in run_tests.py, and having the two scripts somewhat similar would be nice.

PS: The tcp replay worker start / stop should probably be moved under their respective run_pcap_*** operations too.

return subprocess.run(
[
"OpenCppCoverage.exe",
"--verbose",
"--sources",
"Packet++",
"--sources",
"Pcap++",
"--sources",
"Common++",
"--excluded_sources",
"Tests",
"--export_type",
"cobertura:Packet++Coverage.xml",
"--",
os.path.join("Bin", "Packet++Test"),
],
cwd=os.path.join("Tests", "Packet++Test"),
shell=True,
check=True, # Raise exception if the worker returns in non-zero status code
)


def run_pcap_tests(ip_address: str, skip_tests: list[str]):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return subprocess.run(
[
os.path.join("Bin", "Pcap++Test"),
"-i",
ip_address,
"-x",
";".join(skip_tests),
],
cwd=os.path.join("Tests", "Pcap++Test"),
shell=True,
check=True, # Raise exception if the worker returns in non-zero status code
)


def run_pcap_coverage(ip_address: str, skip_tests: list[str]):
Copy link
Owner

@seladb seladb Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: This is a method of just one action, why should it be its own method?

return subprocess.run(
[
"OpenCppCoverage.exe",
"--verbose",
"--sources",
"Packet++",
"--sources",
"Pcap++",
"--sources",
"Common++",
"--excluded_sources",
"Tests",
"--export_type",
"cobertura:Pcap++Coverage.xml",
"--",
os.path.join("Bin", "Pcap++Test"),
"-i",
ip_address,
"-x",
";".join(skip_tests),
],
cwd=os.path.join("Tests", "Pcap++Test"),
shell=True,
check=True, # Raise exception if the worker returns in non-zero status code
)


def main():
Expand All @@ -81,93 +141,26 @@ def main():
)
args = parser.parse_args()

tcpreplay_interface, ip_address = find_interface()
if args.coverage:
run_packet_coverage()
else:
run_packet_tests()

tcp_replay = TcpReplay(TCPREPLAY_PATH)

tcpreplay_interface, ip_address = find_interface(tcp_replay)
if not tcpreplay_interface or not ip_address:
print("Cannot find an interface to run tests on!")
exit(1)
print(f"Interface is {tcpreplay_interface} and IP address is {ip_address}")

try:
tcpreplay_cmd = (
f'tcpreplay.exe -i "{tcpreplay_interface}" --mbps=10 -l 0 {PCAP_FILE_PATH}'
)
tcpreplay_proc = subprocess.Popen(tcpreplay_cmd, shell=True, cwd=TCPREPLAY_PATH)

if args.coverage:
completed_process = subprocess.run(
[
"OpenCppCoverage.exe",
"--verbose",
"--sources",
"Packet++",
"--sources",
"Pcap++",
"--sources",
"Common++",
"--excluded_sources",
"Tests",
"--export_type",
"cobertura:Packet++Coverage.xml",
"--",
os.path.join("Bin", "Packet++Test"),
],
cwd=os.path.join("Tests", "Packet++Test"),
shell=True,
)
else:
completed_process = subprocess.run(
os.path.join("Bin", "Packet++Test"),
cwd=os.path.join("Tests", "Packet++Test"),
shell=True,
)
if completed_process.returncode != 0:
print("Error while executing Packet++ tests: " + str(completed_process))
exit(completed_process.returncode)
print(f"Interface is {tcpreplay_interface} and IP address is {ip_address}")

skip_tests = ["TestRemoteCapture"] + args.skip_tests
skip_tests = ["TestRemoteCapture"] + args.skip_tests
with tcp_replay.replay(tcpreplay_interface, PCAP_FILE_PATH):
if args.coverage:
completed_process = subprocess.run(
[
"OpenCppCoverage.exe",
"--verbose",
"--sources",
"Packet++",
"--sources",
"Pcap++",
"--sources",
"Common++",
"--excluded_sources",
"Tests",
"--export_type",
"cobertura:Pcap++Coverage.xml",
"--",
os.path.join("Bin", "Pcap++Test"),
"-i",
ip_address,
"-x",
";".join(skip_tests),
],
cwd=os.path.join("Tests", "Pcap++Test"),
shell=True,
)
run_pcap_coverage(ip_address, skip_tests)
else:
completed_process = subprocess.run(
[
os.path.join("Bin", "Pcap++Test"),
"-i",
ip_address,
"-x",
";".join(skip_tests),
],
cwd=os.path.join("Tests", "Pcap++Test"),
shell=True,
)
if completed_process.returncode != 0:
print("Error while executing Pcap++ tests: " + str(completed_process))
exit(completed_process.returncode)

finally:
subprocess.call(["taskkill", "/F", "/T", "/PID", str(tcpreplay_proc.pid)])
run_pcap_tests(ip_address, skip_tests)


if __name__ == "__main__":
Expand Down
87 changes: 87 additions & 0 deletions ci/run_tests/tcp_replay_utils.py
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that having this class instead of a simple @contextmanager and subprocess.run() to get the nics is an overkill. Usually less code is better 🙂

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just the nics. Those were mostly an afterthought. It's so the tcp replay executable passed around as a structured object. IMO, that is better than having a path in the parameters that you need to know is the executable.

The differences between unix and windows calls are handled internally instead of having to do that all over the place or having duplicated procedures.

It also allows the places where it is used not to worry about how exactly the replay executable is found.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just the nics. Those were mostly an afterthought. It's so the tcp replay executable passed around as a structured object. IMO, that is better than having a path in the parameters that you need to know is the executable.

The only parameter that is passed around is the tcpreplay path, but I think it's ok because it's just a string.

The differences between unix and windows calls are handled internally instead of having to do that all over the place or having duplicated procedures.

I actually think this is the main problem with the new class - methods like get_nic_list() only run on Windows and otherwise throw an exception, which is not great from OOP perspective. The constructor also has a few if..else per platform - again, not great OOP. I don't think this new class is really needed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only parameter that is passed around is the tcpreplay path, but I think it's ok because it's just a string.

Eh, its workable, but IMO a lot less explicit. A string also won't be caught by the type system if it is passed in the wrong place.

I actually think this is the main problem with the new class - methods like get_nic_list() only run on Windows and otherwise throw an exception, which is not great from OOP perspective.

There is nothing preventing the method get_nic_list() to work on unix, as far as I know. It is just that the method was not used in the unix part so I didn't bother. I can add it if it is an issue.

The constructor also has a few if..else per platform - again, not great OOP.

Why is per platform, if else not great OOP exactly?

The only thing that is "per platform" per se, is that on windows:

  • the executable path is formatted correctly with an .exe suffix.
  • the kill process command uses the taskkill.

Those are both basic cross platform patches that don't modify anything else in the functionality of the class, and would need to be done regardless if the code is structured as tcp_replay_worker function or as the TcpReplay object since the user only submits an optional directory, if the function is to be cross platform.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, its workable, but IMO a lot less explicit. A string also won't be caught by the type system if it is passed in the wrong place.

A path is a string... we can convert the string to Python's Path class but I don't think it's necessary here

There is nothing preventing the method get_nic_list() to work on unix, as far as I know. It is just that the method was not used in the unix part so I didn't bother. I can add it if it is an issue.

To be honest, I didn't check if it can run on Linux, I assume it does and in that case the if sys.platform != "win32" is not needed

Why is per platform, if else not great OOP exactly?

In "clean OOP" there should be minimum if..else clauses and you'd expect to have separate classes for different platforms that share a base class with the common functionality. However, as I mentioned before, I don't think all of this is needed here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I didn't check if it can run on Linux, I assume it does and in that case the if sys.platform != "win32" is not needed

The call to tcpreplay --listnics works but the format is different.

It returns

tcpreplay --listnics
Warning in interface.c:get_interface_list() line 80:
May need to run as root to get access to all network interfaces.
Warning in interface.c:get_interface_list() line 80:
May need to run as root to get access to all network interfaces.
Available network interfaces:
eth0
any
bluetooth-monitor
nflog
nfqueue
dbus-system
dbus-session

instead of \\Device\\NPF_*** strings, so the code after the call is win32 only. That is why the platform is restricted atm.

In "clean OOP" there should be minimum if..else clauses and you'd expect to have separate classes for different platforms that share a base class with the common functionality. However, as I mentioned before, I don't think all of this is needed here

Fair enough. I also agree that full derived classes aren't needed here.

Having said that, we can't escape the if sys.platform branches, if we want to keep the code duplication low and routines cross platform. It is either that or have full duplicated routines for a minor difference. We need to have the enforcement / compatibility checks somewhere.

I used an object so the code is mostly at one place and to maintain the class invariants. It allows the checks / compatibility layer to be centralized. Otherwise it would require every routine that is cross-platform to do the checks in the __init__() method and the checks in the _kill_process() method. Currently that would only be replay / tcp_replay_worker but future maintainability suffers if more functions need to be added.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm saying is that the current script isn't too complex to read or maintain, which is why I think this refactoring is not really needed. I'd think we can focus our energy in areas that can be more beneficial for the project and its users 🙂

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import subprocess
import sys
from collections.abc import Generator
from dataclasses import dataclass
from contextlib import contextmanager
from pathlib import Path

PCAP_FILE_PATH = Path("Tests", "Pcap++Test", "PcapExamples", "example.pcap").absolute()


@dataclass
class TcpReplayTask:
"""A replay task that holds the tcpreplay instance and the subprocess procedure."""

replay: TcpReplay
procedure: subprocess.Popen


class TcpReplay:
def __init__(self, tcpreplay_dir: str | None = None):
"""
A wrapper class for managing tcpreplay operations.
:param tcpreplay_dir: Directory where tcpreplay is located. If None, assumes tcpreplay is in the system PATH.
"""
if tcpreplay_dir is None:
self.executable = Path("tcpreplay")
else:
self.executable = Path(tcpreplay_dir) / "tcpreplay"

if sys.platform == "win32":
self.executable = self.executable.with_suffix(".exe")

# Checking for executable existence does not work if it's in PATH
subprocess.run([self.executable, "--version"], capture_output=True, check=True)

@contextmanager
def replay(
self, interface: str, pcap_file: Path
) -> Generator[TcpReplayTask, None, None]:
"""
Context manager that starts tcpreplay and yields a TcpReplayTask.
:param interface: Network interface to use for replaying packets.
:param pcap_file: Path to the pcap file to replay.
"""
cmd = [self.executable, "-i", interface, "--mbps=10", "-l", "0", str(pcap_file)]
proc = subprocess.Popen(cmd)
try:
yield TcpReplayTask(replay=self, procedure=proc)
finally:
self._kill_process(proc)

def get_nic_list(self):
"""
Get the list of network interfaces using tcpreplay. Only works on Windows.
:return: List of network interface names.
"""
if sys.platform != "win32":
# We don't use it on non-Windows platforms yet.
raise RuntimeError("This method is only supported on Windows!")

completed_process = subprocess.run(
[self.executable, "--listnics"],
capture_output=True,
)
if completed_process.returncode != 0:
raise RuntimeError('Error executing "tcpreplay --listnics"!')

raw_nics_output = completed_process.stdout.decode("utf-8")
nics = []
for row in raw_nics_output.split("\n")[2:]:
columns = row.split("\t")
if len(columns) > 1 and columns[1].startswith("\\Device\\NPF_"):
nics.append(columns[1])
return nics

@staticmethod
def _kill_process(proc: subprocess.Popen) -> None:
if sys.platform == "win32":
# Use taskkill to kill the process and its children
subprocess.call(["taskkill", "/F", "/T", "/PID", str(proc.pid)])
else:
proc.kill()
Loading