Skip to content

Commit 5f0d149

Browse files
committed
Merge branch 'master' of github.com:armadaproject/armada into f/chrisma/add-logfile-config
2 parents c4119d5 + fcee91c commit 5f0d149

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1565
-121
lines changed

client/python/armada_client/asyncio_client.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def __init__(
105105
event_timeout: timedelta = timedelta(minutes=15),
106106
) -> None:
107107
self.submit_stub = submit_pb2_grpc.SubmitStub(channel)
108+
self.queue_stub = submit_pb2_grpc.QueueServiceStub(channel)
108109
self.event_stub = event_pb2_grpc.EventStub(channel)
109110
self.job_stub = job_pb2_grpc.JobsStub(channel)
110111
self.event_timeout = event_timeout
@@ -390,7 +391,7 @@ async def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
390391
:param queue: A queue to create.
391392
"""
392393

393-
response = await self.submit_stub.CreateQueue(queue)
394+
response = await self.queue_stub.CreateQueue(queue)
394395
return response
395396

396397
async def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
@@ -400,7 +401,7 @@ async def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
400401
:param queue: A queue to update.
401402
"""
402403

403-
response = await self.submit_stub.UpdateQueue(queue)
404+
response = await self.queue_stub.UpdateQueue(queue)
404405
return response
405406

406407
async def create_queues(
@@ -413,7 +414,7 @@ async def create_queues(
413414
"""
414415

415416
queue_list = submit_pb2.QueueList(queues=queues)
416-
response = await self.submit_stub.CreateQueues(queue_list)
417+
response = await self.queue_stub.CreateQueues(queue_list)
417418
return response
418419

419420
async def update_queues(
@@ -426,7 +427,7 @@ async def update_queues(
426427
"""
427428

428429
queue_list = submit_pb2.QueueList(queues=queues)
429-
response = await self.submit_stub.UpdateQueues(queue_list)
430+
response = await self.queue_stub.UpdateQueues(queue_list)
430431
return response
431432

432433
async def delete_queue(self, name: str) -> None:
@@ -438,7 +439,7 @@ async def delete_queue(self, name: str) -> None:
438439
:return: None
439440
"""
440441
request = submit_pb2.QueueDeleteRequest(name=name)
441-
await self.submit_stub.DeleteQueue(request)
442+
await self.queue_stub.DeleteQueue(request)
442443

443444
async def get_queue(self, name: str) -> submit_pb2.Queue:
444445
"""Get the queue by name.
@@ -449,9 +450,23 @@ async def get_queue(self, name: str) -> submit_pb2.Queue:
449450
:return: A queue object. See the api definition.
450451
"""
451452
request = submit_pb2.QueueGetRequest(name=name)
452-
response = await self.submit_stub.GetQueue(request)
453+
response = await self.queue_stub.GetQueue(request)
453454
return response
454455

456+
async def get_queues(self) -> list:
457+
"""Retrieves all queues
458+
:return: List containing all queues.
459+
"""
460+
queues = []
461+
request = submit_pb2.StreamingQueueGetRequest()
462+
async for message in self.queue_stub.GetQueues(request):
463+
event_type = message.WhichOneof("event")
464+
if event_type == "queue":
465+
queues.append(message.queue)
466+
elif event_type == "end":
467+
break
468+
return queues
469+
455470
@staticmethod
456471
def unwatch_events(event_stream) -> None:
457472
"""Closes gRPC event streams

client/python/armada_client/client.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,9 @@ class ArmadaClient:
103103
def __init__(self, channel, event_timeout: timedelta = timedelta(minutes=15)):
104104
self.submit_stub = submit_pb2_grpc.SubmitStub(channel)
105105
self.event_stub = event_pb2_grpc.EventStub(channel)
106-
self.event_timeout = event_timeout
107106
self.job_stub = job_pb2_grpc.JobsStub(channel)
107+
self.queue_stub = submit_pb2_grpc.QueueServiceStub(channel)
108+
self.event_timeout = event_timeout
108109

109110
def get_job_events_stream(
110111
self,
@@ -290,6 +291,7 @@ def cancel_jobset(
290291
) -> empty_pb2.Empty:
291292
"""Cancel jobs in a given queue.
292293
294+
293295
Uses the CancelJobSet RPC to cancel jobs.
294296
A filter is used to only cancel jobs in certain states.
295297
@@ -378,7 +380,7 @@ def create_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
378380
:param queue: A queue to create.
379381
"""
380382

381-
response = self.submit_stub.CreateQueue(queue)
383+
response = self.queue_stub.CreateQueue(queue)
382384
return response
383385

384386
def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
@@ -388,7 +390,7 @@ def update_queue(self, queue: submit_pb2.Queue) -> empty_pb2.Empty:
388390
:param queue: A queue to update.
389391
"""
390392

391-
response = self.submit_stub.UpdateQueue(queue)
393+
response = self.queue_stub.UpdateQueue(queue)
392394
return response
393395

394396
def create_queues(
@@ -401,7 +403,7 @@ def create_queues(
401403
"""
402404

403405
queue_list = submit_pb2.QueueList(queues=queues)
404-
response = self.submit_stub.CreateQueues(queue_list)
406+
response = self.queue_stub.CreateQueues(queue_list)
405407
return response
406408

407409
def update_queues(
@@ -414,7 +416,7 @@ def update_queues(
414416
"""
415417

416418
queue_list = submit_pb2.QueueList(queues=queues)
417-
response = self.submit_stub.UpdateQueues(queue_list)
419+
response = self.queue_stub.UpdateQueues(queue_list)
418420
return response
419421

420422
def delete_queue(self, name: str) -> None:
@@ -426,7 +428,7 @@ def delete_queue(self, name: str) -> None:
426428
:return: None
427429
"""
428430
request = submit_pb2.QueueDeleteRequest(name=name)
429-
self.submit_stub.DeleteQueue(request)
431+
self.queue_stub.DeleteQueue(request)
430432

431433
def get_queue(self, name: str) -> submit_pb2.Queue:
432434
"""Get the queue by name.
@@ -437,9 +439,27 @@ def get_queue(self, name: str) -> submit_pb2.Queue:
437439
:return: A queue object. See the api definition.
438440
"""
439441
request = submit_pb2.QueueGetRequest(name=name)
440-
response = self.submit_stub.GetQueue(request)
442+
response = self.queue_stub.GetQueue(request)
441443
return response
442444

445+
def get_queues(self) -> List[submit_pb2.Queue]:
446+
"""Get all queues.
447+
448+
Uses the GetQueues RPC to get the queues.
449+
450+
:return: list containing all queues
451+
"""
452+
queues = []
453+
454+
request = submit_pb2.StreamingQueueGetRequest()
455+
456+
for message in self.queue_stub.GetQueues(request):
457+
if message.HasField("queue"):
458+
queues.append(message.queue)
459+
elif message.HasField("end"):
460+
break
461+
return queues
462+
443463
@staticmethod
444464
def unwatch_events(event_stream) -> None:
445465
"""Closes gRPC event streams

client/python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "armada_client"
3-
version = "0.4.10"
3+
version = "0.4.11"
44
description = "Armada gRPC API python client"
55
readme = "README.md"
66
requires-python = ">=3.9"

client/python/tests/unit/server_mock.py

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from armada_client.armada.submit_pb2 import JobState
1616

1717

18-
class SubmitService(submit_pb2_grpc.SubmitServicer):
18+
class QueueService(submit_pb2_grpc.QueueServiceServicer):
1919
def CreateQueue(self, request, context):
2020
return empty_pb2.Empty()
2121

@@ -25,6 +25,40 @@ def DeleteQueue(self, request, context):
2525
def GetQueue(self, request, context):
2626
return submit_pb2.Queue(name=request.name)
2727

28+
def GetQueues(self, request, context):
29+
queue_names = ["test_queue1", "test_queue2", "test_queue3"]
30+
for name in queue_names:
31+
queue_message = submit_pb2.StreamingQueueMessage(
32+
queue=submit_pb2.Queue(name=name)
33+
)
34+
yield queue_message
35+
36+
yield submit_pb2.StreamingQueueMessage(end=submit_pb2.EndMarker())
37+
38+
def GetQueueInfo(self, request, context):
39+
return submit_pb2.QueueInfo(name=request.name)
40+
41+
def CreateQueues(self, request, context):
42+
return submit_pb2.BatchQueueCreateResponse(
43+
failed_queues=[
44+
submit_pb2.QueueCreateResponse(queue=submit_pb2.Queue(name=queue.name))
45+
for queue in request.queues
46+
]
47+
)
48+
49+
def UpdateQueues(self, request, context):
50+
return submit_pb2.BatchQueueUpdateResponse(
51+
failed_queues=[
52+
submit_pb2.QueueUpdateResponse(queue=submit_pb2.Queue(name=queue.name))
53+
for queue in request.queues
54+
]
55+
)
56+
57+
def UpdateQueue(self, request, context):
58+
return empty_pb2.Empty()
59+
60+
61+
class SubmitService(submit_pb2_grpc.SubmitServicer):
2862
def SubmitJobs(self, request, context):
2963
# read job_ids from request.job_request_items
3064
job_ids = [f"job-{i}" for i in range(1, len(request.job_request_items) + 1)]
@@ -35,9 +69,6 @@ def SubmitJobs(self, request, context):
3569

3670
return submit_pb2.JobSubmitResponse(job_response_items=job_response_items)
3771

38-
def GetQueueInfo(self, request, context):
39-
return submit_pb2.QueueInfo(name=request.name)
40-
4172
def CancelJobs(self, request, context):
4273
return submit_pb2.CancellationResult(
4374
cancelled_ids=["job-1"],
@@ -72,25 +103,6 @@ def ReprioritizeJobs(self, request, context):
72103

73104
return submit_pb2.JobReprioritizeResponse(reprioritization_results=results)
74105

75-
def UpdateQueue(self, request, context):
76-
return empty_pb2.Empty()
77-
78-
def CreateQueues(self, request, context):
79-
return submit_pb2.BatchQueueCreateResponse(
80-
failed_queues=[
81-
submit_pb2.QueueCreateResponse(queue=submit_pb2.Queue(name=queue.name))
82-
for queue in request.queues
83-
]
84-
)
85-
86-
def UpdateQueues(self, request, context):
87-
return submit_pb2.BatchQueueUpdateResponse(
88-
failed_queues=[
89-
submit_pb2.QueueUpdateResponse(queue=submit_pb2.Queue(name=queue.name))
90-
for queue in request.queues
91-
]
92-
)
93-
94106
def Health(self, request, context):
95107
return health_pb2.HealthCheckResponse(
96108
status=health_pb2.HealthCheckResponse.SERVING

client/python/tests/unit/test_asyncio_client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from armada_client.typings import JobState
88
from armada_client.armada.job_pb2 import JobRunState
9-
from server_mock import EventService, SubmitService, QueryAPIService
9+
from server_mock import EventService, SubmitService, QueueService, QueryAPIService
1010

1111
from armada_client.armada import (
1212
event_pb2_grpc,
@@ -28,6 +28,7 @@
2828
def server_mock():
2929
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
3030
submit_pb2_grpc.add_SubmitServicer_to_server(SubmitService(), server)
31+
submit_pb2_grpc.add_QueueServiceServicer_to_server(QueueService(), server)
3132
event_pb2_grpc.add_EventServicer_to_server(EventService(), server)
3233
job_pb2_grpc.add_JobsServicer_to_server(QueryAPIService(), server)
3334
server.add_insecure_port("[::]:50051")
@@ -175,6 +176,13 @@ async def test_get_queue(aio_client):
175176
assert queue.name == "test"
176177

177178

179+
@pytest.mark.asyncio
180+
async def test_get_queues(aio_client):
181+
queues = await aio_client.get_queues()
182+
queue_names = [q.name for q in queues]
183+
assert queue_names == ["test_queue1", "test_queue2", "test_queue3"]
184+
185+
178186
@pytest.mark.asyncio
179187
async def test_delete_queue(aio_client):
180188
await aio_client.delete_queue("test")

client/python/tests/unit/test_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from armada_client.typings import JobState
77
from armada_client.armada.job_pb2 import JobRunState
8-
from server_mock import EventService, SubmitService, QueryAPIService
8+
from server_mock import EventService, SubmitService, QueryAPIService, QueueService
99

1010
from armada_client.armada import (
1111
event_pb2_grpc,
@@ -27,6 +27,7 @@
2727
def server_mock():
2828
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
2929
submit_pb2_grpc.add_SubmitServicer_to_server(SubmitService(), server)
30+
submit_pb2_grpc.add_QueueServiceServicer_to_server(QueueService(), server)
3031
event_pb2_grpc.add_EventServicer_to_server(EventService(), server)
3132
job_pb2_grpc.add_JobsServicer_to_server(QueryAPIService(), server)
3233
server.add_insecure_port("[::]:50051")
@@ -165,6 +166,12 @@ def test_get_queue():
165166
assert tester.get_queue("test").name == "test"
166167

167168

169+
def test_get_queues():
170+
queues = tester.get_queues()
171+
queue_names = [q.name for q in queues]
172+
assert queue_names == ["test_queue1", "test_queue2", "test_queue3"]
173+
174+
168175
def test_delete_queue():
169176
tester.delete_queue("test")
170177

config/scheduler/config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pulsar:
2727
armadaApi:
2828
armadaUrl: "server:50051"
2929
forceNoTls: true
30+
priorityMultiplier:
31+
enabled: false
3032
postgres:
3133
connection:
3234
host: postgres
@@ -118,4 +120,3 @@ scheduling:
118120
experimentalIndicativePricing:
119121
basePrice: 100.0
120122
basePriority: 500.0
121-

docs/floating_resources.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Floating Resources
2+
3+
Floating resources are designed to constrain the usage of resources that are not tied to nodes. For example, if you have a fileserver outside your Kubernetes clusters, you may want to limit how many connections to the fileserver can exist at once. In that case you would add config like the below (this goes under the `scheduling` section of the Armada scheduler config).
4+
5+
```
6+
floatingResources:
7+
- name: fileserver-connections
8+
resolution: "1"
9+
pools:
10+
- name: cpu
11+
quantity: 1000
12+
- name: gpu
13+
quantity: 500
14+
```
15+
When submitting a job, floating resources are specified in the same way as normal Kubernetes resources such as `cpu`. For example if a job needs 3 cpu cores and opens 10 connections to the fileserver, the job should specify
16+
```
17+
resources:
18+
requests:
19+
cpu: "3"
20+
fileserver-connections: "10"
21+
limits:
22+
cpu: "3"
23+
fileserver-connections: "10"
24+
```
25+
The `requests` section is used for scheduling. For floating resources, the `limits` section is not enforced by Armada (this it not possible in the general case). Instead the workload must be trusted to respect its limit.
26+
27+
If the jobs submitted to Armada request more of a floating resource than is available, they queue just as if they had exceeded the amount available of a standard Kubernetes resource (e.g. `cpu`). Floating resources generally behave like standard Kubernetes resources. They use the same code for queue ordering, pre-emption, etc.

docs/python_armada_client.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,24 @@ Uses the GetQueue RPC to get the queue.
447447

448448

449449

450+
#### get_queues()
451+
Get all queues.
452+
453+
Uses the GetQueues RPC to get the queues.
454+
455+
456+
* **Returns**
457+
458+
list containing all queues
459+
460+
461+
462+
* **Return type**
463+
464+
*List*[armada.submit_pb2.Queue]
465+
466+
467+
450468
#### preempt_jobs(queue, job_set_id, job_id)
451469
Preempt jobs in a given queue.
452470

0 commit comments

Comments
 (0)