diff --git a/src/tests/ftest/pool/verify_dtx.py b/src/tests/ftest/pool/verify_dtx.py new file mode 100644 index 00000000000..fd039adec76 --- /dev/null +++ b/src/tests/ftest/pool/verify_dtx.py @@ -0,0 +1,149 @@ +""" + (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP + + SPDX-License-Identifier: BSD-2-Clause-Patent +""" +import json +import math + +from job_manager_utils import get_job_manager +from mdtest_utils import MDTEST_NAMESPACE, run_mdtest +from telemetry_test_base import TestWithTelemetry + + +class VerifyDTX(TestWithTelemetry): + """ + Ensures DTX is involved with MD on SSD phase 2 pool. + + :avocado: recursive + """ + + def test_verify_dtx(self): + """Ensure DTX is involved with MD on SSD phase 2 pool. + + 1. Create a pool with a mem ratio of 100% (for pmem or phase 1) or 25% (for phase 2) + 2. Collect a baseline for the DTX metrics + 3. Run mdtest -a DFS to write data with different object classes + 4. Collect new DTX metrics + 5. Verify DTX metrics + + :avocado: tags=all,daily_regression + :avocado: tags=hw,large + :avocado: tags=pool + :avocado: tags=VerifyDTX,test_verify_dtx + """ + write_bytes = self.params.get('write_bytes', MDTEST_NAMESPACE, None) + processes = self.params.get('processes', MDTEST_NAMESPACE, None) + ppn = self.params.get('ppn', MDTEST_NAMESPACE, None) + object_classes = self.params.get('object_classes', '/run/*') + + dtx_metrics = list(self.telemetry.ENGINE_POOL_VOS_CACHE_METRICS[:1]) + dtx_metrics += list(self.telemetry.ENGINE_IO_DTX_COMMITTED_METRICS) + + self.log_step('Creating a pool (dmg pool create)') + pool = self.get_pool(connect=False) + try: + _result = json.loads(pool.dmg.result.stdout) + tier_bytes_scm = int(_result['response']['tier_bytes'][0]) + mem_file_bytes = int(_result['response']['mem_file_bytes']) + except Exception as error: # pylint: disable=broad-except + self.fail(f'Error extracting data for dmg pool create output: {error}') + + # Calculate the mdtest files_per_process based upon the scm size and other mdtest params + _write_procs = processes + _mdtest_cmds = len(object_classes) + if ppn is not None: + _write_procs = ppn * len(self.host_info.clients.hosts) + files_per_process = math.floor(mem_file_bytes / (write_bytes * _write_procs * _mdtest_cmds)) + if tier_bytes_scm > mem_file_bytes: + # Write more (125%) files to exceed mem_file_bytes and cause eviction + num_of_files_dirs = math.ceil(files_per_process * 1.25) + else: + # Write less (75%) files to avoid out of space errors + num_of_files_dirs = math.floor(files_per_process * 0.75) + + self.log.debug("-" * 60) + self.log.debug("Pool %s create data:", pool) + self.log.debug(" tier_bytes_scm: %s", tier_bytes_scm) + self.log.debug(" mem_file_bytes: %s", mem_file_bytes) + self.log.debug(" mem_ratio.value: %s", pool.mem_ratio.value) + self.log.debug("Mdtest write parameters:") + self.log.debug(" write_bytes per mdtest: %s", write_bytes) + if ppn is not None: + self.log.debug(" ppn / nodes: %s / %s", + ppn, len(self.host_info.clients.hosts)) + else: + self.log.debug(" processes: %s", processes) + self.log.debug(" files_per_process per mtest: %s", files_per_process) + self.log.debug(" Number of mdtest commands: %s", _mdtest_cmds) + self.log.debug(" num_of_files_dirs per mdtest: %s", num_of_files_dirs) + self.log.debug(" total expected to write: %s", + _mdtest_cmds * _write_procs * write_bytes * num_of_files_dirs) + self.log.debug("-" * 60) + + self.log_step('Collect DTX metrics after creating a pool (dmg telemetry metrics query)') + expected_ranges = self.telemetry.collect_data(dtx_metrics) + for metric in sorted(expected_ranges): + for label in expected_ranges[metric]: + expected_ranges[metric][label] = [0, 0] # 0 only + if pool.mem_ratio.value is not None: + if metric.endswith('_dtx_committed_max'): + expected_ranges[metric][label] = [0] # 0 or greater (phase 2) + elif metric.endswith('_dtx_committed_mean'): + expected_ranges[metric][label] = [0] # 0 or greater (phase 2) + elif metric.endswith('_dtx_committed_samples'): + expected_ranges[metric][label] = [0] # 0 or greater (phase 2) + elif metric.endswith('_dtx_committed_stddev'): + expected_ranges[metric][label] = [0] # 0 or greater (phase 2) + elif metric.endswith('_dtx_committed_sum'): + expected_ranges[metric][label] = [0] # 0 or greater (phase 2) + elif metric.endswith('_dtx_committed_sumsquares'): + expected_ranges[metric][label] = [0] # 0 or greater (phase 2) + self.log.debug('%s expected_ranges: %s', pool, expected_ranges) + + self.log_step('Verify DTX metrics after pool creation') + if not self.telemetry.verify_data(expected_ranges): + self.fail('DTX metrics verification failed after pool creation') + + manager = get_job_manager(self, subprocess=False, timeout=None) + processes = self.params.get('processes', MDTEST_NAMESPACE, None) + ppn = self.params.get('ppn', MDTEST_NAMESPACE, None) + for oclass in object_classes: + self.log_step(f'Write data into a containers with the {oclass} object classes (mdtest)') + container = self.get_container(pool, oclass=oclass, dir_oclass=oclass) + run_mdtest( + self, self.hostlist_clients, self.workdir, None, container, processes, ppn, manager, + mdtest_params={'dfs_oclass': oclass, 'dfs_dir_oclass': oclass, + 'num_of_files_dirs': num_of_files_dirs}) + + self.log_step('Collect DTX metrics after writing data (dmg telemetry metrics query)') + expected_ranges = self.telemetry.collect_data(dtx_metrics) + for metric in sorted(expected_ranges): + for label in expected_ranges[metric]: + if metric.endswith('_dtx_committed'): + expected_ranges[metric][label] = [0] # 0 or greater + elif metric.endswith('_dtx_committed_max'): + expected_ranges[metric][label] = [100] # 100 or greater + elif metric.endswith('_dtx_committed_mean'): + expected_ranges[metric][label] = [50] # 50 or greater + elif metric.endswith('_dtx_committed_min'): + expected_ranges[metric][label] = [0] # 0 or greater + elif metric.endswith('_dtx_committed_sum'): + expected_ranges[metric][label] = [1000] # 1000 or greater + elif metric.endswith('_dtx_committed_sumsquares'): + expected_ranges[metric][label] = [100000] # 100,000 or greater + elif metric.endswith('_vos_cache_page_evict'): + if pool.mem_ratio.value is None: + expected_ranges[metric][label] = [0, 0] # 0 only (phase 1) + else: + expected_ranges[metric][label] = [1] # 1 or greater (phase 2) + else: + # e.g. *_dtx_committed_samples, *_dtx_committed_stddev + expected_ranges[metric][label] = [1] # 1 or greater + self.log.debug('%s expected_ranges: %s', pool, expected_ranges) + + self.log_step('Verify DTX metrics after writing data') + if not self.telemetry.verify_data(expected_ranges): + self.fail('DTX metrics verification failed after writing data') + + self.log_step('Test passed') diff --git a/src/tests/ftest/pool/verify_dtx.yaml b/src/tests/ftest/pool/verify_dtx.yaml new file mode 100644 index 00000000000..9ea91164f40 --- /dev/null +++ b/src/tests/ftest/pool/verify_dtx.yaml @@ -0,0 +1,64 @@ +launch: + !filter-only : /run/pool/default # yamllint disable-line rule:colons + +hosts: + test_servers: 5 + test_clients: 3 + +timeout: 16000 + +server_config: + name: daos_server + engines_per_host: 2 + engines: + 0: + pinned_numa_node: 0 + nr_xs_helpers: 0 + log_file: daos_server0.log + storage: auto + 1: + pinned_numa_node: 1 + nr_xs_helpers: 0 + log_file: daos_server1.log + storage: auto + +pool: !mux + default: + size: 100% + md_on_ssd_p2: + size: 100% + mem_ratio: 25 + +container: + type: POSIX + +mdtest: + dfs_destroy: False + manager: "MPICH" + ppn: 32 + test_dir: "/" + api: DFS + flags: "-C -F -G 27 -N 1 -Y -u -L" + branching_factor: 1 + write_bytes: 1024 + read_bytes: 1024 + +object_classes: + - "EC_2P1G1" + - "EC_2P1G8" + - "EC_2P1GX" + - "EC_2P2G1" + - "EC_2P2G8" + - "EC_2P2GX" + - "EC_4P1G1" + - "EC_4P1G8" + - "EC_4P1GX" + - "EC_4P2G1" + - "EC_4P2G8" + - "EC_4P2GX" + - "EC_4P3G1" + - "EC_4P3G8" + - "EC_4P3GX" + - "EC_8P2G1" + - "EC_8P2G8" + - "EC_8P2GX" diff --git a/src/tests/ftest/util/mdtest_utils.py b/src/tests/ftest/util/mdtest_utils.py index 97e5d75d088..f7d7d6e7513 100644 --- a/src/tests/ftest/util/mdtest_utils.py +++ b/src/tests/ftest/util/mdtest_utils.py @@ -1,5 +1,6 @@ """ (C) Copyright 2019-2024 Intel Corporation. + (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP SPDX-License-Identifier: BSD-2-Clause-Patent """ @@ -8,20 +9,115 @@ import re from command_utils import ExecutableCommand -from command_utils_base import FormattedParameter, LogParameter +from command_utils_base import BasicParameter, FormattedParameter, LogParameter +from exception_utils import CommandFailure from general_utils import get_log_file +from job_manager_utils import get_job_manager + +MDTEST_NAMESPACE = "/run/mdtest/*" + + +def get_mdtest(test, hosts, manager=None, path=None, slots=None, namespace=MDTEST_NAMESPACE, + mdtest_params=None): + """Get a Mdtest object. + + Args: + test (Test): avocado Test object + hosts (NodeSet): hosts on which to run the mdtest command + manager (JobManager, optional): command to manage the multi-host execution of mdtest. + Defaults to None, which will get a default job manager. + path (str, optional): hostfile path. Defaults to None. + slots (int, optional): hostfile number of slots per host. Defaults to None. + namespace (str, optional): path to yaml parameters. Defaults to MDTEST_NAMESPACE. + mdtest_params (dict, optional): parameters to update the mdtest command. Defaults to None. + + Returns: + Mdtest: the Mdtest object requested + """ + mdtest = Mdtest(test, hosts, manager, path, slots, namespace) + if mdtest_params: + for name, value in mdtest_params.items(): + mdtest.update(name, value) + return mdtest + + +def run_mdtest(test, hosts, path, slots, container, processes, ppn=None, manager=None, + log_file=None, intercept=None, display_space=True, namespace=MDTEST_NAMESPACE, + mdtest_params=None): + # pylint: disable=too-many-arguments + """Run Mdtest on multiple hosts. + + Args: + test (Test): avocado Test object + hosts (NodeSet): hosts on which to run the mdtest command + path (str): hostfile path. + slots (int): hostfile number of slots per host. + container (TestContainer): DAOS test container object. + processes (int): number of processes to run + ppn (int, optional): number of processes per node to run. If specified it will override + the processes input. Defaults to None. + manager (JobManager, optional): command to manage the multi-host execution of mdtest. + Defaults to None, which will get a default job manager. + log_file (str, optional): log file name. Defaults to None, which will result in a log file + name containing the test, pool, and container IDs. + intercept (str, optional): path to interception library. Defaults to None. + display_space (bool, optional): Whether to display the pool space. Defaults to True. + namespace (str, optional): path to yaml parameters. Defaults to MDTEST_NAMESPACE. + mdtest_params (dict, optional): dictionary of MdtestCommand attributes to override from + get_params(). Defaults to None. + + Raises: + CommandFailure: if there is an error running the mdtest command + + Returns: + CmdResult: result of the ior command + + """ + mdtest = get_mdtest(test, hosts, manager, path, slots, namespace, mdtest_params) + if log_file is None: + log_file = mdtest.get_unique_log(container) + mdtest.update_log_file(log_file) + return mdtest.run(container, processes, ppn, intercept, display_space) + + +def write_mdtest_data(test, container, namespace=MDTEST_NAMESPACE, **mdtest_run_params): + """Write data to the container/dfuse using mdtest. + + Simple method for test classes to use to write data with mdtest. While not required, this is + setup by default to pull in mdtest parameters from the test yaml. + + Args: + test (Test): avocado Test object + container (TestContainer): the container to populate + namespace (str, optional): path to mdtest yaml parameters. Defaults to MDTEST_NAMESPACE. + mdtest_run_params (dict): optional params for the Mdtest.run() command. + + Returns: + Mdtest: the Mdtest object used to populate the container + """ + mdtest = get_mdtest(test, test.hostlist_clients, None, test.workdir, None, namespace) + mdtest.update_log_file(mdtest.get_unique_log(container)) + + if 'processes' not in mdtest_run_params: + mdtest_run_params['processes'] = test.params.get('processes', namespace, None) + elif 'ppn' not in mdtest_run_params: + mdtest_run_params['ppn'] = test.params.get('ppn', namespace, None) + + mdtest.run(container, **mdtest_run_params) + return mdtest class MdtestCommand(ExecutableCommand): """Defines a object representing a mdtest command.""" - def __init__(self, log_dir): + def __init__(self, log_dir, namespace="/run/mdtest/*"): """Create an MdtestCommand object. Args: log_dir (str): directory in which to put log files + namespace (str, optional): path to yaml parameters. Defaults to "/run/mdtest/*". """ - super().__init__("/run/mdtest/*", "mdtest") + super().__init__(namespace, "mdtest") self._log_dir = log_dir @@ -137,6 +233,147 @@ def get_default_env(self, manager_cmd, log_file=None): return env +class Mdtest: + """Defines a class that runs the mdtest command through a job manager, e.g. mpirun.""" + + def __init__(self, test, hosts, manager=None, path=None, slots=None, + namespace=MDTEST_NAMESPACE): + """Initialize an Mdtest object. + + Args: + test (Test): avocado Test object + hosts (NodeSet): hosts on which to run the mdtest command + manager (JobManager, optional): command to manage the multi-host execution of mdtest. + Defaults to None, which will get a default job manager. + path (str, optional): hostfile path. Defaults to None. + slots (int, optional): hostfile number of slots per host. Defaults to None. + namespace (str, optional): path to yaml parameters. Defaults to MDTEST_NAMESPACE. + """ + if manager is None: + manager = get_job_manager(test, subprocess=False, timeout=60) + self.manager = manager + self.manager.assign_hosts(hosts, path, slots) + self.manager.job = MdtestCommand(test.test_env.log_dir, namespace) + self.manager.job.get_params(test) + self.manager.output_check = "both" + self.timeout = test.params.get("timeout", namespace, None) + self.label_generator = test.label_generator + self.test_id = test.test_id + self.env = self.command.get_default_env(str(self.manager)) + + @property + def command(self): + """Get the MdtestCommand object. + + Returns: + MdtestCommand: the MdtestCommand object managed by the JobManager + + """ + return self.manager.job + + def update(self, name, value): + """Update a MdtestCommand BasicParameter with a new value. + + Args: + name (str): name of the MdtestCommand BasicParameter to update + value (str): value to assign to the MdtestCommand BasicParameter + """ + param = getattr(self.command, name, None) + if param: + if isinstance(param, BasicParameter): + param.update(value, ".".join([self.command.command, name])) + + def update_log_file(self, log_file): + """Update the log file for the mdtest command. + + Args: + log_file (str): new mdtest log file + """ + self.command.env["D_LOG_FILE"] = get_log_file( + log_file or f"{self.command.command}_daos.log") + + def get_unique_log(self, container): + """Get a unique mdtest log file name. + + Args: + container (TestContainer): container involved with the command + + Returns: + str: a log file name + """ + label = self.label_generator.get_label("mdtest") + parts = [self.test_id, container.pool.identifier, container.identifier, label] + return '.'.join(['_'.join(parts), 'log']) + + def update_daos_params(self, pool, container): + """Set the mdtest parameters for the pool and container. + + Optionally also set the DAOS pool and container environment variables for mdtest. + + Args: + pool (TestPool): the pool to use with the mdtest command + container (TestContainer): the container to use with the mdtest command + """ + self.command.update_params(dfs_pool=pool.identifier, dfs_cont=container.identifier) + + if "mpirun" in str(self.manager) or "srun" in str(self.manager): + self.env["DAOS_POOL"] = self.command.dfs_pool.value + self.env["DAOS_CONT"] = self.command.dfs_cont.value + self.env["IOR_HINT__MPI__romio_daos_obj_class"] = self.command.dfs_oclass.value + + def run(self, container, processes, ppn=None, intercept=None, display_space=True): + """Run mdtest. + + Args: + container (TestContainer): DAOS test container object. + processes (int): number of processes to run + ppn (int, optional): number of processes per node to run. If specified it will override + the processes input. Defaults to None. + intercept (str, optional): path to interception library. Defaults to None. + display_space (bool, optional): Whether to display the pool space. Defaults to True. + + Raises: + CommandFailure: if there is an error running the mdtest command + + Returns: + CmdResult: result of the mdtest command + """ + result = None + error_message = None + + self.update_daos_params(container.pool, container) + + if intercept: + self.env["LD_PRELOAD"] = intercept + if "D_LOG_MASK" not in self.env: + self.env["D_LOG_MASK"] = "INFO" + + # Pass only processes or ppn to be compatible with previous behavior + if ppn is not None: + self.manager.assign_processes(ppn=ppn) + else: + self.manager.assign_processes(processes=processes) + + self.manager.assign_environment(self.env) + + try: + if display_space: + container.pool.display_space() + result = self.manager.run() + + except CommandFailure as error: + error_message = "Mdtest Failed:\n {}".format("\n ".join(str(error).split("\n"))) + + finally: + if not self.manager.run_as_subprocess and display_space: + container.pool.display_space() + + if error_message: + raise CommandFailure(error_message) + + return result + + class MdtestMetrics(): # pylint: disable=too-few-public-methods """Represents metrics from mdtest output. diff --git a/src/tests/ftest/util/telemetry_utils.py b/src/tests/ftest/util/telemetry_utils.py index 8937db87788..dd84dac2fa5 100644 --- a/src/tests/ftest/util/telemetry_utils.py +++ b/src/tests/ftest/util/telemetry_utils.py @@ -1,6 +1,6 @@ """ (C) Copyright 2021-2024 Intel Corporation. -(C) Copyright 2025 Hewlett Packard Enterprise Development LP +(C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP (C) Copyright 2025 Google LLC SPDX-License-Identifier: BSD-2-Clause-Patent @@ -159,6 +159,12 @@ class TelemetryUtils(): "engine_pool_vos_wal_replay_size", "engine_pool_vos_wal_replay_time", "engine_pool_vos_wal_replay_transactions"] + ENGINE_POOL_VOS_CACHE_METRICS = [ + "engine_pool_vos_cache_page_evict", + "engine_pool_vos_cache_page_flush", + "engine_pool_vos_cache_page_hit", + "engine_pool_vos_cache_page_miss", + "engine_pool_vos_cache_page_ne"] ENGINE_POOL_SVC_METRICS = [ "engine_pool_svc_degraded_ranks", "engine_pool_svc_disabled_targets", @@ -179,6 +185,7 @@ class TelemetryUtils(): ENGINE_POOL_VOS_SPACE_METRICS + \ ENGINE_POOL_VOS_WAL_METRICS + \ ENGINE_POOL_VOS_WAL_REPLAY_METRICS +\ + ENGINE_POOL_VOS_CACHE_METRICS +\ ENGINE_POOL_SVC_METRICS ENGINE_EVENT_METRICS = [ "engine_events_dead_ranks", diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 428051203ed..c3ef2867ebb 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -263,7 +263,7 @@ struct vos_cache_metrics { struct d_tm_node_t *vcm_obj_hit; }; -void vos_cache_metrics_init(struct vos_cache_metrics *vc_metrcis, const char *path, int tgt_id); +void vos_cache_metrics_init(struct vos_cache_metrics *vc_metrics, const char *path, int tgt_id); struct vos_pool_metrics { void *vp_vea_metrics;