@@ -172,9 +172,7 @@ async def _get_execution_ref(self, band: BandType):
172172
173173 return await mo .actor_ref (SubtaskExecutionActor .default_uid (), address = band [0 ])
174174
175- async def set_subtask_result (
176- self , result : SubtaskResult , band : BandType
177- ):
175+ async def set_subtask_result (self , result : SubtaskResult , band : BandType ):
178176 info = self ._subtask_infos [result .subtask_id ]
179177 subtask_id = info .subtask .subtask_id
180178 notify_task_service = True
@@ -346,28 +344,38 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list):
346344 async def _submit_subtasks_to_band (self , band : BandType , subtask_ids : List [str ]):
347345 execution_ref = await self ._get_execution_ref (band )
348346 delays = []
347+ task_stage_count = defaultdict (lambda : 0 )
349348
350349 async with redirect_subtask_errors (
351350 self , self ._get_subtasks_by_ids (subtask_ids )
352351 ):
353352 for subtask_id in subtask_ids :
354353 subtask_info = self ._subtask_infos [subtask_id ]
355354 subtask = subtask_info .subtask
356- self ._submitted_subtask_count .record (
357- 1 ,
358- {
359- "session_id" : self ._session_id ,
360- "task_id" : subtask .task_id ,
361- "stage_id" : subtask .stage_id ,
362- },
363- )
364- logger .debug ("Start run subtask %s in band %s." , subtask_id , band )
355+ task_stage_count [(subtask .task_id , subtask .stage_id )] += 1
365356 delays .append (
366357 execution_ref .run_subtask .delay (subtask , band [1 ], self .address )
367358 )
368359 subtask_info .band_futures [band ] = asyncio .Future ()
369360 subtask_info .start_time = time .time ()
370361 self ._speculation_execution_scheduler .add_subtask (subtask_info )
362+
363+ for (task_id , stage_id ), cnt in task_stage_count .items ():
364+ self ._submitted_subtask_count .record (
365+ cnt ,
366+ {
367+ "session_id" : self ._session_id ,
368+ "task_id" : task_id ,
369+ "stage_id" : stage_id ,
370+ },
371+ )
372+
373+ logger .debug (
374+ "Start run %d subtasks %r in band %s." ,
375+ len (subtask_ids ),
376+ subtask_ids ,
377+ band ,
378+ )
371379 await execution_ref .run_subtask .batch (* delays , send = False )
372380
373381 async def cancel_subtasks (
0 commit comments