Skip to content
35 changes: 19 additions & 16 deletions .buildkite/pipeline_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,48 @@
# the operating system sometimes uses it for book-keeping tasks. The memory node (-m parameter)
# has to be the node associated with the NUMA node from which we picked CPUs.
perf_test = {
"virtio-block": {
"label": "💿 Virtio Block Performance",
"test_path": "integration_tests/performance/test_block_ab.py::test_block_performance",
"virtio-block-sync": {
"label": "💿 Virtio Sync Block Performance",
"tests": "integration_tests/performance/test_block_ab.py::test_block_performance -k 'not Async'",
"devtool_opts": "-c 1-10 -m 0",
},
"virtio-block-async": {
"label": "💿 Virtio Async Block Performance",
"tests": "integration_tests/performance/test_block_ab.py::test_block_performance -k Async",
"devtool_opts": "-c 1-10 -m 0",
},
"vhost-user-block": {
"label": "💿 vhost-user Block Performance",
"test_path": "integration_tests/performance/test_block_ab.py::test_block_vhost_user_performance",
"tests": "integration_tests/performance/test_block_ab.py::test_block_vhost_user_performance",
"devtool_opts": "-c 1-10 -m 0",
"ab_opts": "--noise-threshold 0.1",
},
"network": {
"label": "📠 Network Latency and Throughput",
"test_path": "integration_tests/performance/test_network_ab.py",
"tests": "integration_tests/performance/test_network_ab.py",
"devtool_opts": "-c 1-10 -m 0",
# Triggers if delta is > 0.01ms (10µs) or default relative threshold (5%)
# only relevant for latency test, throughput test will always be magnitudes above this anyway
"ab_opts": "--absolute-strength 0.010",
},
"snapshot-latency": {
"label": "📸 Snapshot Latency",
"test_path": "integration_tests/performance/test_snapshot_ab.py::test_restore_latency integration_tests/performance/test_snapshot_ab.py::test_post_restore_latency",
"tests": "integration_tests/performance/test_snapshot_ab.py::test_restore_latency integration_tests/performance/test_snapshot_ab.py::test_post_restore_latency",
"devtool_opts": "-c 1-12 -m 0",
},
"population-latency": {
"label": "📸 Memory Population Latency",
"test_path": "integration_tests/performance/test_snapshot_ab.py::test_population_latency",
"tests": "integration_tests/performance/test_snapshot_ab.py::test_population_latency",
"devtool_opts": "-c 1-12 -m 0",
},
"vsock-throughput": {
"label": "🧦 Vsock Throughput",
"test_path": "integration_tests/performance/test_vsock_ab.py",
"tests": "integration_tests/performance/test_vsock_ab.py",
"devtool_opts": "-c 1-10 -m 0",
},
"memory-overhead": {
"label": "💾 Memory Overhead and 👢 Boottime",
"test_path": "integration_tests/performance/test_memory_overhead.py integration_tests/performance/test_boottime.py::test_boottime",
"tests": "integration_tests/performance/test_memory_overhead.py integration_tests/performance/test_boottime.py::test_boottime",
"devtool_opts": "-c 1-10 -m 0",
},
}
Expand Down Expand Up @@ -93,23 +98,21 @@
tests = [perf_test[test] for test in pipeline.args.test or perf_test.keys()]
for test in tests:
devtool_opts = test.pop("devtool_opts")
test_path = test.pop("test_path")
test_selector = test.pop("tests")
ab_opts = test.pop("ab_opts", "")
devtool_opts += " --performance"
pytest_opts = ""
test_script_opts = ""
if REVISION_A:
devtool_opts += " --ab"
pytest_opts = (
f"{ab_opts} run build/{REVISION_A}/ build/{REVISION_B} --test {test_path}"
)
test_script_opts = f'{ab_opts} run build/{REVISION_A}/ build/{REVISION_B} --pytest-opts "{test_selector}"'
else:
# Passing `-m ''` below instructs pytest to collect tests regardless of
# their markers (e.g. it will collect both tests marked as nonci, and
# tests without any markers).
pytest_opts += f" -m '' {test_path}"
test_script_opts += f" -m '' {test_selector}"

pipeline.build_group(
command=pipeline.devtool_test(devtool_opts, pytest_opts),
command=pipeline.devtool_test(devtool_opts, test_script_opts),
# and the rest can be command arguments
**test,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/microvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def kill(self):

# filter ps results for the jailer's unique id
_, stdout, stderr = utils.check_output(
f"ps aux | grep {self.jailer.jailer_id}"
f"ps ax -o cmd -ww | grep {self.jailer.jailer_id}"
)
# make sure firecracker was killed
assert (
Expand Down
59 changes: 38 additions & 21 deletions tests/integration_tests/performance/test_block_ab.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""Performance benchmark for block device emulation."""

import concurrent
import glob
import os
import shutil
from pathlib import Path
Expand Down Expand Up @@ -44,7 +45,7 @@ def prepare_microvm_for_test(microvm):
check_output("echo 3 > /proc/sys/vm/drop_caches")


def run_fio(microvm, mode, block_size):
def run_fio(microvm, mode, block_size, fio_engine="libaio"):
"""Run a fio test in the specified mode with block size bs."""
cmd = (
CmdBuilder("fio")
Expand All @@ -59,7 +60,7 @@ def run_fio(microvm, mode, block_size):
.with_arg("--randrepeat=0")
.with_arg(f"--bs={block_size}")
.with_arg(f"--size={BLOCK_DEVICE_SIZE_MB}M")
.with_arg("--ioengine=libaio")
.with_arg(f"--ioengine={fio_engine}")
.with_arg("--iodepth=32")
# Set affinity of the entire fio process to a set of vCPUs equal in size to number of workers
.with_arg(
Expand All @@ -68,8 +69,8 @@ def run_fio(microvm, mode, block_size):
# Instruct fio to pin one worker per vcpu
.with_arg("--cpus_allowed_policy=split")
.with_arg(f"--write_bw_log={mode}")
.with_arg(f"--write_lat_log={mode}")
.with_arg("--log_avg_msec=1000")
.with_arg("--output-format=json+")
.build()
)

Expand Down Expand Up @@ -102,51 +103,65 @@ def run_fio(microvm, mode, block_size):
return logs_path, cpu_load_future.result()


def process_fio_logs(vm, fio_mode, logs_dir, metrics):
"""Parses the fio logs in `{logs_dir}/{fio_mode}_bw.*.log and emits their contents as CloudWatch metrics"""

def process_fio_log_files(logs_glob):
"""Parses all fio log files matching the given glob and yields tuples of same-timestamp read and write metrics"""
data = [
Path(f"{logs_dir}/{fio_mode}_bw.{job_id + 1}.log")
.read_text("UTF-8")
.splitlines()
for job_id in range(vm.vcpus_count)
Path(pathname).read_text("UTF-8").splitlines()
for pathname in glob.glob(logs_glob)
]

assert data, "no log files found!"

for tup in zip(*data):
bw_read = 0
bw_write = 0
read_values = []
write_values = []

for line in tup:
# See https://fio.readthedocs.io/en/latest/fio_doc.html#log-file-formats
_, value, direction, _ = line.split(",", maxsplit=3)
value = int(value.strip())

# See https://fio.readthedocs.io/en/latest/fio_doc.html#log-file-formats
match direction.strip():
case "0":
bw_read += value
read_values.append(value)
case "1":
bw_write += value
write_values.append(value)
case _:
assert False

yield read_values, write_values


def emit_fio_metrics(logs_dir, metrics):
"""Parses the fio logs in `{logs_dir}/*_[clat|bw].*.log and emits their contents as CloudWatch metrics"""
for bw_read, bw_write in process_fio_log_files(f"{logs_dir}/*_bw.*.log"):
if bw_read:
metrics.put_metric("bw_read", bw_read, "Kilobytes/Second")
metrics.put_metric("bw_read", sum(bw_read), "Kilobytes/Second")
if bw_write:
metrics.put_metric("bw_write", bw_write, "Kilobytes/Second")
metrics.put_metric("bw_write", sum(bw_write), "Kilobytes/Second")

for lat_read, lat_write in process_fio_log_files(f"{logs_dir}/*_clat.*.log"):
# latency values in fio logs are in nanoseconds, but cloudwatch only supports
# microseconds as the more granular unit, so need to divide by 1000.
for value in lat_read:
metrics.put_metric("clat_read", value / 1000, "Microseconds")
for value in lat_write:
metrics.put_metric("clat_write", value / 1000, "Microseconds")


@pytest.mark.timeout(120)
@pytest.mark.nonci
@pytest.mark.parametrize("vcpus", [1, 2], ids=["1vcpu", "2vcpu"])
@pytest.mark.parametrize("fio_mode", ["randread", "randwrite"])
@pytest.mark.parametrize("fio_block_size", [4096], ids=["bs4096"])
@pytest.mark.parametrize("fio_engine", ["libaio", "psync"])
def test_block_performance(
microvm_factory,
guest_kernel_acpi,
rootfs,
vcpus,
fio_mode,
fio_block_size,
fio_engine,
io_engine,
metrics,
):
Expand All @@ -170,15 +185,16 @@ def test_block_performance(
"io_engine": io_engine,
"fio_mode": fio_mode,
"fio_block_size": str(fio_block_size),
"fio_engine": fio_engine,
**vm.dimensions,
}
)

vm.pin_threads(0)

logs_dir, cpu_util = run_fio(vm, fio_mode, fio_block_size)
logs_dir, cpu_util = run_fio(vm, fio_mode, fio_block_size, fio_engine)

process_fio_logs(vm, fio_mode, logs_dir, metrics)
emit_fio_metrics(logs_dir, metrics)

for thread_name, values in cpu_util.items():
for value in values:
Expand Down Expand Up @@ -218,6 +234,7 @@ def test_block_vhost_user_performance(
"io_engine": "vhost-user",
"fio_mode": fio_mode,
"fio_block_size": str(fio_block_size),
"fio_engine": "libaio",
**vm.dimensions,
}
)
Expand All @@ -227,7 +244,7 @@ def test_block_vhost_user_performance(

logs_dir, cpu_util = run_fio(vm, fio_mode, fio_block_size)

process_fio_logs(vm, fio_mode, logs_dir, metrics)
emit_fio_metrics(logs_dir, metrics)

for thread_name, values in cpu_util.items():
for value in values:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_memory_overhead(
"""

for _ in range(5):
microvm = microvm_factory.build(guest_kernel_acpi, rootfs)
microvm = microvm_factory.build(guest_kernel_acpi, rootfs, monitor_memory=False)
microvm.spawn(emit_metrics=True)
microvm.basic_config(vcpu_count=vcpu_count, mem_size_mib=mem_size_mib)
microvm.add_net_iface()
Expand Down
48 changes: 39 additions & 9 deletions tools/ab_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@
{"instance": "m6a.metal", "performance_test": "test_network_tcp_throughput"},
# Network throughput on m7a.metal
{"instance": "m7a.metal-48xl", "performance_test": "test_network_tcp_throughput"},
# block latencies if guest uses async request submission
{"fio_engine": "libaio", "metric": "clat_read"},
{"fio_engine": "libaio", "metric": "clat_write"},
]


def is_ignored(dimensions) -> bool:
"""Checks whether the given dimensions match a entry in the IGNORED dictionary above"""
"""Checks whether the given dimensions match an entry in the IGNORED dictionary above"""
for high_variance in IGNORED:
matching = {key: dimensions[key] for key in high_variance if key in dimensions}

Expand Down Expand Up @@ -114,6 +117,8 @@ def load_data_series(report_path: Path, tag=None, *, reemit: bool = False):
# Dictionary mapping EMF dimensions to A/B-testable metrics/properties
processed_emf = {}

distinct_values_per_dimenson = defaultdict(set)

report = json.loads(report_path.read_text("UTF-8"))
for test in report["tests"]:
for line in test["teardown"]["stdout"].splitlines():
Expand All @@ -133,6 +138,9 @@ def load_data_series(report_path: Path, tag=None, *, reemit: bool = False):
if not dimensions:
continue

for dimension, value in dimensions.items():
distinct_values_per_dimenson[dimension].add(value)

dimension_set = frozenset(dimensions.items())

if dimension_set not in processed_emf:
Expand All @@ -149,22 +157,40 @@ def load_data_series(report_path: Path, tag=None, *, reemit: bool = False):

values.extend(result[metric][0])

return processed_emf
irrelevant_dimensions = set()

for dimension, distinct_values in distinct_values_per_dimenson.items():
if len(distinct_values) == 1:
irrelevant_dimensions.add(dimension)

post_processed_emf = {}

for dimension_set, metrics in processed_emf.items():
processed_key = frozenset(
(dim, value)
for (dim, value) in dimension_set
if dim not in irrelevant_dimensions
)

post_processed_emf[processed_key] = metrics

return post_processed_emf


def collect_data(binary_dir: Path, tests: list[str]):
def collect_data(binary_dir: Path, pytest_opts: str):
"""Executes the specified test using the provided firecracker binaries"""
binary_dir = binary_dir.resolve()

print(f"Collecting samples with {binary_dir}")
subprocess.run(
["./tools/test.sh", f"--binary-dir={binary_dir}", *tests, "-m", ""],
f"./tools/test.sh --binary-dir={binary_dir} {pytest_opts} -m ''",
env=os.environ
| {
"AWS_EMF_ENVIRONMENT": "local",
"AWS_EMF_NAMESPACE": "local",
},
check=True,
shell=True,
)
return load_data_series(
Path("test_results/test-report.json"), binary_dir, reemit=True
Expand Down Expand Up @@ -258,7 +284,7 @@ def analyze_data(

failures = []
for (dimension_set, metric), (result, unit) in results.items():
if is_ignored(dict(dimension_set)):
if is_ignored(dict(dimension_set) | {"metric": metric}):
continue

print(f"Doing A/B-test for dimensions {dimension_set} and property {metric}")
Expand Down Expand Up @@ -308,15 +334,15 @@ def analyze_data(
def ab_performance_test(
a_revision: Path,
b_revision: Path,
tests,
pytest_opts,
p_thresh,
strength_abs_thresh,
noise_threshold,
):
"""Does an A/B-test of the specified test with the given firecracker/jailer binaries"""

return binary_ab_test(
lambda bin_dir, _: collect_data(bin_dir, tests),
lambda bin_dir, _: collect_data(bin_dir, pytest_opts),
lambda ah, be: analyze_data(
ah,
be,
Expand Down Expand Up @@ -349,7 +375,11 @@ def ab_performance_test(
help="Directory containing firecracker and jailer binaries whose performance we want to compare against the results from a_revision",
type=Path,
)
run_parser.add_argument("--test", help="The test to run", nargs="+", required=True)
run_parser.add_argument(
"--pytest-opts",
help="Parameters to pass through to pytest, for example for test selection",
required=True,
)
analyze_parser = subparsers.add_parser(
"analyze",
help="Analyze the results of two manually ran tests based on their test-report.json files",
Expand Down Expand Up @@ -388,7 +418,7 @@ def ab_performance_test(
ab_performance_test(
args.a_revision,
args.b_revision,
args.test,
args.pytest_opts,
args.significance,
args.absolute_strength,
args.noise_threshold,
Expand Down