Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions tests/integration-tests/clusters_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import boto3
import yaml
from assertpy import assert_that
from framework.credential_providers import run_pcluster_command
from remote_command_executor import RemoteCommandExecutor
from retrying import retry
from time_utils import minutes, seconds
from utils import (
Expand All @@ -34,6 +36,15 @@
retry_if_subprocess_error,
)

from tests.common.utils import read_remote_file

TAG_CLUSTER_NAME = "parallelcluster:cluster-name"
TAG_NODE_TYPE = "parallelcluster:node-type"
TAG_QUEUE_NAME = "parallelcluster:queue-name"
TAG_COMPUTE_RESOURCE_NAME = "parallelcluster:compute-resource-name"

LAUNCH_TEMPLATES_CONFIG_FILE = "/opt/parallelcluster/shared/launch-templates-config.json"


def suppress_and_log_exception(func):
@functools.wraps(func)
Expand Down Expand Up @@ -253,6 +264,63 @@ def describe_cluster_instances(self, node_type=None, queue_name=None):
logging.error("Failed when getting cluster instances with error:\n%s\nand output:\n%s", e.stderr, e.stdout)
raise

@retry(wait_fixed=seconds(5), stop_max_delay=minutes(1))
def get_compute_nodes(
self,
queue_name: str = None,
compute_resource_name: str = None,
state: list = None,
expected_num_nodes: int = None,
):
"""Return the EC2 instance details for compute nodes matching the provided criteria."""
state = ["running"] if state is None else state
ec2 = boto3.client("ec2", region_name=self.region)
filters = [
{"Name": f"tag:{TAG_CLUSTER_NAME}", "Values": [self.cfn_name]},
{"Name": f"tag:{TAG_NODE_TYPE}", "Values": ["Compute"]},
{"Name": "instance-state-name", "Values": state},
]

if queue_name:
filters.append({"Name": f"tag:{TAG_QUEUE_NAME}", "Values": [queue_name]})
if compute_resource_name:
filters.append({"Name": f"tag:{TAG_COMPUTE_RESOURCE_NAME}", "Values": [compute_resource_name]})

instances = []
for reservation in ec2.describe_instances(Filters=filters).get("Reservations"):
instances.extend(reservation.get("Instances", []))

if expected_num_nodes:
assert_that(instances).is_length(expected_num_nodes)

return instances

def get_compute_nodes_private_ip(
self,
queue_name: str = None,
compute_resource_name: str = None,
state: list = None,
expected_num_nodes: int = None,
):
"""Return the private IP address of compute nodes matching the provided criteria."""
instances = self.get_compute_nodes(queue_name, compute_resource_name, state, expected_num_nodes)
return [i.get("PrivateIpAddress") for i in instances]

def get_compute_nodes_launch_template_logical_id(self, queue_name: str, compute_resource_name: str):
"""Return the launch template logical id of compute nodes matching the provided criteria."""
launch_templates_config = json.loads(
read_remote_file(RemoteCommandExecutor(self), LAUNCH_TEMPLATES_CONFIG_FILE)
)
logging.info(f"Read launch template config from {LAUNCH_TEMPLATES_CONFIG_FILE}: {launch_templates_config}")
return (
launch_templates_config.get("Queues", {})
.get(queue_name, {})
.get("ComputeResources", {})
.get(compute_resource_name, {})
.get("LaunchTemplate", {})
.get("LogicalId")
)

def get_cluster_instance_ids(self, node_type=None, queue_name=None):
"""Run pcluster describe-cluster-instances and collect instance ids."""
instances = self.describe_cluster_instances(node_type=node_type, queue_name=queue_name)
Expand Down
7 changes: 7 additions & 0 deletions tests/integration-tests/configs/develop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,13 @@ test-suites:
instances: [{{ common.instance("instance_type_1") }}]
oss: [{{ OS_X86_6 }}]
schedulers: [ "slurm" ]
ultraserver:
test_gb200.py::test_gb200:
dimensions:
- regions: [ "us-east-1" ]
instances: [ "g4dn.2xlarge" ]
oss: [ "alinux2023" ]
schedulers: [ "slurm" ]
health_checks:
test_gpu_health_checks.py::test_cluster_with_gpu_health_checks:
dimensions:
Expand Down
26 changes: 26 additions & 0 deletions tests/integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,32 @@ def test_datadir(request, datadir):
return datadir / "{0}/{1}".format(class_name, function_name)


@pytest.fixture()
def file_reader(test_datadir, request, vpc_stack):
"""
Define a fixture to render file templates associated to the running test.

The template file for a given test is a generic file stored in the configs_datadir folder.
The template can be written by using Jinja2 template engine.

:return: a _file_renderer(**kwargs) function which gets as input a dictionary of values to replace in the template
"""

def _file_renderer(input_file: str = "script.sh", output_file: str = "script_rendered.sh", **kwargs):
input_file_path = test_datadir / input_file
if not os.path.isfile(input_file_path):
raise FileNotFoundError(f"Input file not found in the expected dir {input_file_path}")
output_file_path = test_datadir / output_file if output_file else input_file_path
default_values = _get_default_template_values(vpc_stack, request)
file_loader = FileSystemLoader(str(test_datadir))
env = SandboxedEnvironment(loader=file_loader)
rendered_template = env.get_template(input_file).render(**{**default_values, **kwargs})
output_file_path.write_text(rendered_template)
return output_file_path

return _file_renderer


@pytest.fixture()
def pcluster_config_reader(test_datadir, vpc_stack, request, region, instance, architecture):
"""
Expand Down
24 changes: 23 additions & 1 deletion tests/integration-tests/tests/common/assertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
import time
from typing import List, Union

import boto3
import pytest
from assertpy import assert_that, soft_assertions
from clusters_factory import Cluster
from constants import NodeType
from remote_command_executor import RemoteCommandExecutor
from retrying import RetryError, retry
Expand All @@ -28,7 +30,7 @@
)

from tests.common.scaling_common import get_compute_nodes_allocation
from tests.common.utils import get_ddb_item
from tests.common.utils import get_ddb_item, read_remote_file


@retry(wait_fixed=seconds(20), stop_max_delay=minutes(6))
Expand Down Expand Up @@ -199,6 +201,19 @@ def wait_for_num_instances_in_queue(cluster_name, region, desired, queue):
return assert_num_instances_in_queue(cluster_name, region, desired, queue)


@retry(wait_fixed=seconds(20), stop_max_delay=minutes(10))
def wait_for_instances_in_compute_resource(
cluster: Cluster, queue: str, compute_resource: str, state: list, desired: int
):
instances = cluster.get_compute_nodes(queue, compute_resource, state)
assert_that(instances).is_length(desired)
logging.info(
f"Cluster {cluster.name} has {desired} compute nodes "
f"in queue {queue} and compute resource {compute_resource}: {instances}"
)
return instances


def assert_num_instances_in_queue(cluster_name, region, desired, queue):
instances = get_cluster_nodes_instance_ids(cluster_name, region, node_type="Compute", queue_name=queue)
assert_that(instances).is_length(desired)
Expand Down Expand Up @@ -422,3 +437,10 @@ def _assert_build_image_stack_deleted(stack_name, region, timeout_seconds=600, p
time.sleep(poll_interval)

pytest.fail(f"Timed-out waiting for stack {stack_name} deletion (last status: {last_status})")


def assert_regex_in_file(cluster: Cluster, compute_node_ip: str, file_name: str, pattern: str, negate: bool = True):
rce = RemoteCommandExecutor(cluster, compute_node_ip)
file_content = read_remote_file(rce, file_name)
assertion = assert_that(bool(re.search(pattern, file_content, re.IGNORECASE)))
assertion.is_false() if negate else assertion.is_fals()
18 changes: 18 additions & 0 deletions tests/integration-tests/tests/common/schedulers_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,24 @@ def get_unique_static_nodes(self):
logging.info("All running nodes: %s", result.stdout)
return result.stdout.splitlines()

def get_nodename_from_ip(self, ip: str):
"""Get the nodename from IP address"""
command = (
f"scontrol show nodes --json | "
f'jq -r --arg ip "{ip}" \'.nodes[] | '
f"select(.address == $ip) | .hostname'"
) # noqa: W605
result = self._remote_command_executor.run_remote_command(command)
logging.info(f"Nodename for {ip} is: {result.stdout}")
return result.stdout

def get_batch_host_for_job(self, job_id: str):
"""Get the node list for a given job."""
command = f"scontrol show jobs {job_id} --json | jq -r '.jobs[].batch_host'" # noqa: W605
result = self._remote_command_executor.run_remote_command(command)
logging.info(f"Nodename for {job_id} is: {result.stdout}")
return result.stdout

@retry(retry_on_result=lambda result: "drain" not in result, wait_fixed=seconds(3), stop_max_delay=minutes(5))
def wait_for_locked_node(self): # noqa: D102
return self._remote_command_executor.run_remote_command("sinfo -h -o '%t'").stdout
Expand Down
16 changes: 16 additions & 0 deletions tests/integration-tests/tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ def run_system_analyzer(cluster, scheduler_commands_factory, request, partition=
logging.info("Compute node system information correctly retrieved.")


def is_existing_remote_file(rce: RemoteCommandExecutor, file_path: str):
"""Return true if the file exists, false otherwise"""
logging.info(f"Checking if remote file exists {file_path}")
result = rce.run_remote_command(f"cat {file_path}", raise_on_error=False)
return not result.failed


@retry(stop_max_attempt_number=5, wait_fixed=seconds(3))
def read_remote_file(remote_command_executor, file_path):
"""Reads the content of a remote file."""
Expand Down Expand Up @@ -536,3 +543,12 @@ def write_file(dirname, filename, content):
f.write(content)
logging.info(f"File written: {filepath}")
return filepath


def terminate_nodes_manually(instance_ids, region):
ec2_client = boto3.client("ec2", region_name=region)
for instance_id in instance_ids:
instance_states = ec2_client.terminate_instances(InstanceIds=[instance_id]).get("TerminatingInstances")[0]
assert_that(instance_states.get("InstanceId")).is_equal_to(instance_id)
assert_that(instance_states.get("CurrentState").get("Name")).is_in("shutting-down", "terminated")
logging.info("Terminated nodes: {}".format(instance_ids))
Loading
Loading