Skip to content

Commit fecfe6c

Browse files
authored
Use batched request to apply for slots (#2601)
1 parent 889ffb7 commit fecfe6c

File tree

4 files changed

+35
-5
lines changed

4 files changed

+35
-5
lines changed

mars/services/scheduling/supervisor/globalslot.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async def __pre_destroy__(self):
6262
async def refresh_bands(self):
6363
self._band_total_slots = await self._cluster_api.get_all_bands()
6464

65+
@mo.extensible
6566
async def apply_subtask_slots(
6667
self,
6768
band: BandType,

mars/services/scheduling/supervisor/queueing.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
169169
submit_aio_tasks = []
170170
manager_ref = await self._get_manager_ref()
171171

172+
apply_delays = []
173+
submit_items_list = []
174+
submitted_bands = []
175+
172176
for band in bands:
173177
band_limit = limit or self._band_slot_nums[band]
174178
task_queue = self._band_queues[band]
@@ -181,17 +185,40 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None)
181185
subtask_ids = list(submit_items)
182186
if not subtask_ids:
183187
continue
188+
189+
submitted_bands.append(band)
190+
submit_items_list.append(submit_items)
191+
184192
# todo it is possible to provide slot data with more accuracy
185193
subtask_slots = [1] * len(subtask_ids)
186194

195+
apply_delays.append(
196+
self._slots_ref.apply_subtask_slots.delay(
197+
band, self._session_id, subtask_ids, subtask_slots
198+
)
199+
)
200+
201+
async with redirect_subtask_errors(
202+
self,
203+
[
204+
item.subtask
205+
for submit_items in submit_items_list
206+
for item in submit_items.values()
207+
],
208+
):
209+
submitted_ids_list = await self._slots_ref.apply_subtask_slots.batch(
210+
*apply_delays
211+
)
212+
213+
for band, submit_items, submitted_ids in zip(
214+
submitted_bands, submit_items_list, submitted_ids_list
215+
):
216+
subtask_ids = list(submit_items)
217+
task_queue = self._band_queues[band]
218+
187219
async with redirect_subtask_errors(
188220
self, [item.subtask for item in submit_items.values()]
189221
):
190-
submitted_ids = set(
191-
await self._slots_ref.apply_subtask_slots(
192-
band, self._session_id, subtask_ids, subtask_slots
193-
)
194-
)
195222
non_submitted_ids = [k for k in submit_items if k not in submitted_ids]
196223
if submitted_ids:
197224
for stid in subtask_ids:

mars/services/scheduling/supervisor/tests/test_queue_balance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ async def create(cls, address: str, **kw):
9999

100100

101101
class MockSlotsActor(mo.Actor):
102+
@mo.extensible
102103
def apply_subtask_slots(
103104
self,
104105
band: Tuple,

mars/services/scheduling/supervisor/tests/test_queueing.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(self):
3333
def set_capacity(self, capacity: int):
3434
self._capacity = capacity
3535

36+
@mo.extensible
3637
def apply_subtask_slots(
3738
self,
3839
band: Tuple,

0 commit comments

Comments
 (0)