diff --git a/dlc_developer_config.toml b/dlc_developer_config.toml index 1962bfd69e21..9fb6f973b22d 100644 --- a/dlc_developer_config.toml +++ b/dlc_developer_config.toml @@ -37,16 +37,16 @@ deep_canary_mode = false [build] # Add in frameworks you would like to build. By default, builds are disabled unless you specify building an image. # available frameworks - ["base", "vllm", "autogluon", "huggingface_tensorflow", "huggingface_pytorch", "huggingface_tensorflow_trcomp", "huggingface_pytorch_trcomp", "pytorch_trcomp", "tensorflow", "pytorch", "stabilityai_pytorch"] -build_frameworks = [] +build_frameworks = ["vllm"] # By default we build both training and inference containers. Set true/false values to determine which to build. -build_training = true -build_inference = true +build_training = false +build_inference = false # Set do_build to "false" to skip builds and test the latest image built by this PR # Note: at least one build is required to set do_build to "false" -do_build = true +do_build = false [notify] ### Notify on test failures diff --git a/test/dlc_tests/container_tests/bin/efa/testEFA b/test/dlc_tests/container_tests/bin/efa/testEFA index 420cd711dc18..52f5664625d8 100755 --- a/test/dlc_tests/container_tests/bin/efa/testEFA +++ b/test/dlc_tests/container_tests/bin/efa/testEFA @@ -36,7 +36,7 @@ validate_all_reduce_performance_logs(){ # EFA 1.37.0 using "Using network Libfabric" instead of "Using network AWS Libfabric" grep -E "Using network (AWS )?Libfabric" ${TRAINING_LOG} || { echo "efa is not working, please check if it is installed correctly"; exit 1; } if [[ ${INSTANCE_TYPE} == p4d* || ${INSTANCE_TYPE} == p5* ]]; then - grep "Setting NCCL_TOPO_FILE environment variable to" ${TRAINING_LOG} + grep "NCCL_TOPO_FILE set by environment to" ${TRAINING_LOG} # EFA 1.37.0 change from NET/AWS Libfabric/0/GDRDMA to NET/Libfabric/0/GDRDMA grep -E "NET/(AWS )?Libfabric/0/GDRDMA" ${TRAINING_LOG} fi @@ -89,7 +89,7 @@ check_efa_nccl_all_reduce(){ RETURN_VAL=${PIPESTATUS[0]} # In case, if you would like see logs, uncomment below line - # RESULT=$(cat ${TRAINING_LOG}) + RESULT=$(cat ${TRAINING_LOG}) if [ ${RETURN_VAL} -eq 0 ]; then echo "***************************** check_efa_nccl_all_reduce passed *****************************" diff --git a/test/dlc_tests/ec2/test_efa.py b/test/dlc_tests/ec2/test_efa.py index 9543d783f21c..8398cd1a9204 100644 --- a/test/dlc_tests/ec2/test_efa.py +++ b/test/dlc_tests/ec2/test_efa.py @@ -294,10 +294,16 @@ def _setup_container(connection, docker_image, container_name): # using SSH on a pre-defined port (as decided by sshd_config on server-side). # Allow instance to share all memory with container using memlock=-1:-1. # Share all EFA devices with container using --device for all EFA devices. - connection.run( - f"docker run --runtime=nvidia --gpus all -id --name {container_name} --network host --ulimit memlock=-1:-1 " - f"{docker_all_devices_arg} -v $HOME/container_tests:/test -v /dev/shm:/dev/shm {docker_image} bash" - ) + if "vllm" in docker_image: + connection.run( + f"docker run --entrypoint=/bin/bash -e CUDA_HOME=/usr/local/cuda --runtime=nvidia --gpus all -id --name {container_name} --network host --ulimit memlock=-1:-1 " + f"{docker_all_devices_arg} -v $HOME/container_tests:/test -v /dev/shm:/dev/shm {docker_image}" + ) + else: + connection.run( + f"docker run --runtime=nvidia --gpus all -id --name {container_name} --network host --ulimit memlock=-1:-1 " + f"{docker_all_devices_arg} -v $HOME/container_tests:/test -v /dev/shm:/dev/shm {docker_image} bash" + ) def _setup_master_efa_ssh_config(connection): diff --git a/test/test_utils/ec2.py b/test/test_utils/ec2.py index 0b84f1f8e399..e2cabd1ea267 100644 --- a/test/test_utils/ec2.py +++ b/test/test_utils/ec2.py @@ -1817,6 +1817,27 @@ def get_default_subnet_for_az(ec2_client, availability_zone): return az_subnet_id +def get_subnet_id_by_vpc(ec2_client, vpc_id): + + response = ec2_client.describe_subnets( + Filters=[ + { + "Name": "vpc-id", + "Values": [ + vpc_id, + ], + }, + ], + ) + + subnet_ids = [] + for subnet in response["Subnets"]: + if subnet["SubnetId"] is not None: + subnet_ids.append(subnet["SubnetId"]) + + return subnet_ids + + def get_vpc_id_by_name(ec2_client, vpc_name): """ Get VPC ID by VPC name tag diff --git a/test/testrunner.py b/test/testrunner.py index 4746740437bc..d7880f7c1ff2 100644 --- a/test/testrunner.py +++ b/test/testrunner.py @@ -409,7 +409,7 @@ def main(): pull_dlc_images(all_image_list) if specific_test_type == "bai": build_bai_docker_container() - if specific_test_type == "eks" and not is_all_images_list_eia: + if specific_test_type in ["eks", "ec2"] and not is_all_images_list_eia: frameworks_in_images = [ framework for framework in ("mxnet", "pytorch", "tensorflow", "vllm") @@ -424,13 +424,13 @@ def main(): if framework == "vllm": try: - LOGGER.info(f"Running vLLM EKS tests with image: {all_image_list[0]}") + LOGGER.info(f"Running vLLM EKS EC2 tests with image: {all_image_list[0]}") test() - LOGGER.info("vLLM EKS tests completed successfully") + LOGGER.info("vLLM EKS EC2 tests completed successfully") # Exit function after vLLM tests return except Exception as e: - LOGGER.error(f"vLLM EKS tests failed: {str(e)}") + LOGGER.error(f"vLLM EKS EC2 tests failed: {str(e)}") raise eks_cluster_name = f"dlc-{framework}-{build_context}" diff --git a/test/vllm/ec2/infra/setup_ec2.py b/test/vllm/ec2/infra/setup_ec2.py new file mode 100644 index 000000000000..c341f8cf330e --- /dev/null +++ b/test/vllm/ec2/infra/setup_ec2.py @@ -0,0 +1,631 @@ +import os +import time +import re +import logging +import sys +import uuid +import boto3 +from contextlib import contextmanager + + +from test import test_utils +import test.test_utils.ec2 as ec2_utils +from test.vllm.ec2.utils.fsx_utils import FsxSetup +from concurrent.futures import ThreadPoolExecutor + +from botocore.config import Config +from fabric import Connection +from botocore.exceptions import WaiterError + + +from test.test_utils import KEYS_TO_DESTROY_FILE + +from test.test_utils.ec2 import ( + get_default_vpc_id, + get_subnet_id_by_vpc, +) + +# Constant to represent default region for boto3 commands +DEFAULT_REGION = "us-west-2" +# Constant to represent region where p4de tests can be run +P4DE_REGION = "us-east-1" + +EC2_INSTANCE_ROLE_NAME = "ec2TestInstanceRole" + +VLLM_INSTANCE_TYPE = ["p4d.24xlarge", "p5.48xlarge"] + +ENABLE_IPV6_TESTING = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" + + +LOGGER = logging.getLogger(__name__) +LOGGER.addHandler(logging.StreamHandler(sys.stdout)) + +TEST_ID = str(uuid.uuid4()) + + +def ec2_client(region): + return boto3.client("ec2", region_name=region, config=Config(retries={"max_attempts": 10})) + + +def ec2_instance_ami(region): + return test_utils.get_dlami_id(region) + + +def availability_zone_options(ec2_client, ec2_instance_type, region): + """ + Parametrize with a reduced list of availability zones for particular instance types for which + capacity has been reserved in that AZ. For other instance types, parametrize with list of all + AZs in the region. + :param ec2_client: boto3 Client for EC2 + :param ec2_instance_type: str instance type for which AZs must be determined + :param region: str region in which instance must be created + :return: list of str AZ names + """ + allowed_availability_zones = None + if ec2_instance_type in ["p4de.24xlarge"]: + if region == "us-east-1": + allowed_availability_zones = ["us-east-1d", "us-east-1c"] + if ec2_instance_type in ["p4d.24xlarge"]: + if region == "us-west-2": + allowed_availability_zones = ["us-west-2b", "us-west-2c"] + if not allowed_availability_zones: + allowed_availability_zones = ec2_utils.get_availability_zone_ids(ec2_client) + return allowed_availability_zones + + +def check_ip_rule_exists(security_group_rules, ip_address): + """ + Check if an IP rule exists in security group rules + """ + if not security_group_rules: + return False + + for rule in security_group_rules: + if ( + rule.get("FromPort") == 80 + and rule.get("ToPort") == 80 + and rule.get("IpProtocol") == "tcp" + and "IpRanges" in rule + ): + for ip_range in rule.get("IpRanges", []): + if ip_range.get("CidrIp") == f"{ip_address}/32": + LOGGER.info(f"Found existing rule for IP {ip_address}") + return True + return False + + +def authorize_ingress(ec2_client, group_id, ip_address): + try: + response = ec2_client.describe_security_groups(GroupIds=[group_id]) + if response.get("SecurityGroups") and response["SecurityGroups"]: + existing_rules = response["SecurityGroups"][0].get("IpPermissions", []) + if check_ip_rule_exists(existing_rules, ip_address): + LOGGER.info("Ingress rule already exists, skipping creation.") + return + + ec2_client.authorize_security_group_ingress( + GroupId=group_id, + IpPermissions=[ + { + "IpProtocol": "tcp", + "FromPort": 8000, + "ToPort": 8000, + "IpRanges": [ + { + "CidrIp": f"{ip_address}/32", + "Description": "Temporary access for vLLM testing", + } + ], + } + ], + ) + LOGGER.info("Ingress rule added successfully.") + except ClientError as e: + LOGGER.error(f"Failed to authorize ingress: {str(e)}") + raise + + +def setup_test_artifacts(ec2_client, instances, key_filename, region): + ec2_connections = {} + master_connection = None + worker_connection = None + + for instance in instances: + instance_id = instance["InstanceId"] + try: + instance_details = ec2_client.describe_instances(InstanceIds=[instance_id])[ + "Reservations" + ][0]["Instances"][0] + public_ip = instance_details.get("PublicIpAddress") + + if not public_ip: + raise Exception(f"No public IP found for instance {instance_id}") + + connection = Connection( + host=public_ip, + user="ec2-user", + connect_kwargs={"key_filename": key_filename}, + ) + + # Test connection + connection.run('echo "Connection test"', hide=True) + ec2_connections[instance_id] = connection + + if not master_connection: + master_connection = connection + else: + worker_connection = connection + + print(f"Successfully connected to instance {instance_id}") + + except Exception as e: + print(f"Failed to connect to instance {instance_id}: {str(e)}") + raise + + artifact_folder = f"vllm-{TEST_ID}-folder" + s3_test_artifact_location = test_utils.upload_tests_to_s3(artifact_folder) + + def delete_s3_artifact_copy(): + test_utils.delete_uploaded_tests_from_s3(s3_test_artifact_location) + + # Setup master instance + master_connection.run("rm -rf $HOME/container_tests") + master_connection.run( + f"aws s3 cp --recursive {test_utils.TEST_TRANSFER_S3_BUCKET}/{artifact_folder} $HOME/container_tests --region {test_utils.TEST_TRANSFER_S3_BUCKET_REGION}" + ) + print(f"Successfully copying {test_utils.TEST_TRANSFER_S3_BUCKET} for master") + master_connection.run( + f"mkdir -p $HOME/container_tests/logs && chmod -R +x $HOME/container_tests/*" + ) + + worker_connection.run("rm -rf $HOME/container_tests") + worker_connection.run( + f"aws s3 cp --recursive {test_utils.TEST_TRANSFER_S3_BUCKET}/{artifact_folder} $HOME/container_tests --region {test_utils.TEST_TRANSFER_S3_BUCKET_REGION}" + ) + print(f"Successfully copying {test_utils.TEST_TRANSFER_S3_BUCKET} for worker") + worker_connection.run( + f"mkdir -p $HOME/container_tests/logs && chmod -R +x $HOME/container_tests/*" + ) + + # Cleanup S3 artifacts + delete_s3_artifact_copy() + + return [master_connection, worker_connection] + + +def efa_ec2_instances( + ec2_client, + ec2_instance_type, + ec2_instance_role_name, + ec2_key_name, + ec2_instance_ami, + region, + availability_zone_options, +): + instances = None + key_filename = None + elastic_ip_allocation_ids = [] + try: + ec2_key_name = f"{ec2_key_name}-{TEST_ID}" + print(f"Creating instance: CI-CD {ec2_key_name}") + key_filename = test_utils.generate_ssh_keypair(ec2_client, ec2_key_name) + print(f"Using AMI for EFA EC2 {ec2_instance_ami}") + volume_name = "/dev/sda1" if ec2_instance_ami in test_utils.UL_AMI_LIST else "/dev/xvda" + + instance_name_prefix = f"CI-CD {ec2_key_name}" + ec2_run_instances_definition = { + "BlockDeviceMappings": [ + { + "DeviceName": volume_name, + "Ebs": { + "DeleteOnTermination": True, + "VolumeSize": 500, + "VolumeType": "gp3", + "Iops": 3000, + "Throughput": 125, + }, + }, + ], + "ImageId": ec2_instance_ami, + "InstanceType": ec2_instance_type, + "IamInstanceProfile": {"Name": ec2_instance_role_name}, + "KeyName": ec2_key_name, + "MaxCount": 2, + "MinCount": 2, + "TagSpecifications": [ + { + "ResourceType": "instance", + "Tags": [{"Key": "Name", "Value": instance_name_prefix}], + } + ], + } + instances = ec2_utils.launch_efa_instances_with_retry( + ec2_client, + ec2_instance_type, + availability_zone_options, + ec2_run_instances_definition, + ) + + master_instance_id = instances[0]["InstanceId"] + ec2_utils.check_instance_state(master_instance_id, state="running", region=region) + ec2_utils.check_system_state( + master_instance_id, system_status="ok", instance_status="ok", region=region + ) + print(f"Master instance {master_instance_id} is ready") + + if len(instances) > 1: + ec2_utils.create_name_tags_for_instance( + master_instance_id, f"{instance_name_prefix}_master", region + ) + for i in range(1, len(instances)): + worker_instance_id = instances[i]["InstanceId"] + ec2_utils.create_name_tags_for_instance( + worker_instance_id, f"{instance_name_prefix}_worker_{i}", region + ) + ec2_utils.check_instance_state(worker_instance_id, state="running", region=region) + ec2_utils.check_system_state( + worker_instance_id, system_status="ok", instance_status="ok", region=region + ) + print(f"Worker instance {worker_instance_id} is ready") + + num_efa_interfaces = ec2_utils.get_num_efa_interfaces_for_instance_type( + ec2_instance_type, region=region + ) + + if num_efa_interfaces > 1: + for instance in instances: + try: + instance_id = instance["InstanceId"] + + network_interface_id = ec2_utils.get_network_interface_id(instance_id, region) + elastic_ip_allocation_id = ec2_utils.attach_elastic_ip( + network_interface_id, region, ENABLE_IPV6_TESTING + ) + elastic_ip_allocation_ids.append(elastic_ip_allocation_id) + except Exception as e: + if elastic_ip_allocation_ids: + ec2_utils.delete_elastic_ips(elastic_ip_allocation_ids, ec2_client) + raise Exception(f"Error allocating elastic IP: {str(e)}") + + connections = setup_test_artifacts(ec2_client, instances, key_filename, region) + return_val = { + "instances": [ + (instance_info["InstanceId"], key_filename) for instance_info in instances + ], + "elastic_ips": elastic_ip_allocation_ids, + "connections": connections, + } + print("Launched EFA Test instances") + return return_val + + except Exception as e: + print(f"Error in efa_ec2_instances: {str(e)}") + # Clean up elastic IPs + if elastic_ip_allocation_ids: + try: + ec2_utils.delete_elastic_ips(elastic_ip_allocation_ids, ec2_client) + except Exception as cleanup_error: + print(f"Error cleaning up elastic IPs: {str(cleanup_error)}") + + # Clean up instances + if instances: + try: + instance_ids = [instance["InstanceId"] for instance in instances] + ec2_client.terminate_instances(InstanceIds=instance_ids) + # Wait for instances to terminate + waiter = ec2_client.get_waiter("instance_terminated") + waiter.wait(InstanceIds=instance_ids) + except Exception as cleanup_error: + print(f"Error terminating instances: {str(cleanup_error)}") + + # Clean up key pair + if key_filename: + try: + if os.path.exists(key_filename): + os.remove(key_filename) + if os.path.exists(f"{key_filename}.pub"): + os.remove(f"{key_filename}.pub") + except Exception as cleanup_error: + print(f"Error cleaning up key files: {str(cleanup_error)}") + + raise + + +@contextmanager +def ec2_test_environment(): + cleanup_functions = [] + try: + # Setup code here + region = DEFAULT_REGION + ec2_cli = ec2_client(region) + instance_type = VLLM_INSTANCE_TYPE[0] + ami_id = ec2_instance_ami(region) + az_options = availability_zone_options(ec2_cli, instance_type, region) + + instances_info = efa_ec2_instances( + ec2_client=ec2_cli, + ec2_instance_type=instance_type, + ec2_instance_role_name=EC2_INSTANCE_ROLE_NAME, + ec2_key_name="vllm-ec2-test", + ec2_instance_ami=ami_id, + region=region, + availability_zone_options=az_options, + ) + # Register cleanup functions + cleanup_functions.extend( + [ + lambda: ec2_cli.terminate_instances( + InstanceIds=[instance_id for instance_id, _ in instances_info] + ), + lambda: test_utils.destroy_ssh_keypair(ec2_cli, instances_info[0][1]), + ] + ) + + yield instances_info + + finally: + print("Running cleanup operations...") + for cleanup_func in cleanup_functions: + try: + if cleanup_func is not None: + cleanup_func() + except Exception as cleanup_error: + LOGGER.error(f"Error during cleanup: {str(cleanup_error)}") + + +def _setup_instance(connection, fsx_dns_name, mount_name): + """ + Setup FSx mount and VLLM environment on an instance synchronously + """ + os.chdir("..") + # Copy script to instance + connection.put("vllm/ec2/utils/setup_fsx_vllm.sh", "/home/ec2-user/setup_fsx_vllm.sh") + + # Make script executable and run it + commands = [ + "chmod +x /home/ec2-user/setup_fsx_vllm.sh", + f"/home/ec2-user/setup_fsx_vllm.sh {fsx_dns_name} {mount_name}", + ] + + # Execute commands synchronously + result = connection.run("; ".join(commands)) + return result + + +def cleanup_resources(ec2_cli, resources, fsx): + """Cleanup all resources in reverse order of creation""" + cleanup_errors = [] + + def wait_for_instances(instance_ids): + waiter = ec2_cli.get_waiter("instance_terminated") + try: + waiter.wait(InstanceIds=instance_ids, WaiterConfig={"Delay": 60, "MaxAttempts": 100}) + return True + except WaiterError as e: + print(f"Warning: Instance termination waiter timed out: {str(e)}") + return False + + if resources.get("elastic_ips"): + try: + ec2_utils.delete_elastic_ips(resources["elastic_ips"], ec2_cli) + print(f"Deleted elastic IPs: {resources['elastic_ips']}") + except Exception as e: + cleanup_errors.append(f"Failed to cleanup Elastic IPs: {str(e)}") + + if resources.get("instances_info"): + try: + instance_ids = [instance_id for instance_id, _ in resources["instances_info"]] + ec2_cli.terminate_instances(InstanceIds=instance_ids) + print(f"Terminating instances: {instance_ids}") + + if not wait_for_instances(instance_ids): + cleanup_errors.append("Instances did not terminate within expected timeframe") + + for _, key_filename in resources["instances_info"]: + if key_filename: + try: + ec2_cli.delete_key_pair(KeyName=key_filename) + for ext in ["", ".pub"]: + file_path = f"{key_filename}{ext}" + if os.path.exists(file_path): + os.remove(file_path) + except Exception as e: + cleanup_errors.append(f"Failed to delete key file: {str(e)}") + except Exception as e: + cleanup_errors.append(f"Failed to cleanup EC2 resources: {str(e)}") + + if resources.get("fsx_config"): + try: + fsx.delete_fsx_filesystem(resources["fsx_config"]["filesystem_id"]) + print(f"Deleted FSx filesystem: {resources['fsx_config']['filesystem_id']}") + except Exception as e: + cleanup_errors.append(f"Failed to delete FSx filesystem: {str(e)}") + + time.sleep(30) + + if resources.get("sg_fsx"): + max_attempts = 5 + for attempt in range(max_attempts): + try: + ec2_cli.delete_security_group(GroupId=resources["sg_fsx"]) + print(f"Deleted security group: {resources['sg_fsx']}") + break + except Exception as e: + if attempt == max_attempts - 1: + cleanup_errors.append( + f"Failed to delete security group after {max_attempts} attempts: {str(e)}" + ) + else: + print(f"Retry {attempt + 1}/{max_attempts} to delete security group") + time.sleep(30) + + if cleanup_errors: + raise Exception("Cleanup errors occurred:\n" + "\n".join(cleanup_errors)) + + +def launch_ec2_instances(ec2_cli): + """Launch EC2 instances with EFA support""" + instance_type = VLLM_INSTANCE_TYPE[0] + ami_id = ec2_instance_ami(DEFAULT_REGION) + az_options = availability_zone_options(ec2_cli, instance_type, DEFAULT_REGION) + + instances_info = efa_ec2_instances( + ec2_client=ec2_cli, + ec2_instance_type=instance_type, + ec2_instance_role_name=EC2_INSTANCE_ROLE_NAME, + ec2_key_name="vllm-ec2-test", + ec2_instance_ami=ami_id, + region=DEFAULT_REGION, + availability_zone_options=az_options, + ) + print(f"Launched instances: {instances_info}") + return instances_info + + +def configure_security_groups(instance_id, ec2_cli, fsx, vpc_id, instances_info): + """ + Configure security groups for FSx and EC2 instances + + Args: + ec2_cli: boto3 EC2 client + fsx: FsxSetup instance + vpc_id: VPC ID where security group will be created + instances_info: List of tuples containing (instance_id, key_filename) + + Returns: + str: FSx security group ID + """ + try: + fsx_name = f"fsx-lustre-vllm-ec2-test-sg-{instance_id}-{TEST_ID}" + # Create FSx security group + sg_fsx = fsx.create_fsx_security_group( + ec2_cli, + vpc_id, + fsx_name, + "Security group for FSx Lustre VLLM EC2 Tests", + ) + print(f"Created FSx security group: {sg_fsx}") + + # Get instance IDs from instances_info + instance_ids = [instance_id for instance_id, _ in instances_info] + + # Add security group rules + fsx.add_ingress_rules_sg(ec2_cli, sg_fsx, instance_ids) + + return sg_fsx + + except Exception as e: + print(f"Error configuring security groups: {str(e)}") + raise + + +def setup_instance(instance_id, key_filename, ec2_cli, fsx_dns_name, mount_name): + """Setup FSx mount on a single instance""" + instance_details = ec2_cli.describe_instances(InstanceIds=[instance_id])["Reservations"][0][ + "Instances" + ][0] + public_ip = instance_details.get("PublicIpAddress") + + if not public_ip: + raise Exception(f"No public IP found for instance {instance_id}") + + connection = Connection( + host=public_ip, + user="ec2-user", + connect_kwargs={"key_filename": key_filename}, + ) + + return _setup_instance(connection, fsx_dns_name, mount_name) + + +def mount_fsx_on_worker(instance_id, key_filename, ec2_cli, fsx_dns_name, mount_name): + """Mount FSx on worker instance without running setup script""" + instance_details = ec2_cli.describe_instances(InstanceIds=[instance_id])["Reservations"][0][ + "Instances" + ][0] + public_ip = instance_details.get("PublicIpAddress") + + if not public_ip: + raise Exception(f"No public IP found for instance {instance_id}") + + connection = Connection( + host=public_ip, + user="ec2-user", + connect_kwargs={"key_filename": key_filename}, + ) + + commands = [ + "sudo yum install -y lustre-client", + "sudo mkdir -p /fsx", + f"sudo mount -t lustre -o relatime,flock {fsx_dns_name}@tcp:/{mount_name} /fsx", + ] + + for cmd in commands: + connection.run(cmd) + + +def setup(): + """Main setup function for VLLM on EC2 with FSx""" + print("Testing vllm on ec2........") + fsx = FsxSetup(DEFAULT_REGION) + ec2_cli = ec2_client(DEFAULT_REGION) + resources = {"instances_info": None, "fsx_config": None, "sg_fsx": None} + + try: + vpc_id = get_default_vpc_id(ec2_cli) + subnet_ids = get_subnet_id_by_vpc(ec2_cli, vpc_id) + + instance_result = launch_ec2_instances(ec2_cli) + resources["instances_info"] = instance_result["instances"] + resources["elastic_ips"] = instance_result["elastic_ips"] + resources["connections"] = instance_result["connections"] + print("Waiting 60 seconds for instances to initialize...") + time.sleep(60) + + # Configure single security group for both instances + instance_ids = [instance_id for instance_id, _ in resources["instances_info"]] + resources["sg_fsx"] = configure_security_groups( + instance_ids[0], ec2_cli, fsx, vpc_id, resources["instances_info"] + ) + + # Create FSx filesystem + resources["fsx_config"] = fsx.create_fsx_filesystem( + subnet_ids[0], + [resources["sg_fsx"]], + 1200, + "SCRATCH_2", + {"Name": f"fsx-lustre-vllm-ec2-test-{instance_ids[0]}-{TEST_ID}"}, + ) + print("Created FSx filesystem") + + master_instance_id, master_key_filename = resources["instances_info"][0] + setup_instance( + master_instance_id, + master_key_filename, + ec2_cli, + resources["fsx_config"]["dns_name"], + resources["fsx_config"]["mount_name"], + ) + print(f"Setup completed for master instance {master_instance_id}") + + # Mount FSx on worker node + worker_instance_id, worker_key_filename = resources["instances_info"][1] + mount_fsx_on_worker( + worker_instance_id, + worker_key_filename, + ec2_cli, + resources["fsx_config"]["dns_name"], + resources["fsx_config"]["mount_name"], + ) + print(f"FSx mounted on worker instance {worker_instance_id}") + + return resources + + except Exception as e: + print(f"Error during setup: {str(e)}") + cleanup_resources(ec2_cli, resources, fsx) + raise + + +if __name__ == "__main__": + setup() diff --git a/test/vllm/ec2/test_artifacts/test_ec2.py b/test/vllm/ec2/test_artifacts/test_ec2.py new file mode 100644 index 000000000000..f2764df58681 --- /dev/null +++ b/test/vllm/ec2/test_artifacts/test_ec2.py @@ -0,0 +1,418 @@ +import threading +import boto3 +import time, os, json +import logging +from botocore.exceptions import ClientError +from botocore.config import Config +from fabric import Connection +from contextlib import contextmanager +from typing import Optional, Tuple + +from test.test_utils.ec2 import ( + get_account_id_from_image_uri, + login_to_ecr_registry, + get_ec2_client, +) +from test.vllm.ec2.utils.fsx_utils import FsxSetup +from test.vllm.ec2.infra.setup_ec2 import cleanup_resources, TEST_ID +from test.dlc_tests.ec2.test_efa import ( + _setup_multinode_efa_instances, + EFA_SANITY_TEST_CMD, + MASTER_CONTAINER_NAME, + HOSTS_FILE_LOCATION, + EFA_INTEGRATION_TEST_CMD, + DEFAULT_EFA_TIMEOUT, +) +from test.test_utils import run_cmd_on_container + +MODEL_NAME = "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" +DEFAULT_REGION = "us-west-2" +logger = logging.getLogger(__name__) + + +def setup_env(connection): + """Setup Python environment on a node""" + setup_command = """ + python3 -m venv vllm_env && \ + source vllm_env/bin/activate && \ + pip install --upgrade pip setuptools wheel && \ + pip install numpy torch tqdm aiohttp pandas datasets pillow ray vllm==0.10.0 && \ + pip install "transformers<4.54.0" + """ + connection.run(setup_command, shell=True) + + +def create_benchmark_command() -> str: + """Create command for running benchmark""" + return f""" + python3 /fsx/vllm-dlc/vllm/benchmarks/benchmark_serving.py \ + --backend vllm \ + --model {MODEL_NAME} \ + --endpoint /v1/chat/completions \ + --dataset-name sharegpt \ + --dataset-path /fsx/vllm-dlc/ShareGPT_V3_unfiltered_cleaned_split.json \ + --num-prompts 1000 + """ + + +def get_secret_hf_token(): + secret_name = "test/hf_token" + region_name = "us-west-2" + + session = boto3.session.Session() + client = session.client(service_name="secretsmanager", region_name=region_name) + try: + get_secret_value_response = client.get_secret_value(SecretId=secret_name) + except ClientError as e: + raise e + + response = json.loads(get_secret_value_response["SecretString"]) + return response + + +def wait_for_container_ready(connection, timeout: int = 1000) -> bool: + """ + Wait for container and model to be ready by checking logs and endpoint + Returns True if container and model are ready, False if timeout + """ + start_time = time.time() + model_ready = False + + while time.time() - start_time < timeout: + if not model_ready: + try: + curl_cmd = """ + curl -s http://localhost:8000/v1/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", + "prompt": "Hello", + "max_tokens": 10 + }' + """ + result = connection.run(curl_cmd, hide=False) + if result.ok: + print("Model endpoint is responding") + model_ready = True + return True + except Exception: + pass + return False + + +def setup_docker_image(conn, image_uri): + account_id = get_account_id_from_image_uri(image_uri) + login_to_ecr_registry(conn, account_id, DEFAULT_REGION) + print(f"Pulling image: {image_uri}") + conn.run(f"docker pull {image_uri}", hide="out") + + +def test_vllm_benchmark_on_multi_node(head_connection, worker_connection, image_uri): + try: + # Get HF token + response = get_secret_hf_token() + hf_token = response.get("HF_TOKEN") + if not hf_token: + raise Exception("Failed to get HF token") + + for conn in [head_connection, worker_connection]: + setup_docker_image(conn, image_uri) + setup_env(conn) + + head_connection.put( + "vllm/ec2/utils/head_node_setup.sh", "/home/ec2-user/head_node_setup.sh" + ) + worker_connection.put( + "vllm/ec2/utils/worker_node_setup.sh", "/home/ec2-user/worker_node_setup.sh" + ) + + head_connection.run("chmod +x head_node_setup.sh") + worker_connection.run("chmod +x worker_node_setup.sh") + + head_ip = head_connection.run("hostname -i").stdout.strip() + worker_ip = worker_connection.run("hostname -i").stdout.strip() + + container_name = "ray_head-" + TEST_ID + + print("Starting head node...") + head_connection.run( + f"./head_node_setup.sh {image_uri} {hf_token} {head_ip} {container_name}" + ) + + worker_connection.run(f"./worker_node_setup.sh {image_uri} {head_ip} {worker_ip}") + + serve_command = f"vllm serve {MODEL_NAME} --tensor-parallel-size 8 --pipeline-parallel-size 2 --max-num-batched-tokens 16384" + head_connection.run(f"docker exec -it {container_name} /bin/bash -c '{serve_command}'") + + print("Waiting for model to be ready, approx estimated time to complete is 15 mins...") + if not wait_for_container_ready(head_connection, timeout=2000): + raise Exception("Container failed to become ready within timeout period") + print("Model serving started successfully") + + # Run benchmark + print("Running benchmark...") + benchmark_cmd = "source vllm_env/bin/activate" + create_benchmark_command() + benchmark_result = head_connection.run(benchmark_cmd, timeout=7200) + print(f"Benchmark completed: {benchmark_result.stdout}") + + return benchmark_result + + except Exception as e: + raise Exception(f"Multi-node test execution failed: {str(e)}") + finally: + head_connection.run("tmux kill-server || true", warn=True) + worker_connection.run("tmux kill-server || true", warn=True) + + +def test_vllm_benchmark_on_single_node(connection, image_uri): + """ + Run VLLM benchmark test on a single node EC2 instance using the shell script + Args: + connection: Fabric connection object to EC2 instance + image_uri: ECR image URI for VLLM container + Returns: + ec2_res: Result object from test execution + """ + try: + setup_env(connection) + response = get_secret_hf_token() + hf_token = response.get("HF_TOKEN") + + setup_docker_image(connection, image_uri) + connection.put( + "vllm/ec2/utils/run_vllm_benchmark_single_node.sh", + "/home/ec2-user/run_vllm_benchmark_single_node.sh", + ) + commands = [ + "chmod +x /home/ec2-user/run_vllm_benchmark_single_node.sh", + f"/home/ec2-user/run_vllm_benchmark_single_node.sh {image_uri} {hf_token} {MODEL_NAME}", + ] + result = connection.run( + "; ".join(commands), + hide=False, + timeout=3600, + ) + + return result + except Exception as e: + print(f"Test execution failed: {str(e)}") + raise + + +def verify_gpu_setup(connection): + """ + Verify GPU setup on the instance before running the test + + Args: + connection: Fabric connection object to EC2 instance + + Returns: + bool: True if GPU setup is valid, False otherwise + """ + try: + # Check nvidia-smi + result = connection.run("nvidia-smi", hide=True) + if result.failed: + print("nvidia-smi check failed") + return False + + # Check CUDA availability + cuda_check = connection.run("nvidia-smi -L", hide=True) + if cuda_check.failed or "GPU" not in cuda_check.stdout: + print("No GPUs found") + return False + + return True + + except Exception as e: + print(f"GPU verification failed: {str(e)}") + return False + + +def cleanup_containers(connection): + """ + Cleanup docker containers and images on the instance + + Args: + connection: Fabric connection object + """ + try: + print("Cleaning up containers and images...") + commands = [ + "docker ps -aq | xargs -r docker stop", + "docker ps -aq | xargs -r docker rm", + ] + for cmd in commands: + connection.run(cmd, hide=True, warn=True) + except Exception as e: + print(f"Cleanup warning: {str(e)}") + + +def run_single_node_test(connection, image_uri): + """ + Run single node VLLM benchmark test + + Args: + connection: Fabric connection object + image_uri: ECR image URI + """ + try: + print("\n=== Starting Single-Node Test ===") + if not verify_gpu_setup(connection): + raise Exception("GPU setup verification failed") + + result = test_vllm_benchmark_on_single_node(connection, image_uri) + if result.ok: + print("Single-node test completed successfully") + return True + return False + + finally: + cleanup_containers(connection) + + +def run_multi_node_test(head_conn, worker_conn, image_uri): + """ + Run multi-node VLLM benchmark test + + Args: + head_conn: Fabric connection object for head node + worker_conn: Fabric connection object for worker node + image_uri: ECR image URI + """ + + print("\n=== Starting Multi-Node Test ===") + verification_tasks = [(head_conn, "head"), (worker_conn, "worker")] + for conn, node_type in verification_tasks: + if not verify_gpu_setup(conn): + raise Exception(f"GPU setup verification failed for {node_type} node") + + result = test_vllm_benchmark_on_multi_node(head_conn, worker_conn, image_uri) + print(result.stdout) + if result.ok: + print("Multi-node test completed successfully") + return True + return False + + +def test_vllm_on_ec2(resources, image_uri): + """ + Test VLLM on EC2 instances in the following order: + 1. EFA testing + 2. Single node test + 3. Multi-node test + + Args: + resources: Dictionary containing instance information and FSx config + image_uri: Docker image URI to test + """ + ec2_cli = None + fsx = None + ec2_connections = {} + test_results = {"efa": False, "single_node": False, "multi_node": False} + + try: + ec2_cli = get_ec2_client(DEFAULT_REGION) + fsx = FsxSetup(DEFAULT_REGION) + for instance_id, key_filename in resources["instances_info"]: + try: + instance_details = ec2_cli.describe_instances(InstanceIds=[instance_id])[ + "Reservations" + ][0]["Instances"][0] + public_ip = instance_details.get("PublicIpAddress") + + if not public_ip: + raise Exception(f"No public IP found for instance {instance_id}") + + connection = Connection( + host=public_ip, + user="ec2-user", + connect_kwargs={"key_filename": key_filename}, + ) + + connection.run('echo "Connection test"', hide=True) + ec2_connections[instance_id] = connection + print(f"Successfully connected to instance {instance_id}") + + except Exception as e: + print(f"Failed to connect to instance {instance_id}: {str(e)}") + raise + + if len(ec2_connections) >= 2: + print("\n=== Starting EFA Tests ===") + instance_ids = list(ec2_connections.keys()) + number_of_nodes = 2 + head_conn = ec2_connections[instance_ids[0]] + worker_conn = ec2_connections[instance_ids[1]] + + _setup_multinode_efa_instances( + image_uri, + resources["instances_info"][:2], + [ec2_connections[instance_ids[0]], ec2_connections[instance_ids[1]]], + "p4d.24xlarge", + DEFAULT_REGION, + ) + + master_connection = ec2_connections[instance_ids[0]] + + # Run EFA sanity test + run_cmd_on_container( + MASTER_CONTAINER_NAME, master_connection, EFA_SANITY_TEST_CMD, hide=False + ) + + run_cmd_on_container( + MASTER_CONTAINER_NAME, + master_connection, + f"{EFA_INTEGRATION_TEST_CMD} {HOSTS_FILE_LOCATION} {number_of_nodes}", + hide=False, + timeout=DEFAULT_EFA_TIMEOUT, + ) + + test_results["efa"] = True + for conn in [head_conn, worker_conn]: + cleanup_containers(conn) + + print("EFA tests completed successfully") + + instance_id = list(ec2_connections.keys())[0] + print(f"\n=== Running Single-Node Test on instance: {instance_id} ===") + test_results["single_node"] = run_single_node_test( + ec2_connections[instance_id], image_uri + ) + + test_results["multi_node"] = run_multi_node_test(head_conn, worker_conn, image_uri) + + else: + print("\nSkipping multi-node test: insufficient instances") + + print("\n=== Test Summary ===") + print(f"EFA tests: {'Passed' if test_results['efa'] else 'Not Run/Failed'}") + print(f"Single-node test: {'Passed' if test_results['single_node'] else 'Failed'}") + print(f"Multi-node test: {'Passed' if test_results['multi_node'] else 'Failed'}") + + if not any(test_results.values()): + raise Exception("All tests failed") + + except Exception as e: + print(f"Test execution failed: {str(e)}") + raise + + finally: + if ec2_cli and fsx: + cleanup_timer = threading.Timer( + 1000, lambda: print("Cleanup timed out, some resources might need manual cleanup") + ) + cleanup_timer.start() + + try: + cleanup_resources( + ec2_cli, + resources, + fsx, + ) + cleanup_timer.cancel() + print("Resources cleaned up successfully") + except Exception as e: + print(f"Cleanup failed: {str(e)}") + finally: + cleanup_timer.cancel() diff --git a/test/vllm/ec2/utils/fsx_utils.py b/test/vllm/ec2/utils/fsx_utils.py new file mode 100644 index 000000000000..ad71dc284844 --- /dev/null +++ b/test/vllm/ec2/utils/fsx_utils.py @@ -0,0 +1,303 @@ +import logging +import time +from invoke import run +from typing import Dict, List, Any +import boto3 + + +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + + +class FsxSetup: + """ + A utility class for setting up and managing FSx for Lustre filesystems + and related AWS and Kubernetes resources. + + : param region: AWS region where resources will be created (default: "us-west-2") + """ + + def __init__(self, region: str = "us-west-2"): + self.region = region + + def create_fsx_filesystem( + self, + subnet_id: str, + security_group_ids: List[str], + storage_capacity: int, + deployment_type: str, + tags: Dict[str, str], + ): + """ + Create FSx filesystem with given configuration + : param subnet_id: subnet ID where FSx will be created + : param security_group_ids: list of security group IDs + : param storage_capacity: storage capacity in GiB + : param deployment_type: FSx deployment type + : param tags: dictionary of tags to apply to the FSx filesystem + : return: dictionary containing filesystem details + """ + tags_param = " ".join([f"Key={k},Value={v}" for k, v in tags.items()]) + + try: + fsx_id = run( + f"aws fsx create-file-system" + f" --file-system-type LUSTRE" + f" --storage-capacity {storage_capacity}" + f" --subnet-ids {subnet_id}" + f' --security-group-ids {" ".join(security_group_ids)}' + f" --lustre-configuration DeploymentType={deployment_type}" + f" --tags {tags_param}" + f" --file-system-type-version 2.15" + f' --query "FileSystem.FileSystemId"' + f" --output text" + ).stdout.strip() + + logger.info(f"Created FSx filesystem: {fsx_id}") + + filesystem_info = self.wait_for_filesystem(fsx_id) + return filesystem_info + + except Exception as e: + logger.error(f"Failed to create FSx filesystem: {e}") + raise + + def delete_fsx_filesystem(self, fsx_id: str): + + try: + fsx_id = run( + f"aws fsx delete-file-system" + f" --file-system-id {fsx_id}" + f' --query "FileSystem.FileSystemId"' + f" --output text" + ).stdout.strip() + + print(f"Deleted FSx filesystem: {fsx_id}") + + except Exception as e: + logger.error(f"Failed to create FSx filesystem: {e}") + raise + + def wait_for_filesystem(self, filesystem_id: str): + """ + Wait for FSx filesystem to become available and return its details + : param filesystem_id: FSx filesystem ID + : return: dictionary containing filesystem details (filesystem_id, dns_name, mount_name) + : raises: Exception if filesystem enters FAILED, DELETING, or DELETED state + """ + print(f"Waiting for FSx filesystem {filesystem_id} to be available...") + while True: + status = run( + f"aws fsx describe-file-systems --file-system-id {filesystem_id} " + f"--query 'FileSystems[0].Lifecycle' --output text" + ).stdout.strip() + + if status == "AVAILABLE": + break + elif status in ["FAILED", "DELETING", "DELETED"]: + raise Exception(f"FSx filesystem entered {status} state") + + print(f"FSx status: {status}, waiting...") + time.sleep(30) + + # get fs DNS and mount name + fsx_dns = run( + f"aws fsx describe-file-systems --file-system-id {filesystem_id} " + f"--query 'FileSystems[0].DNSName' --output text" + ).stdout.strip() + + fsx_mount = run( + f"aws fsx describe-file-systems --file-system-id {filesystem_id} " + f"--query 'FileSystems[0].LustreConfiguration.MountName' --output text" + ).stdout.strip() + + return {"filesystem_id": filesystem_id, "dns_name": fsx_dns, "mount_name": fsx_mount} + + def create_fsx_security_group(self, ec2_cli, vpc_id, group_name, description): + """ + Create a security group for FSx Lustre and add inbound rules. + + :param vpc_id: The ID of the VPC where the security group will be created + :param instance_id: The ID of the newly created EC2 instance + :param region_name: The AWS region name + :return: The ID of the created security group + """ + try: + # Create the security group + response = ec2_cli.create_security_group( + GroupName=group_name, + Description=description, + VpcId=vpc_id, + ) + sg_id = response["GroupId"] + print(f"Created security group: {sg_id}") + + return sg_id + + except ClientError as e: + print(f"An error occurred: {e}") + return None + + def add_ingress_rules_sg(self, ec2_cli, sg_id, instance_ids): + """ + Add ingress rules to FSx security group for multiple instances + + Args: + ec2_cli: boto3 EC2 client + sg_id: ID of the FSx security group + instance_ids: List of EC2 instance IDs + """ + try: + # Get security group IDs for all instances + instance_sg_ids = set() + for instance_id in instance_ids: + response = ec2_cli.describe_instances(InstanceIds=[instance_id]) + sg_id_instance = response["Reservations"][0]["Instances"][0]["SecurityGroups"][0][ + "GroupId" + ] + instance_sg_ids.add(sg_id_instance) + + instance_group_pairs = [{"GroupId": sg} for sg in instance_sg_ids] + + all_group_pairs = instance_group_pairs + [{"GroupId": sg_id}] + + # Add inbound rules + ec2_cli.authorize_security_group_ingress( + GroupId=sg_id, + IpPermissions=[ + { + "IpProtocol": "tcp", + "FromPort": 988, + "ToPort": 1023, + "UserIdGroupPairs": all_group_pairs, + } + ], + ) + print( + f"Added inbound rules to FSx security group {sg_id} for instance security groups: {instance_sg_ids}" + ) + + except Exception as e: + print(f"Error adding ingress rules: {str(e)}") + raise + + def delete_security_group(self, ec2_cli, group_id: str): + """ + Create a security group in the specified VPC + : param vpc_id: VPC ID where the security group will be created + : param name: name of the security group + : param description: description of the security group + : return: created security group ID + : raises: Exception if security group creation fails + """ + try: + response = ec2_cli.delete_security_group( + GroupId=group_id, + ) + sg_id = response["GroupId"] + print(f"Deleted security group: {sg_id}") + + except Exception as e: + logger.error(f"Failed to delete security group: {e}") + raise + + def setup_csi_driver(self): + """ + Install and configure the AWS FSx CSI Driver in the Kubernetes cluster + : return: None + : raises: Exception if driver installation or verification fails + """ + try: + logger.info("Installing AWS FSx CSI Driver...") + run( + "helm repo add aws-fsx-csi-driver https://kubernetes-sigs.github.io/aws-fsx-csi-driver/" + ) + run("helm repo update") + run( + "helm install aws-fsx-csi-driver aws-fsx-csi-driver/aws-fsx-csi-driver --namespace kube-system" + ) + run( + "kubectl wait --for=condition=ready pod -l app=fsx-csi-controller -n kube-system --timeout=300s" + ) + + self._verify_csi_driver() + logger.info("FSx CSI Driver installed successfully") + except Exception as e: + logger.error(f"Failed to setup FSx CSI driver: {e}") + raise + + def _verify_csi_driver(self): + """ + Verify that FSx CSI driver pods are running correctly in the cluster + : return: None + : raises: Exception if driver pods are not found or not running + """ + result = run("kubectl get pods -n kube-system | grep fsx") + + if "fsx-csi-controller" not in result.stdout or "fsx-csi-node" not in result.stdout: + raise Exception("FSx CSI driver pods not found") + + fsx_pods = [ + line + for line in result.stdout.split("\n") + if ("fsx-csi-controller" in line or "fsx-csi-node" in line) and "Running" in line + ] + + if not fsx_pods: + raise Exception("No running FSx CSI driver pods found") + + logger.info(f"Found {len(fsx_pods)} running FSx CSI driver pods") + + def setup_kubernetes_resources( + self, storage_class_file: str, pv_file: str, pvc_file: str, replacements: Dict[str, str] + ): + """ + Setup Kubernetes FSx resources using provided yaml files and replacements + : param storage_class_file: path to the storage class yaml file + : param pv_file: path to the persistent volume yaml file + : param pvc_file: path to the persistent volume claim yaml file + : param replacements: dictionary of placeholder replacements + Example: {"": "subnet-xxx", "": "sg-xxx"} + : return: None + : raises: Exception if resource creation fails + """ + try: + for file_path in [storage_class_file, pv_file, pvc_file]: + for key, value in replacements.items(): + run(f"sed -i 's|{key}|{value}|g' {file_path}") + + for file_path in [storage_class_file, pv_file, pvc_file]: + run(f"kubectl apply -f {file_path}") + + self.validate_kubernetes_resources() + + except Exception as e: + logger.error(f"Failed to setup Kubernetes FSx resources: {e}") + raise + + def validate_kubernetes_resources(self): + """ + Validate that FSx Kubernetes resources are properly created and bound + : return: True if all resources are validated successfully + : raises: Exception if any resource validation fails + """ + try: + sc_result = run("kubectl get sc fsx-sc") + if "fsx-sc" not in sc_result.stdout or "fsx.csi.aws.com" not in sc_result.stdout: + raise Exception("FSx storage class not created correctly") + + pv_result = run("kubectl get pv fsx-lustre-pv") + if "fsx-lustre-pv" not in pv_result.stdout or "Bound" not in pv_result.stdout: + raise Exception("FSx persistent volume not created correctly") + + pvc_result = run("kubectl get pvc fsx-lustre-pvc") + if "fsx-lustre-pvc" not in pvc_result.stdout or "Bound" not in pvc_result.stdout: + raise Exception("FSx persistent volume claim not created correctly") + + logger.info("FSx Kubernetes resources validated successfully") + return True + + except Exception as e: + logger.error(f"FSx resource validation failed: {e}") + raise diff --git a/test/vllm/ec2/utils/head_node_setup.sh b/test/vllm/ec2/utils/head_node_setup.sh new file mode 100644 index 000000000000..4c1b62f1affa --- /dev/null +++ b/test/vllm/ec2/utils/head_node_setup.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# Usage: ./head_node_setup.sh +set -e + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" +} + +IMAGE_URI=$1 +HF_TOKEN=$2 +HEAD_IP=$3 +CONTAINER_NAME=$4 + +log "Starting head node setup..." +log "Image URI: $IMAGE_URI" +log "Head IP: $HEAD_IP" + +# Start head node in tmux session and capture container ID +tmux new-session -d -s ray_head "docker run \ + --name $CONTAINER_NAME \ + --network host \ + --shm-size 10.24g \ + --gpus all \ + -v /fsx/.cache/huggingface:/root/.cache/huggingface \ + -e VLLM_HOST_IP=$HEAD_IP \ + -e HUGGING_FACE_HUB_TOKEN=$HF_TOKEN \ + -e FI_PROVIDER=efa \ + -e FI_EFA_USE_DEVICE_RDMA=1 \ + --device=/dev/infiniband/ \ + --ulimit memlock=-1:-1 \ + -p 8000:8000 \ + $IMAGE_URI \ + /bin/bash -c 'ray start --head --block --port=6379'" + +log "Head node started in container: ${CONTAINER_NAME}" + diff --git a/test/vllm/ec2/utils/run_vllm_benchmark_single_node.sh b/test/vllm/ec2/utils/run_vllm_benchmark_single_node.sh new file mode 100644 index 000000000000..1d6c7c0ef999 --- /dev/null +++ b/test/vllm/ec2/utils/run_vllm_benchmark_single_node.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +DLC_IMAGE=$1 +HF_TOKEN=$2 +MODEL_NAME=$3 + +# Run vLLM using Official Docker image from https://docs.vllm.ai/en/latest/deployment/docker.html +# Here is the https://github.com/vllm-project/vllm/blob/main/docker/Dockerfile +tmux new-session -d -s single_node "docker run --runtime nvidia --gpus all \ + -v /fsx/.cache/huggingface:/root/.cache/huggingface \ + -e "HUGGING_FACE_HUB_TOKEN=$HF_TOKEN" \ + -e "NCCL_DEBUG=TRACE" \ + -p 8000:8000 \ + --ipc=host \ + $DLC_IMAGE \ + --model $MODEL_NAME \ + --tensor-parallel-size 8" + +sleep 1500 + +source vllm_env/bin/activate + +# Example - Online Benchmark: https://github.com/vllm-project/vllm/tree/main/benchmarks#example---online-benchmark +python3 /fsx/vllm-dlc/vllm/benchmarks/benchmark_serving.py \ + --backend vllm \ + --model $MODEL_NAME \ + --endpoint /v1/completions \ + --dataset-name sharegpt \ + --dataset-path /fsx/vllm-dlc/ShareGPT_V3_unfiltered_cleaned_split.json \ + --num-prompts 1000 + +# ============ Serving Benchmark Result ============ +# Successful requests: 1000 +# Benchmark duration (s): 82.67 +# Total input tokens: 215196 +# Total generated tokens: 185671 +# Request throughput (req/s): 12.10 +# Output token throughput (tok/s): 2245.92 +# Total Token throughput (tok/s): 4848.99 +# ---------------Time to First Token---------------- +# Mean TTFT (ms): 25037.89 +# Median TTFT (ms): 22099.12 +# P99 TTFT (ms): 58100.87 +# -----Time per Output Token (excl. 1st token)------ +# Mean TPOT (ms): 98.10 +# Median TPOT (ms): 92.09 +# P99 TPOT (ms): 256.34 +# ---------------Inter-token Latency---------------- +# Mean ITL (ms): 84.56 +# Median ITL (ms): 63.78 +# P99 ITL (ms): 253.97 +# ================================================== diff --git a/test/vllm/ec2/utils/setup_fsx_vllm.sh b/test/vllm/ec2/utils/setup_fsx_vllm.sh new file mode 100644 index 000000000000..7b97cd16aeb0 --- /dev/null +++ b/test/vllm/ec2/utils/setup_fsx_vllm.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +# setup_fsx_vllm.sh +# Script to mount FSx and setup VLLM environment + + +# Get FSx DNS name from argument +FSX_DNS_NAME=$1 +MOUNT_NAME=$2 + +# Function to log messages with hostname +log() { + local HOSTNAME=$(hostname) + echo "[Host ${HOSTNAME}] $1" +} + + +# Function to check if command was successful +check_error() { + if [ $? -ne 0 ]; then + echo "Error: $1" + exit 1 + fi +} + +if [ -z "$FSX_DNS_NAME" ] || [ -z "$MOUNT_NAME" ]; then + echo "Usage: $0 " + exit 1 +fi + +# Install required packages +log "Installing required packages..." +sudo yum install -y nfs-utils git +check_error "Failed to install base packages" + + +# Install the latest Lustre client +log "Installing latest Lustre client..." +sudo yum install -y lustre-client +check_error "Failed to install Lustre client" + + +# Create FSx mount directory +log "Creating FSx mount directory..." +sudo mkdir -p /fsx +check_error "Failed to create /fsx directory" + + +# Modify mount command to include verbose output +sudo mount -t lustre -o relatime,flock ${FSX_DNS_NAME}@tcp:/${MOUNT_NAME} /fsx + +# Create VLLM directory in FSx +log "Creating VLLM directory..." +sudo mkdir -p /fsx/vllm-dlc + +check_error "Failed to create /fsx/vllm-dlc directory" + +# Set proper permissions +log "Setting proper permissions..." +sudo chown -R ec2-user:ec2-user /fsx/vllm-dlc +check_error "Failed to set permissions" + +cd /fsx/vllm-dlc +git clone https://github.com/vllm-project/vllm.git +cd vllm +git checkout tags/v0.10.0 + +# Download ShareGPT dataset +log "Downloading ShareGPT dataset..." +cd /fsx/vllm-dlc && wget -q https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json +check_error "Failed to download ShareGPT dataset" + +log "Setup completed successfully!" + + \ No newline at end of file diff --git a/test/vllm/ec2/utils/worker_node_setup.sh b/test/vllm/ec2/utils/worker_node_setup.sh new file mode 100644 index 000000000000..c6670e882fa8 --- /dev/null +++ b/test/vllm/ec2/utils/worker_node_setup.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Usage: ./worker_node_setup.sh +set -e + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" +} + +IMAGE_URI=$1 +HEAD_IP=$2 +WORKER_IP=$3 + +tmux new-session -d -s ray_worker "bash /fsx/vllm-dlc/vllm/examples/online_serving/run_cluster.sh \ + $IMAGE_URI \ + $HEAD_IP \ + --worker \ + /fsx/.cache/huggingface \ + -e VLLM_HOST_IP=$WORKER_IP \ + -e FI_PROVIDER=efa \ + -e FI_EFA_USE_DEVICE_RDMA=1 \ + --device=/dev/infiniband/ \ + --ulimit memlock=-1:-1" + +log "Worker node setup complete." \ No newline at end of file diff --git a/test/vllm/trigger_test.py b/test/vllm/trigger_test.py index fda05b1001f8..f73734ec5623 100644 --- a/test/vllm/trigger_test.py +++ b/test/vllm/trigger_test.py @@ -4,6 +4,8 @@ from test.test_utils import get_dlc_images from test.vllm.eks.eks_test import test_vllm_on_eks +from test.vllm.ec2.infra.setup_ec2 import setup +from test.vllm.ec2.test_artifacts.test_ec2 import test_vllm_on_ec2 LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -16,8 +18,14 @@ def run_platform_tests(platform: str, images: List[str], commit_id: str, ipv6_en """ LOGGER.info(f"Running {platform} tests") if platform == "ec2": - # Placeholder for EC2 tests - pass + try: + ec2_resources = setup() + print("Finished gathering resources required for VLLM EC2 Tests") + test_vllm_on_ec2(ec2_resources, images[0]) + LOGGER.info("EKS vLLM tests completed successfully") + except Exception as e: + LOGGER.error(f"EKS vLLM tests failed: {str(e)}") + raise elif platform == "eks": LOGGER.info("Running EKS tests") try: @@ -37,7 +45,8 @@ def test(): executor_mode = os.getenv("EXECUTOR_MODE", "False").lower() == "true" dlc_images = os.getenv("DLC_IMAGE") if executor_mode else get_dlc_images() - ipv6_enabled = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" + # ipv6_enabled = os.getenv("ENABLE_IPV6_TESTING", "false").lower() == "true" + ipv6_enabled = True os.environ["ENABLE_IPV6_TESTING"] = "true" if ipv6_enabled else "false" commit_id = os.getenv("CODEBUILD_RESOLVED_SOURCE_VERSION", default="unrecognised_commit_id") diff --git a/vllm/buildspec.yml b/vllm/buildspec.yml index 5c383faa329a..f1483ca331e0 100644 --- a/vllm/buildspec.yml +++ b/vllm/buildspec.yml @@ -49,4 +49,4 @@ images: test_platforms: - sanity - security - - eks \ No newline at end of file + - ec2