Skip to content

Commit bf3e0d9

Browse files
authored
Implement info command (#200)
* add new command implementation Signed-off-by: Piotr Pawłowski <[email protected]> --------- Signed-off-by: Piotr Pawłowski <[email protected]>
1 parent 1734658 commit bf3e0d9

File tree

8 files changed

+330
-4
lines changed

8 files changed

+330
-4
lines changed

.github/workflows/build_tests.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ jobs:
4949
run: |
5050
gcloud config set compute/zone us-east4-a
5151
gcloud config get compute/zone
52+
- name: Install kueuectl
53+
run: curl -Lo ./kubectl-kueue https://github.com/kubernetes-sigs/kueue/releases/download/v0.8.1/kubectl-kueue-linux-amd64 && chmod +x ./kubectl-kueue && mv ./kubectl-kueue /usr/local/bin/kubectl-kueue
5254
- name: Install xpk with pip and verify it executes corretly
5355
run: |
5456
pip install .
@@ -71,6 +73,8 @@ jobs:
7173
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300
7274
- name: List out the workloads on the cluster
7375
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
76+
- name: Run xpk info
77+
run: python3 xpk.py info --cluster $TPU_CLUSTER_NAME --zone=us-central2-b | tee output.txt | grep -P '^(?=.*QUEUE)(?=.*PENDING_WORKLOADS)(?=.*ADMITTED_WORKLOADS)(?=.*2xv4-8:google.com/tpu)(?=.*cpu-rm:cpu)(?=.*cpu-rm:memory)(?=.*cpu-proxy:cpu)(?=.*cpu-proxy:memory)(?=.*cpu-user:cpu)(?=.*cpu-user:memory)' || (echo 'Invalid command output' && cat output.txt && exit 1)
7478
- name: Delete the workload on the cluster
7579
run: python3 xpk.py workload delete --workload $WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
7680
- name: Delete the Pathways workload on the cluster
@@ -83,4 +87,3 @@ jobs:
8387

8488

8589

86-

.github/workflows/nightly_tests.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ jobs:
5151
run: |
5252
gcloud config set compute/zone us-east4-a
5353
gcloud config get compute/zone
54+
- name: Install kueuectl
55+
run: curl -Lo ./kubectl-kueue https://github.com/kubernetes-sigs/kueue/releases/download/v0.8.1/kubectl-kueue-linux-amd64 && chmod +x ./kubectl-kueue && mv ./kubectl-kueue /usr/local/bin/kubectl-kueue
5456
- name: Create an XPK Cluster with zero node pools
5557
run: python xpk.py cluster create --cluster $EMPTY_CLUSTER_NAME --tpu-type=v4-8 --num-slices=0 --zone=us-central2-b --default-pool-cpu-machine-type=n1-standard-16 --reservation='${{ secrets.GCP_TPU_V4_RESERVATION }}' --custom-cluster-arguments='${{ secrets.CLUSTER_ARGUMENTS }}'
5658
- name: Delete the cluster created
@@ -74,6 +76,8 @@ jobs:
7476
run: python3 xpk.py inspector --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --workload $WORKLOAD_NAME
7577
- name: Wait for workload completion and confirm it succeeded
7678
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $WORKLOAD_NAME --timeout 300
79+
- name: Run xpk info command
80+
run : python3 xpk.py info --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
7781
- name: Delete the workload on the cluster
7882
run: python3 xpk.py workload delete --workload $WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
7983
- name: Delete the cluster created

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,5 +1108,14 @@ To explore the stack traces collected in a temporary directory in Kubernetes Pod
11081108
xpk-test --tpu-type=v5litepod-16 --deploy-stacktrace-sidecar
11091109
```
11101110
1111+
### Get information about jobs, queues and resources.
1112+
1113+
To list available resources and queues use ```xpk info``` command. It allows to see localqueues and clusterqueues and check for available resources.
1114+
1115+
To see queues with usage and workload info use:
1116+
```shell
1117+
python3 xpk.py info --cluster my-cluster
1118+
```
1119+
11111120
# Other advanced usage
11121121
[Use a Jupyter notebook to interact with a Cloud TPU cluster](xpk-notebooks.md)

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ keywords = []
3030

3131
# pip dependencies installed with `pip install -e .`
3232
dependencies = [
33-
"cloud-accelerator-diagnostics"
33+
"cloud-accelerator-diagnostics",
34+
"tabulate"
3435
]
3536

3637
[project.urls]

src/xpk/commands/info.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
"""
2+
Copyright 2024 Google LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
"""
16+
17+
from ..utils import xpk_exit, xpk_print
18+
from ..core.kueue import verify_kueuectl
19+
from .cluster import set_cluster_command
20+
from ..core.commands import (
21+
run_command_for_value,
22+
)
23+
from ..core.core import (
24+
add_zone_and_project,
25+
)
26+
import json
27+
from tabulate import tabulate
28+
from argparse import Namespace
29+
30+
table_fmt = 'plain'
31+
32+
33+
def info(args: Namespace) -> None:
34+
"""Provide info about localqueues, clusterqueues and their resources.
35+
36+
Args:
37+
args: user provided arguments for running the command.
38+
Returns:
39+
None
40+
"""
41+
add_zone_and_project(args)
42+
set_cluster_command_code = set_cluster_command(args)
43+
if set_cluster_command_code != 0:
44+
xpk_exit(set_cluster_command_code)
45+
46+
verify_kueuectl(args)
47+
48+
lqs = run_kueuectl_list_localqueue(args)
49+
cqs = run_kueuectl_list_clusterqueue(args)
50+
aggregate_results(cqs, lqs)
51+
52+
53+
def aggregate_results(cqs: list[dict], lqs: list[dict]) -> None:
54+
"""Aggregate listed clusterqueues and localqueues with resource usage and print them as table.
55+
56+
Args:
57+
lqs: list of localqueues.
58+
cqs: list of clusterqueues.
59+
Returns:
60+
None
61+
"""
62+
try:
63+
cq_list = json.loads(cqs)['items']
64+
except ValueError:
65+
xpk_print('Incorrect respone from list clusterqueue')
66+
xpk_print(cqs)
67+
xpk_exit(1)
68+
69+
try:
70+
lq_list = json.loads(lqs)['items']
71+
except ValueError:
72+
xpk_print('Incorrect respone from list localqueue')
73+
xpk_print(lqs)
74+
xpk_exit(1)
75+
nominalQuotas = get_nominal_quotas(cq_list)
76+
cq_usages = parse_queue_lists(cq_list, nominalQuotas)
77+
lq_usages = parse_queue_lists(lq_list, nominalQuotas)
78+
79+
xpk_print(
80+
'Cluster Queues usage \n',
81+
tabulate(cq_usages, headers='keys', tablefmt=table_fmt),
82+
)
83+
xpk_print(
84+
'Local Queues usage \n',
85+
tabulate(lq_usages, headers='keys', tablefmt=table_fmt),
86+
)
87+
88+
89+
def get_nominal_quotas(cqs: list[dict]) -> dict[str, dict[str, str]]:
90+
"""Get quotas from clusterqueues.
91+
This function retrieves how much of resource in each flavor is assigned to cluster queue.
92+
It parses flavors of passed cluster queues.
93+
Args:
94+
- cqs - list of cluster queues.
95+
Returns:
96+
- dictionary of cluster queues resources quotas in format:
97+
{cq_name:{{flavorName:resourceName}:quota}
98+
"""
99+
quotas = {}
100+
for cq in cqs:
101+
spec = cq['spec']
102+
cq_name = cq['metadata']['name']
103+
quotas[cq_name] = {}
104+
for rg in spec['resourceGroups']:
105+
for flavor in rg['flavors']:
106+
name = flavor['name']
107+
for resource in flavor['resources']:
108+
key = f'{name}:{resource["name"]}'
109+
quotas[cq_name][key] = resource['nominalQuota']
110+
return quotas
111+
112+
113+
def parse_queue_lists(
114+
qs: list[dict],
115+
flavor_resource_quotas: dict,
116+
reservation_key: str = 'flavorsReservation',
117+
) -> list[dict]:
118+
qs_usage_list = []
119+
for q in qs:
120+
queue_name = q['metadata']['name']
121+
q_pending_workloads = q['status']['pendingWorkloads']
122+
q_admitted_workloads = q['status']['admittedWorkloads']
123+
q_status = {
124+
'QUEUE': queue_name,
125+
'ADMITTED_WORKLOADS': q_admitted_workloads,
126+
'PENDING_WORKLOADS': q_pending_workloads,
127+
}
128+
q_status.update(
129+
get_flavors_usage(q, reservation_key, flavor_resource_quotas)
130+
)
131+
qs_usage_list.append(q_status)
132+
return qs_usage_list
133+
134+
135+
def get_flavors_resources_reservations(
136+
cq_name: str, flavors_res: list[dict]
137+
) -> dict[str, dict[str, str]]:
138+
"""Get usage of flavors resources.
139+
This function parser flavorsReservation section of clusterQueue of LocalQueue.
140+
Args:
141+
- cq_name - name of ClusterQueue to which flavors belong.
142+
- flavors_res - list of reservations made by flavors
143+
Returns:
144+
Dict containing usage of each resource in flavor for each flavor in cluster or local queue.
145+
Dict format: {cq_name: {{flavor:resource}:reservation}}
146+
"""
147+
reservations = {}
148+
reservations[cq_name] = {}
149+
for flavor_name, flavor_resources_reservation_list in flavors_res.items():
150+
for resource in flavor_resources_reservation_list:
151+
reservations[cq_name][f'{flavor_name}:{resource["name"]}'] = resource[
152+
'total'
153+
]
154+
155+
return reservations
156+
157+
158+
def get_flavors_usage(
159+
q_entry: dict, res_field: str, flavor_resource_quotas: dict
160+
) -> list[dict]:
161+
"""Parse q_entry to retrieve list of each resource usage in flavour.
162+
Args:
163+
q_entry - single entry into either LocalQueue or ClusterQueue structured as json
164+
flavor_resource_quotas - nominalQuota of flavors resource usage for each clusterqueue
165+
Returns:
166+
list of dicts where each list entry is in format (key, entry) where:
167+
- key is flavorName:resourceName
168+
- entry is flavorResourceReservation/flavorResourceQuota
169+
"""
170+
status = q_entry['status']
171+
flavors_res = status[res_field]
172+
queue_type = q_entry['kind']
173+
174+
flavors_res = {flavor['name']: flavor['resources'] for flavor in flavors_res}
175+
usage_fraction = {}
176+
cq_name = (
177+
q_entry['metadata']['name']
178+
if queue_type == 'ClusterQueue'
179+
else q_entry['spec']['clusterQueue']
180+
)
181+
182+
reservations = get_flavors_resources_reservations(cq_name, flavors_res)
183+
184+
for cq_name, cq_reservations in reservations.items():
185+
cq_nominal_quotas = flavor_resource_quotas[cq_name]
186+
187+
for flavor_resource, flavor_resource_quota in cq_nominal_quotas.items():
188+
flavor_resource_reservation = cq_reservations[flavor_resource]
189+
usage_fraction[flavor_resource] = (
190+
f'{flavor_resource_reservation}/{flavor_resource_quota}'
191+
)
192+
return usage_fraction
193+
194+
195+
def run_kueuectl_list_localqueue(args: Namespace) -> str:
196+
"""Run the kueuectl list localqueue command.
197+
198+
Args:
199+
args: user provided arguments for running the command.
200+
201+
Returns:
202+
kueuectl list localqueue formatted as json string.
203+
"""
204+
command = 'kubectl kueue list localqueue -o json'
205+
if args.namespace != '':
206+
command += f' --namespace {args.namespace}'
207+
return_code, val = run_command_for_value(command, 'list localqueue', args)
208+
209+
if return_code != 0:
210+
xpk_print(f'Cluster info request returned ERROR {return_code}')
211+
xpk_exit(return_code)
212+
return val
213+
214+
215+
def run_kueuectl_list_clusterqueue(args: Namespace) -> str:
216+
"""Run the kueuectl list clusterqueue command.
217+
218+
Args:
219+
args: user provided arguments for running the command.
220+
221+
Returns:
222+
kueuectl list clusterqueue formatted as json string
223+
"""
224+
command = 'kubectl kueue list clusterqueue -o json'
225+
226+
return_code, val = run_command_for_value(command, 'list clusterqueue', args)
227+
228+
if return_code != 0:
229+
xpk_print(f'Cluster info request returned ERROR {return_code}')
230+
xpk_exit(return_code)
231+
return val

src/xpk/core/kueue.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
limitations under the License.
1515
"""
1616

17-
from ..utils import write_tmp_file, xpk_print
18-
from .commands import run_command_with_updates_retry
17+
from argparse import Namespace
18+
from ..utils import write_tmp_file, xpk_print, xpk_exit
19+
from .commands import run_command_with_updates_retry, run_command_for_value
1920
from .core import (
2021
AutoprovisioningConfig,
2122
create_accelerator_label,
@@ -140,6 +141,31 @@
140141
"""
141142

142143

144+
def verify_kueuectl(args: Namespace) -> None:
145+
"""Verify if kueuectl is installed.
146+
Args:
147+
args: user provided arguments.
148+
Returns:
149+
None
150+
"""
151+
xpk_print('Veryfing kueuectl installation')
152+
153+
command = 'kubectl kueue version'
154+
task = 'Verify kueuectl installation on cluster'
155+
verify_kueuectl_installed_code, _ = run_command_for_value(command, task, args)
156+
157+
if verify_kueuectl_installed_code == 0:
158+
xpk_print('kueuectl found')
159+
160+
if verify_kueuectl_installed_code != 0:
161+
xpk_print(
162+
'kueuectl not found. Please follow'
163+
' https://kueue.sigs.k8s.io/docs/reference/kubectl-kueue/installation/'
164+
' to install kueuectl.'
165+
)
166+
xpk_exit(verify_kueuectl_installed_code)
167+
168+
143169
def install_kueue_on_cluster(args) -> int:
144170
"""Install Kueue on the cluster.
145171

src/xpk/parser/core.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from .cluster import set_cluster_parser
2121
from .inspector import set_inspector_parser
2222
from .workload import set_workload_parsers
23+
from .info import set_info_parser
2324

2425

2526
def set_parser(parser: argparse.ArgumentParser):
@@ -37,6 +38,9 @@ def set_parser(parser: argparse.ArgumentParser):
3738
"inspector",
3839
help="commands around investigating workload, and Kueue failures.",
3940
)
41+
info_parser = xpk_subcommands.add_parser(
42+
"info", help="commands around listing kueue clusterqueues and localqueues"
43+
)
4044

4145
def default_subcommand_function(
4246
_args,
@@ -53,12 +57,15 @@ def default_subcommand_function(
5357
parser.print_help()
5458
cluster_parser.print_help()
5559
workload_parser.print_help()
60+
info_parser.print_help()
5661
return 0
5762

5863
parser.set_defaults(func=default_subcommand_function)
5964
workload_parser.set_defaults(func=default_subcommand_function)
6065
cluster_parser.set_defaults(func=default_subcommand_function)
66+
info_parser.set_defaults(func=default_subcommand_function)
6167

6268
set_workload_parsers(workload_parser=workload_parser)
6369
set_cluster_parser(cluster_parser=cluster_parser)
6470
set_inspector_parser(inspector_parser=inspector_parser)
71+
set_info_parser(info_parser=info_parser)

0 commit comments

Comments
 (0)