diff --git a/goldens/Workload_create.txt b/goldens/Workload_create.txt index 9c8df3be0..380633b68 100644 --- a/goldens/Workload_create.txt +++ b/goldens/Workload_create.txt @@ -24,7 +24,7 @@ docker tag dry-run-runner gcr.io/golden-project/dry-run-runner:prefix-current [XPK] Task: `Upload Docker Image` is implemented by the following command not running since it is a dry run. docker push gcr.io/golden-project/dry-run-runner:prefix-current [XPK] Task: `Creating Workload` is implemented by the following command not running since it is a dry run. -kubectl apply -f abc33690f7a11b2ba50a8f949970fd3ba812f088367b7f64260729f01f41a231 +kubectl apply -f e21c8ebdc21d15a852187058c096898c486d3b1066e67dcfb67e5052a1d0a7fa [XPK] Task: `GKE Dashboard List` is implemented by the following command not running since it is a dry run. gcloud monitoring dashboards list --project=golden-project --filter="displayName:'GKE - TPU Monitoring Dashboard'" --format="value(name)" --verbosity=error [XPK] Check statistics and outlier mode of GKE metrics here: https://console.cloud.google.com/monitoring/dashboards/builder/0?project=golden-project&f.rlabel.cluster_name.ClusterName=golden-cluster. To view the metric data for your workload, select golden-workload from the JobName filter on the dashboard. diff --git a/goldens/Workload_create_pathways.txt b/goldens/Workload_create_pathways.txt index a74e74afd..fe352c2c4 100644 --- a/goldens/Workload_create_pathways.txt +++ b/goldens/Workload_create_pathways.txt @@ -26,7 +26,7 @@ docker tag dry-run-runner gcr.io/golden-project/dry-run-runner:prefix-current [XPK] Task: `Upload Docker Image` is implemented by the following command not running since it is a dry run. docker push gcr.io/golden-project/dry-run-runner:prefix-current [XPK] Task: `Creating Workload` is implemented by the following command not running since it is a dry run. -kubectl apply -f 574963d6d441695d681ff94ad241e713559f64b4ce519f4f1e0708c659f1c25d +kubectl apply -f 6fb0f350cf4e0dccc71f77392d12db3de6371d5148519657046613794358bfce [XPK] Task: `GKE Dashboard List` is implemented by the following command not running since it is a dry run. gcloud monitoring dashboards list --project=golden-project --filter="displayName:'GKE - TPU Monitoring Dashboard'" --format="value(name)" --verbosity=error [XPK] Check statistics and outlier mode of GKE metrics here: https://console.cloud.google.com/monitoring/dashboards/builder/0?project=golden-project&f.rlabel.cluster_name.ClusterName=golden-cluster. To view the metric data for your workload, select golden-workload from the JobName filter on the dashboard. diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 9a78c48d7..bef3d7ca1 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -63,6 +63,8 @@ get_cpu_affinity, get_gpu_scheduler, create_sub_slicing_annotations, + create_placement_policy_label, + is_placement_policy_supported, ) from ..core.storage import ( GCE_PD_TYPE, @@ -143,6 +145,7 @@ nodeSelector: {accelerator_label} {machine_label} + {placement_policy_label} {autoprovisioning_args} priorityClassName: {args.priority} hostNetwork: true @@ -272,6 +275,7 @@ terminationGracePeriodSeconds: {args.termination_grace_period_seconds} priorityClassName: {args.priority} nodeSelector: + {placement_policy_label} {autoprovisioning_args} pathwaysDir: {args.pathways_gcs_location} #This bucket needs to be created in advance. controller: @@ -558,6 +562,11 @@ def workload_create(args) -> None: user_workload=get_user_workload_for_pathways(args, system), local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, + placement_policy_label=( + create_placement_policy_label(system) + if is_placement_policy_supported(system) + else '' + ), ) else: container, debugging_dashboard_id = get_user_workload_container( @@ -585,6 +594,11 @@ def workload_create(args) -> None: create_sub_slicing_annotations(args.sub_slicing_topology) ) ), + placement_policy_label=( + create_placement_policy_label(system) + if is_placement_policy_supported(system) + else '' + ), machine_label=create_machine_label(system.accelerator_type, system), local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, diff --git a/src/xpk/core/nodepool.py b/src/xpk/core/nodepool.py index 66ac9aed6..1b9cf4dbe 100644 --- a/src/xpk/core/nodepool.py +++ b/src/xpk/core/nodepool.py @@ -16,7 +16,8 @@ from typing import List from ..utils.console import ask_for_user_consent, xpk_print -from ..utils.topology import get_topology_product, is_topology_valid +from ..utils.topology import get_topology_product +from .scheduling import get_placement_policy_name, is_placement_policy_supported from .capacity import ( AUTOPROVISIONING_CONFIG_VALUE, H100_MEGA_DEVICE_TYPE, @@ -258,10 +259,8 @@ def run_gke_node_pool_create_command( return 1 placement_args = '' - if system.requires_workload_policy and is_topology_valid(system.topology): - placement_policy = ( - f'{system.device_type}-{system.topology}-placement-policy' - ) + if is_placement_policy_supported(system): + placement_policy = get_placement_policy_name(system) ensure_resource_policy_exists(placement_policy, args, system.topology) placement_args = f' --placement-policy={placement_policy}' diff --git a/src/xpk/core/nodepool_test.py b/src/xpk/core/nodepool_test.py index 1e1a1fc70..86f055eaf 100644 --- a/src/xpk/core/nodepool_test.py +++ b/src/xpk/core/nodepool_test.py @@ -146,21 +146,23 @@ def mock_nodepool_dependencies(mocker): ) mocker.patch("xpk.core.nodepool.run_commands", return_value=0) mocker.patch("xpk.core.nodepool.ask_for_user_consent", return_value=True) - mock_is_topology_valid = mocker.patch("xpk.core.nodepool.is_topology_valid") + mock_is_placement_policy_supported = mocker.patch( + "xpk.core.nodepool.is_placement_policy_supported" + ) mock_ensure_resource_policy = mocker.patch( "xpk.core.nodepool.ensure_resource_policy_exists" ) - return mock_is_topology_valid, mock_ensure_resource_policy + return mock_is_placement_policy_supported, mock_ensure_resource_policy def test_placement_policy_created_for_gpu_with_valid_topology( mocker, mock_nodepool_dependencies ): """Tests that placement policy is created for GPUs with a valid topology.""" - mock_is_topology_valid, mock_ensure_resource_policy = ( + mock_is_placement_policy_supported, mock_ensure_resource_policy = ( mock_nodepool_dependencies ) - mock_is_topology_valid.return_value = True + mock_is_placement_policy_supported.return_value = True args = mocker.Mock( tpu_type=None, device_type="h100-80gb-8", @@ -188,10 +190,10 @@ def test_placement_policy_not_created_for_gpu_with_invalid_topology( mocker, mock_nodepool_dependencies ): """Tests that placement policy is not created for GPUs with an invalid topology.""" - mock_is_topology_valid, mock_ensure_resource_policy = ( + mock_is_placement_policy_supported, mock_ensure_resource_policy = ( mock_nodepool_dependencies ) - mock_is_topology_valid.return_value = False + mock_is_placement_policy_supported.return_value = False args = mocker.Mock( tpu_type=None, device_type="h100-80gb-8", @@ -218,10 +220,10 @@ def test_placement_policy_created_for_tpu7x_with_valid_topology( mocker, mock_nodepool_dependencies ): """Tests that placement policy is created for tpu7x with a valid topology.""" - mock_is_topology_valid, mock_ensure_resource_policy = ( + mock_is_placement_policy_supported, mock_ensure_resource_policy = ( mock_nodepool_dependencies ) - mock_is_topology_valid.return_value = True + mock_is_placement_policy_supported.return_value = True args = mocker.Mock( tpu_type="tpu7x-8", device_type=None, @@ -251,10 +253,10 @@ def test_placement_policy_not_created_for_non7x_tpu( mocker, mock_nodepool_dependencies ): """Tests that placement policy is not created for non-tpu7x TPUs.""" - mock_is_topology_valid, mock_ensure_resource_policy = ( + mock_is_placement_policy_supported, mock_ensure_resource_policy = ( mock_nodepool_dependencies ) - mock_is_topology_valid.return_value = True + mock_is_placement_policy_supported.return_value = False args = mocker.Mock( tpu_type="v6e", device_type=None, diff --git a/src/xpk/core/scheduling.py b/src/xpk/core/scheduling.py index 55e83c630..f4906fcb4 100644 --- a/src/xpk/core/scheduling.py +++ b/src/xpk/core/scheduling.py @@ -15,6 +15,7 @@ """ from ..utils.console import xpk_print +from ..utils.topology import is_topology_valid from ..utils.execution_context import is_dry_run from .capacity import AUTOPROVISIONING_CONFIG_MAXIMUM_KEY, AUTOPROVISIONING_CONFIG_VALUE from .resources import CLUSTER_RESOURCES_CONFIGMAP, get_cluster_configmap @@ -303,3 +304,18 @@ def create_sub_slicing_annotations(sub_slicing_topology: str) -> list[str]: ), f'cloud.google.com/gke-tpu-slice-topology: {sub_slicing_topology}', ] + + +def create_placement_policy_label(system: SystemCharacteristics) -> str: + if system.accelerator_type != AcceleratorType.TPU: + return '' + name = get_placement_policy_name(system) + return f'cloud.google.com/placement-policy-name: {name}' + + +def get_placement_policy_name(system: SystemCharacteristics) -> str: + return f'{system.device_type}-{system.topology}-placement-policy' + + +def is_placement_policy_supported(system: SystemCharacteristics) -> bool: + return system.requires_workload_policy and is_topology_valid(system.topology)