diff --git a/tests/framework/ab_test.py b/tests/framework/ab_test.py index 66690f910fa..2a5483c617b 100644 --- a/tests/framework/ab_test.py +++ b/tests/framework/ab_test.py @@ -21,19 +21,14 @@ of both invocations is the same, the test passes (with us being alerted to this situtation via a special pipeline that does not block PRs). If not, it fails, preventing PRs from introducing new vulnerable dependencies. """ -import statistics from pathlib import Path from tempfile import TemporaryDirectory -from typing import Callable, List, Optional, TypeVar - -import scipy +from typing import Callable, Optional, TypeVar from framework import utils -from framework.defs import FC_WORKSPACE_DIR from framework.properties import global_props from framework.utils import CommandReturn from framework.with_filelock import with_filelock -from host_tools.cargo_build import DEFAULT_TARGET_DIR # Locally, this will always compare against main, even if we try to merge into, say, a feature branch. # We might want to do a more sophisticated way to determine a "parent" branch here. @@ -98,27 +93,6 @@ def git_ab_test( return result_a, result_b, comparison -DEFAULT_A_DIRECTORY = FC_WORKSPACE_DIR / "build" / "main" -DEFAULT_B_DIRECTORY = FC_WORKSPACE_DIR / "build" / "cargo_target" / DEFAULT_TARGET_DIR - - -def binary_ab_test( - test_runner: Callable[[Path, bool], T], - comparator: Callable[[T, T], U] = default_comparator, - *, - a_directory: Path = DEFAULT_A_DIRECTORY, - b_directory: Path = DEFAULT_B_DIRECTORY, -): - """ - Similar to `git_ab_test`, but instead of locally checking out different revisions, it operates on - directories containing firecracker/jailer binaries - """ - result_a = test_runner(a_directory, True) - result_b = test_runner(b_directory, False) - - return result_a, result_b, comparator(result_a, result_b) - - def git_ab_test_host_command_if_pr( command: str, *, @@ -170,32 +144,6 @@ def set_did_not_grow_comparator( ) -def check_regression( - a_samples: List[float], b_samples: List[float], *, n_resamples: int = 9999 -): - """Checks for a regression by performing a permutation test. A permutation test is a non-parametric test that takes - three parameters: Two populations (sets of samples) and a function computing a "statistic" based on two populations. - First, the test computes the statistic for the initial populations. It then randomly - permutes the two populations (e.g. merges them and then randomly splits them again). For each such permuted - population, the statistic is computed. Then, all the statistics are sorted, and the percentile of the statistic for the - initial populations is computed. We then look at the fraction of statistics that are larger/smaller than that of the - initial populations. The minimum of these two fractions will then become the p-value. - - The idea is that if the two populations are indeed drawn from the same distribution (e.g. if performance did not - change), then permuting will not affect the statistic (indeed, it should be approximately normal-distributed, and - the statistic for the initial populations will be somewhere "in the middle"). - - Useful for performance tests. - """ - return scipy.stats.permutation_test( - (a_samples, b_samples), - # Compute the difference of means, such that a positive different indicates potential for regression. - lambda x, y: statistics.mean(y) - statistics.mean(x), - vectorized=False, - n_resamples=n_resamples, - ) - - @with_filelock def git_clone(clone_path, commitish): """Clone the repository at `commit`. diff --git a/tests/host_tools/metrics.py b/tests/host_tools/metrics.py index 6a2698d8fa2..41c58992a71 100644 --- a/tests/host_tools/metrics.py +++ b/tests/host_tools/metrics.py @@ -45,11 +45,8 @@ import asyncio import json import os -import socket from pathlib import Path -from urllib.parse import urlparse -from aws_embedded_metrics.constants import DEFAULT_NAMESPACE from aws_embedded_metrics.logger.metrics_logger_factory import create_metrics_logger @@ -112,110 +109,3 @@ def get_metrics_logger(): else: logger = None return MetricsWrapper(logger) - - -def emit_raw_emf(emf_msg: dict): - """Emites a raw EMF log message to the local cloudwatch agent""" - if "AWS_EMF_AGENT_ENDPOINT" not in os.environ: - return - - namespace = os.environ.get("AWS_EMF_NAMESPACE", DEFAULT_NAMESPACE) - emf_msg["_aws"]["LogGroupName"] = os.environ.get( - "AWS_EMF_LOG_GROUP_NAME", f"{namespace}-metrics" - ) - emf_msg["_aws"]["LogStreamName"] = os.environ.get("AWS_EMF_LOG_STREAM_NAME", "") - for metrics in emf_msg["_aws"]["CloudWatchMetrics"]: - metrics["Namespace"] = namespace - - emf_endpoint = urlparse(os.environ["AWS_EMF_AGENT_ENDPOINT"]) - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: - sock.sendto( - (json.dumps(emf_msg) + "\n").encode("utf-8"), - (emf_endpoint.hostname, emf_endpoint.port), - ) - - -UNIT_REDUCTIONS = { - "Microseconds": "Milliseconds", - "Milliseconds": "Seconds", - "Bytes": "Kilobytes", - "Kilobytes": "Megabytes", - "Megabytes": "Gigabytes", - "Gigabytes": "Terabytes", - "Bits": "Kilobits", - "Kilobits": "Megabits", - "Megabits": "Gigabits", - "Gigabits": "Terabit", - "Bytes/Second": "Kilobytes/Second", - "Kilobytes/Second": "Megabytes/Second", - "Megabytes/Second": "Gigabytes/Second", - "Gigabytes/Second": "Terabytes/Second", - "Bits/Second": "Kilobits/Second", - "Kilobits/Second": "Megabits/Second", - "Megabits/Second": "Gigabits/Second", - "Gigabits/Second": "Terabits/Second", -} -INV_UNIT_REDUCTIONS = {v: k for k, v in UNIT_REDUCTIONS.items()} - - -UNIT_SHORTHANDS = { - "Seconds": "s", - "Microseconds": "μs", - "Milliseconds": "ms", - "Bytes": "B", - "Kilobytes": "KB", - "Megabytes": "MB", - "Gigabytes": "GB", - "Terabytes": "TB", - "Bits": "Bit", - "Kilobits": "KBit", - "Megabits": "MBit", - "Gigabits": "GBit", - "Terabits": "TBit", - "Percent": "%", - "Count": "", - "Bytes/Second": "B/s", - "Kilobytes/Second": "KB/s", - "Megabytes/Second": "MB/s", - "Gigabytes/Second": "GB/s", - "Terabytes/Second": "TB/s", - "Bits/Second": "Bit/s", - "Kilobits/Second": "KBit/s", - "Megabits/Second": "MBit/s", - "Gigabits/Second": "GBit/s", - "Terabits/Second": "TBit/s", - "Count/Second": "Hz", - "None": "", -} - - -def reduce_value(value, unit): - """ - Utility function for expressing a value in the largest possible unit in which it would still be >= 1 - - For example, `reduce_value(1_000_000, Bytes)` would return (1, Megabytes) - """ - # Could do this recursively, but I am worried about infinite recursion - # due to precision problems (e.g. infinite loop of dividing/multiplying by 1000, alternating - # between values < 1 and >= 1000). - while abs(value) < 1 and unit in INV_UNIT_REDUCTIONS: - value *= 1000 - unit = INV_UNIT_REDUCTIONS[unit] - while abs(value) >= 1000 and unit in UNIT_REDUCTIONS: - value /= 1000 - unit = UNIT_REDUCTIONS[unit] - - return value, unit - - -def format_with_reduced_unit(value, unit): - """ - Utility function for pretty printing a given value by choosing a unit as large as possible, - and then outputting its shorthand. - - For example, `format_with_reduced_unit(1_000_000, Bytes)` would return "1MB". - """ - reduced_value, reduced_unit = reduce_value(value, unit) - formatted_unit = UNIT_SHORTHANDS.get(reduced_unit, reduced_unit) - - return f"{reduced_value:.2f}{formatted_unit}" diff --git a/tools/ab_test.py b/tools/ab_test.py index 7f44064ffde..54277eb1ec4 100755 --- a/tools/ab_test.py +++ b/tools/ab_test.py @@ -6,40 +6,113 @@ The script takes two git revisions and a pytest integration test. It utilizes our integration test frameworks --binary-dir parameter to execute the given -test using binaries compiled from each revision, and captures the EMF logs -output. It the searches for list-valued properties/metrics in the EMF, and runs a -regression test comparing these lists for the two runs. +test using binaries compiled from each revision, and runs a regression test +comparing resulting metrics between runs. It performs the A/B-test as follows: -For each EMF log message output, look at the dimensions. The script assumes that -dimensions are unique across all log messages output from a single test run. In -each log message, then look for all properties that have lists assigned to them, -and collect them. For both runs of the test, the set of distinct dimensions -collected this way must be the same. Then, we match corresponding dimensions -between the two runs, performing statistical regression test across all the list- -valued properties collected. +For both A and B runs, collect all `metrics.json` files and read all dimentions +from them. Script assumes all dimentions are unique within single run and both +A and B runs result in the same dimentions. After collection is done, perform +statistical regression test across all the list-valued properties collected. """ import argparse +import glob import json import os import statistics import subprocess -import sys from collections import defaultdict from pathlib import Path +from typing import Callable, List, TypeVar + +import scipy + +UNIT_REDUCTIONS = { + "Microseconds": "Milliseconds", + "Milliseconds": "Seconds", + "Bytes": "Kilobytes", + "Kilobytes": "Megabytes", + "Megabytes": "Gigabytes", + "Gigabytes": "Terabytes", + "Bits": "Kilobits", + "Kilobits": "Megabits", + "Megabits": "Gigabits", + "Gigabits": "Terabit", + "Bytes/Second": "Kilobytes/Second", + "Kilobytes/Second": "Megabytes/Second", + "Megabytes/Second": "Gigabytes/Second", + "Gigabytes/Second": "Terabytes/Second", + "Bits/Second": "Kilobits/Second", + "Kilobits/Second": "Megabits/Second", + "Megabits/Second": "Gigabits/Second", + "Gigabits/Second": "Terabits/Second", +} +INV_UNIT_REDUCTIONS = {v: k for k, v in UNIT_REDUCTIONS.items()} + + +UNIT_SHORTHANDS = { + "Seconds": "s", + "Microseconds": "μs", + "Milliseconds": "ms", + "Bytes": "B", + "Kilobytes": "KB", + "Megabytes": "MB", + "Gigabytes": "GB", + "Terabytes": "TB", + "Bits": "Bit", + "Kilobits": "KBit", + "Megabits": "MBit", + "Gigabits": "GBit", + "Terabits": "TBit", + "Percent": "%", + "Count": "", + "Bytes/Second": "B/s", + "Kilobytes/Second": "KB/s", + "Megabytes/Second": "MB/s", + "Gigabytes/Second": "GB/s", + "Terabytes/Second": "TB/s", + "Bits/Second": "Bit/s", + "Kilobits/Second": "KBit/s", + "Megabits/Second": "MBit/s", + "Gigabits/Second": "GBit/s", + "Terabits/Second": "TBit/s", + "Count/Second": "Hz", + "None": "", +} + + +def reduce_value(value, unit): + """ + Utility function for expressing a value in the largest possible unit in which it would still be >= 1 + + For example, `reduce_value(1_000_000, Bytes)` would return (1, Megabytes) + """ + # Could do this recursively, but I am worried about infinite recursion + # due to precision problems (e.g. infinite loop of dividing/multiplying by 1000, alternating + # between values < 1 and >= 1000). + while abs(value) < 1 and unit in INV_UNIT_REDUCTIONS: + value *= 1000 + unit = INV_UNIT_REDUCTIONS[unit] + while abs(value) >= 1000 and unit in UNIT_REDUCTIONS: + value /= 1000 + unit = UNIT_REDUCTIONS[unit] + + return value, unit + + +def format_with_reduced_unit(value, unit): + """ + Utility function for pretty printing a given value by choosing a unit as large as possible, + and then outputting its shorthand. + + For example, `format_with_reduced_unit(1_000_000, Bytes)` would return "1MB". + """ + reduced_value, reduced_unit = reduce_value(value, unit) + formatted_unit = UNIT_SHORTHANDS.get(reduced_unit, reduced_unit) -# Hack to be able to use our test framework code -sys.path.append(str(Path(__file__).parent.parent / "tests")) + return f"{reduced_value:.2f}{formatted_unit}" -# pylint:disable=wrong-import-position -from framework.ab_test import binary_ab_test, check_regression -from framework.properties import global_props -from host_tools.metrics import ( - emit_raw_emf, - format_with_reduced_unit, - get_metrics_logger, -) # Performance tests that are known to be unstable and exhibit variances of up to 60% of the mean IGNORED = [ @@ -69,106 +142,37 @@ def is_ignored(dimensions) -> bool: return False -def extract_dimensions(emf): - """Extracts the cloudwatch dimensions from an EMF log message""" - if not emf["_aws"]["CloudWatchMetrics"][0]["Dimensions"]: - # Skipped tests emit a duration metric, but have no dimensions set - return {} - - dimension_list = [ - dim - for dimensions in emf["_aws"]["CloudWatchMetrics"][0]["Dimensions"] - for dim in dimensions - ] - return {key: emf[key] for key in emf if key in dimension_list} - - -def process_log_entry(emf: dict): - """Parses the given EMF log entry - - Returns the entries dimensions and its list-valued properties/metrics, together with their units - """ - result = { - key: (value, find_unit(emf, key)) - for key, value in emf.items() - if ( - "fc_metrics" not in key - and "cpu_utilization" not in key - and isinstance(value, list) - ) - } - # Since we don't consider metrics having fc_metrics in key - # result could be empty so, return empty dimensions as well - if not result: - return {}, {} - - return extract_dimensions(emf), result - - -def find_unit(emf: dict, metric: str): - """Determines the unit of the given metric""" - metrics = { - y["Name"]: y["Unit"] for y in emf["_aws"]["CloudWatchMetrics"][0]["Metrics"] - } - return metrics.get(metric, "None") - - -def load_data_series(report_path: Path, tag=None, *, reemit: bool = False): - """Loads the data series relevant for A/B-testing from test_results/test-report.json - into a dictionary mapping each message's cloudwatch dimensions to a dictionary of - its list-valued properties/metrics. - - If `reemit` is True, it also reemits all EMF logs to a local EMF agent, - overwriting the attached "git_commit_id" field with the given revision.""" - # Dictionary mapping EMF dimensions to A/B-testable metrics/properties - processed_emf = {} - - report = json.loads(report_path.read_text("UTF-8")) - for test in report["tests"]: - for line in test["teardown"]["stdout"].splitlines(): - # Only look at EMF log messages. If we ever have other stdout that starts with braces, - # we will need to rethink this heuristic. - if line.startswith("{"): - emf = json.loads(line) - - if reemit: - assert tag is not None +def load_data_series(data_path: Path): + """Recursively collects `metrics.json` files in provided path""" + data = {} + for name in glob.glob(f"{data_path}/**/metrics.json", recursive=True): + with open(name, encoding="utf-8") as f: + j = json.load(f) - emf["git_commit_id"] = str(tag) - emit_raw_emf(emf) + metrics = j["metrics"] + dimentions = frozenset(j["dimensions"].items()) - dimensions, result = process_log_entry(emf) + data[dimentions] = {} + for m in metrics: + # Ignore certain metrics as we know them to be volatile + if "cpu_utilization" in m: + continue + mm = metrics[m] + unit = mm["unit"] + values = mm["values"] + data[dimentions][m] = (values, unit) - if not dimensions: - continue + return data - dimension_set = frozenset(dimensions.items()) - if dimension_set not in processed_emf: - processed_emf[dimension_set] = result - else: - # If there are many data points for a metric, they will be split across - # multiple EMF log messages. We need to reassemble :( - assert ( - processed_emf[dimension_set].keys() == result.keys() - ), f"Found incompatible metrics associated with dimension set {dimension_set}: {processed_emf[dimension_set].keys()} in one EMF message, but {result.keys()} in another." - - for metric, (values, unit) in processed_emf[dimension_set].items(): - assert result[metric][1] == unit - - values.extend(result[metric][0]) - - return processed_emf - - -def uninteresting_dimensions(processed_emf): +def uninteresting_dimensions(data): """ - Computes the set of cloudwatch dimensions that only ever take on a + Computes the set of dimensions that only ever take on a single value across the entire dataset. """ values_per_dimension = defaultdict(set) - for dimension_set in processed_emf: + for dimension_set in data: for dimension, value in dimension_set: values_per_dimension[dimension].add(value) @@ -189,7 +193,8 @@ def collect_data(tag: str, binary_dir: Path, pytest_opts: str): binary_dir = binary_dir.resolve() print(f"Collecting samples with {binary_dir}") - test_report_path = f"test_results/{tag}/test-report.json" + test_path = f"test_results/{tag}" + test_report_path = f"{test_path}/test-report.json" subprocess.run( f"./tools/test.sh --binary-dir={binary_dir} {pytest_opts} -m '' --json-report-file=../{test_report_path}", env=os.environ @@ -201,12 +206,38 @@ def collect_data(tag: str, binary_dir: Path, pytest_opts: str): shell=True, ) - return load_data_series(Path(test_report_path), binary_dir, reemit=True) + return load_data_series(Path(test_path)) + + +def check_regression( + a_samples: List[float], b_samples: List[float], *, n_resamples: int = 9999 +): + """Checks for a regression by performing a permutation test. A permutation test is a non-parametric test that takes + three parameters: Two populations (sets of samples) and a function computing a "statistic" based on two populations. + First, the test computes the statistic for the initial populations. It then randomly + permutes the two populations (e.g. merges them and then randomly splits them again). For each such permuted + population, the statistic is computed. Then, all the statistics are sorted, and the percentile of the statistic for the + initial populations is computed. We then look at the fraction of statistics that are larger/smaller than that of the + initial populations. The minimum of these two fractions will then become the p-value. + + The idea is that if the two populations are indeed drawn from the same distribution (e.g. if performance did not + change), then permuting will not affect the statistic (indeed, it should be approximately normal-distributed, and + the statistic for the initial populations will be somewhere "in the middle"). + + Useful for performance tests. + """ + return scipy.stats.permutation_test( + (a_samples, b_samples), + # Compute the difference of means, such that a positive different indicates potential for regression. + lambda x, y: statistics.mean(y) - statistics.mean(x), + vectorized=False, + n_resamples=n_resamples, + ) def analyze_data( - processed_emf_a, - processed_emf_b, + data_a, + data_b, p_thresh, strength_abs_thresh, noise_threshold, @@ -219,20 +250,15 @@ def analyze_data( Returns a mapping of dimensions and properties/metrics to the result of their regression test. """ - assert set(processed_emf_a.keys()) == set( - processed_emf_b.keys() + assert set(data_a.keys()) == set( + data_b.keys() ), "A and B run produced incomparable data. This is a bug in the test!" results = {} - metrics_logger = get_metrics_logger() - - for prop_name, prop_val in global_props.__dict__.items(): - metrics_logger.set_property(prop_name, prop_val) - - for dimension_set in processed_emf_a: - metrics_a = processed_emf_a[dimension_set] - metrics_b = processed_emf_b[dimension_set] + for dimension_set in data_a: + metrics_a = data_a[dimension_set] + metrics_b = data_b[dimension_set] assert set(metrics_a.keys()) == set( metrics_b.keys() @@ -242,14 +268,6 @@ def analyze_data( result = check_regression( values_a, metrics_b[metric][0], n_resamples=n_resamples ) - - metrics_logger.set_dimensions({"metric": metric, **dict(dimension_set)}) - metrics_logger.put_metric("p_value", float(result.pvalue), "None") - metrics_logger.put_metric("mean_difference", float(result.statistic), unit) - metrics_logger.set_property("data_a", values_a) - metrics_logger.set_property("data_b", metrics_b[metric][0]) - metrics_logger.flush() - results[dimension_set, metric] = (result, unit) # We sort our A/B-Testing results keyed by metric here. The resulting lists of values @@ -296,7 +314,7 @@ def analyze_data( print(f"Doing A/B-test for dimensions {dimension_set} and property {metric}") - values_a = processed_emf_a[dimension_set][metric][0] + values_a = data_a[dimension_set][metric][0] baseline_mean = statistics.mean(values_a) relative_changes_by_metric[metric].append(result.statistic / baseline_mean) @@ -309,7 +327,7 @@ def analyze_data( ) messages = [] - do_not_print_list = uninteresting_dimensions(processed_emf_a) + do_not_print_list = uninteresting_dimensions(data_a) for dimension_set, metric, result, unit in failures: # Sanity check as described above if abs(statistics.mean(relative_changes_by_metric[metric])) <= noise_threshold: @@ -321,8 +339,8 @@ def analyze_data( # The significant data points themselves are above the noise threshold if abs(statistics.mean(relative_changes_significant[metric])) > noise_threshold: - old_mean = statistics.mean(processed_emf_a[dimension_set][metric][0]) - new_mean = statistics.mean(processed_emf_b[dimension_set][metric][0]) + old_mean = statistics.mean(data_a[dimension_set][metric][0]) + new_mean = statistics.mean(data_b[dimension_set][metric][0]) msg = ( f"\033[0;32m[Firecracker A/B-Test Runner]\033[0m A/B-testing shows a change of " @@ -339,6 +357,27 @@ def analyze_data( print("No regressions detected!") +T = TypeVar("T") +U = TypeVar("U") + + +def binary_ab_test( + test_runner: Callable[[Path, bool], T], + comparator: Callable[[T, T], U], + *, + a_directory: Path, + b_directory: Path, +): + """ + Similar to `git_ab_test`, but instead of locally checking out different revisions, it operates on + directories containing firecracker/jailer binaries + """ + result_a = test_runner(a_directory, True) + result_b = test_runner(b_directory, False) + + return result_a, result_b, comparator(result_a, result_b) + + def ab_performance_test( a_revision: Path, b_revision: Path, @@ -393,13 +432,13 @@ def ab_performance_test( help="Analyze the results of two manually ran tests based on their test-report.json files", ) analyze_parser.add_argument( - "report_a", - help="The path to the test-report.json file of the baseline run", + "path_a", + help="The path to the directlry with A run", type=Path, ) analyze_parser.add_argument( - "report_b", - help="The path to the test-report.json file of the run whose performance we want to compare against report_a", + "path_b", + help="The path to the directlry with B run", type=Path, ) parser.add_argument( @@ -432,8 +471,8 @@ def ab_performance_test( args.noise_threshold, ) else: - data_a = load_data_series(args.report_a) - data_b = load_data_series(args.report_b) + data_a = load_data_series(args.path_a) + data_b = load_data_series(args.path_b) analyze_data( data_a,