Skip to content

Commit e5a3593

Browse files
authored
Merge branch 'master' into sendToGitHub/fix-scheduling-reports-auth
2 parents 0a60f20 + 1d93b7a commit e5a3593

File tree

9 files changed

+122
-42
lines changed

9 files changed

+122
-42
lines changed

client/python/armada_client/asyncio_client.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def __init__(
105105
event_timeout: timedelta = timedelta(minutes=15),
106106
) -> None:
107107
self.submit_stub = submit_pb2_grpc.SubmitStub(channel)
108+
self.queue_stub = submit_pb2_grpc.QueueServiceStub(channel)
108109
self.event_stub = event_pb2_grpc.EventStub(channel)
109110
self.job_stub = job_pb2_grpc.JobsStub(channel)
110111
self.event_timeout = event_timeout
@@ -390,7 +391,7 @@ async def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
390391
:param queue: A queue to create.
391392
"""
392393

393-
response = await self.submit_stub.CreateQueue(queue)
394+
response = await self.queue_stub.CreateQueue(queue)
394395
return response
395396

396397
async def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
@@ -400,7 +401,7 @@ async def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
400401
:param queue: A queue to update.
401402
"""
402403

403-
response = await self.submit_stub.UpdateQueue(queue)
404+
response = await self.queue_stub.UpdateQueue(queue)
404405
return response
405406

406407
async def create_queues(
@@ -413,7 +414,7 @@ async def create_queues(
413414
"""
414415

415416
queue_list = submit_pb2.QueueList(queues=queues)
416-
response = await self.submit_stub.CreateQueues(queue_list)
417+
response = await self.queue_stub.CreateQueues(queue_list)
417418
return response
418419

419420
async def update_queues(
@@ -426,7 +427,7 @@ async def update_queues(
426427
"""
427428

428429
queue_list = submit_pb2.QueueList(queues=queues)
429-
response = await self.submit_stub.UpdateQueues(queue_list)
430+
response = await self.queue_stub.UpdateQueues(queue_list)
430431
return response
431432

432433
async def delete_queue(self, name: str) -> None:
@@ -438,7 +439,7 @@ async def delete_queue(self, name: str) -> None:
438439
:return: None
439440
"""
440441
request = submit_pb2.QueueDeleteRequest(name=name)
441-
await self.submit_stub.DeleteQueue(request)
442+
await self.queue_stub.DeleteQueue(request)
442443

443444
async def get_queue(self, name: str) -> submit_pb2.Queue:
444445
"""Get the queue by name.
@@ -449,9 +450,23 @@ async def get_queue(self, name: str) -> submit_pb2.Queue:
449450
:return: A queue object. See the api definition.
450451
"""
451452
request = submit_pb2.QueueGetRequest(name=name)
452-
response = await self.submit_stub.GetQueue(request)
453+
response = await self.queue_stub.GetQueue(request)
453454
return response
454455

456+
async def get_queues(self) -> list:
457+
"""Retrieves all queues
458+
:return: List containing all queues.
459+
"""
460+
queues = []
461+
request = submit_pb2.StreamingQueueGetRequest()
462+
async for message in self.queue_stub.GetQueues(request):
463+
event_type = message.WhichOneof("event")
464+
if event_type == "queue":
465+
queues.append(message.queue)
466+
elif event_type == "end":
467+
break
468+
return queues
469+
455470
@staticmethod
456471
def unwatch_events(event_stream) -> None:
457472
"""Closes gRPC event streams

client/python/armada_client/client.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ class ArmadaClient:
103103
def __init__(self, channel, event_timeout: timedelta = timedelta(minutes=15)):
104104
self.submit_stub = submit_pb2_grpc.SubmitStub(channel)
105105
self.event_stub = event_pb2_grpc.EventStub(channel)
106-
self.event_timeout = event_timeout
107106
self.job_stub = job_pb2_grpc.JobsStub(channel)
107+
self.queue_stub = submit_pb2_grpc.QueueServiceStub(channel)
108+
self.event_timeout = event_timeout
108109

109110
def get_job_events_stream(
110111
self,
@@ -290,6 +291,7 @@ def cancel_jobset(
290291
) -> empty_pb2.Empty:
291292
"""Cancel jobs in a given queue.
292293
294+
293295
Uses the CancelJobSet RPC to cancel jobs.
294296
A filter is used to only cancel jobs in certain states.
295297
@@ -378,7 +380,7 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
378380
:param queue: A queue to create.
379381
"""
380382

381-
response = self.submit_stub.CreateQueue(queue)
383+
response = self.queue_stub.CreateQueue(queue)
382384
return response
383385

384386
def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
@@ -388,7 +390,7 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
388390
:param queue: A queue to update.
389391
"""
390392

391-
response = self.submit_stub.UpdateQueue(queue)
393+
response = self.queue_stub.UpdateQueue(queue)
392394
return response
393395

394396
def create_queues(
@@ -401,7 +403,7 @@ def create_queues(
401403
"""
402404

403405
queue_list = submit_pb2.QueueList(queues=queues)
404-
response = self.submit_stub.CreateQueues(queue_list)
406+
response = self.queue_stub.CreateQueues(queue_list)
405407
return response
406408

407409
def update_queues(
@@ -414,7 +416,7 @@ def update_queues(
414416
"""
415417

416418
queue_list = submit_pb2.QueueList(queues=queues)
417-
response = self.submit_stub.UpdateQueues(queue_list)
419+
response = self.queue_stub.UpdateQueues(queue_list)
418420
return response
419421

420422
def delete_queue(self, name: str) -> None:
@@ -426,7 +428,7 @@ def delete_queue(self, name: str) -> None:
426428
:return: None
427429
"""
428430
request = submit_pb2.QueueDeleteRequest(name=name)
429-
self.submit_stub.DeleteQueue(request)
431+
self.queue_stub.DeleteQueue(request)
430432

431433
def get_queue(self, name: str) -> submit_pb2.Queue:
432434
"""Get the queue by name.
@@ -437,9 +439,27 @@ def get_queue(self, name: str) -> submit_pb2.Queue:
437439
:return: A queue object. See the api definition.
438440
"""
439441
request = submit_pb2.QueueGetRequest(name=name)
440-
response = self.submit_stub.GetQueue(request)
442+
response = self.queue_stub.GetQueue(request)
441443
return response
442444

445+
def get_queues(self) -> List[submit_pb2.Queue]:
446+
"""Get all queues.
447+
448+
Uses the GetQueues RPC to get the queues.
449+
450+
:return: list containing all queues
451+
"""
452+
queues = []
453+
454+
request = submit_pb2.StreamingQueueGetRequest()
455+
456+
for message in self.queue_stub.GetQueues(request):
457+
if message.HasField("queue"):
458+
queues.append(message.queue)
459+
elif message.HasField("end"):
460+
break
461+
return queues
462+
443463
@staticmethod
444464
def unwatch_events(event_stream) -> None:
445465
"""Closes gRPC event streams

client/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "armada_client"
3-
version = "0.4.10"
3+
version = "0.4.11"
44
description = "Armada gRPC API python client"
55
readme = "README.md"
66
requires-python = ">=3.9"

client/python/tests/unit/server_mock.py

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from armada_client.armada.submit_pb2 import JobState
1616

1717

18-
class SubmitService(submit_pb2_grpc.SubmitServicer):
18+
class QueueService(submit_pb2_grpc.QueueServiceServicer):
1919
def CreateQueue(self, request, context):
2020
return empty_pb2.Empty()
2121

@@ -25,6 +25,40 @@ def DeleteQueue(self, request, context):
2525
def GetQueue(self, request, context):
2626
return submit_pb2.Queue(name=request.name)
2727

28+
def GetQueues(self, request, context):
29+
queue_names = ["test_queue1", "test_queue2", "test_queue3"]
30+
for name in queue_names:
31+
queue_message = submit_pb2.StreamingQueueMessage(
32+
queue=submit_pb2.Queue(name=name)
33+
)
34+
yield queue_message
35+
36+
yield submit_pb2.StreamingQueueMessage(end=submit_pb2.EndMarker())
37+
38+
def GetQueueInfo(self, request, context):
39+
return submit_pb2.QueueInfo(name=request.name)
40+
41+
def CreateQueues(self, request, context):
42+
return submit_pb2.BatchQueueCreateResponse(
43+
failed_queues=[
44+
submit_pb2.QueueCreateResponse(queue=submit_pb2.Queue(name=queue.name))
45+
for queue in request.queues
46+
]
47+
)
48+
49+
def UpdateQueues(self, request, context):
50+
return submit_pb2.BatchQueueUpdateResponse(
51+
failed_queues=[
52+
submit_pb2.QueueUpdateResponse(queue=submit_pb2.Queue(name=queue.name))
53+
for queue in request.queues
54+
]
55+
)
56+
57+
def UpdateQueue(self, request, context):
58+
return empty_pb2.Empty()
59+
60+
61+
class SubmitService(submit_pb2_grpc.SubmitServicer):
2862
def SubmitJobs(self, request, context):
2963
# read job_ids from request.job_request_items
3064
job_ids = [f"job-{i}" for i in range(1, len(request.job_request_items) + 1)]
@@ -35,9 +69,6 @@ def SubmitJobs(self, request, context):
3569

3670
return submit_pb2.JobSubmitResponse(job_response_items=job_response_items)
3771

38-
def GetQueueInfo(self, request, context):
39-
return submit_pb2.QueueInfo(name=request.name)
40-
4172
def CancelJobs(self, request, context):
4273
return submit_pb2.CancellationResult(
4374
cancelled_ids=["job-1"],
@@ -72,25 +103,6 @@ def ReprioritizeJobs(self, request, context):
72103

73104
return submit_pb2.JobReprioritizeResponse(reprioritization_results=results)
74105

75-
def UpdateQueue(self, request, context):
76-
return empty_pb2.Empty()
77-
78-
def CreateQueues(self, request, context):
79-
return submit_pb2.BatchQueueCreateResponse(
80-
failed_queues=[
81-
submit_pb2.QueueCreateResponse(queue=submit_pb2.Queue(name=queue.name))
82-
for queue in request.queues
83-
]
84-
)
85-
86-
def UpdateQueues(self, request, context):
87-
return submit_pb2.BatchQueueUpdateResponse(
88-
failed_queues=[
89-
submit_pb2.QueueUpdateResponse(queue=submit_pb2.Queue(name=queue.name))
90-
for queue in request.queues
91-
]
92-
)
93-
94106
def Health(self, request, context):
95107
return health_pb2.HealthCheckResponse(
96108
status=health_pb2.HealthCheckResponse.SERVING

client/python/tests/unit/test_asyncio_client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from armada_client.typings import JobState
88
from armada_client.armada.job_pb2 import JobRunState
9-
from server_mock import EventService, SubmitService, QueryAPIService
9+
from server_mock import EventService, SubmitService, QueueService, QueryAPIService
1010

1111
from armada_client.armada import (
1212
event_pb2_grpc,
@@ -28,6 +28,7 @@
2828
def server_mock():
2929
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
3030
submit_pb2_grpc.add_SubmitServicer_to_server(SubmitService(), server)
31+
submit_pb2_grpc.add_QueueServiceServicer_to_server(QueueService(), server)
3132
event_pb2_grpc.add_EventServicer_to_server(EventService(), server)
3233
job_pb2_grpc.add_JobsServicer_to_server(QueryAPIService(), server)
3334
server.add_insecure_port("[::]:50051")
@@ -175,6 +176,13 @@ async def test_get_queue(aio_client):
175176
assert queue.name == "test"
176177

177178

179+
@pytest.mark.asyncio
180+
async def test_get_queues(aio_client):
181+
queues = await aio_client.get_queues()
182+
queue_names = [q.name for q in queues]
183+
assert queue_names == ["test_queue1", "test_queue2", "test_queue3"]
184+
185+
178186
@pytest.mark.asyncio
179187
async def test_delete_queue(aio_client):
180188
await aio_client.delete_queue("test")

client/python/tests/unit/test_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from armada_client.typings import JobState
77
from armada_client.armada.job_pb2 import JobRunState
8-
from server_mock import EventService, SubmitService, QueryAPIService
8+
from server_mock import EventService, SubmitService, QueryAPIService, QueueService
99

1010
from armada_client.armada import (
1111
event_pb2_grpc,
@@ -27,6 +27,7 @@
2727
def server_mock():
2828
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
2929
submit_pb2_grpc.add_SubmitServicer_to_server(SubmitService(), server)
30+
submit_pb2_grpc.add_QueueServiceServicer_to_server(QueueService(), server)
3031
event_pb2_grpc.add_EventServicer_to_server(EventService(), server)
3132
job_pb2_grpc.add_JobsServicer_to_server(QueryAPIService(), server)
3233
server.add_insecure_port("[::]:50051")
@@ -165,6 +166,12 @@ def test_get_queue():
165166
assert tester.get_queue("test").name == "test"
166167

167168

169+
def test_get_queues():
170+
queues = tester.get_queues()
171+
queue_names = [q.name for q in queues]
172+
assert queue_names == ["test_queue1", "test_queue2", "test_queue3"]
173+
174+
168175
def test_delete_queue():
169176
tester.delete_queue("test")
170177

deployment/scheduler/templates/scheduler-pruner-secret.yaml renamed to deployment/scheduler/templates/scheduler-pruner-configmap.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
apiVersion: v1
2-
kind: Secret
2+
kind: ConfigMap
33
metadata:
44
name: {{ include "armada-scheduler-pruner.config.name" . }}
55
namespace: {{ .Release.Namespace }}
66
labels:
77
{{- include "armada-scheduler-pruner.labels.all" . | nindent 4 }}
8-
type: Opaque
98
data:
109
{{ include "armada-scheduler-pruner.config.filename" . }}: |
1110
{{- if .Values.pruner.applicationConfig }}
12-
{{ toYaml .Values.pruner.applicationConfig | b64enc | indent 4 }}
11+
{{ toYaml .Values.pruner.applicationConfig | indent 4 }}
1312
{{- end }}

docs/python_armada_client.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,24 @@ Uses the GetQueue RPC to get the queue.
447447

448448

449449

450+
#### get_queues()
451+
Get all queues.
452+
453+
Uses the GetQueues RPC to get the queues.
454+
455+
456+
* **Returns**
457+
458+
list containing all queues
459+
460+
461+
462+
* **Return type**
463+
464+
*List*[armada.submit_pb2.Queue]
465+
466+
467+
450468
#### preempt_jobs(queue, job_set_id, job_id)
451469
Preempt jobs in a given queue.
452470

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_runs_filtered ON runs (executor, succeeded, failed, cancelled, serial);

0 commit comments

Comments
 (0)