Skip to content

Commit ea555f3

Browse files
authored
Kubernetes: request all suitable GPUs (#3259)
Previously, KubernetesCompute only used GPU from the first offer to set node affinity, and if that type of GPU was not available (e.g., another job or even some non-dstack pod had already taken it), the job eventually failed with FAILED_TO_START_DUE_TO_NO_CAPACITY, even if there were other GPUs matching the run spec requirements. Now, we inspect all nodes to request all suitable GPUs (any of). In addition, we now use upper bounds of Ranges (CPU, memory, disk) as limits except for GPU, which cannot have request =/= limit (as it cannot be overcommited). Part-of: #3126
1 parent 4ecf75d commit ea555f3

File tree

2 files changed

+166
-99
lines changed

2 files changed

+166
-99
lines changed

src/dstack/_internal/cli/utils/run.py

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55
from rich.table import Table
66

77
from dstack._internal.cli.utils.common import NO_OFFERS_WARNING, add_row_from_dict, console
8+
from dstack._internal.core.models.backends.base import BackendType
89
from dstack._internal.core.models.configurations import DevEnvironmentConfiguration
9-
from dstack._internal.core.models.instances import InstanceAvailability, Resources
10+
from dstack._internal.core.models.instances import (
11+
InstanceAvailability,
12+
InstanceOfferWithAvailability,
13+
InstanceType,
14+
)
1015
from dstack._internal.core.models.profiles import (
1116
DEFAULT_RUN_TERMINATION_IDLE_TIME,
1217
TerminationPolicy,
1318
)
1419
from dstack._internal.core.models.runs import (
1520
Job,
16-
JobProvisioningData,
17-
JobRuntimeData,
1821
JobStatus,
1922
JobSubmission,
2023
Probe,
@@ -294,27 +297,24 @@ def _format_price(price: float, is_spot: bool) -> str:
294297
return price_str
295298

296299

297-
def _format_backend(backend: Any, region: str) -> str:
298-
backend_str = getattr(backend, "value", backend)
299-
backend_str = backend_str.replace("remote", "ssh")
300+
def _format_backend(backend_type: BackendType, region: str) -> str:
301+
backend_str = backend_type.value
302+
if backend_type == BackendType.REMOTE:
303+
backend_str = "ssh"
300304
return f"{backend_str} ({region})"
301305

302306

303-
def _format_instance_type(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> str:
304-
instance_type = jpd.instance_type.name
305-
if jrd is not None and getattr(jrd, "offer", None) is not None:
306-
if jrd.offer.total_blocks > 1:
307-
instance_type += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})"
308-
if jpd.reservation:
309-
instance_type += f" ({jpd.reservation})"
310-
return instance_type
311-
312-
313-
def _get_resources(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> Resources:
314-
resources: Resources = jpd.instance_type.resources
315-
if jrd is not None and getattr(jrd, "offer", None) is not None:
316-
resources = jrd.offer.instance.resources
317-
return resources
307+
def _format_instance_type(
308+
instance_type: InstanceType,
309+
shared_offer: Optional[InstanceOfferWithAvailability],
310+
reservation: Optional[str],
311+
) -> str:
312+
instance_type_str = instance_type.name
313+
if shared_offer is not None:
314+
instance_type_str += f" ({shared_offer.blocks}/{shared_offer.total_blocks})"
315+
if reservation is not None:
316+
instance_type_str += f" ({reservation})"
317+
return instance_type_str
318318

319319

320320
def _format_run_name(run: CoreRun, show_deployment_num: bool) -> str:
@@ -387,16 +387,35 @@ def get_runs_table(
387387
}
388388
jpd = latest_job_submission.job_provisioning_data
389389
if jpd is not None:
390+
shared_offer: Optional[InstanceOfferWithAvailability] = None
391+
instance_type = jpd.instance_type
392+
price = jpd.price
390393
jrd = latest_job_submission.job_runtime_data
391-
resources = _get_resources(jpd, jrd)
392-
update_dict: Dict[Union[str, int], Any] = {
393-
"BACKEND": _format_backend(jpd.backend, jpd.region),
394-
"RESOURCES": resources.pretty_format(include_spot=False),
395-
"GPU": resources.pretty_format(gpu_only=True, include_spot=False),
396-
"INSTANCE TYPE": _format_instance_type(jpd, jrd),
397-
"PRICE": _format_price(jpd.price, resources.spot),
398-
}
399-
job_row.update(update_dict)
394+
if jrd is not None and jrd.offer is not None and jrd.offer.total_blocks > 1:
395+
# We only use offer data from jrd if the job is/was running on a shared
396+
# instance (the instance blocks feature). In that case, jpd contains the full
397+
# instance offer data, while jrd contains the shared offer (a fraction of
398+
# the full offer). Although jrd always contains the offer, we don't use it in
399+
# other cases, as, unlike jpd offer data, jrd offer is not updated after
400+
# Compute.update_provisioning_data() call, but some backends, namely
401+
# Kubernetes, may update offer data via that method.
402+
# As long as we don't have a backend which both supports the blocks feature
403+
# and may update offer data in update_provisioning_data(), this logic is fine.
404+
shared_offer = jrd.offer
405+
instance_type = shared_offer.instance
406+
price = shared_offer.price
407+
resources = instance_type.resources
408+
job_row.update(
409+
{
410+
"BACKEND": _format_backend(jpd.backend, jpd.region),
411+
"RESOURCES": resources.pretty_format(include_spot=False),
412+
"GPU": resources.pretty_format(gpu_only=True, include_spot=False),
413+
"INSTANCE TYPE": _format_instance_type(
414+
instance_type, shared_offer, jpd.reservation
415+
),
416+
"PRICE": _format_price(price, resources.spot),
417+
}
418+
)
400419
if merge_job_rows:
401420
_status = job_row["STATUS"]
402421
_resources = job_row["RESOURCES"]

src/dstack/_internal/core/backends/kubernetes/compute.py

Lines changed: 117 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
Resources,
5151
SSHConnectionParams,
5252
)
53-
from dstack._internal.core.models.resources import CPUSpec, Memory
53+
from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory
5454
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
5555
from dstack._internal.core.models.volumes import Volume
5656
from dstack._internal.utils.common import parse_memory
@@ -123,38 +123,10 @@ def get_offers_by_requirements(
123123
)
124124
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
125125
for node in nodes:
126-
try:
127-
name = get_value(node, ".metadata.name", str, required=True)
128-
cpu_arch = normalize_arch(
129-
get_value(node, ".status.node_info.architecture", str)
130-
).to_cpu_architecture()
131-
allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True)
132-
cpus = _parse_cpu(allocatable["cpu"])
133-
memory_mib = _parse_memory(allocatable["memory"])
134-
disk_size_mib = _parse_memory(allocatable["ephemeral-storage"])
135-
gpus = _get_node_gpus(node)
136-
except (AttributeError, KeyError, ValueError) as e:
137-
logger.exception("Failed to process node: %s: %s", type(e).__name__, e)
138-
continue
139-
instance_offer = InstanceOfferWithAvailability(
140-
backend=BackendType.KUBERNETES,
141-
instance=InstanceType(
142-
name=name,
143-
resources=Resources(
144-
cpus=cpus,
145-
cpu_arch=cpu_arch,
146-
memory_mib=memory_mib,
147-
gpus=gpus,
148-
spot=False,
149-
disk=Disk(size_mib=disk_size_mib),
150-
),
151-
),
152-
price=0,
153-
region=DUMMY_REGION,
154-
availability=InstanceAvailability.AVAILABLE,
155-
instance_runtime=InstanceRuntime.RUNNER,
156-
)
157-
instance_offers.extend(filter_offers_by_requirements([instance_offer], requirements))
126+
if (instance_offer := _get_instance_offer_from_node(node)) is not None:
127+
instance_offers.extend(
128+
filter_offers_by_requirements([instance_offer], requirements)
129+
)
158130
return instance_offers
159131

160132
def run_job(
@@ -216,18 +188,17 @@ def run_job(
216188
assert isinstance(resources_spec.cpu, CPUSpec)
217189
if (cpu_min := resources_spec.cpu.count.min) is not None:
218190
resources_requests["cpu"] = str(cpu_min)
191+
if (cpu_max := resources_spec.cpu.count.max) is not None:
192+
resources_limits["cpu"] = str(cpu_max)
219193
if (gpu_spec := resources_spec.gpu) is not None:
220194
gpu_min = gpu_spec.count.min
221195
if gpu_min is not None and gpu_min > 0:
222-
if not (offer_gpus := instance_offer.instance.resources.gpus):
223-
raise ComputeError(
224-
"GPU is requested but the offer has no GPUs:"
225-
f" {gpu_spec=} {instance_offer=}",
226-
)
227196
gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu(
228-
self.api, offer_gpus[0]
197+
self.api, gpu_spec
229198
)
230199
logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min)
200+
# Limit must be set (GPU resources cannot be overcommitted)
201+
# and must be equal to request.
231202
resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min)
232203
# It should be NoSchedule, but we also add NoExecute toleration just in case.
233204
for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]:
@@ -238,11 +209,13 @@ def run_job(
238209
)
239210
if (memory_min := resources_spec.memory.min) is not None:
240211
resources_requests["memory"] = _render_memory(memory_min)
241-
if (
242-
resources_spec.disk is not None
243-
and (disk_min := resources_spec.disk.size.min) is not None
244-
):
245-
resources_requests["ephemeral-storage"] = _render_memory(disk_min)
212+
if (memory_max := resources_spec.memory.max) is not None:
213+
resources_limits["memory"] = _render_memory(memory_max)
214+
if (disk_spec := resources_spec.disk) is not None:
215+
if (disk_min := disk_spec.size.min) is not None:
216+
resources_requests["ephemeral-storage"] = _render_memory(disk_min)
217+
if (disk_max := disk_spec.size.max) is not None:
218+
resources_limits["ephemeral-storage"] = _render_memory(disk_max)
246219
if (shm_size := resources_spec.shm_size) is not None:
247220
shm_volume_name = "dev-shm"
248221
volumes_.append(
@@ -328,8 +301,9 @@ def run_job(
328301
instance_type=instance_offer.instance,
329302
instance_id=instance_name,
330303
# Although we can already get Service's ClusterIP from the `V1Service` object returned
331-
# by the `create_namespaced_service` method, we still need PodIP for multinode runs.
332-
# We'll update both hostname and internal_ip once the pod is assigned to the node.
304+
# by the `create_namespaced_service` method, we still need 1) updated instance offer
305+
# 2) PodIP for multinode runs.
306+
# We'll update all these fields once the pod is assigned to the node.
333307
hostname=None,
334308
internal_ip=None,
335309
region=instance_offer.region,
@@ -368,6 +342,15 @@ def update_provisioning_data(
368342
namespace=self.config.namespace,
369343
)
370344
provisioning_data.hostname = get_value(service, ".spec.cluster_ip", str, required=True)
345+
node = call_api_method(
346+
self.api.read_node,
347+
client.V1Node,
348+
name=get_value(pod, ".spec.node_name", str, required=True),
349+
)
350+
if (instance_offer := _get_instance_offer_from_node(node)) is not None:
351+
provisioning_data.instance_type = instance_offer.instance
352+
provisioning_data.region = instance_offer.region
353+
provisioning_data.price = instance_offer.price
371354

372355
def terminate_instance(
373356
self, instance_id: str, region: str, backend_data: Optional[str] = None
@@ -500,6 +483,40 @@ def terminate_gateway(
500483
)
501484

502485

486+
def _get_instance_offer_from_node(node: client.V1Node) -> Optional[InstanceOfferWithAvailability]:
487+
try:
488+
name = get_value(node, ".metadata.name", str, required=True)
489+
cpu_arch = normalize_arch(
490+
get_value(node, ".status.node_info.architecture", str)
491+
).to_cpu_architecture()
492+
allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True)
493+
cpus = _parse_cpu(allocatable["cpu"])
494+
memory_mib = _parse_memory(allocatable["memory"])
495+
disk_size_mib = _parse_memory(allocatable["ephemeral-storage"])
496+
gpus = _get_node_gpus(node)
497+
except (AttributeError, KeyError, ValueError) as e:
498+
logger.exception("Failed to process node: %s: %s", type(e).__name__, e)
499+
return None
500+
return InstanceOfferWithAvailability(
501+
backend=BackendType.KUBERNETES,
502+
instance=InstanceType(
503+
name=name,
504+
resources=Resources(
505+
cpus=cpus,
506+
cpu_arch=cpu_arch,
507+
memory_mib=memory_mib,
508+
gpus=gpus,
509+
spot=False,
510+
disk=Disk(size_mib=disk_size_mib),
511+
),
512+
),
513+
price=0,
514+
region=DUMMY_REGION,
515+
availability=InstanceAvailability.AVAILABLE,
516+
instance_runtime=InstanceRuntime.RUNNER,
517+
)
518+
519+
503520
def _parse_cpu(cpu: str) -> int:
504521
if cpu.endswith("m"):
505522
# "m" means millicpu (1/1000 CPU), e.g., 7900m -> 7.9 -> 7
@@ -590,36 +607,39 @@ def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:
590607

591608

592609
def _get_pod_spec_parameters_for_gpu(
593-
api: client.CoreV1Api, gpu: Gpu
610+
api: client.CoreV1Api, gpu_spec: GPUSpec
594611
) -> tuple[str, client.V1NodeAffinity, str]:
595-
gpu_vendor = gpu.vendor
596-
assert gpu_vendor is not None
597-
if gpu_vendor == AcceleratorVendor.NVIDIA:
598-
node_affinity = _get_nvidia_gpu_node_affinity(api, gpu)
612+
node_list = call_api_method(api.list_node, client.V1NodeList)
613+
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
614+
gpu_vendor = gpu_spec.vendor
615+
# If no vendor specified, we assume it's NVIDIA. Technically, it's possible to request either
616+
# NVIDIA or AMD in the run configuration using only GPU names (e.g.,`gpu: H100,MI300X:8`),
617+
# but we ignore such configurations as it's hard to translate them to K8s request.
618+
if gpu_vendor is None or gpu_vendor == AcceleratorVendor.NVIDIA:
619+
node_affinity = _get_nvidia_gpu_node_affinity(gpu_spec, nodes)
599620
return NVIDIA_GPU_RESOURCE, node_affinity, NVIDIA_GPU_NODE_TAINT
600621
if gpu_vendor == AcceleratorVendor.AMD:
601-
node_affinity = _get_amd_gpu_node_affinity(gpu)
622+
node_affinity = _get_amd_gpu_node_affinity(gpu_spec, nodes)
602623
return AMD_GPU_RESOURCE, node_affinity, AMD_GPU_NODE_TAINT
603624
raise ComputeError(f"Unsupported GPU vendor: {gpu_vendor}")
604625

605626

606-
def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1NodeAffinity:
627+
def _get_nvidia_gpu_node_affinity(
628+
gpu_spec: GPUSpec, nodes: list[client.V1Node]
629+
) -> client.V1NodeAffinity:
607630
matching_gpu_label_values: set[str] = set()
608-
# We cannot generate an expected GPU label value from the Gpu model instance
609-
# as the actual values may have additional components (socket, memory type, etc.)
610-
# that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3".
611-
# Moreover, a single Gpu may match multiple label values.
612-
# As a workaround, we iterate and process all node labels once again (we already
613-
# processed them in `get_offers_by_requirements()`).
614-
node_list = call_api_method(api.list_node, client.V1NodeList)
615-
nodes = get_value(node_list, ".items", list[client.V1Node], required=True)
616631
for node in nodes:
617632
labels = get_value(node, ".metadata.labels", dict[str, str]) or {}
618-
if _get_nvidia_gpu_from_node_labels(labels) == gpu:
633+
gpu = _get_nvidia_gpu_from_node_labels(labels)
634+
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
619635
matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL])
620636
if not matching_gpu_label_values:
621-
raise ComputeError(f"NVIDIA GPU is requested but no matching GPU labels found: {gpu=}")
622-
logger.debug("Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu.name)
637+
raise ComputeError(
638+
f"NVIDIA GPU is requested but no matching GPU labels found: {gpu_spec=}"
639+
)
640+
logger.debug(
641+
"Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu_spec.name
642+
)
623643
return client.V1NodeAffinity(
624644
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
625645
node_selector_terms=[
@@ -637,10 +657,15 @@ def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1N
637657
)
638658

639659

640-
def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity:
641-
device_ids = AMD_GPU_NAME_TO_DEVICE_IDS.get(gpu.name)
642-
if device_ids is None:
643-
raise ComputeError(f"AMD GPU is requested but no matching device ids found: {gpu=}")
660+
def _get_amd_gpu_node_affinity(
661+
gpu_spec: GPUSpec, nodes: list[client.V1Node]
662+
) -> client.V1NodeAffinity:
663+
matching_device_ids: set[int] = set()
664+
for node in nodes:
665+
labels = get_value(node, ".metadata.labels", dict[str, str]) or {}
666+
gpu = _get_amd_gpu_from_node_labels(labels)
667+
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
668+
matching_device_ids.update(AMD_GPU_NAME_TO_DEVICE_IDS[gpu.name])
644669
return client.V1NodeAffinity(
645670
required_during_scheduling_ignored_during_execution=client.V1NodeSelector(
646671
node_selector_terms=[
@@ -652,12 +677,35 @@ def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity:
652677
),
653678
],
654679
)
655-
for device_id in device_ids
680+
for device_id in matching_device_ids
656681
],
657682
),
658683
)
659684

660685

686+
def _gpu_matches_gpu_spec(gpu: Gpu, gpu_spec: GPUSpec) -> bool:
687+
if gpu_spec.vendor is not None and gpu.vendor != gpu_spec.vendor:
688+
return False
689+
if gpu_spec.name is not None and gpu.name.lower() not in map(str.lower, gpu_spec.name):
690+
return False
691+
if gpu_spec.memory is not None:
692+
min_memory_gib = gpu_spec.memory.min
693+
if min_memory_gib is not None and gpu.memory_mib < min_memory_gib * 1024:
694+
return False
695+
max_memory_gib = gpu_spec.memory.max
696+
if max_memory_gib is not None and gpu.memory_mib > max_memory_gib * 1024:
697+
return False
698+
if gpu_spec.compute_capability is not None:
699+
if gpu.vendor != AcceleratorVendor.NVIDIA:
700+
return False
701+
gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO.get(gpu.name)
702+
if gpu_info is None:
703+
return False
704+
if gpu_info.compute_capability < gpu_spec.compute_capability:
705+
return False
706+
return True
707+
708+
661709
def _continue_setup_jump_pod(
662710
api: client.CoreV1Api,
663711
namespace: str,

0 commit comments

Comments
 (0)