Skip to content

Commit 160cd80

Browse files
Sean1783Sean Archer
andauthored
Regressive resource scaling and accelerators validation (#277)
* Added .venv to .gitignore * Add venv/ to .gitignore * Added .venv to .gitignore * Default values for accelerators implemented * Added memory validation and default accelerators values * Setting default values for memory, vcpu, and accelerators * Unit tests for new quota_allocation functions * Refactoring default values * Unit and integration tests complete * Fix for default cpu values * Refactoring and clean up * Accounting for accelerators when min values provided * Refactoring and clean up * Increased default buffer for mem and cpu. Refactored _resolve_ functions * Refactored and added more unit tests * Additional function for default values created. Refactored some unit tests to account for new default values * Refactoring and test additions * Implemented regressive resource scaling for cpu and memory * Refactoring of unit and integ tests * Small change for a unit test * Increasd reserved resources amounts * Small refactoring --------- Co-authored-by: Sean Archer <[email protected]>
1 parent dc2096a commit 160cd80

File tree

7 files changed

+706
-37
lines changed

7 files changed

+706
-37
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ doc/_build/
3232
/result/
3333
/results/
3434

35-
.idea/
35+
.idea/
36+
37+
.venv*
38+
venv

hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,9 @@
276276
"title": "Queue Name"
277277
},
278278
"accelerators": {
279-
"type": "integer",
280-
"minimum": 0,
281-
"description": "Number of accelerators (GPUs/TPUs)"
279+
"type": "integer",
280+
"minimum": 0,
281+
"description": "Number of accelerators (GPUs/TPUs)"
282282
},
283283
"vcpu": {
284284
"type": "float",

src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pydantic import ConfigDict, Field
22

3-
from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL
3+
from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL, NEURON_RESOURCE_LIMIT_KEY, \
4+
NVIDIA_GPU_RESOURCE_LIMIT_KEY
45
from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
56
_HyperPodPytorchJob, HyperPodPytorchJobStatus
67
)
@@ -20,15 +21,26 @@
2021
import yaml
2122
import logging
2223

23-
from sagemaker.hyperpod.training.quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits
24+
from sagemaker.hyperpod.training.quota_allocation_util import (
25+
_is_valid,
26+
_get_resources_from_compute_quotas,
27+
_get_resources_from_instance,
28+
_get_limits,
29+
_resolve_default_memory_values,
30+
_set_default_accelerators_val,
31+
_validate_accelerators_inputs,
32+
_resolve_default_cpu_values,
33+
_trim_resource_requests
34+
)
2435

2536
TRAINING_GROUP = "sagemaker.amazonaws.com"
2637
API_VERSION = "v1"
2738
PLURAL = "hyperpodpytorchjobs"
2839
KIND = "HyperPodPyTorchJob"
2940
TRAINING_OPERATOR_NAMESPACE = "aws-hyperpod"
3041
TRAINING_OPERATOR_LABEL = "hp-training-control-plane"
31-
42+
NVIDIA_RESOURCE_KEY = NVIDIA_GPU_RESOURCE_LIMIT_KEY
43+
NEURON_RESOURCE_KEY = NEURON_RESOURCE_LIMIT_KEY
3244

3345
class HyperPodPytorchJob(_HyperPodPytorchJob):
3446
"""HyperPod PyTorch job for distributed training on Amazon SageMaker HyperPod clusters.
@@ -94,27 +106,64 @@ def _process_replica_resources(cls, data):
94106
requests = resources.get('requests', {})
95107
limits = resources.get('limits', {})
96108

109+
accelerators = None
110+
if requests.get('accelerators'):
111+
accelerators = int(requests.get('accelerators'))
112+
elif requests.get(NVIDIA_RESOURCE_KEY):
113+
accelerators = int(requests.get(NVIDIA_RESOURCE_KEY))
114+
elif requests.get(NEURON_RESOURCE_KEY):
115+
accelerators = int(requests.get(NEURON_RESOURCE_KEY))
116+
97117
# Extract resource values
98-
vcpu = float(requests.get('vcpu')) if requests.get('vcpu') else None
118+
vcpu = None
119+
if requests.get('cpu'):
120+
vcpu = float(requests.get('cpu'))
121+
elif requests.get('vcpu'):
122+
vcpu = float(requests.get('vcpu'))
123+
124+
vcpu_limit = None
125+
if limits.get('cpu'):
126+
vcpu_limit = float(limits.get('cpu'))
127+
elif limits.get('vcpu'):
128+
vcpu_limit = float(limits.get('vcpu'))
129+
99130
memory = cls._extract_numeric_value(requests.get('memory'))
100-
accelerators = int(requests.get('accelerators')) if requests.get('accelerators') else None
101131
memory_limit = cls._extract_numeric_value(limits.get('memory'))
102-
vcpu_limit = float(limits.get('vcpu')) if limits.get('vcpu') else None
103-
accelerators_limit = int(limits.get('accelerators')) if limits.get('accelerators') else None
132+
133+
accelerators_limit = None
134+
if limits.get('accelerators'):
135+
accelerators_limit = int(limits.get('accelerators'))
136+
elif limits.get(NVIDIA_RESOURCE_KEY):
137+
accelerators_limit = int(limits.get(NVIDIA_RESOURCE_KEY))
138+
elif limits.get(NEURON_RESOURCE_KEY):
139+
accelerators_limit = int(limits.get(NEURON_RESOURCE_KEY))
140+
141+
acc_req, acc_lim = _set_default_accelerators_val(instance_type, accelerators, accelerators_limit)
142+
_validate_accelerators_inputs(instance_type, acc_req, acc_lim)
104143

105144
# Validate configuration
106-
valid, error = _is_valid(vcpu, memory, accelerators, node_count, instance_type)
145+
valid, error = _is_valid(vcpu, memory, acc_req, node_count, instance_type)
107146
if not valid:
108147
raise ValueError(error)
109148

110149
# Calculate resource values
111-
requests_value = (_get_resources_from_compute_quotas(instance_type, vcpu, memory, accelerators)
112-
or _get_resources_from_instance(instance_type, node_count=1))
113-
limits_value = _get_limits(instance_type, vcpu_limit, memory_limit, accelerators_limit)
150+
requests_values = _get_resources_from_compute_quotas(instance_type, vcpu, memory, acc_req)
151+
if requests_values is None:
152+
requests_values = _get_resources_from_instance(instance_type, node_count=1)
153+
_trim_resource_requests(instance_type, requests_values)
154+
if NVIDIA_RESOURCE_KEY in requests_values:
155+
acc_lim = requests_values[NVIDIA_RESOURCE_KEY]
156+
elif NEURON_RESOURCE_KEY in requests_values:
157+
acc_lim = requests_values[NEURON_RESOURCE_KEY]
158+
159+
limits_values = _get_limits(instance_type, vcpu_limit, memory_limit, acc_lim)
160+
_resolve_default_memory_values(instance_type, requests_values, limits_values)
161+
_resolve_default_cpu_values(instance_type, requests_values)
114162

115163
# Update data with calculated values
116-
data['template']['spec']['containers'][0]['resources']['requests'] = requests_value
117-
data['template']['spec']['containers'][0]['resources']['limits'] = limits_value
164+
data['template']['spec']['containers'][0]['resources']['requests'] = requests_values
165+
data['template']['spec']['containers'][0]['resources']['limits'] = limits_values
166+
118167
return data
119168
except KeyError as e:
120169
raise ValueError(f"Missing required configuration key: {str(e)}")

src/sagemaker/hyperpod/training/quota_allocation_util.py

Lines changed: 175 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
1111
# ANY KIND, either express or implied. See the License for the specific
1212
# language governing permissions and limitations under the License.
13+
import logging
14+
import re
1315
from sagemaker.hyperpod.cli.constants.command_constants import NVIDIA_GPU_RESOURCE_LIMIT_KEY, NEURON_RESOURCE_LIMIT_KEY
1416
from sagemaker.hyperpod.cli.utils import (
1517
setup_logger
@@ -187,16 +189,15 @@ def _get_resources_from_compute_quotas(instance_type: str,
187189

188190
result["cpu"] = f"{result['cpu']}"
189191
result["memory"] = f"{result['memory']}Gi"
192+
_trim_resource_requests(instance_type, result)
190193
return result
191194

192195

193196
# Gets resources from instance type.
194197
def _get_resources_from_instance(instance_type: str, node_count: int) -> dict:
195-
196198
instance = INSTANCE_RESOURCES.get(instance_type, {})
197199
cpu = instance.get("cpu", 0)
198200
memory = instance.get("memory", 0)
199-
200201
result = {
201202
"cpu": cpu * node_count,
202203
"memory": memory * node_count
@@ -210,8 +211,31 @@ def _get_resources_from_instance(instance_type: str, node_count: int) -> dict:
210211
result["memory"] = f"{result['memory']}Gi"
211212
return result
212213

214+
215+
def _trim_resource_requests(instance_type: str, requests_values: dict) -> dict:
216+
instance = INSTANCE_RESOURCES.get(instance_type, {})
217+
cpu_capacity = instance.get("cpu", 0)
218+
max_allocatable_cpu = cpu_capacity - (_calculate_cpu_reservation(cpu_capacity))
219+
memory_capacity = instance.get("memory", 0)
220+
max_allocatable_memory = memory_capacity - (_calculate_memory_reservation(memory_capacity))
221+
222+
cpu_request_str = requests_values.get('cpu', '0')
223+
cpu_request = float(''.join(filter(lambda x: x.isdigit() or x == '.', cpu_request_str)))
224+
225+
mem_request_str = requests_values.get('memory', '0Gi')
226+
mem_request = float(mem_request_str.replace('Gi', ''))
227+
228+
final_cpu = min(max_allocatable_cpu, cpu_request)
229+
final_memory = min(max_allocatable_memory, mem_request)
230+
231+
requests_values['cpu'] = str(final_cpu)
232+
requests_values['memory'] = f"{final_memory}Gi"
233+
234+
return requests_values
235+
236+
213237
def _get_limits(instance_type: str, vcpu_limit: Optional[float], memory_in_gib_limit: Optional[float], accelerators_limit: Optional[int]) -> dict:
214-
238+
215239
result = {}
216240
type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type)
217241

@@ -224,32 +248,109 @@ def _get_limits(instance_type: str, vcpu_limit: Optional[float], memory_in_gib_l
224248
else:
225249
# user specified accelerator limit but the instance type wasn't found, set limit to 0 as a precaution
226250
result["nvidia.com/gpu"] = 0
227-
228251
if memory_in_gib_limit is not None:
229-
result["memory"] = memory_in_gib_limit
230-
result["memory"] = f"{result['memory']}Gi"
252+
result["memory"] = str(memory_in_gib_limit) + "Gi"
231253

232254
return result
233255

234256

257+
def _resolve_default_cpu_values(instance_type: str, requests_values: dict) -> None:
258+
instance = INSTANCE_RESOURCES.get(instance_type, {})
259+
total_available_cpu = instance.get('cpu')
260+
261+
cpu_request = float(requests_values.get('cpu')) if requests_values.get('cpu') is not None else None
262+
263+
if cpu_request is not None and cpu_request > total_available_cpu:
264+
raise ValueError(
265+
f"Specified CPU request ({cpu_request}) exceeds instance capacity. "
266+
f"Maximum available CPU for {instance_type} is {total_available_cpu}."
267+
)
268+
269+
max_allocatable_cpu = int(total_available_cpu - _calculate_cpu_reservation(total_available_cpu))
270+
cpu_request = min(cpu_request, max_allocatable_cpu)
271+
requests_values["cpu"] = str(cpu_request)
272+
273+
274+
def _resolve_default_memory_values(instance_type: str, requests_values: dict, limits_values: dict) -> None:
275+
276+
instance = INSTANCE_RESOURCES.get(instance_type, {})
277+
total_available_memory = instance.get("memory", 0)
278+
mem_limit_str = limits_values.get("memory")
279+
mem_request_str = requests_values.get("memory")
280+
281+
user_set_limit = True if mem_limit_str is not None else False
282+
if mem_limit_str is None and mem_request_str is not None:
283+
mem_limit_str = mem_request_str
284+
285+
try:
286+
memory_limit = float(re.match(r'^([0-9]*\.?[0-9]+)', mem_limit_str).group(1))
287+
memory_request = float(re.match(r'^([0-9]*\.?[0-9]+)', mem_request_str).group(1))
288+
except (AttributeError, ValueError):
289+
raise ValueError(f"Invalid memory format: {mem_limit_str or mem_request_str}")
290+
291+
if memory_request > total_available_memory:
292+
raise ValueError(
293+
f"Specified memory request ({memory_request}Gi) exceeds instance capacity. "
294+
f"Maximum available memory for {instance_type} is {total_available_memory}Gi."
295+
)
296+
297+
max_allocatable_memory = int(total_available_memory - _calculate_memory_reservation(total_available_memory))
298+
299+
if not user_set_limit:
300+
memory_limit = min(memory_limit, max_allocatable_memory)
301+
302+
memory_request = min(memory_request, max_allocatable_memory)
303+
limits_values["memory"] = str(memory_limit) + "Gi"
304+
requests_values["memory"] = str(memory_request) + "Gi"
305+
306+
307+
def _validate_accelerators_inputs(instance_type: str, accelerators_request: int, accelerators_limit: int) -> None:
308+
type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type)
309+
if type_of_accelerator is None and (accelerators_request is not None or accelerators_limit is not None):
310+
raise ValueError(
311+
f"Instance type {instance_type} does not support accelerators, but accelerator values were provided.")
312+
313+
if type_of_accelerator is not None:
314+
if accelerators_request is not None and accelerators_limit is not None:
315+
if accelerators_request != accelerators_limit:
316+
raise ValueError('Accelerator request must equal accelerator limit')
317+
if accelerators_limit > _max_accelerator_per_instance:
318+
raise ValueError('Requested accelerators exceeds capacity')
319+
if accelerators_request > _max_accelerator_per_instance:
320+
raise ValueError('Requested accelerators exceeds capacity')
321+
322+
323+
def _set_default_accelerators_val(instance_type: Optional[str], accelerators_request: Optional[int], accelerators_limit: Optional[int]) -> Tuple[Optional[int], Optional[int]]:
324+
type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type)
325+
if type_of_accelerator is not None:
326+
if accelerators_request is None and accelerators_limit is None:
327+
return None, None
328+
elif accelerators_request is not None and accelerators_limit is None:
329+
return accelerators_request, accelerators_request
330+
elif accelerators_request is None and accelerators_limit is not None:
331+
return accelerators_limit, accelerators_limit
332+
else:
333+
return accelerators_request, accelerators_limit
334+
return None, None
335+
336+
235337
def _is_valid(vcpu: Optional[float], memory_in_gib: Optional[float], accelerators: Optional[int],
236338
node_count: Optional[int], instance_type: Optional[str]) -> tuple[bool, str]:
237339

238340
has_gpu_quota_allocation = _has_compute_resource_quota_allocation_resources(memory_in_gib, vcpu, accelerators)
239341

240342
if instance_type is None and has_gpu_quota_allocation:
241343
return False, "Instance-type must be specified when accelerators, vcpu, or memory-in-gib specified"
242-
344+
243345
node_specified = node_count is not None and node_count > 0
244-
346+
245347
# Check if instance_type is valid only when it's provided
246348
if instance_type is not None and (INSTANCE_RESOURCES.get(instance_type) is None):
247349
return False, f"Invalid instance-type {instance_type}. Please re-check the instance type and contact AWS for support."
248-
249350
if instance_type is not None:
250351
#both resources and node count specified
251352
if (has_gpu_quota_allocation and node_specified):
252-
return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
353+
return False, f"Either node-count OR a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
253354
return True, ""
254355

255356

@@ -276,3 +377,67 @@ def _get_accelerator_type_and_count(instance_type: str) -> Tuple[Optional[str],
276377
else:
277378
# valid use-case for cpu-only machines, hence return None
278379
return None, 0
380+
381+
382+
def _calculate_memory_reservation(memory_gb: int) -> float:
383+
384+
static_memory_overhead = 0.5 # 500MB
385+
386+
reserved_memory = static_memory_overhead
387+
remaining = memory_gb
388+
389+
# First 4 GB (30%)
390+
first_4gb = min(4, remaining)
391+
reserved_memory += first_4gb * 0.3
392+
remaining -= first_4gb
393+
394+
# Next 4 GB (25%)
395+
if remaining > 0:
396+
next_4gb = min(4, remaining)
397+
reserved_memory += next_4gb * 0.25
398+
remaining -= next_4gb
399+
400+
# Next 8 GB (20%)
401+
if remaining > 0:
402+
next_8gb = min(8, remaining)
403+
reserved_memory += next_8gb * 0.2
404+
remaining -= next_8gb
405+
406+
# Next 112 GB (17%)
407+
if remaining > 0:
408+
next_112gb = min(112, remaining)
409+
reserved_memory += next_112gb * 0.17
410+
remaining -= next_112gb
411+
412+
# Remaining memory (7%)
413+
if remaining > 0:
414+
reserved_memory += remaining * 0.07
415+
416+
return reserved_memory
417+
418+
419+
def _calculate_cpu_reservation(cpu_count: int) -> float:
420+
421+
# Static overhead for observability tools and system processes
422+
static_cpu_overhead = 0.1 # 0.1 cores
423+
424+
reserved_cpu = static_cpu_overhead
425+
426+
# First core (30%)
427+
if cpu_count >= 1:
428+
reserved_cpu += 0.3
429+
430+
# Second core (15%)
431+
if cpu_count >= 2:
432+
reserved_cpu += 0.15
433+
434+
# Cores 3-4 (10% each)
435+
for _ in range(min(2, max(0, cpu_count - 2))):
436+
reserved_cpu += 0.1
437+
438+
# Remaining cores (6% each)
439+
if cpu_count > 4:
440+
reserved_cpu += (cpu_count - 4) * 0.06
441+
442+
return reserved_cpu
443+

0 commit comments

Comments
 (0)