Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion goldens/Workload_create.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ docker tag dry-run-runner gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Upload Docker Image` is implemented by the following command not running since it is a dry run.
docker push gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Creating Workload` is implemented by the following command not running since it is a dry run.
kubectl apply -f abc33690f7a11b2ba50a8f949970fd3ba812f088367b7f64260729f01f41a231
kubectl apply -f e21c8ebdc21d15a852187058c096898c486d3b1066e67dcfb67e5052a1d0a7fa
[XPK] Task: `GKE Dashboard List` is implemented by the following command not running since it is a dry run.
gcloud monitoring dashboards list --project=golden-project --filter="displayName:'GKE - TPU Monitoring Dashboard'" --format="value(name)" --verbosity=error
[XPK] Check statistics and outlier mode of GKE metrics here: https://console.cloud.google.com/monitoring/dashboards/builder/0?project=golden-project&f.rlabel.cluster_name.ClusterName=golden-cluster. To view the metric data for your workload, select golden-workload from the JobName filter on the dashboard.
Expand Down
2 changes: 1 addition & 1 deletion goldens/Workload_create_pathways.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ docker tag dry-run-runner gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Upload Docker Image` is implemented by the following command not running since it is a dry run.
docker push gcr.io/golden-project/dry-run-runner:prefix-current
[XPK] Task: `Creating Workload` is implemented by the following command not running since it is a dry run.
kubectl apply -f 574963d6d441695d681ff94ad241e713559f64b4ce519f4f1e0708c659f1c25d
kubectl apply -f 6fb0f350cf4e0dccc71f77392d12db3de6371d5148519657046613794358bfce
[XPK] Task: `GKE Dashboard List` is implemented by the following command not running since it is a dry run.
gcloud monitoring dashboards list --project=golden-project --filter="displayName:'GKE - TPU Monitoring Dashboard'" --format="value(name)" --verbosity=error
[XPK] Check statistics and outlier mode of GKE metrics here: https://console.cloud.google.com/monitoring/dashboards/builder/0?project=golden-project&f.rlabel.cluster_name.ClusterName=golden-cluster. To view the metric data for your workload, select golden-workload from the JobName filter on the dashboard.
Expand Down
14 changes: 14 additions & 0 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
get_cpu_affinity,
get_gpu_scheduler,
create_sub_slicing_annotations,
create_placement_policy_label,
is_placement_policy_supported,
)
from ..core.storage import (
GCE_PD_TYPE,
Expand Down Expand Up @@ -143,6 +145,7 @@
nodeSelector:
{accelerator_label}
{machine_label}
{placement_policy_label}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too bad we have no test coverage for this, no unit tests cover the actual yaml content, and goldens only capture that some change happened in the template.

{autoprovisioning_args}
priorityClassName: {args.priority}
hostNetwork: true
Expand Down Expand Up @@ -272,6 +275,7 @@
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
priorityClassName: {args.priority}
nodeSelector:
{placement_policy_label}
{autoprovisioning_args}
pathwaysDir: {args.pathways_gcs_location} #This bucket needs to be created in advance.
controller:
Expand Down Expand Up @@ -558,6 +562,11 @@ def workload_create(args) -> None:
user_workload=get_user_workload_for_pathways(args, system),
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
placement_policy_label=(
create_placement_policy_label(system)
if is_placement_policy_supported(system)
else ''
),
)
else:
container, debugging_dashboard_id = get_user_workload_container(
Expand Down Expand Up @@ -585,6 +594,11 @@ def workload_create(args) -> None:
create_sub_slicing_annotations(args.sub_slicing_topology)
)
),
placement_policy_label=(
create_placement_policy_label(system)
if is_placement_policy_supported(system)
else ''
),
machine_label=create_machine_label(system.accelerator_type, system),
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
Expand Down
9 changes: 4 additions & 5 deletions src/xpk/core/nodepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

from typing import List
from ..utils.console import ask_for_user_consent, xpk_print
from ..utils.topology import get_topology_product, is_topology_valid
from ..utils.topology import get_topology_product
from .scheduling import get_placement_policy_name, is_placement_policy_supported
from .capacity import (
AUTOPROVISIONING_CONFIG_VALUE,
H100_MEGA_DEVICE_TYPE,
Expand Down Expand Up @@ -258,10 +259,8 @@ def run_gke_node_pool_create_command(
return 1

placement_args = ''
if system.requires_workload_policy and is_topology_valid(system.topology):
placement_policy = (
f'{system.device_type}-{system.topology}-placement-policy'
)
if is_placement_policy_supported(system):
placement_policy = get_placement_policy_name(system)
ensure_resource_policy_exists(placement_policy, args, system.topology)
placement_args = f' --placement-policy={placement_policy}'

Expand Down
22 changes: 12 additions & 10 deletions src/xpk/core/nodepool_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,23 @@ def mock_nodepool_dependencies(mocker):
)
mocker.patch("xpk.core.nodepool.run_commands", return_value=0)
mocker.patch("xpk.core.nodepool.ask_for_user_consent", return_value=True)
mock_is_topology_valid = mocker.patch("xpk.core.nodepool.is_topology_valid")
mock_is_placement_policy_supported = mocker.patch(
"xpk.core.nodepool.is_placement_policy_supported"
)
mock_ensure_resource_policy = mocker.patch(
"xpk.core.nodepool.ensure_resource_policy_exists"
)
return mock_is_topology_valid, mock_ensure_resource_policy
return mock_is_placement_policy_supported, mock_ensure_resource_policy


def test_placement_policy_created_for_gpu_with_valid_topology(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is created for GPUs with a valid topology."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = True
mock_is_placement_policy_supported.return_value = True
args = mocker.Mock(
tpu_type=None,
device_type="h100-80gb-8",
Expand Down Expand Up @@ -188,10 +190,10 @@ def test_placement_policy_not_created_for_gpu_with_invalid_topology(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is not created for GPUs with an invalid topology."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = False
mock_is_placement_policy_supported.return_value = False
args = mocker.Mock(
tpu_type=None,
device_type="h100-80gb-8",
Expand All @@ -218,10 +220,10 @@ def test_placement_policy_created_for_tpu7x_with_valid_topology(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is created for tpu7x with a valid topology."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = True
mock_is_placement_policy_supported.return_value = True
args = mocker.Mock(
tpu_type="tpu7x-8",
device_type=None,
Expand Down Expand Up @@ -251,10 +253,10 @@ def test_placement_policy_not_created_for_non7x_tpu(
mocker, mock_nodepool_dependencies
):
"""Tests that placement policy is not created for non-tpu7x TPUs."""
mock_is_topology_valid, mock_ensure_resource_policy = (
mock_is_placement_policy_supported, mock_ensure_resource_policy = (
mock_nodepool_dependencies
)
mock_is_topology_valid.return_value = True
mock_is_placement_policy_supported.return_value = False
args = mocker.Mock(
tpu_type="v6e",
device_type=None,
Expand Down
16 changes: 16 additions & 0 deletions src/xpk/core/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

from ..utils.console import xpk_print
from ..utils.topology import is_topology_valid
from ..utils.execution_context import is_dry_run
from .capacity import AUTOPROVISIONING_CONFIG_MAXIMUM_KEY, AUTOPROVISIONING_CONFIG_VALUE
from .resources import CLUSTER_RESOURCES_CONFIGMAP, get_cluster_configmap
Expand Down Expand Up @@ -303,3 +304,18 @@ def create_sub_slicing_annotations(sub_slicing_topology: str) -> list[str]:
),
f'cloud.google.com/gke-tpu-slice-topology: {sub_slicing_topology}',
]


def create_placement_policy_label(system: SystemCharacteristics) -> str:
if system.accelerator_type != AcceleratorType.TPU:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this guard? I'm surprised that placement_policy_label can be rendered as empty string if it's not supported, but also if accelerator is not TPU. If we need to add a label whenever placement policy is supported, then I'd expect create to return something whenever is_supported is true.

return ''
name = get_placement_policy_name(system)
return f'cloud.google.com/placement-policy-name: {name}'


def get_placement_policy_name(system: SystemCharacteristics) -> str:
return f'{system.device_type}-{system.topology}-placement-policy'


def is_placement_policy_supported(system: SystemCharacteristics) -> bool:
return system.requires_workload_policy and is_topology_valid(system.topology)
Loading