|
| 1 | +import re |
| 2 | +import datetime as dt |
| 3 | +import typing as t |
| 4 | + |
| 5 | +from pydantic import Field, constr, conint |
| 6 | + |
| 7 | +from kube_custom_resource import schema |
| 8 | + |
| 9 | +from ...config import settings |
| 10 | +from ...errors import PodLogFormatError, PodResultsIncompleteError |
| 11 | +from ...utils import GnuTimeResult |
| 12 | + |
| 13 | +from . import base |
| 14 | + |
| 15 | +# If results output format changes in future pytorch-benchmark versions |
| 16 | +# check https://github.com/pytorch/benchmark/blob/main/run.py for changes |
| 17 | +PYTORCH_CPU_TIME_REGEX = re.compile(r"CPU Wall Time per batch:\s+(?P<cpu_time>\d+\.\d+)\s*(?P<cpu_time_units>\w+)") |
| 18 | +PYTORCH_CPU_MEMORY_REGEX = re.compile(r"CPU Peak Memory:\s+(?P<cpu_memory>\d+\.\d+)\s*(?P<cpu_mem_units>\w+)") |
| 19 | +PYTORCH_GPU_TIME_REGEX = re.compile(r"GPU Time per batch:\s+(?P<gpu_time>\d+\.\d+)\s*(?P<gpu_time_units>\w+)") |
| 20 | +PYTORCH_GPU_MEMORY_REGEX = re.compile(r"GPU \d+ Peak Memory:\s+(?P<gpu_memory>\d+\.\d+)\s*(?P<gpu_mem_units>\w+)") |
| 21 | + |
| 22 | + |
| 23 | +class Device(str, schema.Enum): |
| 24 | + """ |
| 25 | + Enumeration of supported computation devices. |
| 26 | + """ |
| 27 | + CPU = "cpu" |
| 28 | + CUDA = "cuda" |
| 29 | + |
| 30 | +# List of models here should match list in images/pytorch-benchmark/Dockerfile |
| 31 | +class PyTorchModel(str, schema.Enum): |
| 32 | + """ |
| 33 | + Enumeration of available models for benchmarking. |
| 34 | + """ |
| 35 | + ALEXNET = "alexnet" |
| 36 | + RESNET50 = "resnet50" |
| 37 | + LLAMA = "llama" |
| 38 | + |
| 39 | +class PyTorchBenchmarkType(str, schema.Enum): |
| 40 | + """ |
| 41 | + Enumeration of model processes available to benchmark. |
| 42 | + """ |
| 43 | + TRAIN = "train" |
| 44 | + EVAL = "eval" |
| 45 | + |
| 46 | + |
| 47 | +class PyTorchSpec(base.BenchmarkSpec): |
| 48 | + """ |
| 49 | + Defines the parameters for the Fio benchmark. |
| 50 | + """ |
| 51 | + image: constr(min_length = 1) = Field( |
| 52 | + f"{settings.default_image_prefix}pytorch-benchmarks:{settings.default_image_tag}", |
| 53 | + description = "The image to use for the benchmark." |
| 54 | + ) |
| 55 | + image_pull_policy: base.ImagePullPolicy = Field( |
| 56 | + base.ImagePullPolicy.IF_NOT_PRESENT, |
| 57 | + description = "The pull policy for the image." |
| 58 | + ) |
| 59 | + # PyTorch benchmark config options |
| 60 | + device: Device = Field( |
| 61 | + Device.CPU, |
| 62 | + description = ( |
| 63 | + "The device to run the ML workload." |
| 64 | + "If device is 'cuda' then you must also make a request for GPU resources by" |
| 65 | + "adding a 'nvidia.com/gpu: <gpu-count>' field to benchmark.spec.resources.limits" |
| 66 | + ) |
| 67 | + ) |
| 68 | + model: PyTorchModel = Field( |
| 69 | + description = "The ML model to benchmark." |
| 70 | + ) |
| 71 | + benchmark_type: PyTorchBenchmarkType = Field( |
| 72 | + PyTorchBenchmarkType.EVAL, |
| 73 | + description = "Whether to benchmark the training or inference (eval) process." |
| 74 | + ) |
| 75 | + input_batch_size: conint(multiple_of=2, ge=2) = Field( |
| 76 | + 64, |
| 77 | + description = "The batch size for the generated model input data.", |
| 78 | + ) |
| 79 | + |
| 80 | + |
| 81 | +class PyTorchResult(schema.BaseModel): |
| 82 | + """ |
| 83 | + Represents an individual PyTorch benchmark result. |
| 84 | + |
| 85 | + Some notes on the inner workings of the pytorch benchmark script: |
| 86 | + - Currently only runs one batch for benchmark so 'time per batch' in pytorch output == total time. |
| 87 | + (This may change in future since 'per batch' suffix was added to output text very recently.) |
| 88 | + https://github.com/pytorch/benchmark/blob/6fef32ddaf93a63088b97eb27620fb57ef247521/run.py#L468 |
| 89 | + - CPU 'wall time' reported by pytorch is significantly shorter than reported by GNU `time` command. |
| 90 | + It's not clear what is taking up this extra time outwith the actual model invocation (downloading |
| 91 | + model weights and generating random in-memory input data shouldn't take long at all). |
| 92 | + """ |
| 93 | + pytorch_time: schema.confloat(ge = 0) = Field( |
| 94 | + ..., |
| 95 | + description = "The CPU wall time (in seconds) as reported by the pytorch benchmark script." |
| 96 | + ) |
| 97 | + peak_cpu_memory: schema.confloat(ge = 0) = Field( |
| 98 | + ..., |
| 99 | + description = "The peak CPU memory usage (in GB) reported by the pytorch benchmark script." |
| 100 | + ) |
| 101 | + gpu_time: t.Optional[schema.confloat(ge = 0)] = Field( |
| 102 | + None, # Default to zero for clearer reporting on cpu-only runs |
| 103 | + description = "The GPU wall time (in seconds) reported by the pytorch benchmark script." |
| 104 | + ) |
| 105 | + peak_gpu_memory: t.Optional[schema.confloat(ge = 0)] = Field( |
| 106 | + None, # Default to zero for clearer reporting on cpu-only runs |
| 107 | + description = "The peak GPU memory usage (in GB) reported by the pytorch benchmark script." |
| 108 | + ) |
| 109 | + gnu_time: GnuTimeResult = Field( |
| 110 | + description = "A container for the output of the `time` command which wraps the benchmark execution script." |
| 111 | + ) |
| 112 | + |
| 113 | + |
| 114 | +class PyTorchStatus(base.BenchmarkStatus): |
| 115 | + """ |
| 116 | + Represents the status of the PyTorch benchmark. |
| 117 | + """ |
| 118 | + gpu_count: conint(ge=0) = Field( |
| 119 | + None, |
| 120 | + description = "The number of gpus used in this benchmark" |
| 121 | + ) |
| 122 | + result: t.Optional[PyTorchResult] = Field( |
| 123 | + None, |
| 124 | + description = "The result of the benchmark." |
| 125 | + ) |
| 126 | + wall_time_result: schema.confloat(ge = 0) = Field( |
| 127 | + None, |
| 128 | + description = ( |
| 129 | + "The wall time (in seconds) reported by the GNU time wrapper." |
| 130 | + "Used as a headline result." |
| 131 | + ) |
| 132 | + ) |
| 133 | + gpu_time_result: schema.confloat(ge = 0) = Field( |
| 134 | + None, |
| 135 | + description = ( |
| 136 | + "The GPU wall time (in seconds) reported by the pytorch benchmark script." |
| 137 | + "Used as a headline result." |
| 138 | + ) |
| 139 | + ) |
| 140 | + worker_pod: t.Optional[base.PodInfo] = Field( |
| 141 | + None, |
| 142 | + description = "Pod information for the pod running the benchmark." |
| 143 | + ) |
| 144 | + client_log: t.Optional[constr(min_length = 1)] = Field( |
| 145 | + None, |
| 146 | + description = "The raw pod log of the client pod." |
| 147 | + ) |
| 148 | + |
| 149 | + |
| 150 | +class PyTorch( |
| 151 | + base.Benchmark, |
| 152 | + subresources = {"status": {}}, |
| 153 | + printer_columns = [ |
| 154 | + { |
| 155 | + "name": "Model", |
| 156 | + "type": "string", |
| 157 | + "jsonPath": ".spec.model", |
| 158 | + }, |
| 159 | + { |
| 160 | + "name": "Benchmark Type", |
| 161 | + "type": "string", |
| 162 | + "jsonPath": ".spec.benchmarkType", |
| 163 | + }, |
| 164 | + { |
| 165 | + "name": "Device", |
| 166 | + "type": "string", |
| 167 | + "jsonPath": ".spec.device", |
| 168 | + }, |
| 169 | + { |
| 170 | + "name": "GPUs", |
| 171 | + "type": "integer", |
| 172 | + "jsonPath": ".status.gpuCount", |
| 173 | + }, |
| 174 | + { |
| 175 | + "name": "Batch Size", |
| 176 | + "type": "integer", |
| 177 | + "jsonPath": ".spec.inputBatchSize", |
| 178 | + }, |
| 179 | + { |
| 180 | + "name": "Status", |
| 181 | + "type": "string", |
| 182 | + "jsonPath": ".status.phase", |
| 183 | + }, |
| 184 | + { |
| 185 | + "name": "Started", |
| 186 | + "type": "date", |
| 187 | + "jsonPath": ".status.startedAt", |
| 188 | + }, |
| 189 | + { |
| 190 | + "name": "Finished", |
| 191 | + "type": "date", |
| 192 | + "jsonPath": ".status.finishedAt", |
| 193 | + }, |
| 194 | + { |
| 195 | + "name": "Wall Time (s)", |
| 196 | + "type": "number", |
| 197 | + "jsonPath": ".status.wallTimeResult", |
| 198 | + }, |
| 199 | + { |
| 200 | + "name": "GPU Time (s)", |
| 201 | + "type": "number", |
| 202 | + "jsonPath": ".status.gpuTimeResult", |
| 203 | + }, |
| 204 | + ] |
| 205 | +): |
| 206 | + """ |
| 207 | + Custom resource for running an PyTorch benchmark. |
| 208 | + """ |
| 209 | + spec: PyTorchSpec = Field( |
| 210 | + ..., |
| 211 | + description = "The parameters for the benchmark." |
| 212 | + ) |
| 213 | + status: PyTorchStatus = Field( |
| 214 | + default_factory = PyTorchStatus, |
| 215 | + description = "The status of the benchmark." |
| 216 | + ) |
| 217 | + |
| 218 | + async def pod_modified( |
| 219 | + self, |
| 220 | + pod: t.Dict[str, t.Any], |
| 221 | + fetch_pod_log: t.Callable[[], t.Awaitable[str]] |
| 222 | + ): |
| 223 | + # Parse GPU count from resources to display in status |
| 224 | + if self.spec.resources: |
| 225 | + if self.spec.resources.limits: |
| 226 | + if 'nvidia.com/gpu' in self.spec.resources.limits.keys(): |
| 227 | + self.status.gpu_count = self.spec.resources.limits['nvidia.com/gpu'] |
| 228 | + else: |
| 229 | + self.status.gpu_count = 0 |
| 230 | + |
| 231 | + pod_phase = pod.get("status", {}).get("phase", "Unknown") |
| 232 | + if pod_phase == "Running": |
| 233 | + self.status.worker_pod = base.PodInfo.from_pod(pod) |
| 234 | + elif pod_phase == "Succeeded": |
| 235 | + self.status.client_log = await fetch_pod_log() |
| 236 | + |
| 237 | + def summarise(self): |
| 238 | + # If the client log has not yet been recorded, bail |
| 239 | + if not self.status.client_log: |
| 240 | + raise PodResultsIncompleteError("Pod has not recorded a result yet") |
| 241 | + |
| 242 | + # Parse job output here |
| 243 | + cpu_time_match = PYTORCH_CPU_TIME_REGEX.search(self.status.client_log) |
| 244 | + cpu_time = cpu_time_match.group('cpu_time') |
| 245 | + cpu_time_units = cpu_time_match.group('cpu_time_units') |
| 246 | + cpu_memory_match = PYTORCH_CPU_MEMORY_REGEX.search(self.status.client_log) |
| 247 | + cpu_peak_memory = cpu_memory_match.group('cpu_memory') |
| 248 | + cpu_peak_memory_units = cpu_memory_match.group('cpu_mem_units') |
| 249 | + |
| 250 | + if cpu_time_units != "milliseconds" or cpu_peak_memory_units != "GB": |
| 251 | + raise PodLogFormatError( |
| 252 | + "results output in unexpected units - expected 'milliseconds' and 'GB'" |
| 253 | + "(it's possible that results formatting has changed in upstream pytorch-benchmarks)" |
| 254 | + ) |
| 255 | + |
| 256 | + if self.spec.device != "cpu": |
| 257 | + # Parse GPU results |
| 258 | + gpu_time_match = PYTORCH_GPU_TIME_REGEX.search(self.status.client_log) |
| 259 | + gpu_time = gpu_time_match.group('gpu_time') |
| 260 | + gpu_time_units = gpu_time_match.group('gpu_time_units') |
| 261 | + gpu_memory_match = PYTORCH_GPU_MEMORY_REGEX.search(self.status.client_log) |
| 262 | + gpu_peak_memory = gpu_memory_match.group('gpu_memory') |
| 263 | + gpu_peak_memory_units = gpu_memory_match.group('gpu_mem_units') |
| 264 | + if gpu_time_units != "milliseconds" or gpu_peak_memory_units != "GB": |
| 265 | + raise PodLogFormatError( |
| 266 | + "results output in unexpected units - expected 'milliseconds' and 'GB'" |
| 267 | + "(it's possible that results formatting has changed in upstream pytorch-benchmarks)" |
| 268 | + ) |
| 269 | + # Convert times to seconds to match GNU time output |
| 270 | + gpu_time = float(gpu_time) / 1000 |
| 271 | + else: |
| 272 | + gpu_time, gpu_peak_memory = None, None |
| 273 | + |
| 274 | + # Parse the GNU time wrapper output |
| 275 | + gnu_time_result = GnuTimeResult.parse(self.status.client_log) |
| 276 | + |
| 277 | + # Convert times to seconds to match GNU time output |
| 278 | + self.status.result = PyTorchResult( |
| 279 | + pytorch_time = float(cpu_time) / 1000, |
| 280 | + peak_cpu_memory = cpu_peak_memory, |
| 281 | + gpu_time = gpu_time, |
| 282 | + peak_gpu_memory = gpu_peak_memory, |
| 283 | + gnu_time = gnu_time_result, |
| 284 | + ) |
| 285 | + |
| 286 | + # Format results nicely for printing |
| 287 | + self.status.wall_time_result = float(f"{self.status.result.gnu_time.wall_time_secs:.3g}") |
| 288 | + if self.status.result.gpu_time: |
| 289 | + self.status.gpu_time_result = float(f"{self.status.result.gpu_time:.3g}") |
0 commit comments