diff --git a/.gitignore b/.gitignore index 9b0eb57..ddf2ff5 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ backendbench.egg-info/ CLAUDE.md venv/ ops/ +datasets/ uv.lock diff --git a/BackendBench/data_loaders.py b/BackendBench/data_loaders.py new file mode 100644 index 0000000..fcf7bd5 --- /dev/null +++ b/BackendBench/data_loaders.py @@ -0,0 +1,229 @@ +""" +Shared data loading utilities for reading trace and parquet files. +""" + +import hashlib +import logging +import re +from pathlib import Path +from typing import Dict, List, Optional, Union + +import pyarrow.parquet as pq + +import requests +import torch +from BackendBench.utils import cleanup_memory_and_gpu, deserialize_args +from tqdm import tqdm + + +def _args_size(args): + """Calculate the size of arguments in bytes.""" + + size = 0 + for arg in args: + if isinstance(arg, torch.Tensor): + size += arg.numel() * arg.element_size() + elif isinstance(arg, (tuple, list)): + size += _args_size(arg) + return size + + +def _parse_trace_file(filename: str, filter: Optional[List[str]] = None) -> List[Dict]: + """ + Parse a single trace file and return a list of operation dictionaries. + + Args: + filename: Path to trace file + filter: Optional list of operation name filters + """ + op_inputs = [] + op = None + + with open(filename, "r") as f: + lines = list(f) + iterator = tqdm(lines, desc=f"Parsing {Path(filename).name}") + for line in iterator: + if m := re.match("Operator: (.*)", line): + op = m.group(1) + # this is due to a version skew error of the pytorch version we're + # using for developing BackendBench and what was used in tritonbench where + # SymInt didn't exist. + # @todo: see if we can remove this before releasing + if op == "aten.sum.SymInt": + op = "aten.sum.dim_IntList" + if m := re.match("cnt: \\d+, (.*)", line): + assert op is not None + args_str = m.group(1) + cnt = int(m.group(0).split(",")[0].split(":")[1]) + + if filter is None or any(f in op for f in filter): + args, kwargs = deserialize_args(args_str) + size = _args_size(args) + _args_size(list(kwargs.values())) + size = size / (1024 * 1024) # Convert to MB + is_synthetic = cnt == 0 + + op_inputs.append( + { + "uuid": hashlib.sha256(args_str.encode() + op.encode()).hexdigest(), + "op_name": op, + "args": args_str, + "arg_size": size, + "count": cnt, + "is_synthetic": is_synthetic, + } + ) + return op_inputs + + +def _parse_trace_stream( + stream, filter: Optional[List[str]] = None, desc: str = "Parsing stream" +) -> List[Dict]: + """ + Parse trace data from a text stream (e.g., from requests.Response.iter_lines()). + + Args: + stream: Iterable of lines (strings or bytes) + filter: Optional list of operation name filters + desc: Description for progress bar + """ + op_inputs = [] + op = None + + iterator = tqdm(stream, desc=desc) + + for line in iterator: + # Handle bytes from response stream + if isinstance(line, bytes): + line = line.decode("utf-8") + + if m := re.match("Operator: (.*)", line): + op = m.group(1) + if op == "aten.sum.SymInt": + op = "aten.sum.dim_IntList" + if m := re.match("cnt: \\d+, (.*)", line): + assert op is not None + args_str = m.group(1) + cnt = int(m.group(0).split(",")[0].split(":")[1]) + + if filter is None or any(f in op for f in filter): + args, kwargs = deserialize_args(args_str) + size = _args_size(args) + _args_size(list(kwargs.values())) + del args, kwargs + cleanup_memory_and_gpu() + size = size / (1024 * 1024) # Convert to MB + is_synthetic = cnt == 0 + + op_inputs.append( + { + "uuid": hashlib.sha256(args_str.encode() + op.encode()).hexdigest(), + "op_name": op, + "args": args_str, + "arg_size": size, + "count": cnt, + "is_synthetic": is_synthetic, + } + ) + return op_inputs + + +def load_ops_from_source( + source: Union[str, Path], + format: str = "auto", + filter: Optional[List[str]] = None, +) -> List[Dict]: + """ + Load operation data from various sources and formats. + + Args: + source: File path or URL + format: "trace", "parquet", or "auto" (detect from file extension) + filter: Optional list of operation name filters + + Returns: + List of dictionaries with detailed operation info + + Auto-detection behavior: + - https://domain.com/data.parquet → parquet format + - https://domain.com/data.txt → trace format + - https://domain.com/data → trace format (fallback) + - local_file.parquet → parquet format + - local_file.txt → trace format + """ + + # Auto-detect format if not specified + if format == "auto": + if isinstance(source, str): + # Check file extension first (works for both local files and URLs) + if source.endswith(".parquet"): + format = "parquet" + elif source.endswith(".txt"): + format = "trace" + elif source.startswith(("http://", "https://")): + # Remote URL without recognizable extension - default to trace + format = "trace" + else: + raise ValueError(f"Unsupported source: {source}") + else: + raise ValueError(f"Unsupported source: {source}") + + if format == "parquet": + return _load_from_parquet(source, filter) + elif format == "trace": + # Always load full data - consumers can extract what they need + return _load_from_trace(source, filter) + else: + raise ValueError(f"Unsupported format: {format}") + + +def _load_from_parquet(source: Union[str, Path], filter: Optional[List[str]]): + """Load operations from parquet file.""" + table = pq.read_table(source) + df = table.to_pandas() + + # Apply filter if provided + if filter: + mask = df["op_name"].apply(lambda op: any(f in op for f in filter)) + df = df[mask] + + return df.to_dict("records") + + +def op_list_to_benchmark_dict(ops_list: List[Dict]) -> Dict[str, List[str]]: + """ + Convert a list of operation dictionaries to a dictionary format which can be used for benchmarking. + + Args: + ops_list: List of dicts with 'op_name' and 'args' keys + + Returns: + Dictionary mapping op_name to list of args strings + """ + result = {} + for op_data in ops_list: + if not op_data["included_in_benchmark"]: + continue + op_name = op_data["op_name"] + args = op_data["args"] + if op_name not in result: + result[op_name] = [] + result[op_name].append(args) + return result + + +def _load_from_trace(source: Union[str, Path], filter: Optional[List[str]]) -> List[Dict]: + """Load operations from trace file(s) and return list of dicts.""" + op_inputs = [] + + # Handle URLs - stream directly without saving to disk + if isinstance(source, str) and (source.startswith("http://") or source.startswith("https://")): + logging.info(f"Downloading trace from {source}") + with requests.get(source, stream=True) as response: + response.raise_for_status() + desc = "Parsing" + op_inputs = _parse_trace_stream(response.iter_lines(), filter, desc) + + # Handle single files + else: + op_inputs = _parse_trace_file(source, filter) + + return op_inputs diff --git a/BackendBench/eval.py b/BackendBench/eval.py index d5b1d6f..58540b8 100644 --- a/BackendBench/eval.py +++ b/BackendBench/eval.py @@ -4,9 +4,7 @@ import triton.testing - -from BackendBench.utils import uses_cuda_stream -from BackendBench.utils import serialize_args +from BackendBench.utils import serialize_args, uses_cuda_stream logger = logging.getLogger(__name__) diff --git a/BackendBench/scripts/dataset_filters.py b/BackendBench/scripts/dataset_filters.py new file mode 100644 index 0000000..6c6c3b1 --- /dev/null +++ b/BackendBench/scripts/dataset_filters.py @@ -0,0 +1,30 @@ +# Operators to skip for indexing ops that need valid indices +SKIP_OPERATORS = [ + "embedding", + "scatter", + "gather", + "index", + "nll_loss", + "im2col_backward", + "col2im_backward", + "native_layer_norm_backward", + "upsample_nearest2d_backward.vec", + "upsample_bilinear2d_backward.vec", + "_cudnn_rnn_backward.default", # RuntimeError: cuDNN error: CUDNN_STATUS_BAD_PARAM + "_fft_c2c.default", # cuFFT only supports dimensions whose sizes are powers of two when computing in half precision +] + + +def apply_skip_ops_filter(ops): + for op in ops: + if any(skip_op in op["op_name"] for skip_op in SKIP_OPERATORS): + op["included_in_benchmark"] = False + op["why_excluded"].append("We cannot run this op on backendbench yet") + op["runnable"] = False + + if op["is_synthetic"]: + op["included_in_benchmark"] = False + op["why_excluded"].append( + "Synthetic ops are not supported in the official benchmark yet" + ) + return ops diff --git a/BackendBench/scripts/get_big_inputs.py b/BackendBench/scripts/get_big_inputs.py index 7a3e2e3..c6ac897 100644 --- a/BackendBench/scripts/get_big_inputs.py +++ b/BackendBench/scripts/get_big_inputs.py @@ -1,5 +1,4 @@ import argparse -import gc import logging import os import tempfile @@ -20,6 +19,7 @@ ) from main import setup_logging from tqdm import tqdm +from BackendBench.utils import cleanup_memory_and_gpu # Magic numbers and constants MAX_ITERATIONS = 100 # Maximum binary search iterations to prevent infinite loops @@ -44,16 +44,6 @@ log = logging.getLogger(__name__) -def cleanup_memory_and_gpu(*variables): - """Helper function to delete variables and clean up GPU memory""" - for var in variables: - if var is not None: - del var - torch.cuda.synchronize() - torch.cuda.empty_cache() - gc.collect() - - def scale_shape(shape: List[int], scale_factor: float) -> List[int]: """Scale tensor shape by a factor""" return [max(MIN_TENSOR_DIM, int(dim * scale_factor)) for dim in shape] diff --git a/BackendBench/scripts/main.py b/BackendBench/scripts/main.py index 64a0d41..54677a9 100644 --- a/BackendBench/scripts/main.py +++ b/BackendBench/scripts/main.py @@ -7,6 +7,7 @@ import BackendBench.eval as eval import click import torch + from BackendBench.facto_suite import FactoTestSuite from BackendBench.llm_client import ClaudeKernelGenerator, LLMKernelGenerator from BackendBench.opinfo_suite import OpInfoTestSuite diff --git a/BackendBench/scripts/parquet_trace_converter.py b/BackendBench/scripts/parquet_trace_converter.py new file mode 100644 index 0000000..011037a --- /dev/null +++ b/BackendBench/scripts/parquet_trace_converter.py @@ -0,0 +1,226 @@ +# utility functions to convert parquet and trace files back and forth + +import hashlib +import logging +import os +from pathlib import Path +from typing import List + +import click +import pyarrow as pa +import pyarrow.parquet as pq +from BackendBench.data_loaders import _load_from_trace +from BackendBench.scripts.dataset_filters import apply_skip_ops_filter +from BackendBench.torchbench_suite import DEFAULT_HUGGINGFACE_URL +from huggingface_hub import HfApi + + +""" +Columns for the parquet dataset: +- uuid (int) (hash of op + args) +- op_name (string) +- args (string) +- arg_size (float) (in MB) +- count (int) (number of times this op + set of args was called in real models) +- is_synthetic (boolean) (did we generate this op or is it from a real model) +- included_in_benchmark (boolean) +- why_excluded (list of strings) (empty if included) +- runtime_ms (float) (timings on H100 gpu) +- runnable (bool) (does this op + test work) [we may remove this column later after we solve for special ops] +- in_models (string) (which models did we include this op in) [@TODO add this] +""" + +logger = logging.getLogger(__name__) + + +def _upload_to_hf(file_path: str) -> None: + """Upload file to GPUMODE/huggingface_op_trace.""" + try: + api = HfApi() + api.upload_file( + path_or_fileobj=file_path, + path_in_repo=Path(file_path).name, + repo_id="GPUMODE/huggingface_op_trace", + repo_type="dataset", + ) + logger.info(f"Uploaded {Path(file_path).name} to Hugging Face") + except Exception as e: + logger.warning(f"Failed to upload {Path(file_path).name}: {e}") + + +def setup_logging(log_level): + """Configure logging with the specified level.""" + numeric_level = getattr(logging, log_level.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"Invalid log level: {log_level}") + + logging.basicConfig( + level=numeric_level, + format="[%(asctime)s][%(levelname)s][%(filename)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + +def _load_trace_for_parquet_conversion(source: str) -> List[dict]: + """ + Load operations from trace file(s) with detailed metadata for parquet conversion. + """ + # Use the shared _load_from_trace for parquet conversion + return _load_from_trace(source, filter=None) + + +def convert_trace_to_parquet(trace_file, parquet_file): + """ + Convert a trace file to a parquet file + """ + + # Load operations using local trace parsing function + ops = _load_trace_for_parquet_conversion(trace_file) + + # Add additional metadata fields required for the parquet format + for op in ops: + op["uuid"] = hashlib.sha256(op["args"].encode() + op["op_name"].encode()).hexdigest() + op["included_in_benchmark"] = True + op["why_excluded"] = [] + op["runtime_ms"] = "" + op["runnable"] = True + + # apply filters + ops = apply_skip_ops_filter(ops) + + # Create parquet table with all metadata (formerly "dev" version) + table = pa.Table.from_pylist(ops) + + # Write parquet file + pq.write_table(table, parquet_file) + + logger.info(f"Wrote {len(ops)} ops and inputs to {parquet_file}") + + # Log column information for verification + logger.debug(f"Parquet columns: {table.column_names}") + + +def convert_parquet_to_trace(parquet_file, trace_file): + """ + Convert a parquet file to a trace file + """ + table = pq.read_table(parquet_file) + op_inputs = {} + + for row in table.to_pylist(): + formatted_entry = f"cnt: {row['count']}, {row['args']}" + + if row["op_name"] not in op_inputs: + op_inputs[row["op_name"]] = [] + op_inputs[row["op_name"]].append(formatted_entry) + + # write to trace file + with open(trace_file, "w") as f: + for op, args in op_inputs.items(): + f.write(f"Operator: {op}\n") + for arg in args: + f.write(f"{arg}\n") + total_args = sum(len(op_inputs[op]) for op in op_inputs) + logging.info(f"Wrote {total_args} ops and inputs to {trace_file}") + + +def _validate_parquet_name(parquet_name: str) -> str: + """Validate parquet filename. URLs allowed only for inputs.""" + # URLs are allowed only if this is an input file + if parquet_name.startswith(("http://", "https://")): + raise click.BadParameter("Output parquet file cannot be a URL") + + if not parquet_name.endswith(".parquet"): + raise click.BadParameter("Parquet file must end with .parquet suffix") + + # Ensure local files are in datasets directory + if not parquet_name.startswith("datasets/"): + parquet_name = os.path.join("datasets", parquet_name) + + return parquet_name + + +def _validate_trace_file(trace_file: str, is_input: bool = True) -> str: + """Validate trace file. URLs allowed only for inputs.""" + # URLs are allowed only if this is an input file + if trace_file.startswith(("http://", "https://")): + if is_input: + return trace_file + else: + raise click.BadParameter("Output trace file cannot be a URL") + + # For local files, check extension + if not (trace_file.endswith(".txt") or Path(trace_file).is_dir()): + raise click.BadParameter("Local trace file must end with .txt or be a directory") + + if Path(trace_file).is_dir() and not is_input: + raise click.BadParameter("Output trace file cannot be a directory") + + return trace_file + + +@click.command() +@click.option( + "--log-level", + default=os.getenv("LOG_LEVEL", "INFO"), + type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False), + help="Set the logging level", +) +@click.option( + "--mode", + default="trace-to-parquet", + type=click.Choice(["trace-to-parquet", "parquet-to-trace"]), + help="Conversion mode", +) +@click.option( + "--trace-file", + default=DEFAULT_HUGGINGFACE_URL, + type=str, + help="Input trace file: URL (for downloads), local .txt file, or directory. Output trace files cannot be URLs", +) +@click.option( + "--parquet-name", + default="backend_bench_problems.parquet", + type=str, + help="Parquet filename: URL allowed as input in parquet-to-trace mode, local files in datasets/.", +) +@click.option( + "--upload-to-hf", + is_flag=True, + default=False, + help="Upload generated parquet files to Hugging Face (GPUMODE/huggingface_op_trace) in trace-to-parquet mode", +) +def main(log_level, mode, trace_file, parquet_name, upload_to_hf): + """Convert trace files to parquet format or vice versa.""" + setup_logging(log_level) + + # Create datasets directory + os.makedirs("datasets", exist_ok=True) + + if mode == "trace-to-parquet": + # Validate inputs/outputs + trace_file = _validate_trace_file(trace_file, is_input=True) # Input: URLs allowed + parquet_name = _validate_parquet_name(parquet_name) # Output: URLs not allowed + + logger.info(f"Converting trace file {trace_file} to parquet file {parquet_name}") + + convert_trace_to_parquet(trace_file, parquet_name) + logger.info("Conversion completed successfully") + + if upload_to_hf: + # Upload to Hugging Face + _upload_to_hf(os.path.abspath(parquet_name)) + + elif mode == "parquet-to-trace": + # Validate parquet input (URLs allowed for input in this mode) + parquet_input = _validate_parquet_name(parquet_name) + # Validate trace output (URLs not allowed for output) + trace_output = _validate_trace_file(trace_file, is_input=False) # Output: URLs not allowed + + logger.info(f"Converting parquet file {parquet_input} to trace file {trace_output}") + convert_parquet_to_trace(parquet_input, trace_output) + logger.info("Conversion completed successfully") + + +if __name__ == "__main__": + main() diff --git a/BackendBench/torchbench_suite.py b/BackendBench/torchbench_suite.py index 42ccf98..01288e9 100644 --- a/BackendBench/torchbench_suite.py +++ b/BackendBench/torchbench_suite.py @@ -1,35 +1,19 @@ """ -Load aten inputs from serialized txt files. +Load aten inputs from serialized txt files and parquet files. """ -import re -import tempfile -from collections import defaultdict -from pathlib import Path - -import requests -import torch +import torch # noqa: F401 +from BackendBench.data_loaders import ( + _args_size, + load_ops_from_source, + op_list_to_benchmark_dict, +) +from BackendBench.scripts.dataset_filters import SKIP_OPERATORS from BackendBench.utils import deserialize_args -# the schema for this dataset is the one defined in tritonbench traces. -# ie. https://github.com/pytorch-labs/tritonbench/blob/main/tritonbench/data/input_configs/hf_train/AlbertForMaskedLM_training.txt -DEFAULT_HUGGINGFACE_URL = "https://huggingface.co/datasets/GPUMODE/huggingface_op_trace/raw/main/augmented_hf_op_traces.txt" - -# Operators to skip for indexing ops that need valid indices -SKIP_OPERATORS = [ - "embedding", - "scatter", - "gather", - "index", - "nll_loss", - "im2col_backward", - "col2im_backward", - "native_layer_norm_backward", - "upsample_nearest2d_backward.vec", - "upsample_bilinear2d_backward.vec", - "_cudnn_rnn_backward.default", # RuntimeError: cuDNN error: CUDNN_STATUS_BAD_PARAM - "_fft_c2c.default", # cuFFT only supports dimensions whose sizes are powers of two when computing in half precision -] +# for details on the dataset read this: +# https://huggingface.co/datasets/GPUMODE/huggingface_op_trace +DEFAULT_HUGGINGFACE_URL = "https://huggingface.co/datasets/GPUMODE/huggingface_op_trace/resolve/main/backend_bench_problems.parquet" class TorchBenchTest: @@ -38,16 +22,6 @@ def __init__(self, *args, **kwargs): self.kwargs = kwargs -def _args_size(args): - size = 0 - for arg in args: - if isinstance(arg, torch.Tensor): - size += arg.numel() * arg.element_size() - elif isinstance(arg, (tuple, list)): - size += _args_size(arg) - return size - - class TorchBenchOpTest: def __init__(self, op, inputs, topn): self.op = eval(f"torch.ops.{op}") @@ -76,51 +50,25 @@ def performance_tests(self): yield TorchBenchTest(*args, **kwargs) -def _parse_inputs(filename, filter, op_inputs): - op = None - - with open(filename, "r") as f: - for line in f: - if m := re.match("Operator: (.*)", line): - op = m.group(1) - if op == "aten.sum.SymInt": - op = "aten.sum.dim_IntList" - if m := re.match("cnt: \\d+, (.*)", line): - assert op is not None - args = m.group(1) - if filter is None or any(f in op for f in filter): - op_inputs[op].append(args) - return op_inputs - - class TorchBenchTestSuite: def __init__(self, name, filename=None, filter=None, topn=None): self.name = name self.topn = topn - self.optests = defaultdict(list) # Use default URL if no filename provided if filename is None: filename = DEFAULT_HUGGINGFACE_URL - # Check if filename is a URL - if isinstance(filename, str) and ( - filename.startswith("http://") or filename.startswith("https://") - ): - with ( - tempfile.NamedTemporaryFile(mode="w+", suffix=".txt", delete=False) as tmp_file, - requests.get(filename) as response, - ): - response.raise_for_status() - tmp_file.write(response.text) - tmp_file.flush() - _parse_inputs(tmp_file.name, filter, self.optests) - Path(tmp_file.name).unlink(missing_ok=True) - elif Path(filename).is_dir(): - for file_path in Path(filename).glob("**/*.txt"): - _parse_inputs(str(file_path), filter, self.optests) - else: - _parse_inputs(filename, filter, self.optests) + # Load operations using the shared data loader + ops_list = load_ops_from_source( + source=filename, + format="auto", # Auto-detect based on file extension + filter=filter, + ) + + # Convert to dictionary format using utility function + self.optests = op_list_to_benchmark_dict(ops_list) + # Deduplicate the strings in self.optests for op in self.optests: self.optests[op] = list(set(self.optests[op])) diff --git a/BackendBench/utils.py b/BackendBench/utils.py index 600934f..f5ab709 100644 --- a/BackendBench/utils.py +++ b/BackendBench/utils.py @@ -1,8 +1,10 @@ import ast +import gc import inspect +import math import re import textwrap -import math + import torch from torch.testing import make_tensor @@ -152,4 +154,17 @@ def deserialize_args(inps): # f strings introduce quotations we dont want for key in dtype_abbrs_parsing: inps = inps.replace(f"'{key}'", key) + + # Handle torch.device strings - replace "torch.device(...)" with torch.device(...) + # This regex finds patterns like "torch.device('cpu')" or 'torch.device("cuda:0")' + pattern = r'["\']torch\.device\((.*?)\)["\']' + inps = re.sub(pattern, r"torch.device(\1)", inps) + return eval(inps.strip().strip("'").strip('"'), global_vals) + + +def cleanup_memory_and_gpu(): + """Helper function to clean up GPU memory""" + gc.collect() + torch.cuda.synchronize() + torch.cuda.empty_cache() diff --git a/pyproject.toml b/pyproject.toml index 457bb53..5ecfc77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "anthropic>=0.34.0", "pytest", "requests", + "huggingface_hub", ] [project.optional-dependencies] @@ -43,12 +44,13 @@ packages = ["BackendBench"] [tool.uv] dev-dependencies = [ "pytest", - "pytest-cov", + "pytest-cov", "pytest-mock", "pytest-timeout", "ruff==0.12.1", "torch", "numpy", + "pyarrow", # cupy-cuda12x is platform specific, install manually if needed ]