diff --git a/.github/workflows/auto-update-dev.yml b/.github/workflows/auto-update-dev.yml index 58298ec86a..697fdf8080 100644 --- a/.github/workflows/auto-update-dev.yml +++ b/.github/workflows/auto-update-dev.yml @@ -3,7 +3,7 @@ name: Auto-Update Dev Branches from Master on: push: branches: - - master # Trigger workflow on commits to 'dev' branch + - master # Trigger workflow on commits to 'master' branch jobs: update-dev: diff --git a/.github/workflows/build_wheels.yml b/.github/workflows/build_wheels.yml index 3004ecc2d5..f7c0b1d5ce 100644 --- a/.github/workflows/build_wheels.yml +++ b/.github/workflows/build_wheels.yml @@ -1,5 +1,4 @@ name: Build loadgen wheels and release them into PYPI - on: release: types: [published] diff --git a/.github/workflows/test-bert.yml b/.github/workflows/test-bert.yml index 27fcee0d3f..2a8b609c4c 100755 --- a/.github/workflows/test-bert.yml +++ b/.github/workflows/test-bert.yml @@ -5,7 +5,7 @@ name: Test for MLPerf inference bert submission generation using MLC script auto on: pull_request: - branches: [ "master", "dev" ] + branches: [ "master_off", "dev_off" ] paths: - language/bert/** - tools/submission/** diff --git a/automotive/3d-object-detection/README.md b/automotive/3d-object-detection/README.md index e1190e8132..d0430d444c 100644 --- a/automotive/3d-object-detection/README.md +++ b/automotive/3d-object-detection/README.md @@ -101,3 +101,7 @@ Please click [here](https://github.com/mlcommons/inference/blob/master/automotiv ``` python accuracy_waymo.py --mlperf-accuracy-file /mlperf_log_accuracy.json --waymo-dir /waymo/kitti_format/ ``` + +## Automated command for submission generation via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/submission/) for an automated way to generate submission through MLCFlow. \ No newline at end of file diff --git a/graph/R-GAT/README.md b/graph/R-GAT/README.md index 1380cb047d..d9f5fafa44 100644 --- a/graph/R-GAT/README.md +++ b/graph/R-GAT/README.md @@ -181,6 +181,10 @@ mlcr process,mlperf,accuracy,_igbh --result_dir= -j +``` + +**Onnx Framework** + +``` +mlcr get,ml-model,bert-large,_onnx --outdirname= -j +``` + +**TensorFlow Framework** + +``` +mlcr get,ml-model,bert-large,_tensorflow --outdirname= -j +``` + +### Download dataset through MLCFlow Automation + +**Validation** +``` +mlcr get,dataset,squad,validation --outdirname= -j +``` + +**Calibration** +``` +mlcr get,dataset,squad,_calib1 --outdirname= -j +``` + ## Commands Please run the following commands: @@ -45,6 +83,17 @@ Please run the following commands: - The script [tf_freeze_bert.py] freezes the TensorFlow model into pb file. - The script [bert_tf_to_pytorch.py] converts the TensorFlow model into the PyTorch `BertForQuestionAnswering` module in [HuggingFace Transformers](https://github.com/huggingface/transformers) and also exports the model to [ONNX](https://github.com/onnx/onnx) format. +### Evaluate the accuracy through MLCFlow Automation +```bash +mlcr process,mlperf,accuracy,_squad --result_dir= +``` + +Please click [here](https://github.com/mlcommons/inference/blob/master/language/bert/accuracy-squad.py) to view the Python script for evaluating accuracy for the squad dataset. + +## Automated command for submission generation via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/submission/) for an automated way to generate submission through MLCFlow. + ## Loadgen over the Network ``` diff --git a/language/deepseek-r1/backends/__init__.py b/language/deepseek-r1/backends/__init__.py index 61ad96a3f2..865ed3bd53 100644 --- a/language/deepseek-r1/backends/__init__.py +++ b/language/deepseek-r1/backends/__init__.py @@ -11,4 +11,4 @@ # to avoid dependency issues when only using certain backends __all__ = [ 'BaseBackend', -] \ No newline at end of file +] diff --git a/language/deepseek-r1/backends/pytorch_backend.py b/language/deepseek-r1/backends/pytorch_backend.py index c1e426185d..0742882bca 100644 --- a/language/deepseek-r1/backends/pytorch_backend.py +++ b/language/deepseek-r1/backends/pytorch_backend.py @@ -1,3 +1,17 @@ +from utils.validation import require_initialized, BackendNotInitializedError +from utils.backend_registry import get_backend_config +from .utils import get_cache_directory +from .base_backend import BaseBackend +from transformers import AutoTokenizer +import torch.distributed as dist +import torch +from pathlib import Path +import asyncio +from typing import Any, Dict, List, Optional +import logging +import json +from ref_dsinfer.inference.model import Transformer, ModelArgs +from safetensors.torch import load_model import os import sys @@ -6,23 +20,6 @@ 'REF_DSINFER_PATH', '/opt/ref_dsinfer/inference') sys.path.append(ref_dsinfer_path) -from safetensors.torch import load_model -from ref_dsinfer.inference.model import Transformer, ModelArgs -import json -import logging -from typing import Any, Dict, List, Optional -import asyncio -from pathlib import Path - -import torch -import torch.distributed as dist -from transformers import AutoTokenizer - -from .base_backend import BaseBackend -from .utils import get_cache_directory -from utils.backend_registry import get_backend_config -from utils.validation import require_initialized, BackendNotInitializedError - logger = logging.getLogger(__name__) @@ -115,8 +112,10 @@ def initialize(self) -> None: with torch.device(self.config['device']): self.model = Transformer(self.model_args) - # Load tokenizer (only rank 0 needs it for MLPerf, but all ranks need it for run_eval_mpi) - self.tokenizer = AutoTokenizer.from_pretrained(str(self.model_path), revision=self.config['model_revision']) + # Load tokenizer (only rank 0 needs it for MLPerf, but all ranks need + # it for run_eval_mpi) + self.tokenizer = AutoTokenizer.from_pretrained( + str(self.model_path), revision=self.config['model_revision']) # Load model weights checkpoint_file = self.model_path / \ @@ -133,7 +132,8 @@ def sample(self, logits: torch.Tensor, temperature: float) -> torch.Tensor: """Sample from logits with temperature.""" logits = logits / max(temperature, 1e-5) probs = torch.softmax(logits, dim=-1) - return probs.div_(torch.empty_like(probs).exponential_(1)).argmax(dim=-1) + return probs.div_(torch.empty_like( + probs).exponential_(1)).argmax(dim=-1) @torch.inference_mode() def _generate_batch( @@ -222,7 +222,8 @@ def _generate_batch( return completion_tokens @require_initialized - def generate(self, tokenized_prompts: List[List[int]], **kwargs) -> List[Dict[str, Any]]: + def generate( + self, tokenized_prompts: List[List[int]], **kwargs) -> List[Dict[str, Any]]: """ Generate responses for a list of pre-tokenized prompts. @@ -265,7 +266,8 @@ def generate(self, tokenized_prompts: List[List[int]], **kwargs) -> List[Dict[st return results @require_initialized - def generate_batch_distributed(self, batch_tokens: List[List[int]]) -> List[List[int]]: + def generate_batch_distributed( + self, batch_tokens: List[List[int]]) -> List[List[int]]: """ Generate tokens for a batch in distributed mode. @@ -296,7 +298,8 @@ def generate_batch_distributed(self, batch_tokens: List[List[int]]) -> List[List return [] @require_initialized - def generate_async(self, tokenized_prompts: List[List[int]], **kwargs) -> List[asyncio.Future]: + def generate_async( + self, tokenized_prompts: List[List[int]], **kwargs) -> List[asyncio.Future]: """ Generate responses asynchronously. @@ -331,7 +334,8 @@ async def extract_result(idx): return futures @require_initialized - def generate_batch_distributed_async(self, batch_tokens: List[List[int]]) -> asyncio.Future: + def generate_batch_distributed_async( + self, batch_tokens: List[List[int]]) -> asyncio.Future: """ Generate tokens for a batch in distributed mode asynchronously. diff --git a/language/deepseek-r1/backends/sglang_backend.py b/language/deepseek-r1/backends/sglang_backend.py index 06cf074a96..10be6e1dcd 100644 --- a/language/deepseek-r1/backends/sglang_backend.py +++ b/language/deepseek-r1/backends/sglang_backend.py @@ -66,11 +66,12 @@ def __init__(self, config: Dict[str, Any] = None): # Log monitoring self._log_monitor = None - + # Shared semaphore for async concurrency control self._async_semaphore = None - # Configure logging to suppress httpx INFO logs (only show warnings/errors) + # Configure logging to suppress httpx INFO logs (only show + # warnings/errors) import logging logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openai").setLevel(logging.WARNING) @@ -128,7 +129,8 @@ def _build_server_command(self) -> List[str]: cmd.append('flashinfer') if self.config['enable_dp_attention']: - cmd.extend(['--enable-dp-attention', '--dp', str(self.config['dp'])]) + cmd.extend(['--enable-dp-attention', + '--dp', str(self.config['dp'])]) # Add performance settings cmd.extend([ @@ -175,7 +177,8 @@ def _wait_for_server_ready(self, timeout: int = None) -> bool: # Update progress indicator every 0.5 seconds if time.time() - last_progress_update >= 0.5: last_progress_update = time.time() - progress_idx = (progress_idx + 1) % len(TerminalDisplay.PROGRESS_CHARS) + progress_idx = ( + progress_idx + 1) % len(TerminalDisplay.PROGRESS_CHARS) minutes = elapsed // 60 seconds = elapsed % 60 # Use carriage return to stay on the same line @@ -192,7 +195,8 @@ def _wait_for_server_ready(self, timeout: int = None) -> bool: if response.status_code == 200: # Health check passed, now try a warmup query print(f"\r{' '*80}\r", end='', flush=True) - print(f"\n[SGLANG] Health check passed, running warmup query...") + print( + f"\n[SGLANG] Health check passed, running warmup query...") # Try to send a simple warmup query using OpenAI client try: @@ -210,7 +214,8 @@ def _wait_for_server_ready(self, timeout: int = None) -> bool: # Send a simple warmup request warmup_response = warmup_client.chat.completions.create( model=self.config['served_model_name'], - messages=[{"role": "user", "content": "Hello"}], + messages=[ + {"role": "user", "content": "Hello"}], temperature=0.0, max_tokens=10, seed=self.config['seed'] @@ -218,23 +223,28 @@ def _wait_for_server_ready(self, timeout: int = None) -> bool: # Check if we got a valid response if warmup_response.choices[0].message.content: - print(f"[SGLANG] ✓ Warmup query successful! Response: {warmup_response.choices[0].message.content[:50]}...") + print( + f"[SGLANG] ✓ Warmup query successful! Response: {warmup_response.choices[0].message.content[:50]}...") # Stop log monitoring if self._log_monitor: self._log_monitor.stop() self._log_monitor = None - print(f"\n[SGLANG] " + "="*60) - print(f"[SGLANG] ✓ SERVER READY! (startup took {elapsed}s)") - print(f"[SGLANG] " + "="*60) + print(f"\n[SGLANG] " + "=" * 60) + print( + f"[SGLANG] ✓ SERVER READY! (startup took {elapsed}s)") + print(f"[SGLANG] " + "=" * 60) return True else: - print(f"[SGLANG] Warmup query returned empty response, retrying...") + print( + f"[SGLANG] Warmup query returned empty response, retrying...") except Exception as warmup_error: - print(f"[SGLANG] Warmup query failed: {warmup_error}, retrying...") - # Continue waiting, the server might not be fully ready yet + print( + f"[SGLANG] Warmup query failed: {warmup_error}, retrying...") + # Continue waiting, the server might not be fully + # ready yet except requests.exceptions.RequestException: pass @@ -246,9 +256,11 @@ def _wait_for_server_ready(self, timeout: int = None) -> bool: self._log_monitor = None # Clear progress line print(f"\r{' '*80}\r", end='', flush=True) - print(f"\n[SGLANG] ✗ Server process died with exit code: {self.server_process.returncode}") + print( + f"\n[SGLANG] ✗ Server process died with exit code: {self.server_process.returncode}") if self.server_log_file: - print(f"[SGLANG] Check server logs at: {self.server_log_file}") + print( + f"[SGLANG] Check server logs at: {self.server_log_file}") return False time.sleep(0.1) # Check every 100ms for smoother progress @@ -264,17 +276,21 @@ def _wait_for_server_ready(self, timeout: int = None) -> bool: def _start_server(self) -> None: """Start the SGLang server as a subprocess.""" - print(f"\n[SGLANG] Starting SGLang server for {self.config['model']}...") + print( + f"\n[SGLANG] Starting SGLang server for {self.config['model']}...") print(f"[SGLANG] Configuration:") print(f"[SGLANG] - Port: {self.port}") - print(f"[SGLANG] - Tensor Parallel: {self.config['tensor_parallel_size']}") - print(f"[SGLANG] - Context Length: {self.config['context_length']:,} tokens") + print( + f"[SGLANG] - Tensor Parallel: {self.config['tensor_parallel_size']}") + print( + f"[SGLANG] - Context Length: {self.config['context_length']:,} tokens") print(f"[SGLANG] - dtype: {self.config['dtype']}") # Create log file for server output log_dir = Path("/work/logs") log_dir.mkdir(exist_ok=True) - self.server_log_file = log_dir / f"sglang_server_{self.port}_{int(time.time())}.log" + self.server_log_file = log_dir / \ + f"sglang_server_{self.port}_{int(time.time())}.log" cmd = self._build_server_command() print(f"\n[SGLANG] Command: {' '.join(cmd)}") @@ -315,7 +331,10 @@ def _stop_server(self) -> None: except subprocess.TimeoutExpired: # Force kill if not stopped print("[SGLANG] Server didn't stop gracefully, forcing...") - os.killpg(os.getpgid(self.server_process.pid), signal.SIGKILL) + os.killpg( + os.getpgid( + self.server_process.pid), + signal.SIGKILL) self.server_process.wait() print("[SGLANG] Server force stopped") except ProcessLookupError: @@ -332,7 +351,8 @@ def initialize(self) -> None: try: # Load tokenizer for string conversion print(f"[SGLANG] Loading tokenizer: {self.config['tokenizer']}...") - self.tokenizer = AutoTokenizer.from_pretrained(self.config['tokenizer'], revision=self.config['model_revision']) + self.tokenizer = AutoTokenizer.from_pretrained( + self.config['tokenizer'], revision=self.config['model_revision']) # Start SGLang server (with log monitoring) self._start_server() @@ -341,7 +361,8 @@ def initialize(self) -> None: base_url = f"http://localhost:{self.port}/v1" api_key = self.config['api_key'] or "dummy-key" - print(f"[SGLANG] Creating OpenAI clients with base URL: {base_url}") + print( + f"[SGLANG] Creating OpenAI clients with base URL: {base_url}") # Configure timeout settings timeout_config = httpx.Timeout( @@ -371,10 +392,12 @@ def initialize(self) -> None: ) print(f"[SGLANG] Created asynchronous OpenAI client") - + # Create shared semaphore for async concurrency control - self._async_semaphore = asyncio.Semaphore(self.config['max_running_requests']) - print(f"[SGLANG] Created async semaphore with limit: {self.config['max_running_requests']}") + self._async_semaphore = asyncio.Semaphore( + self.config['max_running_requests']) + print( + f"[SGLANG] Created async semaphore with limit: {self.config['max_running_requests']}") # Server readiness was already verified by health endpoint in _wait_for_server_ready() # No need to check models endpoint @@ -403,17 +426,18 @@ def initialize(self) -> None: raise @require_initialized - def generate(self, - tokenized_prompts: Optional[List[List[int]]] = None, - text_prompts: Optional[List[str]] = None, - **kwargs) -> List[Dict[str, Any]]: + def generate(self, + tokenized_prompts: Optional[List[List[int]]] = None, + text_prompts: Optional[List[str]] = None, + **kwargs) -> List[Dict[str, Any]]: """Generate responses synchronously.""" # Check if server process is still alive self._check_server_alive() # Check if client is properly initialized if self.client is None: - raise RuntimeError("SGLang client is not initialized. Server may have failed to start.") + raise RuntimeError( + "SGLang client is not initialized. Server may have failed to start.") # Validate prompts using centralized validation validate_prompts_input( @@ -436,7 +460,8 @@ def generate(self, results = [] # Process prompts with progress bar - for prompt in tqdm(prompt_strings, desc="SGLang sync inference", unit="prompt"): + for prompt in tqdm( + prompt_strings, desc="SGLang sync inference", unit="prompt"): try: completion = self.client.chat.completions.create( model=self.config['served_model_name'], @@ -452,7 +477,8 @@ def generate(self, # Validate response is not empty if not generated_text: - raise RuntimeError(f"Empty response received from SGLang server for prompt: {prompt[:100]}...") + raise RuntimeError( + f"Empty response received from SGLang server for prompt: {prompt[:100]}...") # Tokenize the output to get token IDs tokens = self.tokenizer.encode(generated_text) @@ -464,15 +490,18 @@ def generate(self, except Exception as e: print(f"\nError generating completion: {e}") - raise RuntimeError(f"SGLang backend failed to generate tokens for prompt: {prompt[:100]}...") + raise RuntimeError( + f"SGLang backend failed to generate tokens for prompt: {prompt[:100]}...") return results - async def _async_generate_single(self, prompt: str, idx: int, semaphore: asyncio.Semaphore) -> Tuple[int, Dict[str, Any]]: + async def _async_generate_single( + self, prompt: str, idx: int, semaphore: asyncio.Semaphore) -> Tuple[int, Dict[str, Any]]: """Generate a single response asynchronously with semaphore control.""" # Check if async client is properly initialized if self.async_client is None: - raise RuntimeError(f"SGLang async client is not initialized for prompt {idx}") + raise RuntimeError( + f"SGLang async client is not initialized for prompt {idx}") async with semaphore: try: @@ -490,7 +519,8 @@ async def _async_generate_single(self, prompt: str, idx: int, semaphore: asyncio # Validate response is not empty if not generated_text: - raise RuntimeError(f"Empty response received from SGLang server for prompt: {prompt[:100]}...") + raise RuntimeError( + f"Empty response received from SGLang server for prompt: {prompt[:100]}...") # Tokenize the output to get token IDs tokens = self.tokenizer.encode(generated_text) @@ -499,20 +529,22 @@ async def _async_generate_single(self, prompt: str, idx: int, semaphore: asyncio except Exception as e: print(f"\nError generating completion for prompt {idx}: {e}") - raise RuntimeError(f"SGLang backend failed to generate tokens for prompt {idx}: {e}") + raise RuntimeError( + f"SGLang backend failed to generate tokens for prompt {idx}: {e}") @require_initialized - def generate_async(self, - tokenized_prompts: Optional[List[List[int]]] = None, - text_prompts: Optional[List[str]] = None, - **kwargs) -> List[asyncio.Future]: + def generate_async(self, + tokenized_prompts: Optional[List[List[int]]] = None, + text_prompts: Optional[List[str]] = None, + **kwargs) -> List[asyncio.Future]: """Generate responses asynchronously using shared semaphore.""" # Check if server process is still alive self._check_server_alive() # Check if client is properly initialized if self.async_client is None: - raise RuntimeError("SGLang async client is not initialized. Server may have failed to start.") + raise RuntimeError( + "SGLang async client is not initialized. Server may have failed to start.") # Validate prompts using centralized validation validate_prompts_input( @@ -542,44 +574,49 @@ def generate_async(self, futures = [] for idx, prompt in enumerate(prompt_strings): # Create a task for each prompt using the shared semaphore - task = asyncio.create_task(self._async_generate_single(prompt, idx, self._async_semaphore)) - + task = asyncio.create_task( + self._async_generate_single( + prompt, idx, self._async_semaphore)) + # Create a future that will hold the result future = asyncio.Future() - + # Setup callback to extract just the result (not the index) def make_callback(future_obj, expected_idx): def callback(task_obj): try: idx, result = task_obj.result() if idx != expected_idx: - future_obj.set_exception(Exception(f"Index mismatch: expected {expected_idx}, got {idx}")) + future_obj.set_exception( + Exception(f"Index mismatch: expected {expected_idx}, got {idx}")) else: future_obj.set_result(result) except Exception as e: future_obj.set_exception(e) return callback - + task.add_done_callback(make_callback(future, idx)) futures.append(future) return futures - async def generate_stream(self, - tokenized_prompts: Optional[List[List[int]]] = None, - text_prompts: Optional[List[str]] = None, - **kwargs) -> List[AsyncIterator[StreamingChunk]]: + async def generate_stream(self, + tokenized_prompts: Optional[List[List[int]]] = None, + text_prompts: Optional[List[str]] = None, + **kwargs) -> List[AsyncIterator[StreamingChunk]]: """Generate responses for a list of prompts with streaming.""" if not self.is_initialized: - raise RuntimeError("Backend not initialized. Call initialize() first.") - + raise RuntimeError( + "Backend not initialized. Call initialize() first.") + # Check if server process is still alive self._check_server_alive() - + # Check if async client is properly initialized if self.async_client is None: - raise RuntimeError("SGLang async client is not initialized. Server may have failed to start.") - + raise RuntimeError( + "SGLang async client is not initialized. Server may have failed to start.") + # Validate prompts validate_prompts_input( backend_name='sglang', @@ -587,7 +624,7 @@ async def generate_stream(self, text_prompts=text_prompts, input_type='text' ) - + # SGLang prefers text prompts if text_prompts is None: # Convert tokenized prompts to strings @@ -597,8 +634,9 @@ async def generate_stream(self, ] else: prompt_strings = text_prompts - - async def stream_single_prompt(prompt: str) -> AsyncIterator[StreamingChunk]: + + async def stream_single_prompt( + prompt: str) -> AsyncIterator[StreamingChunk]: try: stream = await self.async_client.chat.completions.create( model=self.config['served_model_name'], @@ -609,14 +647,14 @@ async def stream_single_prompt(prompt: str) -> AsyncIterator[StreamingChunk]: seed=self.config.get('seed'), stream=True ) - + async for chunk in stream: if not chunk.choices: continue - + delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason - + if delta.content: yield StreamingChunk( token=delta.content, @@ -635,7 +673,7 @@ async def stream_single_prompt(prompt: str) -> AsyncIterator[StreamingChunk]: except Exception as e: print(f"[SGLANG] Streaming error for prompt: {e}") raise - + return [stream_single_prompt(prompt) for prompt in prompt_strings] def shutdown(self) -> None: @@ -650,7 +688,7 @@ def shutdown(self) -> None: # Close clients self.client = None self.async_client = None - + # Clear async semaphore self._async_semaphore = None @@ -665,4 +703,4 @@ def shutdown(self) -> None: torch.cuda.empty_cache() self.is_initialized = False - print("[SGLANG] Backend shutdown complete") \ No newline at end of file + print("[SGLANG] Backend shutdown complete") diff --git a/language/deepseek-r1/backends/utils.py b/language/deepseek-r1/backends/utils.py index 0e4c7732da..ebd6ce3719 100644 --- a/language/deepseek-r1/backends/utils.py +++ b/language/deepseek-r1/backends/utils.py @@ -19,50 +19,50 @@ def get_cache_directory() -> Path: """ Get the cache directory at /raid/data/$USER/.cache - + Returns: Path: The cache directory path """ # Get the current user user = os.environ.get('USER', os.environ.get('USERNAME', 'unknown')) - + # Use /raid/data/$USER/.cache cache_dir = Path(f'/raid/data/{user}/.cache') - + # Create the cache directory if it doesn't exist cache_dir.mkdir(parents=True, exist_ok=True) - + return cache_dir def setup_huggingface_cache() -> Path: """ Set up HuggingFace cache environment variables using the preferred cache directory. - + Returns: Path: The cache directory being used """ cache_dir = get_cache_directory() - + # Set HuggingFace cache environment variables os.environ['HF_HOME'] = str(cache_dir) os.environ['HF_HUB_CACHE'] = str(cache_dir) os.environ['HUGGINGFACE_HUB_CACHE'] = str(cache_dir) - + return cache_dir def find_free_port(start_port: int = 30000, max_attempts: int = 100) -> int: """ Find a free port starting from start_port. - + Args: start_port: The port number to start searching from max_attempts: Maximum number of ports to try - + Returns: int: A free port number - + Raises: RuntimeError: If no free port is found after max_attempts """ @@ -75,13 +75,14 @@ def find_free_port(start_port: int = 30000, max_attempts: int = 100) -> int: return port except OSError: continue - raise RuntimeError(f"Could not find free port after {max_attempts} attempts starting from {start_port}") + raise RuntimeError( + f"Could not find free port after {max_attempts} attempts starting from {start_port}") def set_all_seeds(seed: int = 42) -> None: """ Set seeds for all random number generators for reproducibility. - + Args: seed: The seed value to use """ @@ -96,73 +97,76 @@ def set_all_seeds(seed: int = 42) -> None: set_seed(seed) -def validate_prompts(tokenized_prompts: Optional[list] = None, - text_prompts: Optional[list] = None, - backend_type: str = "") -> None: +def validate_prompts(tokenized_prompts: Optional[list] = None, + text_prompts: Optional[list] = None, + backend_type: str = "") -> None: """ Validate that at least one type of prompts is provided. - + Args: tokenized_prompts: List of tokenized prompts text_prompts: List of text prompts backend_type: Name of the backend for error messages - + Raises: ValueError: If neither prompt type is provided """ if tokenized_prompts is None and text_prompts is None: - raise ValueError(f"{backend_type + ' backend' if backend_type else 'Backend'} requires either text_prompts or tokenized_prompts") + raise ValueError( + f"{backend_type + ' backend' if backend_type else 'Backend'} requires either text_prompts or tokenized_prompts") # Terminal display utilities class TerminalDisplay: """ANSI escape codes and utilities for terminal display formatting.""" - + # ANSI escape codes for cursor control CLEAR_SCREEN = "\033[2J" MOVE_CURSOR_UP = "\033[{}A" CLEAR_LINE = "\033[K" SAVE_CURSOR = "\033[s" RESTORE_CURSOR = "\033[u" - + # Progress spinner characters PROGRESS_CHARS = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] - + @staticmethod def clear_lines(num_lines: int) -> None: """Clear the specified number of lines above the cursor.""" - print(TerminalDisplay.MOVE_CURSOR_UP.format(num_lines), end='', flush=True) + print(TerminalDisplay.MOVE_CURSOR_UP.format( + num_lines), end='', flush=True) for _ in range(num_lines): print(TerminalDisplay.CLEAR_LINE) - print(TerminalDisplay.MOVE_CURSOR_UP.format(num_lines), end='', flush=True) - + print(TerminalDisplay.MOVE_CURSOR_UP.format( + num_lines), end='', flush=True) + @staticmethod def save_cursor_position() -> None: """Save the current cursor position.""" print(TerminalDisplay.SAVE_CURSOR, end='', flush=True) - + @staticmethod def restore_cursor_position() -> None: """Restore the previously saved cursor position.""" print(TerminalDisplay.RESTORE_CURSOR, end='', flush=True) - + @staticmethod def clear_current_line() -> None: """Clear the current line.""" print("\r" + " " * 80 + "\r", end='', flush=True) - + @staticmethod def truncate_line(line: str, max_length: int = 110) -> str: """Truncate a line to fit within the specified length.""" if len(line) <= max_length: return line - return line[:max_length - 3] + "..." + return line[:max_length - 3] + "..." class LogMonitor: """Real-time log file monitor with terminal display.""" - - def __init__(self, + + def __init__(self, log_file_path: Union[str, Path], prefix: str = "LOG", max_lines: int = 5, @@ -170,7 +174,7 @@ def __init__(self, header_text: Optional[str] = None): """ Initialize the log monitor. - + Args: log_file_path: Path to the log file to monitor prefix: Prefix for display lines (e.g., "[SGLANG]") @@ -183,42 +187,43 @@ def __init__(self, self.max_lines = max_lines self.display_interval = display_interval self.header_text = header_text or f"Server startup logs (last {max_lines} lines):" - + # Threading control self._monitor_thread = None self._stop_event = None self._ready_event = None - + # Display dimensions self.total_lines = max_lines + 3 # 2 header lines + 1 blank separator - - def start(self, wait_for_file: bool = True, file_wait_timeout: float = 30.0) -> bool: + + def start(self, wait_for_file: bool = True, + file_wait_timeout: float = 30.0) -> bool: """ Start the log monitor in a background thread. - + Args: wait_for_file: Whether to wait for the log file to exist file_wait_timeout: How long to wait for the file (seconds) - + Returns: bool: True if monitor started successfully """ if self._monitor_thread is not None: return True # Already running - + self._stop_event = threading.Event() self._ready_event = threading.Event() - + self._monitor_thread = threading.Thread( target=self._monitor_loop, args=(wait_for_file, file_wait_timeout), daemon=True ) self._monitor_thread.start() - + # Wait for the monitor to set up its display area return self._ready_event.wait(timeout=2.0) - + def stop(self) -> None: """Stop the log monitor and clean up display.""" if self._stop_event and self._monitor_thread: @@ -227,36 +232,39 @@ def stop(self) -> None: self._monitor_thread = None self._stop_event = None self._ready_event = None - - def _monitor_loop(self, wait_for_file: bool, file_wait_timeout: float) -> None: + + def _monitor_loop(self, wait_for_file: bool, + file_wait_timeout: float) -> None: """Main monitoring loop that runs in a separate thread.""" # Wait for log file if requested if wait_for_file: start_time = time.time() while not self.log_file_path.exists(): if time.time() - start_time > file_wait_timeout: - print(f"[{self.prefix}] Warning: Log file not found after {file_wait_timeout}s: {self.log_file_path}") + print( + f"[{self.prefix}] Warning: Log file not found after {file_wait_timeout}s: {self.log_file_path}") self._ready_event.set() return time.sleep(0.5) elif not self.log_file_path.exists(): - print(f"[{self.prefix}] Warning: Log file not found: {self.log_file_path}") + print( + f"[{self.prefix}] Warning: Log file not found: {self.log_file_path}") self._ready_event.set() return - + print(f"\n[{self.prefix}] Monitoring logs: {self.log_file_path.name}") - print(f"[{self.prefix}] " + "="*60) - + print(f"[{self.prefix}] " + "=" * 60) + # Initialize display area self._setup_display_area() - + # Signal that we're ready self._ready_event.set() - + # Buffer for log lines line_buffer = [] last_display_time = 0 - + try: # Use tail -f to follow the log file process = subprocess.Popen( @@ -267,11 +275,11 @@ def _monitor_loop(self, wait_for_file: bool, file_wait_timeout: float) -> None: bufsize=1, universal_newlines=True ) - + while not self._stop_event.is_set(): if process.poll() is not None: break - + # Read available lines without blocking line_added = False try: @@ -285,7 +293,7 @@ def _monitor_loop(self, wait_for_file: bool, file_wait_timeout: float) -> None: line_added = True else: break - except: + except BaseException: # Fallback for systems without select line = process.stdout.readline() if line: @@ -293,65 +301,69 @@ def _monitor_loop(self, wait_for_file: bool, file_wait_timeout: float) -> None: if len(line_buffer) > self.max_lines: line_buffer.pop(0) line_added = True - + # Update display if needed current_time = time.time() - if line_added or (current_time - last_display_time >= self.display_interval): + if line_added or ( + current_time - last_display_time >= self.display_interval): last_display_time = current_time self._update_display(line_buffer) - + time.sleep(0.1) - + # Clean up process.terminate() try: process.wait(timeout=2) except subprocess.TimeoutExpired: process.kill() - + except Exception as e: print(f"\n[{self.prefix}] Log monitor error: {e}") finally: self._cleanup_display() - + def _setup_display_area(self) -> None: """Reserve and initialize the display area.""" # Reserve space for _ in range(self.total_lines): print() - + # Move back up to start of reserved area - print(TerminalDisplay.MOVE_CURSOR_UP.format(self.total_lines), end='', flush=True) - + print(TerminalDisplay.MOVE_CURSOR_UP.format( + self.total_lines), end='', flush=True) + # Print initial display print(f"\r[{self.prefix}] {self.header_text}", end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - print(f"\r[{self.prefix}] " + "-"*60, end='') + print(f"\r[{self.prefix}] " + "-" * 60, end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - + # Print empty lines for _ in range(self.max_lines): print(f"\r[{self.prefix}] ", end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - + # Print separator print(f"\r", end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - + def _update_display(self, line_buffer: list) -> None: """Update the display with current log lines.""" # Save cursor position print(TerminalDisplay.SAVE_CURSOR, end='', flush=True) - - # Move to start of reserved area (cursor is on progress line, 1 below our area) - print(TerminalDisplay.MOVE_CURSOR_UP.format(self.total_lines + 1), end='', flush=True) - + + # Move to start of reserved area (cursor is on progress line, 1 below + # our area) + print(TerminalDisplay.MOVE_CURSOR_UP.format( + self.total_lines + 1), end='', flush=True) + # Print header print(f"\r[{self.prefix}] {self.header_text}", end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - print(f"\r[{self.prefix}] " + "-"*60, end='') + print(f"\r[{self.prefix}] " + "-" * 60, end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - + # Print log lines for i in range(self.max_lines): if i < len(line_buffer): @@ -360,22 +372,23 @@ def _update_display(self, line_buffer: list) -> None: else: print(f"\r[{self.prefix}] ", end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - + # Print separator print(f"\r", end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - + # Restore cursor position print(TerminalDisplay.RESTORE_CURSOR, end='', flush=True) - + def _cleanup_display(self) -> None: """Clean up the display area on exit.""" print(TerminalDisplay.SAVE_CURSOR, end='', flush=True) - print(TerminalDisplay.MOVE_CURSOR_UP.format(self.total_lines + 1), end='', flush=True) - + print(TerminalDisplay.MOVE_CURSOR_UP.format( + self.total_lines + 1), end='', flush=True) + # Clear all reserved lines for _ in range(self.total_lines): print(f"\r", end='') print(TerminalDisplay.CLEAR_LINE, flush=True) - - print(TerminalDisplay.RESTORE_CURSOR, end='', flush=True) \ No newline at end of file + + print(TerminalDisplay.RESTORE_CURSOR, end='', flush=True) diff --git a/language/deepseek-r1/backends/vllm_backend.py b/language/deepseek-r1/backends/vllm_backend.py index 4ac408042f..ec49227f41 100644 --- a/language/deepseek-r1/backends/vllm_backend.py +++ b/language/deepseek-r1/backends/vllm_backend.py @@ -148,9 +148,9 @@ def initialize(self) -> None: @require_initialized def generate(self, - tokenized_prompts: Optional[List[List[int]]] = None, - text_prompts: Optional[List[str]] = None, - **kwargs) -> List[Dict[str, Any]]: + tokenized_prompts: Optional[List[List[int]]] = None, + text_prompts: Optional[List[str]] = None, + **kwargs) -> List[Dict[str, Any]]: """Generate responses synchronously using LLM.generate(). Note: vLLM backend only accepts text_prompts parameter. @@ -177,11 +177,14 @@ def generate(self, if not completion.text: # Get the corresponding prompt for context prompt_idx = outputs.index(output) - prompt_preview = text_prompts[prompt_idx][:100] if len(text_prompts[prompt_idx]) > 100 else text_prompts[prompt_idx] - raise RuntimeError(f"Empty response received from vLLM for prompt: {prompt_preview}...") + prompt_preview = text_prompts[prompt_idx][:100] if len( + text_prompts[prompt_idx]) > 100 else text_prompts[prompt_idx] + raise RuntimeError( + f"Empty response received from vLLM for prompt: {prompt_preview}...") results.append({ - 'tokens': list(completion.token_ids), # Convert tuple to list for .copy() compatibility + # Convert tuple to list for .copy() compatibility + 'tokens': list(completion.token_ids), 'text': completion.text, 'finish_reason': completion.finish_reason }) @@ -190,9 +193,9 @@ def generate(self, @require_initialized def generate_async(self, - tokenized_prompts: Optional[List[List[int]]] = None, - text_prompts: Optional[List[str]] = None, - **kwargs) -> List[asyncio.Future]: + tokenized_prompts: Optional[List[List[int]]] = None, + text_prompts: Optional[List[str]] = None, + **kwargs) -> List[asyncio.Future]: """Generate responses asynchronously, returning futures immediately. Note: vLLM backend only accepts text_prompts parameter. @@ -245,11 +248,14 @@ def _generate_batch(self, text_prompts: List[str]) -> List[Dict[str, Any]]: if not completion.text: # Get the corresponding prompt for context prompt_idx = outputs.index(output) - prompt_preview = text_prompts[prompt_idx][:100] if len(text_prompts[prompt_idx]) > 100 else text_prompts[prompt_idx] - raise RuntimeError(f"Empty response received from vLLM for prompt: {prompt_preview}...") + prompt_preview = text_prompts[prompt_idx][:100] if len( + text_prompts[prompt_idx]) > 100 else text_prompts[prompt_idx] + raise RuntimeError( + f"Empty response received from vLLM for prompt: {prompt_preview}...") results.append({ - 'tokens': list(completion.token_ids), # Convert tuple to list for .copy() compatibility + # Convert tuple to list for .copy() compatibility + 'tokens': list(completion.token_ids), 'text': completion.text, 'finish_reason': completion.finish_reason }) @@ -265,7 +271,8 @@ def shutdown(self) -> None: # Access internal executor to ensure proper cleanup if self.llm.llm_engine is not None: try: - # This helps cleanup vLLM's internal Ray/multiprocessing resources + # This helps cleanup vLLM's internal Ray/multiprocessing + # resources del self.llm.llm_engine.model_executor except Exception as e: print(f"Warning: Failed to cleanup model executor: {e}") diff --git a/language/deepseek-r1/eval_accuracy.py b/language/deepseek-r1/eval_accuracy.py index 55647b7ca5..a0b546b600 100644 --- a/language/deepseek-r1/eval_accuracy.py +++ b/language/deepseek-r1/eval_accuracy.py @@ -45,11 +45,11 @@ # ============================================================================= def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], - dataset_file: Union[str, Path], - checkpoint_path: str, - dtype: str = "int32", - output_dir: Optional[Union[str, Path]] = None, - base_filename: Optional[str] = None) -> Tuple[pd.DataFrame, str]: + dataset_file: Union[str, Path], + checkpoint_path: str, + dtype: str = "int32", + output_dir: Optional[Union[str, Path]] = None, + base_filename: Optional[str] = None) -> Tuple[pd.DataFrame, str]: """Process MLPerf log accuracy file and evaluate results. Args: @@ -68,7 +68,8 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], dataset_file = Path(dataset_file) if not mlperf_log_file.exists(): - raise FileNotFoundError(f"MLPerf log file not found: {mlperf_log_file}") + raise FileNotFoundError( + f"MLPerf log file not found: {mlperf_log_file}") if not dataset_file.exists(): raise FileNotFoundError(f"Dataset file not found: {dataset_file}") @@ -86,7 +87,8 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], ) logger.info("Tokenizer loaded successfully") except Exception as e: - raise RuntimeError(f"Failed to load tokenizer from {checkpoint_path}: {e}") + raise RuntimeError( + f"Failed to load tokenizer from {checkpoint_path}: {e}") # Load ground truth dataset try: @@ -99,14 +101,20 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], elif 'ground_truth' in dataset_df.columns: ground_truths = dataset_df['ground_truth'].tolist() else: - raise ValueError("Dataset must contain 'gt_output' or 'ground_truth' column") + raise ValueError( + "Dataset must contain 'gt_output' or 'ground_truth' column") # Get other required columns with fallbacks if 'dataset' in dataset_df.columns: datasets = dataset_df['dataset'].tolist() elif 'metric' in dataset_df.columns: # Infer dataset from metric names - datasets = [metric.replace('_em', '').replace('_', '') for metric in dataset_df['metric'].tolist()] + datasets = [ + metric.replace( + '_em', + '').replace( + '_', + '') for metric in dataset_df['metric'].tolist()] else: datasets = ['unknown'] * len(ground_truths) @@ -138,7 +146,7 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], # First, check if this is a JSON array format or newline-delimited JSON with open(mlperf_log_file, 'r') as f: first_line = f.readline().strip() - + if first_line == '[': # JSON array format - load the entire file logger.info("Detected JSON array format") @@ -146,8 +154,10 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], try: mlperf_results = json.load(f) except json.JSONDecodeError as e: - # If full file parsing fails, try to parse line by line, skipping brackets - logger.warning(f"Failed to parse as complete JSON array: {e}") + # If full file parsing fails, try to parse line by line, + # skipping brackets + logger.warning( + f"Failed to parse as complete JSON array: {e}") logger.info("Attempting line-by-line parsing") mlperf_results = [] with open(mlperf_log_file, 'r') as f2: @@ -162,7 +172,8 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], try: mlperf_results.append(json.loads(line)) except json.JSONDecodeError as e: - logger.warning(f"Failed to parse line {line_num}: {e}") + logger.warning( + f"Failed to parse line {line_num}: {e}") continue else: # Newline-delimited JSON format @@ -180,7 +191,7 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], except json.JSONDecodeError as e: logger.warning(f"Failed to parse line {line_num}: {e}") continue - + logger.info(f"Loaded {len(mlperf_results)} MLPerf results") except Exception as e: raise RuntimeError(f"Failed to load MLPerf log file: {e}") @@ -220,7 +231,8 @@ def process_mlperf_log_accuracy(mlperf_log_file: Union[str, Path], questions_required.append(questions[qsl_idx]) except Exception as e: - logger.warning(f"Error processing entry with qsl_idx {qsl_idx}: {e}") + logger.warning( + f"Error processing entry with qsl_idx {qsl_idx}: {e}") continue if not preds_token_ids: @@ -271,7 +283,11 @@ def validate_dataframe(df: pd.DataFrame) -> None: if not isinstance(df, pd.DataFrame): raise ValueError("Input must be a pandas DataFrame") - required_cols = ['model_output', 'dataset', 'ground_truth', 'tok_model_output_len'] + required_cols = [ + 'model_output', + 'dataset', + 'ground_truth', + 'tok_model_output_len'] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise ValueError(f"Missing required columns: {missing_cols}") @@ -390,7 +406,8 @@ def parse_code(text: str) -> Optional[str]: # Answer Evaluation Functions # ============================================================================= -def evaluate_multiple_choice(parsed: Optional[str], ground_truth: str, valid_options: str) -> bool: +def evaluate_multiple_choice( + parsed: Optional[str], ground_truth: str, valid_options: str) -> bool: """Evaluate multiple choice answer.""" if not parsed or not ground_truth: return False @@ -414,10 +431,12 @@ def evaluate_math500(parsed: Optional[str], ground_truth: str) -> bool: # Use sys.path approach for proper module importing workspace_path = os.path.dirname(os.path.abspath(__file__)) - prm800k_module_path = os.path.join(workspace_path, "submodules", "prm800k", "prm800k") + prm800k_module_path = os.path.join( + workspace_path, "submodules", "prm800k", "prm800k") if not os.path.exists(prm800k_module_path): - raise FileNotFoundError(f"PRM800K module not found at: {prm800k_module_path}") + raise FileNotFoundError( + f"PRM800K module not found at: {prm800k_module_path}") # Save current directory and sys.path original_cwd = os.getcwd() @@ -427,10 +446,10 @@ def evaluate_math500(parsed: Optional[str], ground_truth: str) -> bool: # Add prm800k module path to sys.path if prm800k_module_path not in sys.path: sys.path.insert(0, prm800k_module_path) - + # Change directory as some imports might use relative paths os.chdir(prm800k_module_path) - + # Now import should work from grading.grader import grade_answer result = grade_answer(given_answer=parsed, ground_truth=ground_truth) @@ -622,7 +641,8 @@ def process_row(row: pd.Series) -> Dict[str, Any]: } -def process_livecodebench_parallel(df: pd.DataFrame, group_indices: pd.Index) -> Tuple[int, int]: +def process_livecodebench_parallel( + df: pd.DataFrame, group_indices: pd.Index) -> Tuple[int, int]: """Process LiveCodeBench items in parallel.""" # Prepare work items work_items = [] @@ -726,7 +746,8 @@ def process_dataframe(df: pd.DataFrame) -> pd.DataFrame: # Unified Evaluation Utilities # ============================================================================= -def print_evaluation_results(df_evaluated: pd.DataFrame, logger: Optional[logging.Logger] = None) -> Dict[str, Any]: +def print_evaluation_results(df_evaluated: pd.DataFrame, + logger: Optional[logging.Logger] = None) -> Dict[str, Any]: """Print evaluation results in a unified format. Args: @@ -762,8 +783,8 @@ def print_evaluation_results(df_evaluated: pd.DataFrame, logger: Optional[loggin def process_and_save_dataframe(df: pd.DataFrame, - output_dir: Optional[Union[str, Path]] = None, - base_filename: Optional[str] = None) -> Tuple[pd.DataFrame, str]: + output_dir: Optional[Union[str, Path]] = None, + base_filename: Optional[str] = None) -> Tuple[pd.DataFrame, str]: """Process dataframe for evaluation and save the results. Args: @@ -779,7 +800,8 @@ def process_and_save_dataframe(df: pd.DataFrame, # Determine output path if output_dir is None: - # Try to infer from existing path info in the dataframe or use current directory + # Try to infer from existing path info in the dataframe or use current + # directory output_dir = Path.cwd() else: output_dir = Path(output_dir) diff --git a/language/deepseek-r1/mlperf/__init__.py b/language/deepseek-r1/mlperf/__init__.py index 33b3154f6b..bfe95c35e3 100644 --- a/language/deepseek-r1/mlperf/__init__.py +++ b/language/deepseek-r1/mlperf/__init__.py @@ -16,8 +16,8 @@ __all__ = [ # SUTs - 'BaseSUT', - 'OfflineSUT', + 'BaseSUT', + 'OfflineSUT', 'ServerSUT', # QSL 'QuerySampleLibrary', @@ -26,4 +26,4 @@ 'prepare_mlperf_dataset', 'process_mlperf_results', 'create_mlperf_output_dataframe' -] \ No newline at end of file +] diff --git a/language/deepseek-r1/mlperf/base_sut.py b/language/deepseek-r1/mlperf/base_sut.py index 7249207aab..f1d32eb869 100644 --- a/language/deepseek-r1/mlperf/base_sut.py +++ b/language/deepseek-r1/mlperf/base_sut.py @@ -12,65 +12,65 @@ class BaseSUT(abc.ABC): """Base class for MLPerf inference System Under Test (SUT). - + This class defines the interface that all SUTs must implement for MLPerf inference benchmarks. It provides two main methods: - issue_queries: to enqueue prompt tokens - flush_queries: to await completion of all issued queries """ - + def __init__(self, name: str = "BaseSUT"): """Initialize the base SUT. - + Args: name: Name of the SUT for logging purposes """ self.name = name self.sut = None logger.info(f"Initializing {self.name}") - + @abc.abstractmethod def issue_queries(self, query_samples: List[lg.QuerySample]) -> None: """Issue queries to the SUT. - + This method should enqueue the provided query samples for processing. It should return immediately without waiting for completion. - + Args: query_samples: List of MLPerf LoadGen query samples to process """ raise NotImplementedError("Subclasses must implement issue_queries") - + @abc.abstractmethod def flush_queries(self) -> None: """Flush all pending queries. - + This method should wait for all previously issued queries to complete before returning. It's called by LoadGen to ensure all work is done. """ raise NotImplementedError("Subclasses must implement flush_queries") - + def start(self) -> lg.ConstructSUT: """Start the SUT and return the LoadGen SUT handle. - + Returns: LoadGen SUT handle for use with LoadGen """ self.sut = lg.ConstructSUT(self.issue_queries, self.flush_queries) logger.info(f"{self.name} started") return self.sut - + def stop(self) -> None: """Stop the SUT and clean up resources.""" if self.sut: lg.DestroySUT(self.sut) self.sut = None logger.info(f"{self.name} stopped") - + def __enter__(self): """Context manager entry.""" return self.start() - + def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" - self.stop() \ No newline at end of file + self.stop() diff --git a/language/deepseek-r1/mlperf/offline_sut.py b/language/deepseek-r1/mlperf/offline_sut.py index db1c4feea1..00382f4660 100644 --- a/language/deepseek-r1/mlperf/offline_sut.py +++ b/language/deepseek-r1/mlperf/offline_sut.py @@ -44,11 +44,15 @@ def __init__(self, self.dataset_strings = dataset_strings # Determine backend type using registry - self.backend_name = getattr(backend, 'backend_name', type(backend).__name__.lower()) + self.backend_name = getattr( + backend, + 'backend_name', + type(backend).__name__.lower()) self.uses_text_prompts = uses_text_input(self.backend_name) if self.uses_text_prompts and dataset_strings is None: - raise ValueError(f"Backend {self.backend_name} requires text prompts but dataset_strings was not provided") + raise ValueError( + f"Backend {self.backend_name} requires text prompts but dataset_strings was not provided") # Async event loop and thread self.loop = None @@ -122,12 +126,15 @@ async def _process_all_queries_async(self): # Prepare prompts for batch processing (like run_eval.py) if self.uses_text_prompts: # Use text prompts for vLLM and SGLang - prompts = [self.dataset_strings[sample.index] for sample in query_samples] + prompts = [self.dataset_strings[sample.index] + for sample in query_samples] futures = self.backend.generate_async(text_prompts=prompts) else: # Use tokenized prompts for other backends - prompts = [self.dataset[sample.index] for sample in query_samples] - futures = self.backend.generate_async(tokenized_prompts=prompts) + prompts = [self.dataset[sample.index] + for sample in query_samples] + futures = self.backend.generate_async( + tokenized_prompts=prompts) logger.info(f"Got {len(futures)} futures from backend") @@ -136,7 +143,8 @@ async def _process_all_queries_async(self): indexed_futures = [(i, future) for i, future in enumerate(futures)] completed_indices = set() - # Use asyncio.wait with FIRST_COMPLETED to handle out-of-order completion + # Use asyncio.wait with FIRST_COMPLETED to handle out-of-order + # completion pending = {future for _, future in indexed_futures} while pending: @@ -153,12 +161,14 @@ async def _process_all_queries_async(self): break if original_idx is None: - logger.error("Could not find original index for completed future") + logger.error( + "Could not find original index for completed future") continue # Check for duplicate completion if original_idx in completed_indices: - logger.warning(f"Prompt {original_idx} completed multiple times!") + logger.warning( + f"Prompt {original_idx} completed multiple times!") continue try: @@ -174,36 +184,44 @@ async def _process_all_queries_async(self): await self._send_result_to_loadgen(sample, result) except Exception as e: - logger.error(f"Error processing prompt {original_idx}: {type(e).__name__}: {e}") + logger.error( + f"Error processing prompt {original_idx}: {type(e).__name__}: {e}") # Raise the error instead of handling empty responses - raise RuntimeError(f"Backend failed to generate tokens for prompt {original_idx}: {e}") + raise RuntimeError( + f"Backend failed to generate tokens for prompt {original_idx}: {e}") # Verify all results are populated if len(completed_indices) != len(futures): missing_count = len(futures) - len(completed_indices) - raise RuntimeError(f"Missing results: completed {len(completed_indices)} != {len(futures)} total ({missing_count} missing)") + raise RuntimeError( + f"Missing results: completed {len(completed_indices)} != {len(futures)} total ({missing_count} missing)") for i, result in enumerate(results): if result is None: raise RuntimeError(f"Missing result for prompt {i}") - logger.info(f"Completed all {len(completed_indices)} prompts successfully") + logger.info( + f"Completed all {len(completed_indices)} prompts successfully") except Exception as e: - logger.error(f"Error during batch processing: {type(e).__name__}: {e}") + logger.error( + f"Error during batch processing: {type(e).__name__}: {e}") import traceback traceback.print_exc() raise # Re-raise instead of sending empty responses - async def _send_result_to_loadgen(self, sample: lg.QuerySample, result: Dict[str, Any]): + async def _send_result_to_loadgen( + self, sample: lg.QuerySample, result: Dict[str, Any]): """Send a single result to LoadGen.""" try: # Validate that tokens exist - raise error if missing tokens = result.get('tokens') if tokens is None: - raise ValueError(f"Backend result missing 'tokens' key for query {sample.id}") + raise ValueError( + f"Backend result missing 'tokens' key for query {sample.id}") if not isinstance(tokens, (list, tuple)) or len(tokens) == 0: - raise ValueError(f"Backend returned empty or invalid tokens for query {sample.id}: {tokens}") + raise ValueError( + f"Backend returned empty or invalid tokens for query {sample.id}: {tokens}") # Create a copy of tokens before numpy conversion tokens_copy = tokens.copy() @@ -229,12 +247,15 @@ async def _send_result_to_loadgen(self, sample: lg.QuerySample, result: Dict[str # Send response to LoadGen lg.QuerySamplesComplete([response]) - logger.debug(f"Sent {n_tokens} tokens to LoadGen for query {sample.id}") + logger.debug( + f"Sent {n_tokens} tokens to LoadGen for query {sample.id}") except Exception as e: - logger.error(f"Error sending result to LoadGen for query {sample.id}: {e}") + logger.error( + f"Error sending result to LoadGen for query {sample.id}: {e}") # Raise the error instead of sending empty response - raise RuntimeError(f"Failed to send result to LoadGen for query {sample.id}: {e}") + raise RuntimeError( + f"Failed to send result to LoadGen for query {sample.id}: {e}") def _run_event_loop(self): """Run the async event loop in a separate thread.""" @@ -282,7 +303,8 @@ def get_results(self) -> List[Dict[str, Any]]: # Sort by index to maintain dataset order queried_indices = sorted(self.index_to_id.keys()) - logger.info(f"Retrieving results for {len(queried_indices)} queried samples") + logger.info( + f"Retrieving results for {len(queried_indices)} queried samples") # Process results in order of dataset indices using stored results for i in queried_indices: @@ -296,7 +318,8 @@ def get_results(self) -> List[Dict[str, Any]]: tokens = result['tokens'] output_text = result.get('text', '') if not output_text and self.backend.tokenizer: - output_text = self.backend.tokenizer.decode(result['tokens'], skip_special_tokens=True) + output_text = self.backend.tokenizer.decode( + result['tokens'], skip_special_tokens=True) ordered_results.append({ 'model_output': output_text, @@ -305,6 +328,7 @@ def get_results(self) -> List[Dict[str, Any]]: }) else: # No backend result for this sample - raise RuntimeError(f"No backend result stored for dataset index {i}, sample_id {sample_id}") + raise RuntimeError( + f"No backend result stored for dataset index {i}, sample_id {sample_id}") - return ordered_results \ No newline at end of file + return ordered_results diff --git a/language/deepseek-r1/mlperf/qsl.py b/language/deepseek-r1/mlperf/qsl.py index 59bc5e36a8..d4c9405a4e 100644 --- a/language/deepseek-r1/mlperf/qsl.py +++ b/language/deepseek-r1/mlperf/qsl.py @@ -7,12 +7,12 @@ class QuerySampleLibrary: """MLPerf QuerySampleLibrary implementation for single-process execution.""" - - def __init__(self, dataset: List[List[int]], dataset_strings: List[str], + + def __init__(self, dataset: List[List[int]], dataset_strings: List[str], name: str = "QSL"): """ Initialize QSL with dataset. - + Args: dataset: List of tokenized prompts dataset_strings: List of original prompt strings @@ -24,7 +24,7 @@ def __init__(self, dataset: List[List[int]], dataset_strings: List[str], self.perf_count = self.count self.name = name self.logger = logging.getLogger(__name__) - + # Create LoadGen QSL self.qsl = lg.ConstructQSL( self.count, @@ -33,7 +33,7 @@ def __init__(self, dataset: List[List[int]], dataset_strings: List[str], lambda x: None # UnloadSamplesFromRam ) self.logger.info(f"Created {self.name} with {self.count} samples") - + def __del__(self): """Cleanup QSL.""" if self.qsl is not None: @@ -43,12 +43,12 @@ def __del__(self): class DistributedQuerySampleLibrary: """QuerySampleLibrary for distributed execution (MPI/torchrun).""" - + def __init__(self, dataset: List[List[int]], dataset_strings: List[str], rank: int, world_size: int, name: str = "DistributedQSL"): """ Initialize distributed QSL. - + Args: dataset: List of tokenized prompts dataset_strings: List of original prompt strings @@ -64,10 +64,10 @@ def __init__(self, dataset: List[List[int]], dataset_strings: List[str], self.world_size = world_size self.name = name self.logger = logging.getLogger(__name__) - + # Track if this is rank zero explicitly self.is_rank_zero = (self.rank == 0) - + # Only rank 0 creates the actual QSL if self.is_rank_zero: self.qsl = lg.ConstructQSL( @@ -76,12 +76,13 @@ def __init__(self, dataset: List[List[int]], dataset_strings: List[str], lambda x: None, lambda x: None ) - self.logger.info(f"Created {self.name} with {self.count} samples on rank 0") + self.logger.info( + f"Created {self.name} with {self.count} samples on rank 0") else: self.qsl = None - + def __del__(self): """Cleanup QSL on rank 0.""" if self.is_rank_zero and self.qsl is not None: lg.DestroyQSL(self.qsl) - self.logger.info(f"{self.name} destroyed on rank 0") \ No newline at end of file + self.logger.info(f"{self.name} destroyed on rank 0") diff --git a/language/deepseek-r1/mlperf/server_sut.py b/language/deepseek-r1/mlperf/server_sut.py index 75699e208f..e3acb2bde8 100644 --- a/language/deepseek-r1/mlperf/server_sut.py +++ b/language/deepseek-r1/mlperf/server_sut.py @@ -69,11 +69,15 @@ def __init__(self, self.dataset_strings = dataset_strings # Determine backend type using registry - self.backend_name = getattr(backend, 'backend_name', type(backend).__name__.lower()) + self.backend_name = getattr( + backend, + 'backend_name', + type(backend).__name__.lower()) self.uses_text_prompts = uses_text_input(self.backend_name) if self.uses_text_prompts and dataset_strings is None: - raise ValueError(f"Backend {self.backend_name} requires text prompts but dataset_strings was not provided") + raise ValueError( + f"Backend {self.backend_name} requires text prompts but dataset_strings was not provided") # Async event loop and thread self.loop = None @@ -91,8 +95,6 @@ def __init__(self, self.all_results: Dict[int, Dict[str, Any]] = {} self.results_lock = asyncio.Lock() - - def issue_queries(self, query_samples: List[lg.QuerySample]) -> None: """Issue queries in streaming mode with batching.""" if not supports_streaming(): @@ -123,7 +125,8 @@ async def _start_streaming_query(self, query_info: QueryInfo) -> None: try: # Verify streaming support if not supports_streaming(): - raise RuntimeError(f"Backend {self.backend_name} does not support streaming required for server mode") + raise RuntimeError( + f"Backend {self.backend_name} does not support streaming required for server mode") # Prepare prompt based on backend type if self.uses_text_prompts: @@ -155,8 +158,10 @@ async def _start_streaming_query(self, query_info: QueryInfo) -> None: task.add_done_callback(self._remove_task_from_active) except Exception as e: - logger.error(f"Error starting stream for query {query_info.query_id}: {e}") - raise RuntimeError(f"Failed to start streaming for query {query_info.query_id}: {e}") + logger.error( + f"Error starting stream for query {query_info.query_id}: {e}") + raise RuntimeError( + f"Failed to start streaming for query {query_info.query_id}: {e}") def _remove_task_from_active(self, task: asyncio.Task) -> None: """Remove a completed task from the active set.""" @@ -181,7 +186,8 @@ async def _process_stream(self, state: StreamingQueryState) -> None: state.accumulated_tokens.extend(chunk.token_ids) # Report first token immediately for TTFT measurement - if not state.first_token_sent and (chunk.token or chunk.token_ids): + if not state.first_token_sent and ( + chunk.token or chunk.token_ids): state.first_token_time = current_time - state.start_time state.first_token_sent = True @@ -197,35 +203,43 @@ async def _process_stream(self, state: StreamingQueryState) -> None: except asyncio.CancelledError: # Task was cancelled, clean up gracefully - logger.debug(f"Stream processing cancelled for query {state.query_info.query_id}") - # Close the async generator properly (assume aclose exists in our containerized environment) + logger.debug( + f"Stream processing cancelled for query {state.query_info.query_id}") + # Close the async generator properly (assume aclose exists in our + # containerized environment) try: await state.stream_gen.aclose() except Exception: pass raise except Exception as e: - logger.error(f"Error processing stream for query {state.query_info.query_id}: {e}") - raise RuntimeError(f"Stream processing failed for query {state.query_info.query_id}: {e}") + logger.error( + f"Error processing stream for query {state.query_info.query_id}: {e}") + raise RuntimeError( + f"Stream processing failed for query {state.query_info.query_id}: {e}") finally: # Clean up active stream async with self.active_streams_lock: self.active_streams.pop(state.query_info.query_id, None) - async def _send_first_token_response(self, state: StreamingQueryState) -> None: + async def _send_first_token_response( + self, state: StreamingQueryState) -> None: """Send first token notification to LoadGen for TTFT measurement.""" - logger.debug(f"First token received for query {state.query_info.query_id} at {state.first_token_time:.3f}s") + logger.debug( + f"First token received for query {state.query_info.query_id} at {state.first_token_time:.3f}s") # Convert first tokens to proper format for LoadGen if state.accumulated_tokens: - output_tokens = np.ascontiguousarray(state.accumulated_tokens, dtype=np.int32) + output_tokens = np.ascontiguousarray( + state.accumulated_tokens, dtype=np.int32) else: # If no token IDs available, encode the text if hasattr(self.backend, 'tokenizer') and state.accumulated_text: tokens = self.backend.tokenizer.encode(state.accumulated_text) output_tokens = np.ascontiguousarray(tokens, dtype=np.int32) else: - raise RuntimeError(f"No token IDs available for first token response for query {state.query_info.query_id}") + raise RuntimeError( + f"No token IDs available for first token response for query {state.query_info.query_id}") output_seq_len = len(output_tokens) output_toks_ptr = output_tokens.ctypes.data if output_seq_len > 0 else 0 @@ -248,22 +262,25 @@ async def _send_final_response(self, state: StreamingQueryState) -> None: if state.accumulated_tokens: # Create a copy of tokens before numpy conversion tokens_to_send = state.accumulated_tokens.copy() - token_array = np.array(state.accumulated_tokens, dtype=np.int32) + token_array = np.array( + state.accumulated_tokens, dtype=np.int32) else: # If no tokens, encode the text - if hasattr(self.backend, 'tokenizer') and state.accumulated_text: - tokens = self.backend.tokenizer.encode(state.accumulated_text) + if hasattr(self.backend, + 'tokenizer') and state.accumulated_text: + tokens = self.backend.tokenizer.encode( + state.accumulated_text) # Create a copy of tokens before numpy conversion tokens_to_send = tokens.copy() token_array = np.array(tokens, dtype=np.int32) else: - raise RuntimeError(f"No tokens or tokenizer available for query {state.query_info.query_id}") + raise RuntimeError( + f"No tokens or tokenizer available for query {state.query_info.query_id}") # Validate we have tokens if len(token_array) == 0: - raise RuntimeError(f"No tokens generated for query {state.query_info.query_id}") - - + raise RuntimeError( + f"No tokens generated for query {state.query_info.query_id}") # Create LoadGen response response = lg.QuerySampleResponse( @@ -287,11 +304,14 @@ async def _send_final_response(self, state: StreamingQueryState) -> None: } self.all_results[state.query_info.query_id] = state.query_info.result - logger.debug(f"Sent {len(token_array)} tokens to LoadGen for query {state.query_info.query_id}") + logger.debug( + f"Sent {len(token_array)} tokens to LoadGen for query {state.query_info.query_id}") except Exception as e: - logger.error(f"Error sending final response for query {state.query_info.query_id}: {e}") - raise RuntimeError(f"Failed to send final response for query {state.query_info.query_id}: {e}") + logger.error( + f"Error sending final response for query {state.query_info.query_id}: {e}") + raise RuntimeError( + f"Failed to send final response for query {state.query_info.query_id}: {e}") def flush_queries(self) -> None: """Wait for all active streams to complete.""" @@ -313,13 +333,16 @@ async def wait_for_streams(): async with self.active_streams_lock: if self.active_streams: - logger.warning(f"Timeout: {len(self.active_streams)} streams still active") + logger.warning( + f"Timeout: {len(self.active_streams)} streams still active") # Run the wait task in the event loop if self.loop and not self.loop.is_closed(): - future = asyncio.run_coroutine_threadsafe(wait_for_streams(), self.loop) + future = asyncio.run_coroutine_threadsafe( + wait_for_streams(), self.loop) try: - future.result(timeout=310) # Slightly longer than internal timeout + # Slightly longer than internal timeout + future.result(timeout=310) except Exception as e: logger.error(f"Error waiting for streams to complete: {e}") @@ -352,7 +375,8 @@ async def cancel_all_tasks(): tasks_to_cancel = list(self.active_tasks) if tasks_to_cancel: - logger.info(f"Cancelling {len(tasks_to_cancel)} active streaming tasks...") + logger.info( + f"Cancelling {len(tasks_to_cancel)} active streaming tasks...") for task in tasks_to_cancel: task.cancel() @@ -365,7 +389,8 @@ async def cancel_all_tasks(): self.active_tasks.clear() # Run the cancellation in the event loop - future = asyncio.run_coroutine_threadsafe(cancel_all_tasks(), self.loop) + future = asyncio.run_coroutine_threadsafe( + cancel_all_tasks(), self.loop) try: future.result(timeout=10.0) # Give tasks time to cancel except Exception as e: @@ -405,10 +430,12 @@ def get_results(self) -> List[Dict[str, Any]]: # Only process results for samples that were actually queried # Sort by index to maintain dataset order queried_indices = sorted(index_to_result.keys()) - - logger.info(f"Retrieving results for {len(queried_indices)} queried samples") - # Process results in order of dataset indices using stored backend results + logger.info( + f"Retrieving results for {len(queried_indices)} queried samples") + + # Process results in order of dataset indices using stored backend + # results for i in queried_indices: result = index_to_result[i] @@ -416,7 +443,8 @@ def get_results(self) -> List[Dict[str, Any]]: tokens = result['tokens'] output_text = result.get('text', '') if not output_text and self.backend.tokenizer: - output_text = self.backend.tokenizer.decode(result['tokens'], skip_special_tokens=True) + output_text = self.backend.tokenizer.decode( + result['tokens'], skip_special_tokens=True) ordered_results.append({ 'model_output': output_text, @@ -424,4 +452,4 @@ def get_results(self) -> List[Dict[str, Any]]: 'tok_model_output_len': len(tokens) }) - return ordered_results \ No newline at end of file + return ordered_results diff --git a/language/deepseek-r1/mlperf/utils.py b/language/deepseek-r1/mlperf/utils.py index 973e46c201..f4fbedda41 100644 --- a/language/deepseek-r1/mlperf/utils.py +++ b/language/deepseek-r1/mlperf/utils.py @@ -7,15 +7,15 @@ from utils.tokenization import StandardTokenizer -def prepare_mlperf_dataset(input_file: str, - backend_name: Optional[str] = None, - tokenizer: StandardTokenizer = None, - num_samples: Optional[int] = None, - skip_samples: int = 0, - use_chat_template: Optional[bool] = None) -> Dict[str, Any]: +def prepare_mlperf_dataset(input_file: str, + backend_name: Optional[str] = None, + tokenizer: StandardTokenizer = None, + num_samples: Optional[int] = None, + skip_samples: int = 0, + use_chat_template: Optional[bool] = None) -> Dict[str, Any]: """ Prepare dataset for MLPerf inference. - + Args: input_file: Path to input pickle file backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. @@ -24,29 +24,30 @@ def prepare_mlperf_dataset(input_file: str, num_samples: Number of samples to use skip_samples: Number of samples to skip use_chat_template: Whether to use chat template (if None, determined by registry) - + Returns: Dictionary with prepared dataset components """ if backend_name is None: from utils.backend_registry import detect_backend backend_name = detect_backend() - + # Load and validate dataset df = load_dataset(input_file, num_samples, skip_samples) validate_dataset(df) - + prompts = df['text_input'].tolist() print(f"[MLPerf] Loaded {len(prompts)} prompts from dataset") - + # Check if backend uses text prompts from registry uses_text_prompts = uses_text_input() - + # Determine chat template usage from registry if not specified if use_chat_template is None: use_chat_template = uses_chat_template() - print(f"[MLPerf] Using chat template from registry: {use_chat_template}") - + print( + f"[MLPerf] Using chat template from registry: {use_chat_template}") + if uses_text_prompts: print(f"[MLPerf] Backend {backend_name} uses text prompts directly") return { @@ -62,7 +63,7 @@ def prepare_mlperf_dataset(input_file: str, prompts, use_chat_template ) print(f"[MLPerf] Tokenized {len(tokenized_prompts)} prompts") - + return { 'dataframe': df, 'prompts': prompts, @@ -73,61 +74,63 @@ def prepare_mlperf_dataset(input_file: str, def process_mlperf_results(sut_results: List[Dict[str, Any]], - tokenizer: Optional[StandardTokenizer] = None, - backend_name: Optional[str] = None, - uses_text_prompts: Optional[bool] = None) -> List[Dict[str, Any]]: + tokenizer: Optional[StandardTokenizer] = None, + backend_name: Optional[str] = None, + uses_text_prompts: Optional[bool] = None) -> List[Dict[str, Any]]: """ Process MLPerf SUT results into standardized format. - + Args: sut_results: Raw results from MLPerf SUT tokenizer: StandardTokenizer for decoding backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. (Kept for backward compatibility but not used in our codebase) uses_text_prompts: Whether backend uses text prompts (if None, determined by registry) - + Returns: List of processed result dictionaries """ from utils.tokenization import process_inference_results - + if backend_name is None: from utils.backend_registry import detect_backend backend_name = detect_backend() - + # Determine text prompt usage from registry if not specified if uses_text_prompts is None: uses_text_prompts = uses_text_input() - + # Reuse the general inference result processing - return process_inference_results(sut_results, tokenizer, uses_text_prompts=uses_text_prompts) + return process_inference_results( + sut_results, tokenizer, uses_text_prompts=uses_text_prompts) def create_mlperf_output_dataframe(input_df: pd.DataFrame, - results: List[Dict[str, Any]], - backend_name: Optional[str] = None) -> pd.DataFrame: + results: List[Dict[str, Any]], + backend_name: Optional[str] = None) -> pd.DataFrame: """ Create output dataframe with MLPerf results. - + Args: input_df: Input dataframe results: Processed MLPerf results backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. (Kept for backward compatibility but not used in our codebase) - + Returns: Output dataframe with results """ if backend_name is None: from utils.backend_registry import detect_backend backend_name = detect_backend() - + df_output = input_df.copy() - + # Add result columns df_output['model_output'] = [r['model_output'] for r in results] df_output['tok_model_output'] = [r['tok_model_output'] for r in results] - df_output['tok_model_output_len'] = [r['tok_model_output_len'] for r in results] + df_output['tok_model_output_len'] = [ + r['tok_model_output_len'] for r in results] df_output['model_backend'] = backend_name - - return df_output \ No newline at end of file + + return df_output diff --git a/language/deepseek-r1/run_eval.py b/language/deepseek-r1/run_eval.py index 169b3473e4..8965101bd4 100755 --- a/language/deepseek-r1/run_eval.py +++ b/language/deepseek-r1/run_eval.py @@ -1,4 +1,13 @@ #!/usr/bin/env python3 +from utils import ( + load_dataset, save_results, validate_dataset, generate_timestamped_filename, + validate_runner_for_backend, uses_text_input, uses_chat_template, + StandardTokenizer, process_inference_results, + get_backend_instance, create_base_argument_parser, print_runner_header, + setup_output_paths, validate_runner_args, handle_runner_error, + validate_dataset_extended, supports_async +) +from backends import BaseBackend import argparse import asyncio import os @@ -11,59 +20,51 @@ # Disable tokenizers parallelism to avoid forking issues os.environ["TOKENIZERS_PARALLELISM"] = "false" -from backends import BaseBackend -from utils import ( - load_dataset, save_results, validate_dataset, generate_timestamped_filename, - validate_runner_for_backend, uses_text_input, uses_chat_template, - StandardTokenizer, process_inference_results, - get_backend_instance, create_base_argument_parser, print_runner_header, - setup_output_paths, validate_runner_args, handle_runner_error, - validate_dataset_extended, supports_async -) - def create_argument_parser() -> argparse.ArgumentParser: """Create argument parser with shared arguments only.""" parser = create_base_argument_parser( "Modular backend evaluation system for MLPerf DeepSeek reference implementation" ) - + # Add runner-specific arguments parser.add_argument("--async", action="store_true", - help="Use async generation instead of synchronous") - + help="Use async generation instead of synchronous") + return parser -async def run_async_inference(backend: BaseBackend, - tokenized_prompts: List[List[int]], - text_prompts: Optional[List[str]] = None) -> List[Dict[str, Any]]: +async def run_async_inference(backend: BaseBackend, + tokenized_prompts: List[List[int]], + text_prompts: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Run async inference with proper error handling and progress bar that updates as tasks complete.""" try: # Get futures from backend if uses_text_input(): futures = backend.generate_async(text_prompts=text_prompts) else: - futures = backend.generate_async(tokenized_prompts=tokenized_prompts) - + futures = backend.generate_async( + tokenized_prompts=tokenized_prompts) + # Create a list to store results in order results = [None] * len(futures) - + # Create enumerated futures with their original indices for tracking indexed_futures = [(i, future) for i, future in enumerate(futures)] - + # Track completion for debugging completed_indices = set() - + # Process tasks with progress bar that updates as tasks complete with async_tqdm(total=len(futures), desc="Async inference", unit="prompt") as pbar: - # Use asyncio.wait with FIRST_COMPLETED to handle out-of-order completion + # Use asyncio.wait with FIRST_COMPLETED to handle out-of-order + # completion pending = {future for _, future in indexed_futures} - + while pending: # Wait for at least one future to complete done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) - + # Process all completed futures in this batch for completed_future in done: # Find the original index for this completed future @@ -72,46 +73,51 @@ async def run_async_inference(backend: BaseBackend, if future is completed_future: original_idx = idx break - + if original_idx is None: - print(f"\nWarning: Could not find original index for completed future") + print( + f"\nWarning: Could not find original index for completed future") continue - + # Check for duplicate completion if original_idx in completed_indices: - print(f"\nWarning: Prompt {original_idx} completed multiple times!") + print( + f"\nWarning: Prompt {original_idx} completed multiple times!") continue - + try: # Get the result from the completed future result = await completed_future - + # Store the result in the correct position results[original_idx] = result completed_indices.add(original_idx) - + except Exception as e: - print(f"\nError processing prompt {original_idx}: {type(e).__name__}: {e}") + print( + f"\nError processing prompt {original_idx}: {type(e).__name__}: {e}") import traceback traceback.print_exception(type(e), e, e.__traceback__) - + # Raise the error instead of using empty tokens - raise RuntimeError(f"Backend failed to generate tokens for prompt {original_idx}: {e}") - + raise RuntimeError( + f"Backend failed to generate tokens for prompt {original_idx}: {e}") + # Update progress bar after each completion pbar.update(1) - + # Verify all results are populated if len(completed_indices) != len(futures): missing_count = len(futures) - len(completed_indices) - raise RuntimeError(f"Missing results: completed {len(completed_indices)} != {len(futures)} total ({missing_count} missing)") - + raise RuntimeError( + f"Missing results: completed {len(completed_indices)} != {len(futures)} total ({missing_count} missing)") + for i, result in enumerate(results): if result is None: raise RuntimeError(f"Missing result for prompt {i}") - + print(f"\nCompleted all {len(completed_indices)} prompts successfully") - + return results except Exception as e: print(f"Error during async inference: {type(e).__name__}: {e}") @@ -120,9 +126,9 @@ async def run_async_inference(backend: BaseBackend, raise -def run_sync_inference(backend: BaseBackend, - tokenized_prompts: List[List[int]], - text_prompts: Optional[List[str]] = None) -> List[Dict[str, Any]]: +def run_sync_inference(backend: BaseBackend, + tokenized_prompts: List[List[int]], + text_prompts: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Run sync inference with proper error handling.""" try: if uses_text_input(): @@ -140,46 +146,52 @@ def main(): # Parse arguments parser = create_argument_parser() args = parser.parse_args() - + try: # Validate arguments validate_runner_args(args, 'eval') - + # Detect backend early backend_name = validate_runner_for_backend('eval') - + # Set up output paths output_dir, output_file = setup_output_paths(args) if args.output_file is None: args.output_file = output_file - - # Generate the actual filename with timestamp that will be used for saving - actual_output_file = generate_timestamped_filename(args.output_file, add_timestamp=True) - + + # Generate the actual filename with timestamp that will be used for + # saving + actual_output_file = generate_timestamped_filename( + args.output_file, add_timestamp=True) + # Get async flag using getattr since 'async' is a reserved keyword use_async = getattr(args, 'async', False) - + # Check if backend supports async if use_async and not supports_async(): - raise RuntimeError(f"Backend {backend_name} does not support async generation") - + raise RuntimeError( + f"Backend {backend_name} does not support async generation") + # Print header - print_runner_header("Modular Backend Evaluation System", backend_name, args) + print_runner_header( + "Modular Backend Evaluation System", + backend_name, + args) print(f"Mode: {'Async' if use_async else 'Sync'}") print("=" * 80) - + # Load and validate dataset df = load_dataset(args.input_file, args.num_samples, args.skip_samples) validate_dataset_extended(df) - + prompts = df['text_input'].tolist() - + # Initialize tokenizer tokenizer = StandardTokenizer() - + # Determine whether to use chat template based on registry use_chat_template = uses_chat_template() - + # For text-prompt backends, we'll pass the prompts directly # For tokenized-prompt backends, we need to tokenize first if uses_text_input(): @@ -195,19 +207,19 @@ def main(): ) print(f"Tokenized {len(tokenized_prompts)} prompts") print(f"Tokenizer Max length: {tokenizer.max_length}") - + # Initialize backend using registry print(f"\nInitializing {backend_name.upper()} backend...") backend = get_backend_instance(backend_name) - + with backend: # Create new output dataframe with only required columns df_output = pd.DataFrame() - + # Copy all columns from input dataframe first for col in df.columns: df_output[col] = df[col] - + # Run inference with appropriate prompt format if use_async: print("Running async inference...") @@ -217,26 +229,31 @@ def main(): print("Running sync inference...") raw_results = run_sync_inference( backend, tokenized_prompts, text_prompts=prompts) - + # Process raw results into standardized format using shared utility print("Processing results...") standardized_results = process_inference_results( raw_results, tokenizer ) - + # Add generated columns - df_output['model_output'] = [r['model_output'] for r in standardized_results] - df_output['tok_model_output'] = [r['tok_model_output'] for r in standardized_results] - df_output['tok_model_output_len'] = [r['tok_model_output_len'] for r in standardized_results] - df_output['model_backend'] = [r['model_backend'] for r in standardized_results] - + df_output['model_output'] = [r['model_output'] + for r in standardized_results] + df_output['tok_model_output'] = [r['tok_model_output'] + for r in standardized_results] + df_output['tok_model_output_len'] = [ + r['tok_model_output_len'] for r in standardized_results] + df_output['model_backend'] = [r['model_backend'] + for r in standardized_results] + # Save results - output_file = save_results(df_output, args.output_file, add_timestamp=True) - + output_file = save_results( + df_output, args.output_file, add_timestamp=True) + print(f"\nEvaluation completed successfully!") print(f"Results saved to: {output_file}") print(f"Output columns: {list(df_output.columns)}") - + except KeyboardInterrupt: print("\nEvaluation interrupted by user") sys.exit(1) @@ -245,4 +262,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/language/deepseek-r1/run_eval_mpi.py b/language/deepseek-r1/run_eval_mpi.py index 37425526e1..4edeae5f8f 100644 --- a/language/deepseek-r1/run_eval_mpi.py +++ b/language/deepseek-r1/run_eval_mpi.py @@ -1,4 +1,11 @@ #!/usr/bin/env python3 +from backends import BaseBackend +from utils.data_utils import load_dataset +from utils.validation import validate_runner_args, ValidationError +from utils.runner_utils import create_base_argument_parser, print_runner_header +from utils.backend_registry import uses_chat_template, get_backend_instance, detect_backend, validate_runner_for_backend +from utils import save_results, generate_timestamped_filename, StandardTokenizer +from backends.pytorch_backend import PyTorchBackend import os import sys import argparse @@ -11,13 +18,6 @@ # Import utilities and backend registry sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from backends.pytorch_backend import PyTorchBackend -from utils import save_results, generate_timestamped_filename, StandardTokenizer -from utils.backend_registry import uses_chat_template, get_backend_instance, detect_backend, validate_runner_for_backend -from utils.runner_utils import create_base_argument_parser, print_runner_header -from utils.validation import validate_runner_args, ValidationError -from utils.data_utils import load_dataset -from backends import BaseBackend def main( @@ -41,7 +41,7 @@ def main( # Detect backend from environment backend_name = detect_backend() - + # Validate backend validate_runner_for_backend('eval_mpi') @@ -49,7 +49,8 @@ def main( use_chat_template = uses_chat_template() # Generate the actual filename with timestamp that will be used for saving - actual_output_file = generate_timestamped_filename(output_pickle_path, add_timestamp=True) + actual_output_file = generate_timestamped_filename( + output_pickle_path, add_timestamp=True) if rank == 0: _print("=" * 80) @@ -63,13 +64,14 @@ def main( _print(f"Sample limit: {num_samples}") if skip_samples: _print(f"Skip samples: {skip_samples}") - _print(f"Chat template: {'enabled' if use_chat_template else 'disabled'} (from registry)") + _print( + f"Chat template: {'enabled' if use_chat_template else 'disabled'} (from registry)") _print("=" * 80) # Initialize PyTorch backend backend = PyTorchBackend() backend.initialize() - + # Initialize StandardTokenizer tokenizer = StandardTokenizer() @@ -82,12 +84,14 @@ def main( _print(f"Loading input DataFrame from {input_pickle_path}...") try: df_for_results = pd.read_pickle(input_pickle_path) - _print(f"Loaded DataFrame with {len(df_for_results)} rows and columns: {df_for_results.columns.tolist()}") - + _print( + f"Loaded DataFrame with {len(df_for_results)} rows and columns: {df_for_results.columns.tolist()}") + # Apply skip_samples if specified if skip_samples > 0: if skip_samples >= len(df_for_results): - _print(f"Error: skip_samples ({skip_samples}) is greater than or equal to total samples ({len(df_for_results)})") + _print( + f"Error: skip_samples ({skip_samples}) is greater than or equal to total samples ({len(df_for_results)})") backend.shutdown() if world_size > 1: dist.destroy_process_group() @@ -96,14 +100,15 @@ def main( df_for_results = df_for_results.iloc[skip_samples:].copy() # Reset index to ensure sequential indices starting from 0 df_for_results = df_for_results.reset_index(drop=True) - + # Apply num_samples limit if specified if num_samples is not None and num_samples < len(df_for_results): - _print(f"Limiting to first {num_samples} samples (out of {len(df_for_results)} total after skipping)") + _print( + f"Limiting to first {num_samples} samples (out of {len(df_for_results)} total after skipping)") df_for_results = df_for_results.head(num_samples).copy() # Reset index to ensure sequential indices starting from 0 df_for_results = df_for_results.reset_index(drop=True) - + except Exception as e: _print(f"Error loading input pickle file: {e}") backend.shutdown() @@ -119,21 +124,25 @@ def main( return prompts_text_list = df_for_results['text_input'].tolist() - _print(f"Extracted {len(prompts_text_list)} prompts from 'text_input' column.") + _print( + f"Extracted {len(prompts_text_list)} prompts from 'text_input' column.") # Pre-initialize output columns df_for_results['model_output'] = "" df_for_results['tok_model_output'] = None - df_for_results['tok_model_output'] = df_for_results['tok_model_output'].astype('object') + df_for_results['tok_model_output'] = df_for_results['tok_model_output'].astype( + 'object') df_for_results['tok_model_output_len'] = 0 df_for_results['model_backend'] = backend_name # Broadcast the number of prompts to all ranks if world_size > 1: if rank == 0: - num_prompts_tensor = torch.tensor(len(prompts_text_list), dtype=torch.long, device="cuda") + num_prompts_tensor = torch.tensor( + len(prompts_text_list), dtype=torch.long, device="cuda") else: - num_prompts_tensor = torch.empty(1, dtype=torch.long, device="cuda") + num_prompts_tensor = torch.empty( + 1, dtype=torch.long, device="cuda") dist.broadcast(num_prompts_tensor, src=0) num_total_prompts = num_prompts_tensor.item() else: @@ -148,13 +157,14 @@ def main( current_batch_prompt_tokens = None if rank == 0: - current_batch_prompt_texts = prompts_text_list[i:i+batch_size] + current_batch_prompt_texts = prompts_text_list[i:i + batch_size] # Tokenize on rank 0 using StandardTokenizer current_batch_prompt_tokens, _ = tokenizer.tokenize_prompts( current_batch_prompt_texts, use_chat_template ) - - _print(f"Processing batch {current_batch_num}, size {len(current_batch_prompt_tokens)}") + + _print( + f"Processing batch {current_batch_num}, size {len(current_batch_prompt_tokens)}") # All ranks call generate_batch_distributed generated_tokens_for_batch = backend.generate_batch_distributed( @@ -164,12 +174,14 @@ def main( if rank == 0: # Validate that we received valid tokens if not generated_tokens_for_batch: - raise RuntimeError(f"Backend returned empty tokens for batch {current_batch_num}") - + raise RuntimeError( + f"Backend returned empty tokens for batch {current_batch_num}") + for batch_idx, tokens in enumerate(generated_tokens_for_batch): if not isinstance(tokens, (list, tuple)) or len(tokens) == 0: - raise RuntimeError(f"Backend returned empty or invalid tokens for batch {current_batch_num}, item {batch_idx}: {tokens}") - + raise RuntimeError( + f"Backend returned empty or invalid tokens for batch {current_batch_num}, item {batch_idx}: {tokens}") + # Decode tokens to text using StandardTokenizer decoded_texts_for_batch = tokenizer.batch_decode( generated_tokens_for_batch @@ -183,23 +195,36 @@ def main( original_df_idx = start_index_in_df + batch_idx if original_df_idx < len(df_for_results): # Use at for assignments with list values - df_for_results.at[original_df_idx, 'model_output'] = decoded_texts_for_batch[batch_idx] - df_for_results.at[original_df_idx, 'tok_model_output'] = generated_tokens_for_batch[batch_idx] - df_for_results.at[original_df_idx, 'tok_model_output_len'] = len(generated_tokens_for_batch[batch_idx]) + df_for_results.at[original_df_idx, + 'model_output'] = decoded_texts_for_batch[batch_idx] + df_for_results.at[original_df_idx, + 'tok_model_output'] = generated_tokens_for_batch[batch_idx] + df_for_results.at[original_df_idx, 'tok_model_output_len'] = len( + generated_tokens_for_batch[batch_idx]) _print(f"Batch {current_batch_num} completed.") if rank == 0 and df_for_results is not None: _print(f"All batches processed. Saving results...") - + # Keep only required columns in the same order as run_eval.py - output_columns = ['text_input', 'ground_truth', 'question', 'dataset', 'model_output', 'tok_model_output', 'tok_model_output_len', 'model_backend'] + output_columns = [ + 'text_input', + 'ground_truth', + 'question', + 'dataset', + 'model_output', + 'tok_model_output', + 'tok_model_output_len', + 'model_backend'] # Filter to only columns that exist - output_columns = [col for col in output_columns if col in df_for_results.columns] + output_columns = [ + col for col in output_columns if col in df_for_results.columns] df_output = df_for_results[output_columns] - + try: - saved_file = save_results(df_output, output_pickle_path, add_timestamp=True) + saved_file = save_results( + df_output, output_pickle_path, add_timestamp=True) _print(f"Successfully saved results to {saved_file}") except Exception as e: _print(f"Error saving output pickle file: {e}") @@ -234,4 +259,4 @@ def main( args.output_file, args.num_samples, args.skip_samples, - ) \ No newline at end of file + ) diff --git a/language/deepseek-r1/run_mlperf.py b/language/deepseek-r1/run_mlperf.py index 7f484e725e..2345cf5b9b 100755 --- a/language/deepseek-r1/run_mlperf.py +++ b/language/deepseek-r1/run_mlperf.py @@ -1,4 +1,23 @@ #!/usr/bin/env python3 +from eval_accuracy import process_dataframe, print_evaluation_results, process_and_save_dataframe, process_mlperf_log_accuracy +from utils import ( + validate_runner_for_backend, uses_text_input, uses_chat_template, + load_dataset, save_results, print_runner_header, StandardTokenizer, + get_backend_instance, create_base_argument_parser, + setup_output_paths, validate_runner_args, handle_runner_error, + validate_dataset_extended, generate_timestamped_filename +) +from mlperf import ( + OfflineSUT, ServerSUT, BaseSUT, + QuerySampleLibrary, + prepare_mlperf_dataset, + process_mlperf_results, + create_mlperf_output_dataframe +) +from backends import BaseBackend +import pandas as pd +import numpy as np +import mlperf_loadgen as lg import argparse import json import logging @@ -10,26 +29,6 @@ # Disable tokenizers parallelism to avoid forking issues os.environ["TOKENIZERS_PARALLELISM"] = "false" -import mlperf_loadgen as lg -import numpy as np -import pandas as pd - -from backends import BaseBackend -from mlperf import ( - OfflineSUT, ServerSUT, BaseSUT, - QuerySampleLibrary, - prepare_mlperf_dataset, - process_mlperf_results, - create_mlperf_output_dataframe -) -from utils import ( - validate_runner_for_backend, uses_text_input, uses_chat_template, - load_dataset, save_results, print_runner_header, StandardTokenizer, - get_backend_instance, create_base_argument_parser, - setup_output_paths, validate_runner_args, handle_runner_error, - validate_dataset_extended, generate_timestamped_filename -) -from eval_accuracy import process_dataframe, print_evaluation_results, process_and_save_dataframe, process_mlperf_log_accuracy # Configure logging logging.basicConfig( @@ -47,39 +46,39 @@ def create_argument_parser() -> argparse.ArgumentParser: # Scenario selection (no backend argument, auto-detected) parser.add_argument("--mode", type=str, default="offline", - choices=["offline", "server"], - help="MLPerf scenario mode") + choices=["offline", "server"], + help="MLPerf scenario mode") # MLPerf configuration parser.add_argument("--mlperf-conf", type=str, default="/inference/mlperf.conf", - help="Path to MLPerf configuration file") + help="Path to MLPerf configuration file") parser.add_argument("--user-conf", type=str, default="mlperf/user.conf", - help="Path to user configuration file") + help="Path to user configuration file") parser.add_argument("--scenario", type=str, default=None, - choices=["Offline", "Server"], - help="MLPerf scenario (overrides --mode)") + choices=["Offline", "Server"], + help="MLPerf scenario (overrides --mode)") parser.add_argument("--accuracy", action="store_true", - help="Run accuracy mode instead of performance") + help="Run accuracy mode instead of performance") # Output configuration parser.add_argument("--output-dir", type=str, default="mlperf_results", - help="Directory for MLPerf output logs") + help="Directory for MLPerf output logs") parser.add_argument("--log-dir", type=str, default=None, - help="Directory for detailed logs") + help="Directory for detailed logs") return parser def configure_loadgen(scenario: str, - accuracy_mode: bool, - mlperf_conf: Optional[str] = None, - user_conf: Optional[str] = None, - log_dir: Optional[str] = None, - model_name: str = "deepseek-r1") -> lg.TestSettings: + accuracy_mode: bool, + mlperf_conf: Optional[str] = None, + user_conf: Optional[str] = None, + log_dir: Optional[str] = None, + model_name: str = "deepseek-r1") -> lg.TestSettings: """Configure LoadGen test settings. Args: @@ -119,9 +118,9 @@ def configure_loadgen(scenario: str, def run_loadgen_test(sut: Union[OfflineSUT, ServerSUT], - qsl: QuerySampleLibrary, - settings: lg.TestSettings, - log_settings: lg.LogSettings) -> None: + qsl: QuerySampleLibrary, + settings: lg.TestSettings, + log_settings: lg.LogSettings) -> None: """Run LoadGen test. Args: @@ -162,7 +161,8 @@ def main(): if args.log_dir: log_dir = Path(args.log_dir) else: - log_dir = output_dir / args.mode / ("accuracy" if args.accuracy else "performance") + log_dir = output_dir / args.mode / \ + ("accuracy" if args.accuracy else "performance") log_dir.mkdir(parents=True, exist_ok=True) # Set up output paths with mode information @@ -170,17 +170,21 @@ def main(): if args.output_file is None: # Create output file path in the log directory mode_str = "accuracy" if args.accuracy else "performance" - output_file_base = str(log_dir / f"{backend_name}_mlperf_{args.mode}_{mode_str}_output.pkl") + output_file_base = str( + log_dir / f"{backend_name}_mlperf_{args.mode}_{mode_str}_output.pkl") else: output_file_base = args.output_file - # Generate the actual filename with timestamp that will be used for saving - actual_output_file = generate_timestamped_filename(output_file_base, add_timestamp=True) + # Generate the actual filename with timestamp that will be used for + # saving + actual_output_file = generate_timestamped_filename( + output_file_base, add_timestamp=True) # Ensure the parent directory of the output file exists output_file_parent = Path(actual_output_file).parent output_file_parent.mkdir(parents=True, exist_ok=True) - logger.info(f"Ensured output file directory exists: {output_file_parent}") + logger.info( + f"Ensured output file directory exists: {output_file_parent}") logger.info("=" * 80) logger.info("MLPerf Inference Benchmark Runner (Async Pattern)") @@ -220,13 +224,14 @@ def main(): # For backends that use text prompts, we pass the processed strings # For tokenized backends, we pass the tokenized prompts if uses_text_prompts: - logger.info(f"Backend {backend_name} will use text prompts directly") + logger.info( + f"Backend {backend_name} will use text prompts directly") dataset_for_sut = tokenized_prompts strings_for_sut = processed_strings else: logger.info(f"Backend {backend_name} will use tokenized prompts") dataset_for_sut = tokenized_prompts - strings_for_sut = processed_strings # This is what gets used for generation now + strings_for_sut = processed_strings # This is what gets used for generation now # Create backend using registry logger.info(f"Initializing {backend_name} backend...") @@ -315,7 +320,8 @@ def main(): try: # Get results from SUT - must have valid results if not sut_results: - raise RuntimeError("No results available from SUT - backend failed to generate tokens") + raise RuntimeError( + "No results available from SUT - backend failed to generate tokens") # Process results using new utility processed_results = process_mlperf_results( @@ -347,16 +353,19 @@ def main(): mlperf_log_file = log_dir / "mlperf_log_accuracy.json" if mlperf_log_file.exists(): - logger.info(f"Found MLPerf log accuracy file: {mlperf_log_file}") + logger.info( + f"Found MLPerf log accuracy file: {mlperf_log_file}") logger.info("Using MLPerf log for accuracy evaluation...") # Get checkpoint path from backend configuration backend_config = get_backend_instance(backend_name).config # Determine checkpoint path based on backend type - if hasattr(get_backend_instance(backend_name), 'model_path'): + if hasattr(get_backend_instance( + backend_name), 'model_path'): # PyTorch backend has model_path - checkpoint_path = str(get_backend_instance(backend_name).model_path) + checkpoint_path = str( + get_backend_instance(backend_name).model_path) elif 'model' in backend_config: # Other backends use model name directly checkpoint_path = backend_config['model'] @@ -376,10 +385,13 @@ def main(): base_filename="mlperf_accuracy_evaluated.pkl" ) - logger.info(f"MLPerf accuracy evaluation saved to: {evaluated_file}") + logger.info( + f"MLPerf accuracy evaluation saved to: {evaluated_file}") else: - logger.info("No MLPerf log accuracy file found, using standard DataFrame evaluation...") - raise RuntimeError("No MLPerf log accuracy file found, using standard DataFrame evaluation...") + logger.info( + "No MLPerf log accuracy file found, using standard DataFrame evaluation...") + raise RuntimeError( + "No MLPerf log accuracy file found, using standard DataFrame evaluation...") # Ensure clean exit gc.collect() @@ -397,4 +409,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/language/deepseek-r1/run_mlperf_mpi.py b/language/deepseek-r1/run_mlperf_mpi.py index 66196645ed..176be41710 100755 --- a/language/deepseek-r1/run_mlperf_mpi.py +++ b/language/deepseek-r1/run_mlperf_mpi.py @@ -1,4 +1,37 @@ #!/usr/bin/env python3 +from eval_accuracy import process_dataframe, print_evaluation_results, process_and_save_dataframe, process_mlperf_log_accuracy +from utils.data_utils import ( + load_dataset, save_results, + generate_timestamped_filename +) +from utils.validation import ( + validate_runner_args, ValidationError, + validate_dataset_extended +) +from utils.backend_registry import ( + uses_chat_template, get_backend_instance, detect_backend, + validate_runner_for_backend +) +from utils.runner_utils import create_base_argument_parser, print_runner_header +from utils import ( + StandardTokenizer, + validate_dataset, + process_inference_results +) +from mlperf import ( + OfflineSUT, ServerSUT, BaseSUT, + DistributedQuerySampleLibrary, + prepare_mlperf_dataset, + process_mlperf_results, + create_mlperf_output_dataframe +) +from backends.pytorch_backend import PyTorchBackend +from transformers import AutoTokenizer +import torch.distributed as dist +import torch +import pandas as pd +import numpy as np +import mlperf_loadgen as lg import argparse import json import logging @@ -12,41 +45,6 @@ # Disable tokenizers parallelism to avoid forking issues os.environ["TOKENIZERS_PARALLELISM"] = "false" -import mlperf_loadgen as lg -import numpy as np -import pandas as pd -import torch -import torch.distributed as dist -from transformers import AutoTokenizer - -from backends.pytorch_backend import PyTorchBackend -from mlperf import ( - OfflineSUT, ServerSUT, BaseSUT, - DistributedQuerySampleLibrary, - prepare_mlperf_dataset, - process_mlperf_results, - create_mlperf_output_dataframe -) -from utils import ( - StandardTokenizer, - validate_dataset, - process_inference_results -) -from utils.runner_utils import create_base_argument_parser, print_runner_header -from utils.backend_registry import ( - uses_chat_template, get_backend_instance, detect_backend, - validate_runner_for_backend -) -from utils.validation import ( - validate_runner_args, ValidationError, - validate_dataset_extended -) -from utils.data_utils import ( - load_dataset, save_results, - generate_timestamped_filename -) -from eval_accuracy import process_dataframe, print_evaluation_results, process_and_save_dataframe, process_mlperf_log_accuracy - # Configure logging - only for rank 0 def setup_logging(rank: int): @@ -119,7 +117,7 @@ def issue_queries(self, query_samples: List[lg.QuerySample]) -> None: batch_size = self.backend.config['batch_size'] for i in range(0, len(query_samples), batch_size): - batch_samples = query_samples[i:i+batch_size] + batch_samples = query_samples[i:i + batch_size] # Prepare batch tokens batch_tokens = [] @@ -141,10 +139,12 @@ def issue_queries(self, query_samples: List[lg.QuerySample]) -> None: # Generate using distributed backend # This will broadcast to all ranks internally - generated_tokens = self.backend.generate_batch_distributed(batch_tokens) + generated_tokens = self.backend.generate_batch_distributed( + batch_tokens) # Process results and send to LoadGen - for j, (sample_id, tokens) in enumerate(zip(batch_ids, generated_tokens)): + for j, (sample_id, tokens) in enumerate( + zip(batch_ids, generated_tokens)): # Create a copy of tokens before numpy conversion tokens_copy = tokens.copy() @@ -219,7 +219,8 @@ def get_results(self) -> List[Dict[str, Any]]: # Decode tokens to get text output output_text = '' if self.backend.tokenizer: - output_text = self.backend.tokenizer.decode(tokens, skip_special_tokens=True) + output_text = self.backend.tokenizer.decode( + tokens, skip_special_tokens=True) ordered_results.append({ 'model_output': output_text, @@ -228,16 +229,16 @@ def get_results(self) -> List[Dict[str, Any]]: }) else: # Result exists but no tokens - this is an error - raise RuntimeError(f"No tokens in result for dataset index {i}, sample_id {sample_id}") + raise RuntimeError( + f"No tokens in result for dataset index {i}, sample_id {sample_id}") else: # No result for this index - this is an error - raise RuntimeError(f"No result for dataset index {i}, sample_id {sample_id}") + raise RuntimeError( + f"No result for dataset index {i}, sample_id {sample_id}") return ordered_results - - def create_argument_parser() -> argparse.ArgumentParser: """Create argument parser for distributed MLPerf runner.""" parser = argparse.ArgumentParser( @@ -247,44 +248,45 @@ def create_argument_parser() -> argparse.ArgumentParser: # Dataset arguments parser.add_argument("--input-file", type=str, - default="data/final_output.pkl", - help="Input pickle file with prompts") + default="data/final_output.pkl", + help="Input pickle file with prompts") # MLPerf configuration parser.add_argument("--mlperf-conf", type=str, default="/inference/mlperf.conf", - help="Path to MLPerf configuration file") + help="Path to MLPerf configuration file") parser.add_argument("--user-conf", type=str, default="mlperf/user.conf", - help="Path to user configuration file") + help="Path to user configuration file") parser.add_argument("--mode", type=str, default="offline", - choices=["offline", "server"], - help="MLPerf scenario mode (only offline supported for distributed)") + choices=["offline", "server"], + help="MLPerf scenario mode (only offline supported for distributed)") parser.add_argument("--accuracy", action="store_true", - help="Run accuracy mode instead of performance") + help="Run accuracy mode instead of performance") # Output configuration parser.add_argument("--output-dir", type=str, default="mlperf_results", - help="Directory for MLPerf output logs") + help="Directory for MLPerf output logs") parser.add_argument("--log-dir", type=str, default=None, - help="Directory for detailed logs") + help="Directory for detailed logs") parser.add_argument("--output-file", type=str, default=None, - help="Output pickle file path (auto-generated if not specified)") + help="Output pickle file path (auto-generated if not specified)") - # Note: --no-chat-template is removed (chat template usage determined by backend registry) + # Note: --no-chat-template is removed (chat template usage determined by + # backend registry) return parser def configure_loadgen(scenario: str, - accuracy_mode: bool, - mlperf_conf: Optional[str] = None, - user_conf: Optional[str] = None, - log_dir: Optional[str] = None, - model_name: str = "deepseek-r1") -> lg.TestSettings: + accuracy_mode: bool, + mlperf_conf: Optional[str] = None, + user_conf: Optional[str] = None, + log_dir: Optional[str] = None, + model_name: str = "deepseek-r1") -> lg.TestSettings: """Configure LoadGen test settings. Args: @@ -324,11 +326,11 @@ def configure_loadgen(scenario: str, def run_loadgen_test(sut: DistributedOfflineSUT, - qsl: DistributedQuerySampleLibrary, - settings: lg.TestSettings, - log_settings: lg.LogSettings, - rank: int, - logger) -> None: + qsl: DistributedQuerySampleLibrary, + settings: lg.TestSettings, + log_settings: lg.LogSettings, + rank: int, + logger) -> None: """Run LoadGen test (only on rank 0). Args: @@ -386,7 +388,8 @@ def main(): # Validate mode for distributed if args.mode != "offline": if rank == 0: - logger.error("Only offline mode is supported for distributed execution") + logger.error( + "Only offline mode is supported for distributed execution") sys.exit(1) # Create output directories (only rank 0) @@ -397,7 +400,8 @@ def main(): if args.log_dir: log_dir = Path(args.log_dir) else: - log_dir = output_dir / args.mode / ("accuracy" if args.accuracy else "performance") + log_dir = output_dir / args.mode / \ + ("accuracy" if args.accuracy else "performance") log_dir.mkdir(parents=True, exist_ok=True) # Determine output file path @@ -405,15 +409,18 @@ def main(): output_file_base = args.output_file else: mode_str = "accuracy" if args.accuracy else "performance" - output_file_base = str(log_dir / f"{backend_name}_mlperf_{args.mode}_{mode_str}_output.pkl") + output_file_base = str( + log_dir / f"{backend_name}_mlperf_{args.mode}_{mode_str}_output.pkl") # Generate the actual filename with timestamp - actual_output_file = generate_timestamped_filename(output_file_base, add_timestamp=True) + actual_output_file = generate_timestamped_filename( + output_file_base, add_timestamp=True) # Ensure the parent directory of the output file exists output_file_parent = Path(actual_output_file).parent output_file_parent.mkdir(parents=True, exist_ok=True) - logger.info(f"Ensured output file directory exists: {output_file_parent}") + logger.info( + f"Ensured output file directory exists: {output_file_parent}") logger.info("=" * 80) logger.info("MLPerf Inference Benchmark Runner (Distributed PyTorch)") @@ -425,7 +432,8 @@ def main(): logger.info(f"Input file: {args.input_file}") logger.info(f"Output directory: {output_dir}") logger.info(f"Output file: {actual_output_file}") - logger.info(f"Chat template: {'enabled' if use_chat_template else 'disabled'} (from registry)") + logger.info( + f"Chat template: {'enabled' if use_chat_template else 'disabled'} (from registry)") logger.info("=" * 80) else: log_dir = None @@ -460,7 +468,8 @@ def main(): tokenized_prompts = dataset_info['tokenized_prompts'] processed_strings = dataset_info['processed_strings'] - logger.info(f"Loaded {len(tokenized_prompts)} prompts from dataset") + logger.info( + f"Loaded {len(tokenized_prompts)} prompts from dataset") # Create SUT sut = DistributedOfflineSUT( @@ -511,7 +520,8 @@ def main(): if rank == 0: # Run test (only rank 0) logger.info("Running test...") - run_loadgen_test(sut, qsl, settings, log_settings, rank, logger) + run_loadgen_test( + sut, qsl, settings, log_settings, rank, logger) logger.info("Completed test...") # Ensure all queries are flushed and async operations complete @@ -524,7 +534,8 @@ def main(): dist.broadcast_object_list(exit_signal, src=0) else: # Non-rank 0 processes participate in distributed generation - # They wait for signals from rank 0 and participate in generate_batch_distributed + # They wait for signals from rank 0 and participate in + # generate_batch_distributed while True: # First, check if we should exit # We use a separate broadcast to signal exit @@ -536,7 +547,8 @@ def main(): break elif exit_check[0] == "generate": # Signal to participate in generation - # The actual batch tokens will be broadcast inside generate_batch_distributed + # The actual batch tokens will be broadcast inside + # generate_batch_distributed backend.generate_batch_distributed(None) # If exit_check[0] is None, continue waiting finally: @@ -563,9 +575,11 @@ def main(): try: # Get results from SUT (if available) - logger.info("Retrieving results from distributed SUT...") + logger.info( + "Retrieving results from distributed SUT...") sut_results = sut.get_results() - logger.info(f"Retrieved {len(sut_results)} results from distributed SUT") + logger.info( + f"Retrieved {len(sut_results)} results from distributed SUT") # Process results using new utility processed_results = process_mlperf_results( @@ -597,11 +611,19 @@ def main(): mlperf_log_file = log_dir / "mlperf_log_accuracy.json" if mlperf_log_file.exists(): - logger.info(f"Found MLPerf log accuracy file: {mlperf_log_file}") - logger.info("Using MLPerf log for accuracy evaluation...") - - # For PyTorch backend (only one supported in MPI), get model path - checkpoint_path = str(backend.model_path) if hasattr(backend, 'model_path') else backend.config.get('model_name', 'deepseek-ai/DeepSeek-R1') + logger.info( + f"Found MLPerf log accuracy file: {mlperf_log_file}") + logger.info( + "Using MLPerf log for accuracy evaluation...") + + # For PyTorch backend (only one supported in MPI), + # get model path + checkpoint_path = str( + backend.model_path) if hasattr( + backend, + 'model_path') else backend.config.get( + 'model_name', + 'deepseek-ai/DeepSeek-R1') # Process MLPerf log accuracy df_evaluated, evaluated_file = process_mlperf_log_accuracy( @@ -612,10 +634,13 @@ def main(): base_filename="mlperf_accuracy_evaluated.pkl" ) - logger.info(f"MLPerf accuracy evaluation saved to: {evaluated_file}") + logger.info( + f"MLPerf accuracy evaluation saved to: {evaluated_file}") else: - logger.info("No MLPerf log accuracy file found, using standard DataFrame evaluation...") - raise RuntimeError("No MLPerf log accuracy file found, using standard DataFrame evaluation...") + logger.info( + "No MLPerf log accuracy file found, using standard DataFrame evaluation...") + raise RuntimeError( + "No MLPerf log accuracy file found, using standard DataFrame evaluation...") except KeyboardInterrupt: if rank == 0: @@ -639,4 +664,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/language/deepseek-r1/utils/__init__.py b/language/deepseek-r1/utils/__init__.py index ce8d10e9fd..65f575da29 100644 --- a/language/deepseek-r1/utils/__init__.py +++ b/language/deepseek-r1/utils/__init__.py @@ -101,4 +101,4 @@ # Error handling 'handle_backend_error', 'handle_runner_error' -] \ No newline at end of file +] diff --git a/language/deepseek-r1/utils/backend_registry.py b/language/deepseek-r1/utils/backend_registry.py index 73aee587e6..aa8d62a18e 100644 --- a/language/deepseek-r1/utils/backend_registry.py +++ b/language/deepseek-r1/utils/backend_registry.py @@ -173,7 +173,8 @@ def validate_backend(backend: str) -> None: f"Unknown backend '{backend}'. Supported backends: {', '.join(supported_backends)}") -def _get_compatibility_error_message(backend: str, runner_type: str, compatible: List[str]) -> str: +def _get_compatibility_error_message( + backend: str, runner_type: str, compatible: List[str]) -> str: """ Generate error message for incompatible backend/runner combinations. @@ -401,7 +402,8 @@ def get_backend_instance(backend_name: Optional[str] = None): return backend_class() -def is_backend_compatible_with_runner(backend_name: Optional[str] = None, runner_type: str = None) -> bool: +def is_backend_compatible_with_runner( + backend_name: Optional[str] = None, runner_type: str = None) -> bool: """Check if a backend is compatible with a specific runner type. Args: @@ -441,7 +443,8 @@ def get_backend_env_vars(backend_name: Optional[str] = None) -> Dict[str, str]: # Get static env vars env_vars = BACKEND_REGISTRY[backend_name]['env_vars'].copy() - # Handle dynamic env vars (e.g., OMP_NUM_THREADS based on tensor_parallel_size) + # Handle dynamic env vars (e.g., OMP_NUM_THREADS based on + # tensor_parallel_size) if backend_name == 'vllm': config = get_backend_config(backend_name) env_vars['OMP_NUM_THREADS'] = str( @@ -461,4 +464,4 @@ def apply_backend_env_vars(backend_name: Optional[str] = None) -> None: env_vars = get_backend_env_vars(backend_name) for key, value in env_vars.items(): - os.environ[key] = value \ No newline at end of file + os.environ[key] = value diff --git a/language/deepseek-r1/utils/data_utils.py b/language/deepseek-r1/utils/data_utils.py index 80acb5c8ce..0eb4cd3dcd 100644 --- a/language/deepseek-r1/utils/data_utils.py +++ b/language/deepseek-r1/utils/data_utils.py @@ -15,54 +15,56 @@ from utils.validation import ValidationError, validate_dataset_extended -def generate_timestamped_filename(output_file: str, add_timestamp: bool = True) -> str: +def generate_timestamped_filename( + output_file: str, add_timestamp: bool = True) -> str: """ Generate the actual filename that will be used when saving, with timestamp if requested. - + Args: output_file: Base output file path add_timestamp: Whether to add timestamp to filename - + Returns: Actual filename that will be used for saving """ if not add_timestamp: return output_file - + timestamp_suffix = time.strftime("%Y%m%d_%H%M%S") base_name, ext = os.path.splitext(output_file) return f"{base_name}_{timestamp_suffix}{ext}" -def load_dataset(file_path: str, num_samples: Optional[int] = None, skip_samples: int = 0) -> pd.DataFrame: +def load_dataset( + file_path: str, num_samples: Optional[int] = None, skip_samples: int = 0) -> pd.DataFrame: """ Load dataset from pickle file. - + Args: file_path: Path to the pickle file num_samples: Optional limit on number of samples to load skip_samples: Number of samples to skip from the beginning - + Returns: Loaded DataFrame - + Raises: ValidationError: If file doesn't exist or validation fails Exception: If file can't be loaded """ if not os.path.exists(file_path): raise ValidationError(f"Input file not found: {file_path}") - + print(f"Loading dataset from {file_path}...") - + try: with open(file_path, "rb") as f: df = pd.read_pickle(f) except Exception as e: raise ValidationError(f"Failed to load dataset: {str(e)}") - + print(f"Loaded {len(df)} samples") - + # Skip samples if specified if skip_samples > 0: if skip_samples >= len(df): @@ -71,31 +73,33 @@ def load_dataset(file_path: str, num_samples: Optional[int] = None, skip_samples ) original_length = len(df) df = df.iloc[skip_samples:].reset_index(drop=True) - print(f"Skipped first {skip_samples} samples (from {original_length} total)") - + print( + f"Skipped first {skip_samples} samples (from {original_length} total)") + # Limit number of samples if specified if num_samples is not None: original_length = len(df) df = df.head(num_samples) - print(f"Limited to {len(df)} samples (from {original_length} total after skipping)") - + print( + f"Limited to {len(df)} samples (from {original_length} total after skipping)") + return df -def save_results(df: pd.DataFrame, - output_file: str, - add_timestamp: bool = True) -> str: +def save_results(df: pd.DataFrame, + output_file: str, + add_timestamp: bool = True) -> str: """ Save results DataFrame to pickle file. - + Args: df: DataFrame to save output_file: Output file path add_timestamp: Whether to add timestamp to filename - + Returns: Actual output file path used - + Raises: ValidationError: If save operation fails """ @@ -104,93 +108,99 @@ def save_results(df: pd.DataFrame, timestamp_suffix = time.strftime("%Y%m%d_%H%M%S") base_name, ext = os.path.splitext(output_file) output_file = f"{base_name}_{timestamp_suffix}{ext}" - + # Ensure output directory exists os.makedirs(os.path.dirname(output_file), exist_ok=True) - + print(f"Saving results to {output_file}...") - + # Reset index before saving df_to_save = df.reset_index(drop=True) - + try: with open(output_file, "wb") as f: pickle.dump(df_to_save, f) - print(f"Save completed: {len(df_to_save)} samples saved to {output_file}") + print( + f"Save completed: {len(df_to_save)} samples saved to {output_file}") except Exception as e: raise ValidationError(f"Failed to save results: {str(e)}") - + return output_file -def prepare_output_dataframe(input_df: pd.DataFrame, - backend_name: Optional[str] = None) -> pd.DataFrame: +def prepare_output_dataframe(input_df: pd.DataFrame, + backend_name: Optional[str] = None) -> pd.DataFrame: """ Prepare output DataFrame by cleaning up old columns. - + Args: input_df: Input DataFrame backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. - + Returns: Cleaned DataFrame ready for new results """ if backend_name is None: from utils.backend_registry import detect_backend backend_name = detect_backend() - + df_output = input_df.copy() - + # Define columns to drop (old model outputs and unwanted columns) columns_to_drop = [ # specify columns to drop here ] - + # Also drop any existing backend-specific columns - backend_columns = [col for col in df_output.columns if col.startswith(f'{backend_name}_')] + backend_columns = [ + col for col in df_output.columns if col.startswith(f'{backend_name}_')] columns_to_drop.extend(backend_columns) - + # Drop columns that exist df_output = df_output.drop( columns=[col for col in columns_to_drop if col in df_output.columns] ) - + return df_output -def add_standardized_columns(df: pd.DataFrame, - results: List[Dict[str, Any]], - tokenized_prompts: List[List[int]] = None) -> pd.DataFrame: +def add_standardized_columns(df: pd.DataFrame, + results: List[Dict[str, Any]], + tokenized_prompts: List[List[int]] = None) -> pd.DataFrame: """ Add standardized output columns to DataFrame. - + Args: df: Input DataFrame results: List of result dictionaries from backend tokenized_prompts: List of tokenized input prompts (deprecated, not used) - + Returns: DataFrame with added standardized columns """ # Add results columns with new naming convention df['model_output'] = [r.get('model_output', '') for r in results] df['tok_model_output'] = [r.get('tok_model_output', []) for r in results] - df['tok_model_output_len'] = [r.get('tok_model_output_len', 0) for r in results] + df['tok_model_output_len'] = [ + r.get( + 'tok_model_output_len', + 0) for r in results] df['model_backend'] = [r.get('model_backend', '') for r in results] - + return df -def validate_dataset(df: pd.DataFrame, backend_name: Optional[str] = None) -> None: +def validate_dataset(df: pd.DataFrame, + backend_name: Optional[str] = None) -> None: """ Validate that the dataset has required columns. - + Args: df: DataFrame to validate backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. - + Raises: ValidationError: If required columns are missing or validation fails """ # Use centralized validation function - validate_dataset_extended(df, backend_name) \ No newline at end of file + validate_dataset_extended(df, backend_name) diff --git a/language/deepseek-r1/utils/error_handling.py b/language/deepseek-r1/utils/error_handling.py index 54ca580135..6b588b9c20 100644 --- a/language/deepseek-r1/utils/error_handling.py +++ b/language/deepseek-r1/utils/error_handling.py @@ -5,17 +5,18 @@ from .validation import BackendError, ValidationError -def handle_backend_error(e: Exception, backend_name: str, operation: str) -> None: +def handle_backend_error(e: Exception, backend_name: str, + operation: str) -> None: """ Standardized error handling for backend operations. - + Args: e: The exception that occurred backend_name: Name of the backend operation: Description of the operation that failed """ error_msg = f"\n[{backend_name.upper()}] Error during {operation}: {type(e).__name__}: {str(e)}" - + if isinstance(e, (RuntimeError, ValueError)): # Known errors - just print the message print(error_msg) @@ -28,7 +29,7 @@ def handle_backend_error(e: Exception, backend_name: str, operation: str) -> Non def handle_runner_error(e: Exception, runner_name: str) -> None: """ Standardized error handling for runners. - + Args: e: The exception that occurred runner_name: Name of the runner @@ -45,4 +46,4 @@ def handle_runner_error(e: Exception, runner_name: str) -> None: else: print(f"\n{runner_name} failed: {e}") traceback.print_exc() - sys.exit(1) \ No newline at end of file + sys.exit(1) diff --git a/language/deepseek-r1/utils/runner_utils.py b/language/deepseek-r1/utils/runner_utils.py index 8c90deb515..fc2d4ad7f2 100644 --- a/language/deepseek-r1/utils/runner_utils.py +++ b/language/deepseek-r1/utils/runner_utils.py @@ -12,29 +12,31 @@ def create_base_argument_parser(description: str) -> argparse.ArgumentParser: description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter ) - + # Common dataset arguments parser.add_argument("--input-file", type=str, - default="data/final_output.pkl", - help="Input pickle file with prompts") - + default="data/final_output.pkl", + help="Input pickle file with prompts") + parser.add_argument("--output-file", type=str, default=None, - help="Output pickle file path (auto-generated if not specified)") - + help="Output pickle file path (auto-generated if not specified)") + parser.add_argument("--num-samples", type=int, default=None, - help="Number of samples to process from dataset") - + help="Number of samples to process from dataset") + parser.add_argument("--skip-samples", type=int, default=0, - help="Number of samples to skip from the beginning") - - # NOTE: --no-chat-template flag is NOT included (chat template usage determined by backend registry) - + help="Number of samples to skip from the beginning") + + # NOTE: --no-chat-template flag is NOT included (chat template usage + # determined by backend registry) + return parser -def print_runner_header(runner_name: str, backend_name: Optional[str] = None, args: argparse.Namespace = None) -> None: +def print_runner_header( + runner_name: str, backend_name: Optional[str] = None, args: argparse.Namespace = None) -> None: """Print standardized header for runners. - + Args: runner_name: Name of the runner backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. @@ -43,7 +45,7 @@ def print_runner_header(runner_name: str, backend_name: Optional[str] = None, ar if backend_name is None: from .backend_registry import detect_backend backend_name = detect_backend() - + print("=" * 80) print(f"{runner_name}") print("=" * 80) @@ -59,22 +61,23 @@ def print_runner_header(runner_name: str, backend_name: Optional[str] = None, ar print("=" * 80) -def setup_output_paths(args: argparse.Namespace, backend_name: Optional[str] = None, mode: Optional[str] = None) -> Tuple[Path, str]: +def setup_output_paths(args: argparse.Namespace, + backend_name: Optional[str] = None, mode: Optional[str] = None) -> Tuple[Path, str]: """ Set up output directories and file paths. - + Args: args: Parsed command line arguments backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. mode: Optional mode (e.g., 'offline', 'server' for MLPerf) - + Returns: Tuple of (output_dir, output_file_path) """ if backend_name is None: from .backend_registry import detect_backend backend_name = detect_backend() - + # Determine output directory if hasattr(args, 'output_dir') and args.output_dir: output_dir = Path(args.output_dir) @@ -84,9 +87,9 @@ def setup_output_paths(args: argparse.Namespace, backend_name: Optional[str] = N output_dir = Path(f"outputs/{backend_name}/{mode}") else: output_dir = Path(f"outputs/{backend_name}") - + output_dir.mkdir(parents=True, exist_ok=True) - + # Determine output file path if args.output_file: output_file = args.output_file @@ -97,10 +100,13 @@ def setup_output_paths(args: argparse.Namespace, backend_name: Optional[str] = N suffix = f"_{args.num_samples}samples" else: suffix = "_full" - + if mode: - output_file = str(output_dir / f"{backend_name}_{mode}_output_{timestamp}{suffix}.pkl") + output_file = str( + output_dir / + f"{backend_name}_{mode}_output_{timestamp}{suffix}.pkl") else: - output_file = str(output_dir / f"{backend_name}_output_{timestamp}{suffix}.pkl") - - return output_dir, output_file \ No newline at end of file + output_file = str(output_dir / + f"{backend_name}_output_{timestamp}{suffix}.pkl") + + return output_dir, output_file diff --git a/language/deepseek-r1/utils/tokenization.py b/language/deepseek-r1/utils/tokenization.py index c5fa77d69d..ec67e1e2eb 100644 --- a/language/deepseek-r1/utils/tokenization.py +++ b/language/deepseek-r1/utils/tokenization.py @@ -7,15 +7,15 @@ class StandardTokenizer: """Standard tokenizer for DeepSeek models.""" - + # Standard configuration used across all runners DEFAULT_MODEL = "deepseek-ai/DeepSeek-R1" DEFAULT_MAX_LENGTH = 32 * 1024 - + def __init__(self, model_name: str = None, max_length: int = None): """ Initialize tokenizer. - + Args: model_name: HuggingFace model name max_length: Maximum sequence length @@ -23,50 +23,54 @@ def __init__(self, model_name: str = None, max_length: int = None): self.model_name = model_name or self.DEFAULT_MODEL self.max_length = max_length or self.DEFAULT_MAX_LENGTH self._tokenizer = None - + @property def tokenizer(self): """Lazy load tokenizer.""" if self._tokenizer is None: print(f"Loading tokenizer: {self.model_name}") - self._tokenizer = AutoTokenizer.from_pretrained(self.model_name, revision="56d4cbbb4d29f4355bab4b9a39ccb717a14ad5ad") + self._tokenizer = AutoTokenizer.from_pretrained( + self.model_name, revision="56d4cbbb4d29f4355bab4b9a39ccb717a14ad5ad") return self._tokenizer - - def tokenize_prompts(self, prompts: List[str], - use_chat_template: Optional[bool] = None, - backend_name: Optional[str] = None) -> Tuple[List[List[int]], List[str]]: + + def tokenize_prompts(self, prompts: List[str], + use_chat_template: Optional[bool] = None, + backend_name: Optional[str] = None) -> Tuple[List[List[int]], List[str]]: """ Tokenize prompts with backend-specific handling. - + Args: prompts: List of text prompts use_chat_template: Whether to use chat template (if None and backend_name provided, uses registry) backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. - + Returns: Tuple of (tokenized_prompts, processed_strings) """ # Auto-detect backend if not provided if backend_name is None: backend_name = detect_backend() - + # Determine chat template usage from registry if backend_name provided if use_chat_template is None: use_chat_template = uses_chat_template(backend_name) - print(f"[{backend_name}] Using chat template from registry: {use_chat_template}") - + print( + f"[{backend_name}] Using chat template from registry: {use_chat_template}") + tokenized = [] processed_strings = [] - + for prompt in prompts: - if use_chat_template and hasattr(self.tokenizer, 'apply_chat_template'): + if use_chat_template and hasattr( + self.tokenizer, 'apply_chat_template'): tokens = self.tokenizer.apply_chat_template( [{"role": "user", "content": prompt}], add_generation_prompt=True, max_length=self.max_length, truncation=True ) - processed_string = self.tokenizer.decode(tokens, skip_special_tokens=False) + processed_string = self.tokenizer.decode( + tokens, skip_special_tokens=False) else: tokens = self.tokenizer.encode( prompt, @@ -74,49 +78,52 @@ def tokenize_prompts(self, prompts: List[str], max_length=self.max_length ) processed_string = prompt - + tokenized.append(tokens) processed_strings.append(processed_string) - + return tokenized, processed_strings - - def decode_tokens(self, tokens: List[int], skip_special_tokens: bool = True) -> str: + + def decode_tokens(self, tokens: List[int], + skip_special_tokens: bool = True) -> str: """Decode tokens to text.""" - return self.tokenizer.decode(tokens, skip_special_tokens=skip_special_tokens) - - def batch_decode(self, token_lists: List[List[int]], - skip_special_tokens: bool = True) -> List[str]: + return self.tokenizer.decode( + tokens, skip_special_tokens=skip_special_tokens) + + def batch_decode(self, token_lists: List[List[int]], + skip_special_tokens: bool = True) -> List[str]: """Batch decode multiple token lists.""" - return self.tokenizer.batch_decode(token_lists, skip_special_tokens=skip_special_tokens) + return self.tokenizer.batch_decode( + token_lists, skip_special_tokens=skip_special_tokens) -def process_inference_results(raw_results: List[dict], - tokenizer: Optional[StandardTokenizer] = None, - backend_name: Optional[str] = None, - uses_text_prompts: bool = False) -> List[dict]: +def process_inference_results(raw_results: List[dict], + tokenizer: Optional[StandardTokenizer] = None, + backend_name: Optional[str] = None, + uses_text_prompts: bool = False) -> List[dict]: """ Process raw inference results into standardized format. - + Args: raw_results: Raw results from backend tokenizer: Tokenizer for decoding backend_name: Optional backend name override. If None, uses MLPERF_BACKEND env var. uses_text_prompts: Whether backend uses text prompts - + Returns: List of standardized result dictionaries """ # Auto-detect backend if not provided if backend_name is None: backend_name = detect_backend() - + if backend_name not in get_supported_backends(): raise ValueError(f"Backend {backend_name} is not supported") - + backend_config = get_backend_config(backend_name) - + standardized_results = [] - + for raw_result in raw_results: # Handle text-prompt backends if uses_text_prompts and 'text' in raw_result: @@ -129,9 +136,9 @@ def process_inference_results(raw_results: List[dict], if tokenizer and tokens: try: text = tokenizer.decode_tokens(tokens) - except: + except BaseException: pass - + standardized = { 'model_output': text, 'tok_model_output': tokens, @@ -139,5 +146,5 @@ def process_inference_results(raw_results: List[dict], 'model_backend': backend_name, } standardized_results.append(standardized) - - return standardized_results \ No newline at end of file + + return standardized_results diff --git a/language/deepseek-r1/utils/validation.py b/language/deepseek-r1/utils/validation.py index 29bebef4f1..768427ada1 100644 --- a/language/deepseek-r1/utils/validation.py +++ b/language/deepseek-r1/utils/validation.py @@ -12,8 +12,10 @@ class BackendError(RuntimeError): class BackendNotInitializedError(BackendError): """Raised when backend operation is called before initialization.""" + def __init__(self, backend_name: str = "Backend"): - super().__init__(f"{backend_name} not initialized. Call initialize() first.") + super().__init__( + f"{backend_name} not initialized. Call initialize() first.") class ValidationError(ValueError): @@ -33,9 +35,9 @@ def wrapper(self, *args, **kwargs): def validate_prompts_input(backend_name: Optional[str] = None, - tokenized_prompts: Optional[List[List[int]]] = None, - text_prompts: Optional[List[str]] = None, - input_type: str = None) -> None: + tokenized_prompts: Optional[List[List[int]]] = None, + text_prompts: Optional[List[str]] = None, + input_type: str = None) -> None: """ Centralized prompt validation with backend-specific requirements. @@ -53,13 +55,16 @@ def validate_prompts_input(backend_name: Optional[str] = None, backend_name = detect_backend() if tokenized_prompts is None and text_prompts is None: - raise ValidationError(f"{backend_name} backend requires either text_prompts or tokenized_prompts") + raise ValidationError( + f"{backend_name} backend requires either text_prompts or tokenized_prompts") if input_type == 'text' and tokenized_prompts is not None and text_prompts is None: - raise ValidationError(f"{backend_name} backend requires text_prompts, not tokenized_prompts") + raise ValidationError( + f"{backend_name} backend requires text_prompts, not tokenized_prompts") if input_type == 'tokenized' and text_prompts is not None and tokenized_prompts is None: - raise ValidationError(f"{backend_name} backend requires tokenized_prompts, not text_prompts") + raise ValidationError( + f"{backend_name} backend requires tokenized_prompts, not text_prompts") # Additional validation for tokenized prompts if tokenized_prompts is not None: @@ -67,9 +72,11 @@ def validate_prompts_input(backend_name: Optional[str] = None, raise ValidationError("tokenized_prompts cannot be empty") for i, prompt in enumerate(tokenized_prompts): if not isinstance(prompt, list): - raise ValidationError(f"tokenized_prompts[{i}] must be a list of integers") + raise ValidationError( + f"tokenized_prompts[{i}] must be a list of integers") if not prompt: - raise ValidationError(f"tokenized_prompts[{i}] cannot be empty") + raise ValidationError( + f"tokenized_prompts[{i}] cannot be empty") # Additional validation for text prompts if text_prompts is not None: @@ -81,8 +88,8 @@ def validate_prompts_input(backend_name: Optional[str] = None, def validate_dataset_extended(df: pd.DataFrame, - backend_name: Optional[str] = None, - required_columns: Optional[List[str]] = None) -> None: + backend_name: Optional[str] = None, + required_columns: Optional[List[str]] = None) -> None: """ Extended dataset validation with backend-specific requirements. @@ -101,9 +108,11 @@ def validate_dataset_extended(df: pd.DataFrame, if required_columns is None: required_columns = ['text_input'] - missing_columns = [col for col in required_columns if col not in df.columns] + missing_columns = [ + col for col in required_columns if col not in df.columns] if missing_columns: - raise ValidationError(f"Dataset missing required columns: {missing_columns}") + raise ValidationError( + f"Dataset missing required columns: {missing_columns}") # Check for empty prompts empty_prompts = df['text_input'].isna().sum() @@ -118,7 +127,8 @@ def validate_dataset_extended(df: pd.DataFrame, config = get_backend_config(backend_name) # Add backend-specific validation based on config if needed - print(f"Dataset validation passed: {len(df)} samples with required columns") + print( + f"Dataset validation passed: {len(df)} samples with required columns") def validate_runner_args(args: argparse.Namespace, runner_type: str) -> None: @@ -133,7 +143,8 @@ def validate_runner_args(args: argparse.Namespace, runner_type: str) -> None: ValidationError: If validation fails """ # Common validations - if hasattr(args, 'num_samples') and args.num_samples is not None and args.num_samples <= 0: + if hasattr( + args, 'num_samples') and args.num_samples is not None and args.num_samples <= 0: raise ValidationError("--num-samples must be positive") if hasattr(args, 'skip_samples') and args.skip_samples < 0: @@ -142,4 +153,5 @@ def validate_runner_args(args: argparse.Namespace, runner_type: str) -> None: # Runner-specific validations if runner_type in ['mlperf', 'mlperf_mpi']: if hasattr(args, 'mode') and args.mode not in ['offline', 'server']: - raise ValidationError(f"Invalid mode: {args.mode}. Must be 'offline' or 'server'") \ No newline at end of file + raise ValidationError( + f"Invalid mode: {args.mode}. Must be 'offline' or 'server'") diff --git a/language/gpt-j/README.md b/language/gpt-j/README.md index cfcf068791..9c952b65db 100644 --- a/language/gpt-j/README.md +++ b/language/gpt-j/README.md @@ -1,9 +1,28 @@ # GPT-J Reference Implementation -Please see the [new docs site](https://docs.mlcommons.org/inference/benchmarks/language/gpt-j) for an automated way to run this benchmark across different available implementations and do an end-to-end submission with or without docker. +## Automated command to run the benchmark via MLCFlow Please see the [new docs site](https://docs.mlcommons.org/inference/benchmarks/language/gpt-j/) for an automated way to run this benchmark across different available implementations and do an end-to-end submission with or without docker. +You can also do `pip install mlc-scripts` and then use `mlcr` commands for downloading the model and datasets using the commands given in the later sections. + +### Download model through MLCFlow Automation + +``` +mlcr get,ml-model,gptj,_pytorch --outdirname= -j +``` + +### Download dataset through MLCFlow Automation + +**Validation Dataset** +``` +mlcr get,dataset,cnndm,_validation --outdirname= -j +``` + +**Calibration Dataset** +``` +mlcr get,dataset,cnndm,_calibration --outdirname= -j +``` ### Setup Instructions @@ -113,6 +132,13 @@ Evaluates the ROGUE scores from the accuracy logs. Only applicable when specifyi python evaluation.py --mlperf-accuracy-file ./build/logs/mlperf_log_accuracy.json --dataset-file ./data/cnn_eval.json ``` +### Evaluate the accuracy through MLCFlow Automation +```bash +mlcr process,mlperf,accuracy,_cnndm --result_dir= +``` + +Please click [here](https://github.com/mlcommons/inference/blob/master/language/gpt-j/evaluation.py) to view the Python script for evaluating accuracy for the cnndm dataset. + ### Reference Model - ROUGE scores The following are the rouge scores obtained when evaluating the GPT-J fp32 model on the entire validation set (13368 samples) using beam search, beam_size=4 @@ -122,6 +148,10 @@ ROUGE 2 - 20.1235 ROUGE L - 29.9881 +## Automated command for submission generation via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/submission/) for an automated way to generate submission through MLCFlow. + ### License: Apache License Version 2.0. diff --git a/language/llama2-70b/README.md b/language/llama2-70b/README.md index 506423cc2c..bbd9889564 100644 --- a/language/llama2-70b/README.md +++ b/language/llama2-70b/README.md @@ -7,6 +7,9 @@ - For server scenario, it is necessary to call `lg.FirstTokenComplete(response)` for each query. This way the first token will be reported and it's latency will be measured. - For all scenarios, when calling `lg.QuerySamplesComplete(response)`, it is necessary that each of the elements in response is a `lg.QuerySampleResponse` that contains the number of tokens (can be create this way: `lg.QuerySampleResponse(qitem.id, bi[0], bi[1], n_tokens)`). The number of tokens reported should match with the number of tokens on your answer and this will be checked in [TEST06](../../compliance/nvidia/TEST06/) + +## Automated command to run the benchmark via MLCFlow + Please see the [new docs site](https://docs.mlcommons.org/inference/benchmarks/language/llama2-70b) for an automated way to run this benchmark across different available implementations and do an end-to-end submission with or without docker. You can also do `pip install mlc-scripts` and then use `mlcr` commands for downloading the model and datasets using the commands given in the later sections. @@ -65,9 +68,11 @@ CPU-only setup, as well as any GPU versions for applicable libraries like PyTorc ### MLCommons Members Download MLCommons hosts the model and preprocessed dataset for download **exclusively by MLCommons Members**. You must first agree to the [confidentiality notice](https://llama2.mlcommons.org) using your organizational email address, then you will receive a link to a directory containing Rclone download instructions. _If you cannot access the form but you are part of a MLCommons Member organization, submit the [MLCommons subscription form](https://mlcommons.org/community/subscribe/) with your organizational email address and [associate a Google account](https://accounts.google.com/SignUpWithoutGmail) with your organizational email address._ -Once you have the access, you can download the model automatically via the below command + +### Download model through MLCFlow Automation + ``` -mlcr get,ml-model,llama2 --outdirname=${CHECKPOINT_PATH} -j +mlcr get,ml-model,llama2-70b,_pytorch -j --outdirname= -j ``` ### External Download (Not recommended for official submission) @@ -82,6 +87,34 @@ git clone https://huggingface.co/meta-llama/Llama-2-70b-chat-hf ${CHECKPOINT_PAT ## Get Dataset +### Download Preprocessed dataset through MLCFlow Automation + +**Validation** + +``` +mlcr get,dataset,preprocessed,openorca,_validation --outdirname= -j +``` + +**Calibration** + +``` +mlcr get,dataset,preprocessed,openorca,_calibration --outdirname= -j +``` + +### Download Unprocessed dataset through MLCFlow Automation + +**Validation** + +``` +mlcr get,dataset,openorca,_validation --outdirname= -j +``` + +**Calibration** + +``` +mlcr get,dataset,openorca,_calibration --outdirname= -j +``` + ### Preprocessed You can use Rclone to download the preprocessed dataset from a Cloudflare R2 bucket. @@ -244,6 +277,18 @@ scale from a 0.0-1.0 scale): This was run on a DGX-H100 node. Total runtime was ~4.5 days. +### Evaluate the accuracy through MLCFlow Automation +```bash +mlcr process,mlperf,accuracy,_openorca --result_dir= +``` + +Please click [here](https://github.com/mlcommons/inference/blob/master/language/llama2-70b/evaluate-accuracy.py) to view the Python script for evaluating accuracy for the Waymo dataset. + +## Automated command for submission generation via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/submission/) for an automated way to generate submission through MLCFlow. + + # Run llama2-70b-interactive benchmark For official, Llama2-70b submissions it is also possible to submit in the interactive category. This sets a more strict latency requirements for Time to First Token (ttft) and Time per Output Token (tpot). Specifically, the interactive category requires loadgen to enforce `ttft <= 450ms` and `ttft <= 40ms` diff --git a/language/llama3.1-405b/README.md b/language/llama3.1-405b/README.md index 50668263c4..65cf226d03 100644 --- a/language/llama3.1-405b/README.md +++ b/language/llama3.1-405b/README.md @@ -7,11 +7,12 @@ - For server scenario, it is necessary to call `lg.FirstTokenComplete(response)` for each query. This way the first token will be reported and it's latency will be measured. - For all scenarios, when calling `lg.QuerySamplesComplete(response)`, it is necessary that each of the elements in response is a `lg.QuerySampleResponse` that contains the number of tokens (can be create this way: `lg.QuerySampleResponse(qitem.id, bi[0], bi[1], n_tokens)`). The number of tokens reported should match with the number of tokens on your answer and this will be checked in [TEST06](../../compliance/nvidia/TEST06/) -## Automated command to run the benchmark via MLFlow +## Automated command to run the benchmark via MLCFlow Please see the [new docs site](https://docs.mlcommons.org/inference/benchmarks/language/llama3_1-405b/) for an automated way to run this benchmark across different available implementations and do an end-to-end submission with or without docker. -You can also do pip install mlc-scripts and then use `mlcr` commands for downloading the model and datasets using the commands given in the later sections. +You can also do `pip install mlc-scripts` and then use `mlcr` commands for downloading the model and datasets using the commands given in the later sections. + ## Prepare environment @@ -99,11 +100,24 @@ pip install -e ../../loadgen ## Get Model ### MLCommons Members Download (Recommended for official submission) -You need to request for access to [MLcommons](http://llama3-1.mlcommons.org/) and you'll receive an email with the download instructions. You can download the model automatically via the below command +You need to request for access to [MLcommons](http://llama3-1.mlcommons.org/) and you'll receive an email with the download instructions. + +### Download model through MLCFlow Automation + +**From MLCOMMONS Google Drive** + ``` mlcr get,ml-model,llama3 --outdirname=${CHECKPOINT_PATH} -j ``` +**From HuggingFace** + +``` +mlcr get,ml-model,llama3,_hf --outdirname=${CHECKPOINT_PATH} --hf_token= -j +``` + +**Note:** +Downloading llama3.1-405B model from Hugging Face will require an [**access token**](https://huggingface.co/settings/tokens) which could be generated for your account. Additionally, ensure that your account has access to the [llama3.1-405B](https://huggingface.co/meta-llama/Llama-3.1-405B-Instruct) model. ### External Download (Not recommended for official submission) + First go to [llama3.1-request-link](https://ai.meta.com/resources/models-and-libraries/llama-downloads/) and make a request, sign in to HuggingFace (if you don't have account, you'll need to create one). **Please note your authentication credentials** as you may be required to provide them when cloning below. @@ -115,16 +129,22 @@ git clone https://huggingface.co/meta-llama/Llama-3.1-405B-Instruct ${CHECKPOINT cd ${CHECKPOINT_PATH} && git checkout be673f326cab4cd22ccfef76109faf68e41aa5f1 ``` -### Download huggingface model through MLC + +## Get Dataset + +### Download dataset through MLCFlow Automation + +**Validation** ``` -mlcr get,ml-model,llama3,_hf --outdirname=${CHECKPOINT_PATH} --hf_token= -j +mlcr get,dataset,mlperf,inference,llama3,_validation --outdirname= -j ``` -**Note:** -Downloading llama3.1-405B model from Hugging Face will require an [**access token**](https://huggingface.co/settings/tokens) which could be generated for your account. Additionally, ensure that your account has access to the [llama3.1-405B](https://huggingface.co/meta-llama/Llama-3.1-405B-Instruct) model. +**Calibration** -## Get Dataset +``` +mlcr get,dataset,mlperf,inference,llama3,_calibration --outdirname= -j +``` ### Preprocessed @@ -144,11 +164,6 @@ You can then navigate in the terminal to your desired download directory and run ``` rclone copy mlc-inference:mlcommons-inference-wg-public/llama3.1_405b/mlperf_llama3.1_405b_dataset_8313_processed_fp16_eval.pkl ./ -P ``` -**MLC Command** - -``` -mlcr get,dataset,mlperf,inference,llama3,_validation --outdirname= -j -``` You can also download the calibration dataset from the Cloudflare R2 bucket by running the following command: @@ -156,11 +171,6 @@ You can also download the calibration dataset from the Cloudflare R2 bucket by r rclone copy mlc-inference:mlcommons-inference-wg-public/llama3.1_405b/mlperf_llama3.1_405b_calibration_dataset_512_processed_fp16_eval.pkl ./ -P ``` -**MLC Command** -``` -mlcr get,dataset,mlperf,inference,llama3,_calibration --outdirname= -j -``` - ## Run Performance Benchmarks @@ -267,3 +277,7 @@ Running the GPU implementation in FP16 precision resulted in the following FP16 } ``` The accuracy target is 99% for rougeL and exact_match, and 90% for tokens_per_sample + +## Automated command for submission generation via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/submission/) for an automated way to generate submission through MLCFlow. \ No newline at end of file diff --git a/language/mixtral-8x7b/README.md b/language/mixtral-8x7b/README.md index 74935a4dc2..2bdcceb12c 100644 --- a/language/mixtral-8x7b/README.md +++ b/language/mixtral-8x7b/README.md @@ -9,7 +9,11 @@ - For all scenarios, when calling `lg.QuerySamplesComplete(response)`, it is necessary that each of the elements in response is a `lg.QuerySampleResponse` that contains the number of tokens (can be create this way: `lg.QuerySampleResponse(qitem.id, bi[0], bi[1], n_tokens)`). The number of tokens reported should match with the number of tokens on your answer and this will be checked in [TEST06](../../compliance/nvidia/TEST06/) -Please see the [new docs site](https://docs.mlcommons.org/inference/benchmarks/language/mixtral-8x7b) for an automated way to run this benchmark across different available implementations and do an end-to-end submission with or without docker. +## Automated command to run the benchmark via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/benchmarks/language/mixtral-8x7b/) for an automated way to run this benchmark across different available implementations and do an end-to-end submission with or without docker. + +You can also do `pip install mlc-scripts` and then use `mlcr` commands for downloading the model and datasets using the commands given in the later sections. ## Prepare environment @@ -66,6 +70,12 @@ CPU-only setup, as well as any GPU versions for applicable libraries like PyTorc **Important Note:** Files and configurations of the model have changed, and might change in the future. If you are going to get the model from Hugging Face or any external source, use a version of the model that exactly matches the one in this [commit](https://huggingface.co/mistralai/Mixtral-8x7B-Instruct-v0.1/commit/a60832cb6c88d5cb6e507680d0e9996fbad77050). We strongly recommend to get the model following the steps in the next section: +### Download model through MLCFlow Automation + +``` +mlcr get,ml-model,mixtral --outdirname= -j +``` + ### Get Checkpoint #### Using Rclone @@ -87,6 +97,22 @@ rclone copy mlc-inference:mlcommons-inference-wg-public/mixtral_8x7b/mixtral-8x7 ## Get Dataset +### Download Preprocessed dataset through MLCFlow Automation + +**Validation** + +``` +mlcr get,dataset-mixtral,openorca-mbxp-gsm8k-combined,_validation --outdirname= -j +``` + +**Calibration** + +``` +mlcr get,dataset-mixtral,openorca-mbxp-gsm8k-combined,_calibration --outdirname= -j +``` + +- Adding `_wget` tag to the run command will change the download tool from `rclone` to `wget`. + ### Preprocessed #### Using Rclone @@ -228,6 +254,15 @@ fi The ServerSUT was not tested for GPU runs. +## Accuracy Evaluation + +### Evaluate the accuracy through MLCFlow Automation +```bash +mlcr process,mlperf,accuracy,_openorca-gsm8k-mbxp-combined --result_dir= +``` + +Please click [here](https://github.com/mlcommons/inference/blob/master/language/mixtral-8x7b/evaluate-accuracy.py) to view the Python script for evaluating accuracy for the Waymo dataset. + ### Evaluation Recreating the enviroment for evaluating the quality metrics can be quite tedious. Therefore we provide a dockerfile and recommend using docker for this task. 1. Build the evaluation container @@ -269,3 +304,7 @@ For official submissions, 99% of each reference score is enforced. Additionally, ```json {'tokens_per_sample': 144.84} ``` + +## Automated command for submission generation via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/submission/) for an automated way to generate submission through MLCFlow. \ No newline at end of file diff --git a/recommendation/dlrm_v2/pytorch/README.md b/recommendation/dlrm_v2/pytorch/README.md index 6f09e26ded..9a0c523334 100755 --- a/recommendation/dlrm_v2/pytorch/README.md +++ b/recommendation/dlrm_v2/pytorch/README.md @@ -2,8 +2,12 @@ This is the reference implementation for MLCommons Inference benchmarks. +## Automated command to run the benchmark via MLCFlow + Please see the [new docs site](https://docs.mlcommons.org/inference/benchmarks/recommendation/dlrm-v2/) for an automated way to run this benchmark across different available implementations and do an end-to-end submission with or without docker. +You can also do `pip install mlc-scripts` and then use `mlcr` commands for downloading the model and datasets using the commands given in the later sections. + ### Supported Models **TODO: Decide benchmark name** @@ -71,7 +75,13 @@ CFLAGS="-std=c++14" python setup.py develop --user ### Download preprocessed Dataset -Download the preprocessed dataset using Rclone. +#### Download dataset through MLCFlow Automation + +``` +mlcr get,preprocessed,dataset,criteo,_validation --outdirname= -j +``` + +#### Download the preprocessed dataset using Rclone. To run Rclone on Windows, you can download the executable [here](https://rclone.org/install/#windows). To install Rclone on Linux/macOS/BSD systems, run: @@ -102,13 +112,10 @@ framework | Size in bytes (`du *`) | MD5 hash (`md5sum *`) N/A | pytorch | <2GB | - pytorch | 97.31GB | - -#### MLC method - -The following MLCommons MLC commands can be used to programmatically download the model checkpoint. +#### Download model through MLCFlow Automation ``` -pip install mlc-scripts -mlcr get,ml-model,dlrm,_pytorch,_weight_sharded,_rclone -j +mlcr get,ml-model,get,ml-model,dlrm,_pytorch,_weight_sharded,_rclone --outdirname= -j ``` #### Manual method @@ -312,6 +319,15 @@ In the reference implementation, each sample is mapped to 100-700 user-item pair ### Running accuracy script +#### Evaluate the accuracy through MLCFlow Automation + +```bash +mlcr process,mlperf,accuracy,_terabyte --result_dir= +``` + +Please click [here](https://github.com/mlcommons/inference/blob/master/recommendation/dlrm_v2/pytorch/tools/accuracy-dlrm.py) to view the Python script for evaluating accuracy for the Waymo dataset. + + To get the accuracy from a LoadGen accuracy json log file, 1. If your SUT outputs the predictions and the ground truth labels in a packed format like the reference implementation then run @@ -414,6 +430,10 @@ usage: main.py [-h] `--find-peak-performance` determine the maximum QPS for the Server, while not applicable to other scenarios. +## Automated command for submission generation via MLCFlow + +Please see the [new docs site](https://docs.mlcommons.org/inference/submission/) for an automated way to generate submission through MLCFlow. + ## License [Apache License 2.0](LICENSE) diff --git a/text_to_image/README.md b/text_to_image/README.md index b00595785b..b11873049e 100644 --- a/text_to_image/README.md +++ b/text_to_image/README.md @@ -164,3 +164,15 @@ Add the `--accuracy` to the command to run the benchmark ```bash python3 main.py --dataset "coco-1024" --dataset-path coco2014 --profile stable-diffusion-xl-pytorch --accuracy --model-path model/ [--dtype ] [--device ] [--time