2424from ....lib .aio import alru_cache
2525from ....metrics import Metrics
2626from ....resource import ZeroResource
27+ from ....typing import BandType
2728from ....utils import dataslots
2829from ...subtask import Subtask
2930from ...task import TaskAPI
@@ -48,6 +49,7 @@ class SubtaskQueueingActor(mo.Actor):
4849 _stid_to_bands : DefaultDict [str , List [Tuple ]]
4950 _stid_to_items : Dict [str , HeapItem ]
5051 _band_queues : DefaultDict [Tuple , List [HeapItem ]]
52+ _submit_requests : List [Optional [Dict [BandType , int ]]]
5153
5254 @classmethod
5355 def gen_uid (cls , session_id : str ):
@@ -61,6 +63,10 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None):
6163 # so that we can ensure band queue is busy if the band queue is not empty.
6264 self ._band_queues = defaultdict (list )
6365
66+ self ._submit_requests = []
67+ self ._submit_request_event = asyncio .Event ()
68+ self ._submit_request_task = None
69+
6470 self ._cluster_api = None
6571 self ._slots_ref = None
6672 self ._assigner_ref = None
@@ -69,7 +75,6 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None):
6975 self ._band_watch_task = None
7076 self ._max_enqueue_id = 0
7177
72- self ._periodical_submit_task = None
7378 self ._submit_period = submit_period or _DEFAULT_SUBMIT_PERIOD
7479 self ._submitted_subtask_number = Metrics .gauge (
7580 "mars.band.submitted_subtask_number" ,
@@ -133,23 +138,13 @@ async def watch_bands():
133138 AssignerActor .gen_uid (self ._session_id ), address = self .address
134139 )
135140
136- if self ._submit_period > 0 :
137- self ._periodical_submit_task = self .ref ().periodical_submit .tell_delay (
138- delay = self ._submit_period
139- )
141+ self ._submit_request_task = asyncio .create_task (self ._submission_task_func ())
140142
141143 async def __pre_destroy__ (self ):
142144 self ._band_watch_task .cancel ()
143- if self ._periodical_submit_task is not None : # pragma: no branch
144- self ._periodical_submit_task .cancel ()
145-
146- async def periodical_submit (self ):
147- await self .ref ().submit_subtasks .tell ()
148- self ._periodical_submit_task = self .ref ().periodical_submit .tell_delay (
149- delay = self ._submit_period
150- )
145+ if self ._submit_request_task is not None : # pragma: no branch
146+ self ._submit_request_task .cancel ()
151147
152- @alru_cache
153148 async def _get_task_api (self ):
154149 return await TaskAPI .create (self ._session_id , self .address )
155150
@@ -180,114 +175,171 @@ async def add_subtasks(
180175 self ._max_enqueue_id += 1
181176 heapq .heappush (self ._band_queues [band ], heap_item )
182177 logger .debug (
183- "Subtask %s enqueued to band %s excluded from %s." ,
178+ "Subtask %s enqueued to band %s. exclude_bands= %s." ,
184179 subtask .subtask_id ,
185180 band ,
186181 exclude_bands ,
187182 )
188183 logger .debug ("%d subtasks enqueued" , len (subtasks ))
189184
190- async def submit_subtasks (self , band : Tuple = None , limit : Optional [int ] = None ):
191- logger .debug ("Submitting subtasks with limit %s" , limit )
185+ def submit_subtasks (self , band_to_limit : Dict [BandType , int ] = None ):
186+ self ._submit_requests .append (band_to_limit )
187+ self ._submit_request_event .set ()
188+
189+ async def _submission_task_func (self ):
190+ while True :
191+ try :
192+ periodical_triggered = False
193+ if not self ._submit_requests : # pragma: no branch
194+ try :
195+ if self ._submit_period :
196+ await asyncio .wait_for (
197+ self ._submit_request_event .wait (), self ._submit_period
198+ )
199+ else :
200+ await self ._submit_request_event .wait ()
201+
202+ self ._submit_request_event .clear ()
203+ except asyncio .TimeoutError :
204+ periodical_triggered = True
205+
206+ requests = self ._submit_requests
207+ self ._submit_requests = []
208+ if not periodical_triggered and not requests : # pragma: no cover
209+ continue
210+
211+ merged_band_to_limit = dict ()
212+ for req in requests :
213+ if req is None :
214+ merged_band_to_limit = None
215+ break
216+ merged_band_to_limit .update (req )
217+ await self ._submit_subtask_request (merged_band_to_limit )
218+ except asyncio .CancelledError :
219+ break
220+
221+ async def _submit_subtask_request (self , band_to_limit : Dict [BandType , int ] = None ):
222+ if band_to_limit :
223+ logger .debug (
224+ "TMP_QUEUE_PROBE: Submitting subtasks with limits: %r" , band_to_limit
225+ )
192226
193- if not limit and band not in self ._band_to_resource :
227+ if not self ._band_to_resource or any (
228+ not limit and band not in self ._band_to_resource
229+ for band , limit in band_to_limit or ()
230+ ):
194231 self ._band_to_resource = await self ._cluster_api .get_all_bands ()
195232
196- bands = [band ] if band is not None else list (self ._band_to_resource .keys ())
197- submit_aio_tasks = []
198- manager_ref = await self ._get_manager_ref ()
233+ if not band_to_limit :
234+ band_to_limit = {band : None for band in self ._band_to_resource .keys ()}
199235
200236 apply_delays = []
201237 submit_items_list = []
202238 submitted_bands = []
203239
204- for band in bands :
205- band_limit = limit or (
206- self ._band_to_resource [band ].num_cpus
207- or self ._band_to_resource [band ].num_gpus
208- )
209- task_queue = self ._band_queues [band ]
210- submit_items = dict ()
211- while (
212- self ._ensure_top_item_valid (task_queue )
213- and len (submit_items ) < band_limit
214- ):
215- item = heapq .heappop (task_queue )
216- submit_items [item .subtask .subtask_id ] = item
217-
218- subtask_ids = list (submit_items )
219- if not subtask_ids :
220- continue
221-
222- submitted_bands .append (band )
223- submit_items_list .append (submit_items )
224-
225- # Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because
226- # there is a slot idle. But now we have memory requirements, so the subtask may apply resource
227- # from supervisor failed. In such cases, those subtasks will never got scheduled.
228- # TODO We can use `_periodical_submit_task` to submit those subtasks.
229- subtask_resources = [
230- item .subtask .required_resource for item in submit_items .values ()
231- ]
232- apply_delays .append (
233- self ._slots_ref .apply_subtask_resources .delay (
234- band , self ._session_id , subtask_ids , subtask_resources
240+ def _load_items_to_submit ():
241+ for band , limit in band_to_limit .items ():
242+ band_limit = limit or (
243+ self ._band_to_resource [band ].num_cpus
244+ or self ._band_to_resource [band ].num_gpus
235245 )
236- )
246+ task_queue = self ._band_queues [band ]
247+ submit_items = dict ()
248+ while (
249+ self ._ensure_top_item_valid (task_queue )
250+ and len (submit_items ) < band_limit
251+ ):
252+ item = heapq .heappop (task_queue )
253+ submit_items [item .subtask .subtask_id ] = item
254+
255+ subtask_ids = list (submit_items )
256+ if not subtask_ids :
257+ continue
258+
259+ submitted_bands .append (band )
260+ submit_items_list .append (submit_items )
261+
262+ # Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because
263+ # there is a slot idle. But now we have memory requirements, so the subtask may apply resource
264+ # from supervisor failed. In such cases, those subtasks will never got scheduled.
265+ # TODO We can use `_periodical_submit_task` to submit those subtasks.
266+ subtask_resources = [
267+ item .subtask .required_resource for item in submit_items .values ()
268+ ]
269+ apply_delays .append (
270+ self ._slots_ref .apply_subtask_resources .delay (
271+ band , self ._session_id , subtask_ids , subtask_resources
272+ )
273+ )
274+
275+ await asyncio .to_thread (_load_items_to_submit )
276+
277+ logger .debug ("TMP_QUEUE_PROBE: Finished picking top subtasks" )
237278
238279 async with redirect_subtask_errors (
239280 self ,
240- [
281+ (
241282 item .subtask
242283 for submit_items in submit_items_list
243284 for item in submit_items .values ()
244- ] ,
285+ ) ,
245286 ):
246287 submitted_ids_list = await self ._slots_ref .apply_subtask_resources .batch (
247288 * apply_delays
248289 )
249290
250- for band , submit_items , submitted_ids in zip (
251- submitted_bands , submit_items_list , submitted_ids_list
252- ):
253- subtask_ids = list (submit_items )
254- task_queue = self ._band_queues [band ]
291+ logger .debug (
292+ "TMP_QUEUE_PROBE: Finished band resource allocation, %d subtasks submitted" ,
293+ sum (len (ids ) for ids in submitted_ids_list ),
294+ )
255295
256- async with redirect_subtask_errors (
257- self , [item .subtask for item in submit_items .values ()]
296+ manager_ref = await self ._get_manager_ref ()
297+ submit_delays = []
298+
299+ def _gather_submissions ():
300+ for band , submit_items , submitted_ids in zip (
301+ submitted_bands , submit_items_list , submitted_ids_list
258302 ):
259- non_submitted_ids = [k for k in submit_items if k not in submitted_ids ]
303+ subtask_ids = list (submit_items )
304+ task_queue = self ._band_queues [band ]
305+ submitted_id_set = set (submitted_ids )
306+
307+ non_submitted_ids = [
308+ k for k in submit_items if k not in submitted_id_set
309+ ]
260310 tags = {
261311 "session_id" : self ._session_id ,
262312 "band" : band [0 ] if band else "" ,
263313 }
264314 self ._submitted_subtask_number .record (len (submitted_ids ), tags )
265315 self ._unsubmitted_subtask_number .record (len (non_submitted_ids ), tags )
266- if submitted_ids :
316+
317+ if not submitted_ids :
318+ if non_submitted_ids :
319+ logger .debug ("No slots available on band %s" , band )
320+ else :
267321 for stid in subtask_ids :
268- if stid not in submitted_ids :
322+ if stid not in submitted_id_set :
269323 continue
270324 item = submit_items [stid ]
271325 logger .debug ("Submit subtask %r to band %r" , item .subtask , band )
272- submit_aio_tasks .append (
273- asyncio .create_task (
274- manager_ref .submit_subtask_to_band .tell (
275- item .subtask .subtask_id , band
276- )
326+ submit_delays .append (
327+ manager_ref .submit_subtask_to_band .delay (
328+ item .subtask .subtask_id , band
277329 )
278330 )
279- await asyncio .sleep (0 )
280331 self .remove_queued_subtasks ([item .subtask .subtask_id ])
281- else :
282- logger .debug ("No slots available" )
283332
284- for stid in non_submitted_ids :
285- # TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that
286- # other subtasks can be submitted.
287- heapq .heappush (task_queue , submit_items [stid ])
333+ for stid in non_submitted_ids :
334+ # TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that
335+ # other subtasks can be submitted.
336+ heapq .heappush (task_queue , submit_items [stid ])
337+
338+ await asyncio .to_thread (_gather_submissions )
288339
289- if submit_aio_tasks :
290- yield asyncio .gather (* submit_aio_tasks )
340+ logger .debug ("TMP_QUEUE_PROBE: Start subtask submission in batch" )
341+ await manager_ref .submit_subtask_to_band .batch (* submit_delays )
342+ logger .debug ("TMP_QUEUE_PROBE: Finished subtask submission" )
291343
292344 def _ensure_top_item_valid (self , task_queue ):
293345 """Clean invalid subtask item from the queue to ensure that when the queue is not empty,
0 commit comments