diff --git a/aiu_fms_testing_utils/scripts/README.md b/aiu_fms_testing_utils/scripts/README.md index e2432bf1..c196aae5 100644 --- a/aiu_fms_testing_utils/scripts/README.md +++ b/aiu_fms_testing_utils/scripts/README.md @@ -1,6 +1,6 @@ # Scripts for using Foundation Model Stack (FMS) on AIU hardware -The scripts provided here allow you to run FMS on AIU device for a variety of models. +The scripts provided here allow you to run FMS on AIU device for a variety of models. Let's look at some of the example usage below. @@ -76,3 +76,35 @@ python3 scripts/validation.py --architecture=hf_configured --model_path=/home/de To run a logits-based validation, pass `--validation_level=1` to the validation script. This will check for the logits output to match at every step of the model through cross-entropy loss. You can control the acceptable threshold with `--logits_loss_threshold`. +## Setup the environment for reporting resource usage + +When running `drive_paged_programs.py` you may want to see how much CPU and memory usage is +happening. This is done using Prometheus, thus if you are running in a container environment (non-OpenShift), you want to set up a simple Prometheus server to start collecting these metrics. To do this, do the following: + +1. Run `podman network create promnet` +2. Run `podman run -d --name node-exporter --network promnet quay.io/prometheus/node-exporter:latest` +3. Create a file called `prometheus.yml` that has the following contents: + +```yaml +global: +  scrape_interval: 5s + +scrape_configs: +  - job_name: "node" +    static_configs: +      - targets: ["node-exporter:9100"] +``` + +4. Run `podman run -d --name prometheus --network promnet -p 9091:9090   -v "$PWD/prometheus.yml:/etc/prometheus/prometheus.yml:Z"   quay.io/prometheus/prometheus:latest   --config.file=/etc/prometheus/prometheus.yml` +5. Check the status of the server by running `curl -s "http://localhost:9091/api/v1/targets" | python3 -m json.tool | grep health` and ensuring that "health" says "up". +6. When you are about to run DPP, run `export PROMETHEUS_URL="http://localhost:9091"` + +If you are running in OpenShift, the aformentioned instructions are not neccessary and instead, you are going to want to set `PROMETHEUS_URL` to an OpenShift route that already has Prometheus set up. Additionally, you are going to want to set `PROMETHEUS_API_KEY` to your OpenShift OAuth token if the Prometheus instance on the cluster is protected. You can get this token by running `oc whoami -t`. + +When actually running a DPP test, you are going to want to set the `--report_resource_utilization` flag to see outputs. Regardless if you have this flag set or if you do not have Prometheus installed or any of the environment variables set, DPP should always run. These instructions are simply just to see resource utilization outputs. + +Sample test to run with resource utilization outputs: + +```bash +torchrun --nproc-per-node=4 aiu-fms-testing-utils/scripts/drive_paged_programs.py --model_variant=/ibm-granite/granite-3.3-8b-instruct --program_criteria_json_path=path/to/program_criterion.json --dataset_type=sharegpt --skip_validation --programs "*:0,<8192" --prioritize_large_batch_sizes --enforce_homogeneous_prompt_programs --prefill_chunk_size=1024 --dataset_path=ShareGPT_V3_unfiltered_cleaned_split.json --report_resource_utilization +``` diff --git a/aiu_fms_testing_utils/scripts/drive_paged_programs.py b/aiu_fms_testing_utils/scripts/drive_paged_programs.py index 0e343aeb..eade65b0 100644 --- a/aiu_fms_testing_utils/scripts/drive_paged_programs.py +++ b/aiu_fms_testing_utils/scripts/drive_paged_programs.py @@ -1,6 +1,6 @@ import argparse from dataclasses import dataclass -import datetime +from datetime import datetime import itertools import json import os @@ -44,6 +44,10 @@ get_programs_prompts, ) from aiu_fms_testing_utils.testing.utils import format_kwargs_to_string +from aiu_fms_testing_utils.utils.resource_collection import ( + instantiate_prometheus, + print_step, +) # Constants PAD_MULTIPLE = 64 @@ -276,6 +280,11 @@ def parse_cli_args() -> argparse.Namespace: action="store_true", help="set to true ensure that all prompts hit the same prompt program for a given test", ) + parser.add_argument( + "--report_resource_utilization", + action="store_true", + help="set to true to report CPU/memory utilization during compilation and inference stages", + ) return parser.parse_args() @@ -1251,6 +1260,8 @@ def generate_validation_info_and_test( timing: str, prefill_chunk_size: int, model_variant: str, + print_utilization: bool = False, + profile: Optional[Any] = None, ) -> list[Any]: """Generates tokens using AIU and CPU models and validates the results. @@ -1271,8 +1282,12 @@ def generate_validation_info_and_test( f"program id: {valid_prompt.program_id}, valid prompt: {valid_prompt.shape}, input shape: {valid_prompt.input_ids.shape}" ) + # Start inference if not skip_validation: # Generate or load CPU validation info + cpu_metric_start = print_step( + profile, print_utilization, "started", "CPU Inference" + ) cpu_validation_info = generate_cpu_validation( model_variant=model_variant, max_new_tokens=max_new_tokens, @@ -1287,7 +1302,18 @@ def generate_validation_info_and_test( cpu_dtype=env_config.cpu_dtype, tokenizer=tokenizer, ) + print_step( + profile, + print_utilization, + "completed", + "CPU Inference", + cpu_metric_start, + ) + # Generate AIU validation info + aiu_metric_start = print_step( + profile, print_utilization, "started", "AIU Inference" + ) aiu_validation_info = generate_aiu_validation( test_type=test_type, max_new_tokens=max_new_tokens, @@ -1298,6 +1324,13 @@ def generate_validation_info_and_test( cpu_validation_info=cpu_validation_info, extra_kwargs=valid_prompt.extra_kwargs, ) + print_step( + profile, + print_utilization, + "completed", + "AIU Inference", + aiu_metric_start, + ) if test_type == "metrics": failure_rate = evaluate_cross_entropy_metrics( @@ -1325,6 +1358,10 @@ def generate_validation_info_and_test( else: raise ValueError("test type must be one of metrics or tokens") else: + # Generate AIU validation info + aiu_metric_start = print_step( + profile, print_utilization, "started", "AIU Inference" + ) aiu_validation_info = generate_aiu_validation( test_type=test_type, max_new_tokens=max_new_tokens, @@ -1335,6 +1372,13 @@ def generate_validation_info_and_test( cpu_validation_info=None, extra_kwargs=valid_prompt.extra_kwargs, ) + print_step( + profile, + print_utilization, + "completed", + "AIU Inference", + aiu_metric_start, + ) if local_rank == 0: for sentence_idx, test_sentence in enumerate( @@ -1392,6 +1436,9 @@ def main() -> None: tokenizer=tokenizer, ) + # Instantiate the Prometheus client for resource metric collection + p = instantiate_prometheus(args.report_resource_utilization) + # Model Loading model_kwargs: Dict[str, Any] = _get_model_kwargs(model_variant=args.model_variant) distributed_kwargs: Dict[str, Any] = _get_distributed_kwargs( @@ -1448,6 +1495,8 @@ def main() -> None: compile_dynamic_sendnn=True, stagger_update_lazyhandle=args.stagger_update_lazyhandle, prefill_chunk_size=args.prefill_chunk_size, + print_utilization=args.report_resource_utilization, + profile=p, **extra_kwargs, ) if args.distributed: @@ -1490,6 +1539,8 @@ def main() -> None: timing=args.timing, prefill_chunk_size=args.prefill_chunk_size, model_variant=args.model_variant, + print_utilization=args.report_resource_utilization, + profile=p, ) if not args.skip_validation and local_rank == 0: diff --git a/aiu_fms_testing_utils/utils/__init__.py b/aiu_fms_testing_utils/utils/__init__.py index 1bbb82ec..23e1f95e 100644 --- a/aiu_fms_testing_utils/utils/__init__.py +++ b/aiu_fms_testing_utils/utils/__init__.py @@ -1,5 +1,5 @@ # Standard -from typing import Optional, List, Tuple +from typing import Optional, List, Tuple, Any import json import os import random @@ -12,7 +12,7 @@ from aiu_fms_testing_utils.utils.aiu_setup import dprint, rank, world_size from transformers.tokenization_utils_base import PreTrainedTokenizerBase from aiu_fms_testing_utils.testing.utils import format_kwargs_to_string - +from aiu_fms_testing_utils.utils.resource_collection import print_step from fms.utils.generation import pad_input_ids import torch import torch.nn as nn @@ -55,6 +55,8 @@ def warmup_model( use_cache: bool = True, stagger_update_lazyhandle: int = 0, prefill_chunk_size: int = 0, + print_utilization: bool = False, + profile: Optional[Any] = None, **extra_kwargs, ): import torch_sendnn @@ -72,9 +74,13 @@ def warmup_model( attention_specific_kwargs["contiguous_cache"] = True attention_specific_kwargs["max_seq_len"] = input_ids.shape[1] + max_new_tokens + # Start the warmup dprint("AIU warmup") pt_compile_model_time = time.time() + ## Report on initial resource usage + metric_start = print_step(profile, print_utilization, "started", "Compilation") + # adjust inputs depending on attn_type and dynamic shapes _warmup_input_ids = input_ids _extra_kwargs = extra_kwargs @@ -103,6 +109,9 @@ def warmup_model( **attention_specific_kwargs, ) pt_compile_model_time = time.time() - pt_compile_model_time + + # Get completed metric read + print_step(profile, print_utilization, "completed", "Compilation", metric_start) dprint(f"PT compile complete, took {pt_compile_model_time:.3f}s") diff --git a/aiu_fms_testing_utils/utils/resource_collection.py b/aiu_fms_testing_utils/utils/resource_collection.py new file mode 100644 index 00000000..74dfc4e5 --- /dev/null +++ b/aiu_fms_testing_utils/utils/resource_collection.py @@ -0,0 +1,258 @@ +# Imports +import os +from datetime import datetime, timezone +import subprocess + +from aiu_fms_testing_utils.utils.aiu_setup import dprint + + +def install_prometheus(): + """ + Top-level method that will install Prometheus if needed + """ + + # See if it is installed + run = subprocess.run( + ["pip", "show", "prometheus_api_client"], capture_output=True, check=False + ) + + # Install if needed + if run.returncode != 0: + print("prometheus_api_client not found, installing...") + subprocess.run(["pip", "install", "prometheus_api_client"], check=True) + + +def instantiate_prometheus(report_utilization): + """ + Top-level method that will instantiate the Prometheus Client to collect + resource usage metrics. + + Returns: + - client: the instantiated Prometheus client. + """ + + client = None + if report_utilization: + # Install and import Prometheus if needed + install_prometheus() + from prometheus_api_client import PrometheusConnect + + try: + # Get required env variables + connection_url = os.environ["PROMETHEUS_URL"] + api_token = os.environ.get("PROMETHEUS_API_KEY") + + # Define necessary headers + request_headers = ( + {"Authorization": f"Bearer {api_token}"} if api_token else None + ) + + client = PrometheusConnect( + url=connection_url, headers=request_headers, disable_ssl=True + ) + + except Exception as e: + print( + f"WARNING: Cannot instantiate Prometheus. Make sure PROMETHEUS_URL and PROMETHEUS_API_KEY are set in your environment if you are trying to collect resource metrics. Error: {e}" + ) + + return client + + +def get_value(given_res, query_type="static"): + """ + Helper method to get the given value from a Prometheus response + + Args: + - given_res: The response object obtained from the Prometheus client that has our value. + - query_type: The type of query we are processing, "static" or "range" + + Returns: + - value: the value for the given resource metric we want to report that was obtained from + the response, represented as a float if present, otherwise None. + """ + + # Iterate through to save our output to a list + values = [] + value = None + if query_type == "static": ## For start/end reads + for series in given_res or []: + try: + values.append(float(series["value"][1])) + except Exception: + pass + value = values[0] if values else None + + else: ## For peak reads + for series in given_res or []: + for timestamp, val in series.get("values", []): + try: + values.append(float(val)) + except Exception: + pass + value = max(values) if values else None + + return value + + +def get_static_read(client, recorded_time): + """ + Top-level method that will get a read on CPU and Memory usage give a single + moment in time. + + Args: + - client: the Prometheus client to use to get our metrics. + - recorded_time: the time that we want to get the metric read at. + + Returns: + - cpu_value: this is the reported value for percentage of CPU usage at the given + recorded time. + - mem_value: this is the reported value for memory usage at the given + recorded time in gigabytes. + """ + + cpu_value = None + mem_value = None + if client is not None: + try: + # Make the request for CPU and Mem + cpu_query = '100 * (1 - avg(rate(node_cpu_seconds_total{mode="idle"}[2m])))' + mem_query = "(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / 1024 / 1024 / 1024" + cpu_response = client.custom_query( + query=cpu_query, params={"time": recorded_time.timestamp()} + ) + mem_response = client.custom_query( + query=mem_query, params={"time": recorded_time.timestamp()} + ) + + ## Get the CPU & Mem metrics out of the response + cpu_value = get_value(cpu_response) + mem_value = get_value(mem_response) + + except Exception as e: + print( + f"WARNING: Failed to retrieve utilization values. Ensure PROMETHEUS_API_KEY is set. Error: {e}" + ) + + return cpu_value, mem_value + + +def get_peak_read(client, start, end): + """ + Top-level method that will get the peak resource usage during a given interval. + + Args: + - client: the Prometheus client to use to get our metrics. + - start: the recorded start time for the interval. + - end: the recorded end time for the interval. + + Returns: + - peak_cpu_value: this is the peak reported value for percentage of CPU usage over the + given interval. + - peak_mem_value: this is the peak reported value for memory usage over the given interval + in gigabytes. + + """ + + peak_cpu_value = None + peak_mem_value = None + if client is not None: + try: + # Make the request for CPU and Mem + cpu_query = '100 * (1 - avg(rate(node_cpu_seconds_total{mode="idle"}[2m])))' + mem_query = "(node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / 1024 / 1024 / 1024" + cpu_response = client.custom_query_range( + query=cpu_query, start_time=start, end_time=end, step="3s" + ) + mem_response = client.custom_query_range( + query=mem_query, start_time=start, end_time=end, step="3s" + ) + + ## Get the CPU & Mem metrics out of the response + peak_cpu_value = get_value(cpu_response, "range") + peak_mem_value = get_value(mem_response, "range") + + except Exception as e: + print( + f"WARNING: Failed to retrieve utilization values. Ensure PROMETHEUS_API_KEY is set. Error: {e}" + ) + + return peak_cpu_value, peak_mem_value + + +def timestamp_print(given_string): + """ + Helper method that will add a timestamp before the given string that needs to be + printed. + + Args: + - given_string: the string that is to be printed with the timestamp. + """ + + timestamp = datetime.now().strftime("%Y-%m-%d:%H:%M:%S") + print(f"[{timestamp}] {given_string}") + + +def print_comp_resource_metrics(cpu_val, mem_val, stage, step, print_utilization): + """ + Helper method that will do a timestamp print for a specific step to report resource + usage. + + Args: + - cpu_val: the value for CPU usage as a percentage that we want to print. + - mem_val: the value for memory usage in gigabytes we want to print. + - stage: The stage of the step we are in, either "peak" or "started". + - step: The step that we performing in the script, either "compilation" or "inference". + - print_utilization: a boolean denoting if we want to print resource utilization metrics. + """ + + if stage != "peak": + if not print_utilization or (cpu_val is None or mem_val is None): + timestamp_print(f"{step} {stage}") + else: + timestamp_print( + f"{step} {stage} - CPU: {cpu_val:.2f}%, Memory: {mem_val:.2f} GB" + ) + + elif print_utilization and (cpu_val is not None and mem_val is not None): + dprint( + f"Peak Resource Utilization - CPU: {cpu_val:.2f}%, Memory: {mem_val:.2f} GB" + ) + + +def print_step(p, report_utilization, step, stage, start_time=None): + """ + Print function to print out when a specific stage starts and ends, + as well as reporting resource usage if enabled. + + Args: + - p: the Prometheus profile client to resource utilization collection. + - report_utilization: a boolean denoting if we want to print resource utilization metrics. + - step: string denoting what step we are at ("inference" or "compilation"). + - stage: string denoting what stage of the step we are at ("started" or "completed"). + - start_time: datetime object that denotes when the step started (optional). + + Returns: + - recorded_time: the time that was recorded when getting a metric read. Returned for + scenarios where we need to use the recorded time in a later step (i.e completed stages). + """ + + ## Get metric read + recorded_time = datetime.now(timezone.utc) + cpu_usage, mem_usage = get_static_read(p, recorded_time) + print_comp_resource_metrics(cpu_usage, mem_usage, step, stage, report_utilization) + + ## Get and print the peak usage + if start_time is not None: + peak_cpu_inference_cpu, peak_mem_inference_cpu = get_peak_read( + p, start_time, recorded_time + ) + print_comp_resource_metrics( + peak_cpu_inference_cpu, + peak_mem_inference_cpu, + "peak", + stage, + report_utilization, + ) + + return recorded_time