From 163bdbf31beea5eeb28ce583516af57e71a439c0 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Mon, 30 May 2022 17:34:23 +0800 Subject: [PATCH 1/6] FIX_BATCH --- mars/services/scheduling/api/oscar.py | 6 +- .../services/scheduling/supervisor/manager.py | 267 ++++++++++-------- .../scheduling/supervisor/queueing.py | 210 ++++++++------ .../supervisor/tests/test_assigner.py | 51 ++-- .../supervisor/tests/test_globalresource.py | 7 +- .../supervisor/tests/test_manager.py | 62 +++- .../supervisor/tests/test_queue_balance.py | 62 ++-- .../supervisor/tests/test_queueing.py | 13 +- .../services/scheduling/tests/test_service.py | 19 +- mars/services/scheduling/utils.py | 5 +- mars/services/scheduling/worker/execution.py | 35 ++- .../scheduling/worker/tests/test_execution.py | 47 ++- mars/services/task/api/web.py | 7 +- mars/services/task/execution/mars/stage.py | 6 +- 14 files changed, 490 insertions(+), 307 deletions(-) diff --git a/mars/services/scheduling/api/oscar.py b/mars/services/scheduling/api/oscar.py index c39e4f8f9f..523c295c23 100644 --- a/mars/services/scheduling/api/oscar.py +++ b/mars/services/scheduling/api/oscar.py @@ -16,7 +16,7 @@ from .... import oscar as mo from ....lib.aio import alru_cache -from ...subtask import Subtask +from ...subtask import Subtask, SubtaskResult from ..core import SubtaskScheduleSummary from .core import AbstractSchedulingAPI @@ -112,7 +112,7 @@ async def cancel_subtasks( async def finish_subtasks( self, - subtask_ids: List[str], + subtask_results: List[SubtaskResult], bands: List[Tuple] = None, schedule_next: bool = True, ): @@ -129,7 +129,7 @@ async def finish_subtasks( schedule_next whether to schedule succeeding subtasks """ - await self._manager_ref.finish_subtasks(subtask_ids, bands, schedule_next) + await self._manager_ref.finish_subtasks(subtask_results, bands, schedule_next) class MockSchedulingAPI(SchedulingAPI): diff --git a/mars/services/scheduling/supervisor/manager.py b/mars/services/scheduling/supervisor/manager.py index 8c24713bb0..2cdb52916a 100644 --- a/mars/services/scheduling/supervisor/manager.py +++ b/mars/services/scheduling/supervisor/manager.py @@ -46,6 +46,7 @@ class SubtaskScheduleInfo: band_futures: Dict[BandType, asyncio.Future] = field(default_factory=dict) start_time: int = -1 end_time: int = -1 + cancel_pending: bool = False max_reschedules: int = 0 num_reschedules: int = 0 num_speculative_concurrent_run: int = 0 @@ -81,8 +82,10 @@ def __init__( self._subtask_max_reschedules = subtask_max_reschedules self._subtask_cancel_timeout = subtask_cancel_timeout self._speculation_config = speculation_config or {} + self._queueing_ref = None self._global_resource_ref = None + self._submitted_subtask_count = Metrics.counter( "mars.scheduling.submitted_subtask_count", "The count of submitted subtasks to all bands.", @@ -93,13 +96,14 @@ def __init__( "The count of finished subtasks of all bands.", ("session_id", "task_id", "stage_id"), ) - self._canceled_subtask_count = Metrics.counter( + self._cancelled_subtask_count = Metrics.counter( "mars.scheduling.canceled_subtask_count", "The count of canceled subtasks of all bands.", ("session_id", "task_id", "stage_id"), ) + logger.info( - "Created SubtaskManager with subtask_max_reschedules %s, " + "Created SubtaskManagerActor with subtask_max_reschedules %s, " "speculation_config %s", self._subtask_max_reschedules, speculation_config, @@ -123,10 +127,17 @@ async def __post_create__(self): ) await self._speculation_execution_scheduler.start() + async def dump_running(): + while True: + if self._subtask_infos: + logger.warning("RUNNING: %r", list(self._subtask_infos)) + await asyncio.sleep(5) + + asyncio.create_task(dump_running()) + async def __pre_destroy__(self): await self._speculation_execution_scheduler.stop() - @alru_cache async def _get_task_api(self): return await TaskAPI.create(self._session_id, self.address) @@ -171,18 +182,90 @@ async def _get_execution_ref(self, band: BandType): return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=band[0]) + async def _handle_subtask_result( + self, info: SubtaskScheduleInfo, result: SubtaskResult, band: BandType + ): + subtask_id = info.subtask.subtask_id + async with redirect_subtask_errors(self, [info.subtask]): + try: + info.band_futures[band].set_result(result) + if result.error is not None: + raise result.error.with_traceback(result.traceback) + logger.debug("Finished subtask %s with result %s.", subtask_id, result) + except (OSError, MarsError) as ex: + # TODO: We should handle ServerClosed Error. + if ( + info.subtask.retryable + and info.num_reschedules < info.max_reschedules + ): + logger.error( + "Reschedule subtask %s due to %s", + info.subtask.subtask_id, + ex, + ) + info.num_reschedules += 1 + await self._queueing_ref.add_subtasks( + [info.subtask], + [info.subtask.priority or tuple()], + exclude_bands=set(info.band_futures.keys()), + ) + else: + raise ex + except asyncio.CancelledError: + raise + except BaseException as ex: + if ( + info.subtask.retryable + and info.num_reschedules < info.max_reschedules + ): + logger.error( + "Failed to reschedule subtask %s, " + "num_reschedules: %s, max_reschedules: %s, unhandled exception: %s", + info.subtask.subtask_id, + info.num_reschedules, + info.max_reschedules, + ex, + ) + raise ex + finally: + # make sure slot is released before marking tasks as finished + await self._global_resource_ref.release_subtask_resource( + band, + info.subtask.session_id, + info.subtask.subtask_id, + ) + logger.debug( + "Slot released for band %s after subtask %s", + band, + info.subtask.subtask_id, + ) + # We should call submit_subtasks after the resource is released. + # If submit_subtasks runs before release_subtask_resource + # then the rescheduled subtask may not be submitted due to + # no available resource. The mars will hangs. + if info.num_reschedules > 0: + await self._queueing_ref.submit_subtasks.tell() + async def finish_subtasks( self, - subtask_ids: List[str], + subtask_results: List[SubtaskResult], bands: List[BandType] = None, schedule_next: bool = True, ): + subtask_ids = [result.subtask_id for result in subtask_results] logger.debug("Finished subtasks %s.", subtask_ids) band_tasks = defaultdict(lambda: 0) bands = bands or [None] * len(subtask_ids) - for subtask_id, subtask_band in zip(subtask_ids, bands): + for result, subtask_band in zip(subtask_results, bands): + subtask_id = result.subtask_id subtask_info = self._subtask_infos.get(subtask_id, None) + if subtask_info is not None: + if subtask_band is not None: + logger.warning("BEFORE await self._handle_subtask_result(subtask_info, result, subtask_band)") + await self._handle_subtask_result(subtask_info, result, subtask_band) + logger.warning("AFTER await self._handle_subtask_result(subtask_info, result, subtask_band)") + self._finished_subtask_count.record( 1, { @@ -192,14 +275,16 @@ async def finish_subtasks( }, ) self._subtask_summaries[subtask_id] = subtask_info.to_summary( - is_finished=True + is_finished=True, is_cancelled=result.status == SubtaskStatus.cancelled ) subtask_info.end_time = time.time() self._speculation_execution_scheduler.finish_subtask(subtask_info) # Cancel subtask on other bands. aio_task = subtask_info.band_futures.pop(subtask_band, None) if aio_task: + logger.warning("BEFORE await aio_task") await aio_task + logger.warning("AFTER await aio_task") if schedule_next: band_tasks[subtask_band] += 1 if subtask_info.band_futures: @@ -219,15 +304,9 @@ async def finish_subtasks( if schedule_next: for band in subtask_info.band_futures.keys(): band_tasks[band] += 1 - await self._queueing_ref.remove_queued_subtasks(subtask_ids) + # await self._queueing_ref.remove_queued_subtasks(subtask_ids) if band_tasks: - tasks = [] - for band, subtask_count in band_tasks.items(): - task = asyncio.ensure_future( - self._queueing_ref.submit_subtasks.tell(band, subtask_count) - ) - tasks.append(task) - await asyncio.wait(tasks) + await self._queueing_ref.submit_subtasks.tell(dict(band_tasks)) def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask]]: subtasks = [] @@ -238,106 +317,65 @@ def _get_subtasks_by_ids(self, subtask_ids: List[str]) -> List[Optional[Subtask] subtasks.append(None) return subtasks + @mo.extensible async def submit_subtask_to_band(self, subtask_id: str, band: BandType): - if subtask_id not in self._subtask_infos: # pragma: no cover - logger.info( - "Subtask %s is not in added subtasks set, it may be finished or canceled, skip it.", - subtask_id, - ) - return + raise NotImplementedError + + @submit_subtask_to_band.batch + async def batch_submit_subtask_to_band(self, args_list, kwargs_list): + band_to_subtask_ids = defaultdict(list) + res_release_delays = [] + for args, kwargs in zip(args_list, kwargs_list): + subtask_id, band = self.submit_subtask_to_band.bind(*args, **kwargs) + try: + info = self._subtask_infos[subtask_id] + if info.cancel_pending: + res_release_delays.append( + self._global_resource_ref.release_subtask_resource.delay( + band, info.subtask.session_id, info.subtask.subtask_id + ) + ) + continue + except KeyError: # pragma: no cover + logger.info( + "Subtask %s is not in added subtasks set, it may be finished or canceled, skip it.", + subtask_id, + ) + continue + band_to_subtask_ids[band].append(subtask_id) + + if res_release_delays: + await self._global_resource_ref.release_subtask_resource.batch(*res_release_delays) + + for band, subtask_ids in band_to_subtask_ids.items(): + asyncio.create_task(self._submit_subtasks_to_band(band, subtask_ids)) + + async def _submit_subtasks_to_band(self, band: BandType, subtask_ids: List[str]): + execution_ref = await self._get_execution_ref(band) + delays = [] + async with redirect_subtask_errors( - self, self._get_subtasks_by_ids([subtask_id]) + self, self._get_subtasks_by_ids(subtask_ids) ): - try: + for subtask_id in subtask_ids: subtask_info = self._subtask_infos[subtask_id] - execution_ref = await self._get_execution_ref(band) - extra_config = subtask_info.subtask.extra_config - enable_profiling = MARS_ENABLE_PROFILING or ( - extra_config and extra_config.get("enable_profiling") - ) - profiling_context = ( - ProfilingContext(subtask_info.subtask.task_id) - if enable_profiling - else None - ) + subtask = subtask_info.subtask self._submitted_subtask_count.record( 1, { "session_id": self._session_id, - "task_id": subtask_info.subtask.task_id, - "stage_id": subtask_info.subtask.stage_id, + "task_id": subtask.task_id, + "stage_id": subtask.stage_id, }, ) logger.debug("Start run subtask %s in band %s.", subtask_id, band) - with Timer() as timer: - task = asyncio.create_task( - execution_ref.run_subtask.options( - profiling_context=profiling_context - ).send(subtask_info.subtask, band[1], self.address) - ) - subtask_info.band_futures[band] = task - subtask_info.start_time = time.time() - self._speculation_execution_scheduler.add_subtask(subtask_info) - result = yield task - ProfilingData.collect_subtask( - subtask_info.subtask, band, timer.duration - ) - task_api = await self._get_task_api() - logger.debug("Finished subtask %s with result %s.", subtask_id, result) - await task_api.set_subtask_result(result) - except (OSError, MarsError) as ex: - # TODO: We should handle ServerClosed Error. - if ( - subtask_info.subtask.retryable - and subtask_info.num_reschedules < subtask_info.max_reschedules - ): - logger.error( - "Reschedule subtask %s due to %s", - subtask_info.subtask.subtask_id, - ex, - ) - subtask_info.num_reschedules += 1 - await self._queueing_ref.add_subtasks( - [subtask_info.subtask], - [subtask_info.subtask.priority or tuple()], - exclude_bands=set(subtask_info.band_futures.keys()), - ) - else: - raise ex - except asyncio.CancelledError: - raise - except Exception as ex: - if ( - subtask_info.subtask.retryable - and subtask_info.num_reschedules < subtask_info.max_reschedules - ): - logger.error( - "Failed to reschedule subtask %s, " - "num_reschedules: %s, max_reschedules: %s, unhandled exception: %s", - subtask_info.subtask.subtask_id, - subtask_info.num_reschedules, - subtask_info.max_reschedules, - ex, - ) - raise ex - finally: - # make sure slot is released before marking tasks as finished - await self._global_resource_ref.release_subtask_resource( - band, - subtask_info.subtask.session_id, - subtask_info.subtask.subtask_id, + delays.append( + execution_ref.run_subtask.delay(subtask, band[1], self.address) ) - logger.debug( - "Slot released for band %s after subtask %s", - band, - subtask_info.subtask.subtask_id, - ) - # We should call submit_subtasks after the resource is released. - # If submit_subtasks runs before release_subtask_resource - # then the rescheduled subtask may not be submitted due to - # no available resource. The mars will hangs. - if subtask_info.num_reschedules > 0: - await self._queueing_ref.submit_subtasks.tell() + subtask_info.band_futures[band] = asyncio.Future() + subtask_info.start_time = time.time() + self._speculation_execution_scheduler.add_subtask(subtask_info) + await execution_ref.run_subtask.batch(*delays, send=False) async def cancel_subtasks( self, subtask_ids: List[str], kill_timeout: Union[float, int] = None @@ -380,19 +418,20 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks): ) continue - subtask_info = self._subtask_infos[subtask_id] - raw_tasks_to_cancel = list(subtask_info.band_futures.values()) + info = self._subtask_infos[subtask_id] + info.cancel_pending = True + raw_tasks_to_cancel = list(info.band_futures.values()) if not raw_tasks_to_cancel: queued_subtask_ids.append(subtask_id) single_cancel_tasks.append( asyncio.create_task( - cancel_single_task(subtask_info.subtask, [], []) + cancel_single_task(info.subtask, [], []) ) ) else: cancel_tasks = [] - for band in subtask_info.band_futures.keys(): + for band in info.band_futures.keys(): execution_ref = await self._get_execution_ref(band) cancel_tasks.append( asyncio.create_task( @@ -404,7 +443,7 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks): single_cancel_tasks.append( asyncio.create_task( cancel_single_task( - subtask_info.subtask, raw_tasks_to_cancel, cancel_tasks + info.subtask, raw_tasks_to_cancel, cancel_tasks ) ) ) @@ -415,21 +454,21 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks): yield asyncio.wait(single_cancel_tasks) for subtask_id in subtask_ids: - subtask_info = self._subtask_infos.pop(subtask_id, None) - if subtask_info is not None: - self._subtask_summaries[subtask_id] = subtask_info.to_summary( + info = self._subtask_infos.pop(subtask_id, None) + if info is not None: + self._subtask_summaries[subtask_id] = info.to_summary( is_finished=True, is_cancelled=True ) - self._canceled_subtask_count.record( + self._cancelled_subtask_count.record( 1, { "session_id": self._session_id, - "task_id": subtask_info.subtask.task_id, - "stage_id": subtask_info.subtask.stage_id, + "task_id": info.subtask.task_id, + "stage_id": info.subtask.stage_id, }, ) await self._queueing_ref.submit_subtasks.tell() - logger.info("Subtasks %s canceled.", subtask_ids) + logger.info("Subtasks %s cancelled.", subtask_ids) def get_schedule_summaries(self, task_id: Optional[str] = None): if task_id is not None: diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 97c5d5f6b3..0772664463 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -24,6 +24,7 @@ from ....lib.aio import alru_cache from ....metrics import Metrics from ....resource import ZeroResource +from ....typing import BandType from ....utils import dataslots from ...subtask import Subtask from ...task import TaskAPI @@ -48,6 +49,7 @@ class SubtaskQueueingActor(mo.Actor): _stid_to_bands: DefaultDict[str, List[Tuple]] _stid_to_items: Dict[str, HeapItem] _band_queues: DefaultDict[Tuple, List[HeapItem]] + _submit_requests: List[Optional[Dict[BandType, int]]] @classmethod def gen_uid(cls, session_id: str): @@ -61,6 +63,10 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None): # so that we can ensure band queue is busy if the band queue is not empty. self._band_queues = defaultdict(list) + self._submit_requests = [] + self._submit_request_event = asyncio.Event() + self._submit_request_task = None + self._cluster_api = None self._slots_ref = None self._assigner_ref = None @@ -69,7 +75,6 @@ def __init__(self, session_id: str, submit_period: Union[float, int] = None): self._band_watch_task = None self._max_enqueue_id = 0 - self._periodical_submit_task = None self._submit_period = submit_period or _DEFAULT_SUBMIT_PERIOD self._submitted_subtask_number = Metrics.gauge( "mars.band.submitted_subtask_number", @@ -133,23 +138,13 @@ async def watch_bands(): AssignerActor.gen_uid(self._session_id), address=self.address ) - if self._submit_period > 0: - self._periodical_submit_task = self.ref().periodical_submit.tell_delay( - delay=self._submit_period - ) + self._submit_request_task = asyncio.create_task(self._submission_task_func()) async def __pre_destroy__(self): self._band_watch_task.cancel() - if self._periodical_submit_task is not None: # pragma: no branch - self._periodical_submit_task.cancel() - - async def periodical_submit(self): - await self.ref().submit_subtasks.tell() - self._periodical_submit_task = self.ref().periodical_submit.tell_delay( - delay=self._submit_period - ) + if self._submit_request_task is not None: # pragma: no branch + self._submit_request_task.cancel() - @alru_cache async def _get_task_api(self): return await TaskAPI.create(self._session_id, self.address) @@ -180,114 +175,171 @@ async def add_subtasks( self._max_enqueue_id += 1 heapq.heappush(self._band_queues[band], heap_item) logger.debug( - "Subtask %s enqueued to band %s excluded from %s.", + "Subtask %s enqueued to band %s. exclude_bands=%s.", subtask.subtask_id, band, exclude_bands, ) logger.debug("%d subtasks enqueued", len(subtasks)) - async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None): - logger.debug("Submitting subtasks with limit %s", limit) + def submit_subtasks(self, band_to_limit: Dict[BandType, int] = None): + self._submit_requests.append(band_to_limit) + self._submit_request_event.set() + + async def _submission_task_func(self): + while True: + try: + periodical_triggered = False + if not self._submit_requests: # pragma: no branch + try: + if self._submit_period: + await asyncio.wait_for( + self._submit_request_event.wait(), self._submit_period + ) + else: + await self._submit_request_event.wait() + + self._submit_request_event.clear() + except asyncio.TimeoutError: + periodical_triggered = True + + requests = self._submit_requests + self._submit_requests = [] + if not periodical_triggered and not requests: # pragma: no cover + continue + + merged_band_to_limit = dict() + for req in requests: + if req is None: + merged_band_to_limit = None + break + merged_band_to_limit.update(req) + await self._submit_subtask_request(merged_band_to_limit) + except asyncio.CancelledError: + break + + async def _submit_subtask_request(self, band_to_limit: Dict[BandType, int] = None): + if band_to_limit: + logger.debug( + "TMP_QUEUE_PROBE: Submitting subtasks with limits: %r", band_to_limit + ) - if not limit and band not in self._band_to_resource: + if not self._band_to_resource or any( + not limit and band not in self._band_to_resource + for band, limit in band_to_limit or () + ): self._band_to_resource = await self._cluster_api.get_all_bands() - bands = [band] if band is not None else list(self._band_to_resource.keys()) - submit_aio_tasks = [] - manager_ref = await self._get_manager_ref() + if not band_to_limit: + band_to_limit = {band: None for band in self._band_to_resource.keys()} apply_delays = [] submit_items_list = [] submitted_bands = [] - for band in bands: - band_limit = limit or ( - self._band_to_resource[band].num_cpus - or self._band_to_resource[band].num_gpus - ) - task_queue = self._band_queues[band] - submit_items = dict() - while ( - self._ensure_top_item_valid(task_queue) - and len(submit_items) < band_limit - ): - item = heapq.heappop(task_queue) - submit_items[item.subtask.subtask_id] = item - - subtask_ids = list(submit_items) - if not subtask_ids: - continue - - submitted_bands.append(band) - submit_items_list.append(submit_items) - - # Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because - # there is a slot idle. But now we have memory requirements, so the subtask may apply resource - # from supervisor failed. In such cases, those subtasks will never got scheduled. - # TODO We can use `_periodical_submit_task` to submit those subtasks. - subtask_resources = [ - item.subtask.required_resource for item in submit_items.values() - ] - apply_delays.append( - self._slots_ref.apply_subtask_resources.delay( - band, self._session_id, subtask_ids, subtask_resources + def _load_items_to_submit(): + for band, limit in band_to_limit.items(): + band_limit = limit or ( + self._band_to_resource[band].num_cpus + or self._band_to_resource[band].num_gpus ) - ) + task_queue = self._band_queues[band] + submit_items = dict() + while ( + self._ensure_top_item_valid(task_queue) + and len(submit_items) < band_limit + ): + item = heapq.heappop(task_queue) + submit_items[item.subtask.subtask_id] = item + + subtask_ids = list(submit_items) + if not subtask_ids: + continue + + submitted_bands.append(band) + submit_items_list.append(submit_items) + + # Before hbo, when a manager finish a subtask, it will schedule one subtask successfully because + # there is a slot idle. But now we have memory requirements, so the subtask may apply resource + # from supervisor failed. In such cases, those subtasks will never got scheduled. + # TODO We can use `_periodical_submit_task` to submit those subtasks. + subtask_resources = [ + item.subtask.required_resource for item in submit_items.values() + ] + apply_delays.append( + self._slots_ref.apply_subtask_resources.delay( + band, self._session_id, subtask_ids, subtask_resources + ) + ) + + await asyncio.to_thread(_load_items_to_submit) + + logger.debug("TMP_QUEUE_PROBE: Finished picking top subtasks") async with redirect_subtask_errors( self, - [ + ( item.subtask for submit_items in submit_items_list for item in submit_items.values() - ], + ), ): submitted_ids_list = await self._slots_ref.apply_subtask_resources.batch( *apply_delays ) - for band, submit_items, submitted_ids in zip( - submitted_bands, submit_items_list, submitted_ids_list - ): - subtask_ids = list(submit_items) - task_queue = self._band_queues[band] + logger.debug( + "TMP_QUEUE_PROBE: Finished band resource allocation, %d subtasks submitted", + sum(len(ids) for ids in submitted_ids_list), + ) - async with redirect_subtask_errors( - self, [item.subtask for item in submit_items.values()] + manager_ref = await self._get_manager_ref() + submit_delays = [] + + def _gather_submissions(): + for band, submit_items, submitted_ids in zip( + submitted_bands, submit_items_list, submitted_ids_list ): - non_submitted_ids = [k for k in submit_items if k not in submitted_ids] + subtask_ids = list(submit_items) + task_queue = self._band_queues[band] + submitted_id_set = set(submitted_ids) + + non_submitted_ids = [ + k for k in submit_items if k not in submitted_id_set + ] tags = { "session_id": self._session_id, "band": band[0] if band else "", } self._submitted_subtask_number.record(len(submitted_ids), tags) self._unsubmitted_subtask_number.record(len(non_submitted_ids), tags) - if submitted_ids: + + if not submitted_ids: + if non_submitted_ids: + logger.debug("No slots available on band %s", band) + else: for stid in subtask_ids: - if stid not in submitted_ids: + if stid not in submitted_id_set: continue item = submit_items[stid] logger.debug("Submit subtask %r to band %r", item.subtask, band) - submit_aio_tasks.append( - asyncio.create_task( - manager_ref.submit_subtask_to_band.tell( - item.subtask.subtask_id, band - ) + submit_delays.append( + manager_ref.submit_subtask_to_band.delay( + item.subtask.subtask_id, band ) ) - await asyncio.sleep(0) self.remove_queued_subtasks([item.subtask.subtask_id]) - else: - logger.debug("No slots available") - for stid in non_submitted_ids: - # TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that - # other subtasks can be submitted. - heapq.heappush(task_queue, submit_items[stid]) + for stid in non_submitted_ids: + # TODO if subtasks submit failed due to lacking memory/cpu/gpu resources, lower the priority so that + # other subtasks can be submitted. + heapq.heappush(task_queue, submit_items[stid]) + + await asyncio.to_thread(_gather_submissions) - if submit_aio_tasks: - yield asyncio.gather(*submit_aio_tasks) + logger.debug("TMP_QUEUE_PROBE: Start subtask submission in batch") + await manager_ref.submit_subtask_to_band.batch(*submit_delays) + logger.debug("TMP_QUEUE_PROBE: Finished subtask submission") def _ensure_top_item_valid(self, task_queue): """Clean invalid subtask item from the queue to ensure that when the queue is not empty, diff --git a/mars/services/scheduling/supervisor/tests/test_assigner.py b/mars/services/scheduling/supervisor/tests/test_assigner.py index 3766069498..5ba317cdbc 100644 --- a/mars/services/scheduling/supervisor/tests/test_assigner.py +++ b/mars/services/scheduling/supervisor/tests/test_assigner.py @@ -62,32 +62,31 @@ def get_all_bands(self, role=None, statuses=None): class FakeClusterAPI(ClusterAPI): @classmethod async def create(cls, address: str, **kw): - dones, _ = await asyncio.wait( - [ - mo.create_actor( - SupervisorPeerLocatorActor, - "fixed", - address, - uid=SupervisorPeerLocatorActor.default_uid(), - address=address, - ), - mo.create_actor( - MockNodeInfoCollectorActor, - with_gpu=kw.get("with_gpu", False), - uid=NodeInfoCollectorActor.default_uid(), - address=address, - ), - mo.create_actor( - NodeInfoUploaderActor, - NodeRole.WORKER, - interval=kw.get("upload_interval"), - band_to_resource=kw.get("band_to_resource"), - use_gpu=kw.get("use_gpu", False), - uid=NodeInfoUploaderActor.default_uid(), - address=address, - ), - ] - ) + coros = [ + mo.create_actor( + SupervisorPeerLocatorActor, + "fixed", + address, + uid=SupervisorPeerLocatorActor.default_uid(), + address=address, + ), + mo.create_actor( + MockNodeInfoCollectorActor, + with_gpu=kw.get("with_gpu", False), + uid=NodeInfoCollectorActor.default_uid(), + address=address, + ), + mo.create_actor( + NodeInfoUploaderActor, + NodeRole.WORKER, + interval=kw.get("upload_interval"), + band_to_resource=kw.get("band_to_resource"), + use_gpu=kw.get("use_gpu", False), + uid=NodeInfoUploaderActor.default_uid(), + address=address, + ), + ] + dones, _ = await asyncio.wait([asyncio.create_task(coro) for coro in coros]) for task in dones: try: diff --git a/mars/services/scheduling/supervisor/tests/test_globalresource.py b/mars/services/scheduling/supervisor/tests/test_globalresource.py index 6d938ba3e4..bb42b01f3a 100644 --- a/mars/services/scheduling/supervisor/tests/test_globalresource.py +++ b/mars/services/scheduling/supervisor/tests/test_globalresource.py @@ -67,11 +67,12 @@ async def test_global_resource(actor_pool): band, session_id, ["subtask1"], [Resource(num_cpus=1)] ) - wait_coro = global_resource_ref.wait_band_idle(band) - (done, pending) = await asyncio.wait([wait_coro], timeout=0.5) + wait_task = asyncio.create_task(global_resource_ref.wait_band_idle(band)) + (done, pending) = await asyncio.wait([wait_task], timeout=0.5) assert not done await global_resource_ref.release_subtask_resource(band, session_id, "subtask0") - (done, pending) = await asyncio.wait([wait_coro], timeout=0.5) + wait_task = asyncio.create_task(global_resource_ref.wait_band_idle(band)) + (done, pending) = await asyncio.wait([wait_task], timeout=0.5) assert done assert band in await global_resource_ref.get_idle_bands(0) assert ["subtask1"] == await global_resource_ref.apply_subtask_resources( diff --git a/mars/services/scheduling/supervisor/tests/test_manager.py b/mars/services/scheduling/supervisor/tests/test_manager.py index c4ad5c895c..8f5a7c4f30 100644 --- a/mars/services/scheduling/supervisor/tests/test_manager.py +++ b/mars/services/scheduling/supervisor/tests/test_manager.py @@ -14,7 +14,7 @@ import asyncio from collections import defaultdict -from typing import List, Tuple, Set +from typing import List, Dict, Tuple, Set import pytest @@ -22,6 +22,7 @@ from .....typing import BandType from ....cluster import MockClusterAPI from ....subtask import Subtask, SubtaskResult, SubtaskStatus +from ....task import TaskAPI from ....task.supervisor.manager import TaskManagerActor from ...supervisor import ( SubtaskQueueingActor, @@ -35,8 +36,12 @@ class MockTaskManagerActor(mo.Actor): def __init__(self): self._results = dict() - def set_subtask_result(self, result: SubtaskResult): + async def set_subtask_result(self, result: SubtaskResult): self._results[result.subtask_id] = result + manager_ref = await mo.actor_ref( + uid=SubtaskManagerActor.gen_uid(result.session_id), address=self.address + ) + await manager_ref.finish_subtasks([result], result.bands) def get_result(self, subtask_id: str) -> SubtaskResult: return self._results[subtask_id] @@ -59,7 +64,7 @@ def add_subtasks( for subtask, priority in zip(subtasks, priorities): self._subtasks[subtask.subtask_id] = (subtask, priority) - def submit_subtasks(self, band: BandType, limit: int): + def submit_subtasks(self, band_to_limit: Dict[BandType, int] = None): pass def remove_queued_subtasks(self, subtask_ids: List[str]): @@ -78,14 +83,47 @@ def __init__(self): async def set_run_subtask_event(self, subtask_id, event): self._run_subtask_events[subtask_id] = event + @mo.extensible async def run_subtask( self, subtask: Subtask, band_name: str, supervisor_address: str ): self._run_subtask_events[subtask.subtask_id].set() - task = self._subtask_aiotasks[subtask.subtask_id][ - band_name - ] = asyncio.create_task(asyncio.sleep(20)) - return await task + + async def task_fun(): + task_api = await TaskAPI.create(subtask.session_id, supervisor_address) + try: + await asyncio.sleep(20) + except asyncio.CancelledError as ex: + await task_api.set_subtask_result( + SubtaskResult( + subtask_id=subtask.subtask_id, + session_id=subtask.session_id, + task_id=subtask.task_id, + stage_id=subtask.stage_id, + bands=[(self.address, band_name)], + status=SubtaskStatus.cancelled, + progress=1.0, + error=ex, + traceback=ex.__traceback__, + ) + ) + raise + else: + await task_api.set_subtask_result( + SubtaskResult( + subtask_id=subtask.subtask_id, + session_id=subtask.session_id, + task_id=subtask.task_id, + stage_id=subtask.stage_id, + status=SubtaskStatus.succeeded, + bands=[(self.address, band_name)], + progress=1.0, + ) + ) + + self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task( + task_fun() + ) def cancel_subtask(self, subtask_id: str, kill_timeout: int = 5): for task in self._subtask_aiotasks[subtask_id].values(): @@ -93,7 +131,7 @@ def cancel_subtask(self, subtask_id: str, kill_timeout: int = 5): async def wait_subtask(self, subtask_id: str, band_name: str): try: - yield self._subtask_aiotasks[subtask_id][band_name] + await self._subtask_aiotasks[subtask_id][band_name] except asyncio.CancelledError: pass @@ -158,12 +196,12 @@ async def test_subtask_manager(actor_pool): await execution_ref.set_run_subtask_event(subtask1.subtask_id, run_subtask1_event) await execution_ref.set_run_subtask_event(subtask2.subtask_id, run_subtask2_event) - submit1 = asyncio.create_task( + asyncio.create_task( manager_ref.submit_subtask_to_band( subtask1.subtask_id, (pool.external_address, "gpu-0") ) ) - submit2 = asyncio.create_task( + asyncio.create_task( manager_ref.submit_subtask_to_band( subtask2.subtask_id, (pool.external_address, "gpu-1") ) @@ -179,10 +217,6 @@ async def test_subtask_manager(actor_pool): ), timeout=10, ) - with pytest.raises(asyncio.CancelledError): - await submit1 - with pytest.raises(asyncio.CancelledError): - await submit2 assert ( await task_manager_ref.get_result(subtask1.subtask_id) ).status == SubtaskStatus.cancelled diff --git a/mars/services/scheduling/supervisor/tests/test_queue_balance.py b/mars/services/scheduling/supervisor/tests/test_queue_balance.py index f2760b5dac..0a08634ca9 100644 --- a/mars/services/scheduling/supervisor/tests/test_queue_balance.py +++ b/mars/services/scheduling/supervisor/tests/test_queue_balance.py @@ -62,31 +62,30 @@ def get_all_bands(self, role=None, statuses=None): class FakeClusterAPI(ClusterAPI): @classmethod async def create(cls, address: str, **kw): - dones, _ = await asyncio.wait( - [ - mo.create_actor( - SupervisorPeerLocatorActor, - "fixed", - address, - uid=SupervisorPeerLocatorActor.default_uid(), - address=address, - ), - mo.create_actor( - MockNodeInfoCollectorActor, - uid=NodeInfoCollectorActor.default_uid(), - address=address, - ), - mo.create_actor( - NodeInfoUploaderActor, - NodeRole.WORKER, - interval=kw.get("upload_interval"), - band_to_resource=kw.get("band_to_resource"), - use_gpu=kw.get("use_gpu", False), - uid=NodeInfoUploaderActor.default_uid(), - address=address, - ), - ] - ) + coros = [ + mo.create_actor( + SupervisorPeerLocatorActor, + "fixed", + address, + uid=SupervisorPeerLocatorActor.default_uid(), + address=address, + ), + mo.create_actor( + MockNodeInfoCollectorActor, + uid=NodeInfoCollectorActor.default_uid(), + address=address, + ), + mo.create_actor( + NodeInfoUploaderActor, + NodeRole.WORKER, + interval=kw.get("upload_interval"), + band_to_resource=kw.get("band_to_resource"), + use_gpu=kw.get("use_gpu", False), + uid=NodeInfoUploaderActor.default_uid(), + address=address, + ), + ] + dones, _ = await asyncio.wait([asyncio.create_task(coro) for coro in coros]) for task in dones: try: @@ -217,20 +216,23 @@ async def test_subtask_queueing(actor_pool): ) # 9 subtasks on ('address0', 'numa-0') - await queueing_ref.submit_subtasks(band=("address0", "numa-0"), limit=10) + await queueing_ref.submit_subtasks({("address0", "numa-0"): 10}) + await asyncio.sleep(0.2) commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] assert ( len(commited_subtask_ids) == 9 - ), f"commited_subtask_ids {commited_subtask_ids}" + ), f"committed_subtask_ids {commited_subtask_ids}" # 0 subtasks on ('address1', 'numa-0') - await queueing_ref.submit_subtasks(band=("address1", "numa-0"), limit=10) + await queueing_ref.submit_subtasks({("address1", "numa-0"): 10}) + await asyncio.sleep(0.2) commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] assert ( len(commited_subtask_ids) == 9 - ), f"commited_subtask_ids {commited_subtask_ids}" + ), f"committed_subtask_ids {commited_subtask_ids}" # 9 subtasks on ('address2', 'numa-0') - await queueing_ref.submit_subtasks(band=("address2", "numa-0"), limit=10) + await queueing_ref.submit_subtasks({("address2", "numa-0"): 10}) + await asyncio.sleep(0.2) submitted_subtask_ids = await manager_ref.dump_data() assert sum(len(v) for v in submitted_subtask_ids.values()) == 18 diff --git a/mars/services/scheduling/supervisor/tests/test_queueing.py b/mars/services/scheduling/supervisor/tests/test_queueing.py index 21bcd2d1ac..38d6fb7921 100644 --- a/mars/services/scheduling/supervisor/tests/test_queueing.py +++ b/mars/services/scheduling/supervisor/tests/test_queueing.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import pytest from typing import Tuple, List @@ -57,16 +58,20 @@ def assign_subtasks( return [(self.address, "numa-0")] * len(subtasks) -class MockSubtaskManagerActor(mo.Actor): +class MockSubtaskManagerActor(mo.StatelessActor): def __init__(self): self._subtask_ids, self._bands = [], [] + self._event = asyncio.Event() @mo.extensible def submit_subtask_to_band(self, subtask_id: str, band: Tuple): self._subtask_ids.append(subtask_id) self._bands.append(band) + self._event.set() - def dump_data(self): + async def dump_data(self): + await asyncio.wait_for(self._event.wait(), timeout=10) + self._event.clear() return self._subtask_ids, self._bands @@ -123,7 +128,7 @@ async def test_subtask_queueing(actor_pool): assert await queueing_ref.all_bands_busy() await queueing_ref.submit_subtasks() # queue: [2 1 0] - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() + commited_subtask_ids, _committed_bands = await manager_ref.dump_data() assert commited_subtask_ids == ["4", "3"] await queueing_ref.remove_queued_subtasks(["1"]) @@ -135,6 +140,6 @@ async def test_subtask_queueing(actor_pool): # queue: [0(3) 2] await queueing_ref.submit_subtasks() # queue: [] - commited_subtasks, _commited_bands = await manager_ref.dump_data() + commited_subtasks, _committed_bands = await manager_ref.dump_data() assert commited_subtasks == ["4", "3", "0", "2"] assert not await queueing_ref.all_bands_busy() diff --git a/mars/services/scheduling/tests/test_service.py b/mars/services/scheduling/tests/test_service.py index 985a1ea821..1b83d104bb 100644 --- a/mars/services/scheduling/tests/test_service.py +++ b/mars/services/scheduling/tests/test_service.py @@ -19,7 +19,6 @@ import numpy as np import pytest -from ..api.web import WebSchedulingAPI from .... import oscar as mo from .... import remote as mr from .... import tensor as mt @@ -33,6 +32,7 @@ from ...task.supervisor.manager import TaskManagerActor from ...web import WebActor from .. import SchedulingAPI +from ..api.web import WebSchedulingAPI from ..supervisor import GlobalResourceManagerActor @@ -42,11 +42,17 @@ def __init__(self, *args, **kwargs): self._events = defaultdict(list) self._results = dict() - def set_subtask_result(self, subtask_result: SubtaskResult): + async def set_subtask_result(self, subtask_result: SubtaskResult): + scheduling_api = await SchedulingAPI.create( + subtask_result.session_id, self.address + ) self._results[subtask_result.subtask_id] = subtask_result for event in self._events[subtask_result.subtask_id]: event.set() self._events.pop(subtask_result.subtask_id, None) + await scheduling_api.finish_subtasks( + [subtask_result], subtask_result.bands + ) def _return_result(self, subtask_id: str): result = self._results[subtask_id] @@ -184,7 +190,6 @@ async def test_schedule_success(actor_pools): subtask.expect_bands = [(worker_pool.external_address, "numa-0")] await scheduling_api.add_subtasks([subtask], [(0,)]) await task_manager_ref.wait_subtask_result(subtask.subtask_id) - await scheduling_api.finish_subtasks([subtask.subtask_id]) result_key = next(subtask.chunk_graph.iter_indep(reverse=True)).key result = await storage_api.get(result_key) @@ -220,17 +225,17 @@ def _remote_fun(secs): async def _waiter_fun(subtask_id): await task_manager_ref.wait_subtask_result(subtask_id) - await scheduling_api.finish_subtasks([subtask_id]) finish_ids.append(subtask_id) finish_time.append(time.time()) subtasks = [] wait_tasks = [] + band = (worker_pool.external_address, "numa-0") for task_id in range(6): a = mr.spawn(_remote_fun, args=(0.5 + 0.01 * task_id,)) subtask = _gen_subtask(a, session_id) subtask.subtask_id = f"test_schedule_queue_subtask_{task_id}" - subtask.expect_bands = [(worker_pool.external_address, "numa-0")] + subtask.expect_bands = [band] subtask.priority = (4 - task_id,) wait_tasks.append(asyncio.create_task(_waiter_fun(subtask.subtask_id))) subtasks.append(subtask) @@ -291,15 +296,15 @@ def _remote_fun(secs): async def _waiter_fun(subtask_id): await task_manager_ref.wait_subtask_result(subtask_id) - await scheduling_api.finish_subtasks([subtask_id]) subtasks = [] wait_tasks = [] + band = (worker_pool.external_address, "numa-0") for task_id in range(6): a = mr.spawn(_remote_fun, args=(1 - 0.01 * task_id,)) subtask = _gen_subtask(a, session_id) subtask.subtask_id = f"test_schedule_queue_subtask_{task_id}" - subtask.expect_bands = [(worker_pool.external_address, "numa-0")] + subtask.expect_bands = [band] subtask.priority = (4 - task_id,) wait_tasks.append(asyncio.create_task(_waiter_fun(subtask.subtask_id))) subtasks.append(subtask) diff --git a/mars/services/scheduling/utils.py b/mars/services/scheduling/utils.py index f8325697ec..be6d8fcb51 100644 --- a/mars/services/scheduling/utils.py +++ b/mars/services/scheduling/utils.py @@ -15,10 +15,11 @@ import asyncio import contextlib import sys +from typing import Iterable from ... import oscar as mo from ...lib.aio import alru_cache -from ..subtask import SubtaskResult, SubtaskStatus +from ..subtask import Subtask, SubtaskResult, SubtaskStatus from ..task import TaskAPI @@ -28,7 +29,7 @@ async def _get_task_api(actor: mo.Actor): @contextlib.asynccontextmanager -async def redirect_subtask_errors(actor: mo.Actor, subtasks): +async def redirect_subtask_errors(actor: mo.Actor, subtasks: Iterable[Subtask]): try: yield except: # noqa: E722 # pylint: disable=bare-except diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index f07edc44b5..0140f60eee 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -35,6 +35,7 @@ from ...meta import MetaAPI from ...storage import StorageAPI from ...subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus +from ...task import TaskAPI from .workerslot import BandSlotManagerActor from .quota import QuotaActor @@ -367,6 +368,7 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): task_id=subtask.task_id, stage_id=subtask.stage_id, status=SubtaskStatus.pending, + bands=[(self.address, band_name)] ) try: logger.debug("Preparing data for subtask %s", subtask.subtask_id) @@ -408,6 +410,11 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): finally: # pop the subtask info at the end is to cancel the job. self._subtask_info.pop(subtask.subtask_id, None) + + task_api = await TaskAPI.create( + subtask.session_id, subtask_info.supervisor_address + ) + await task_api.set_subtask_result(subtask_info.result) return subtask_info.result async def _retry_run_subtask( @@ -515,15 +522,17 @@ async def _run_subtask_once(): e, wrap_name="_UnretryableException", message=message ) + @mo.extensible async def run_subtask( self, subtask: Subtask, band_name: str, supervisor_address: str ): - if subtask.subtask_id in self._subtask_info: # pragma: no cover - raise Exception( - f"Subtask {subtask.subtask_id} is already running on this band[{self.address}]." - ) + subtask_id = subtask.subtask_id + assert ( + subtask_id not in self._subtask_info + ), f"Subtask {subtask_id} is already running on this band[{self.address}]." + logger.debug( - "Start to schedule subtask %s on %s.", subtask.subtask_id, self.address + "Start to schedule subtask %s on %s.", subtask_id, self.address ) self._submitted_subtask_count.record(1, {"band": self.address}) with mo.debug.no_message_trace(): @@ -541,14 +550,18 @@ async def run_subtask( if subtask_max_retries is None: subtask_max_retries = self._subtask_max_retries - self._subtask_info[subtask.subtask_id] = SubtaskExecutionInfo( + self._subtask_info[subtask_id] = SubtaskExecutionInfo( task, band_name, supervisor_address, max_retries=subtask_max_retries ) - result = await task - self._subtask_info.pop(subtask.subtask_id, None) - self._finished_subtask_count.record(1, {"band": self.address}) - logger.debug("Subtask %s finished with result %s", subtask.subtask_id, result) - return result + + def _finalize_subtask(fut: asyncio.Future): + res = fut.result() + + self._subtask_info.pop(subtask_id, None) + self._finished_subtask_count.record(1, {"band": self.address}) + logger.debug("Subtask %s finished with result %s", subtask_id, res) + + task.add_done_callback(_finalize_subtask) async def cancel_subtask(self, subtask_id: str, kill_timeout: Optional[int] = 5): try: diff --git a/mars/services/scheduling/worker/tests/test_execution.py b/mars/services/scheduling/worker/tests/test_execution.py index abeb50ba7f..2eec005c88 100644 --- a/mars/services/scheduling/worker/tests/test_execution.py +++ b/mars/services/scheduling/worker/tests/test_execution.py @@ -44,7 +44,7 @@ from ....session import MockSessionAPI from ....storage import MockStorageAPI from ....storage.handler import StorageHandlerActor -from ....subtask import MockSubtaskAPI, Subtask, SubtaskStatus +from ....subtask import MockSubtaskAPI, Subtask, SubtaskStatus, SubtaskResult from ....task.supervisor.manager import TaskManagerActor from ....mutable import MockMutableAPI from ...supervisor import GlobalResourceManagerActor @@ -136,13 +136,23 @@ async def update_subtask_resources( class MockTaskManager(mo.Actor): def __init__(self): - self._results = [] + self._results = dict() + self._subtask_futures = dict() - def set_subtask_result(self, result): - self._results.append(result) + def set_subtask_result(self, result: SubtaskResult): + self._results[result.subtask_id] = result + if result.subtask_id in self._subtask_futures: + self._subtask_futures[result.subtask_id].set_result(result) + + async def wait_subtask(self, subtask_id: str): + if subtask_id in self._results: + return self._results[subtask_id] + if subtask_id not in self._subtask_futures: + self._subtask_futures[subtask_id] = asyncio.Future() + return self._subtask_futures[subtask_id] def get_results(self): - return self._results + return list(self._results.values()) @pytest.fixture @@ -229,6 +239,9 @@ async def actor_pool(request): @pytest.mark.parametrize("actor_pool", [(1, True)], indirect=True) async def test_execute_tensor(actor_pool): pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref = actor_pool + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) data1 = np.random.rand(10, 10) data2 = np.random.rand(10, 10) @@ -268,6 +281,7 @@ async def test_execute_tensor(actor_pool): subtask = Subtask("test_subtask", session_id=session_id, chunk_graph=chunk_graph) await execution_ref.run_subtask(subtask, "numa-0", pool.external_address) + await task_manager_ref.wait_subtask(subtask.subtask_id) # check if results are correct result = await storage_api.get(result_chunk.key) @@ -306,6 +320,9 @@ async def test_execute_with_cancel(actor_pool, cancel_phase): pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref = actor_pool delay_fetch_event = asyncio.Event() delay_wait_event = asyncio.Event() + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) # config for different phases ref_to_delay = None @@ -361,7 +378,7 @@ def delay_fun(delay, _inp1): subtask = Subtask( f"test_subtask_{uuid.uuid4()}", session_id=session_id, chunk_graph=chunk_graph ) - aiotask = asyncio.create_task( + asyncio.create_task( execution_ref.run_subtask(subtask, "numa-0", pool.external_address) ) if ref_to_delay: @@ -375,7 +392,9 @@ def delay_fun(delay, _inp1): execution_ref.cancel_subtask(subtask.subtask_id, kill_timeout=1), timeout=30, ) - r = await asyncio.wait_for(aiotask, timeout=30) + r = await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) assert r.status == SubtaskStatus.cancelled assert timer.duration < 15 @@ -403,6 +422,9 @@ def delay_fun(delay, _inp1): @pytest.mark.parametrize("actor_pool", [(1, True)], indirect=True) async def test_execute_with_pure_deps(actor_pool): pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref = actor_pool + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) dep = TensorFetch(key="input1", dtype=np.dtype(int)).new_chunk([]) @@ -424,6 +446,9 @@ def main_fun(): ) # subtask shall run well without data of `dep` available await execution_ref.run_subtask(subtask, "numa-0", pool.external_address) + await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) res = await storage_api.get(remote_result.key) assert res == session_id @@ -476,6 +501,9 @@ async def test_cancel_without_kill(actor_pool): executed_file = os.path.join( tempfile.gettempdir(), f"mars_test_cancel_without_kill_{os.getpid()}.tmp" ) + task_manager_ref = await mo.actor_ref( + TaskManagerActor.gen_uid(session_id), address=pool.external_address + ) def delay_fun(delay): import mars @@ -499,7 +527,7 @@ def check_fun(): subtask = Subtask( f"test_subtask_{uuid.uuid4()}", session_id=session_id, chunk_graph=chunk_graph ) - aiotask = asyncio.create_task( + asyncio.create_task( execution_ref.run_subtask(subtask, "numa-0", pool.external_address) ) await asyncio.sleep(0.5) @@ -508,7 +536,7 @@ def check_fun(): execution_ref.cancel_subtask(subtask.subtask_id, kill_timeout=1), timeout=30, ) - r = await asyncio.wait_for(aiotask, timeout=30) + r = await asyncio.wait_for(task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30) assert r.status == SubtaskStatus.cancelled remote_result = RemoteFunction( @@ -523,6 +551,7 @@ def check_fun(): await asyncio.wait_for( execution_ref.run_subtask(subtask, "numa-0", pool.external_address), timeout=30 ) + await asyncio.wait_for(task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30) # check if slots not killed (or slot assignment may be cancelled) if os.path.exists(executed_file): diff --git a/mars/services/task/api/web.py b/mars/services/task/api/web.py index 8d498ca4c5..0afb5edf04 100644 --- a/mars/services/task/api/web.py +++ b/mars/services/task/api/web.py @@ -15,6 +15,7 @@ import asyncio import base64 import json +import logging from typing import Callable, List, Optional, Union from ....core import TileableGraph, Tileable @@ -253,7 +254,11 @@ async def get_task_progress(self, task_id: str) -> float: path = f"{self._address}/api/session/{self._session_id}/task/{task_id}" params = dict(action="progress") res = await self._request_url("GET", path, params=params) - return float(res.body.decode()) + try: + return float(res.body.decode()) + except ValueError: + logging.exception("Failed to handle content %r", res.body) + raise async def get_last_idle_time(self) -> Union[float, None]: path = f"{self._address}/api/session/{self._session_id}/task" diff --git a/mars/services/task/execution/mars/stage.py b/mars/services/task/execution/mars/stage.py index a5ab47ca27..f216a91243 100644 --- a/mars/services/task/execution/mars/stage.py +++ b/mars/services/task/execution/mars/stage.py @@ -149,7 +149,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) if all_done or error_or_cancelled: # tell scheduling to finish subtasks await self._scheduling_api.finish_subtasks( - [result.subtask_id], bands=[band], schedule_next=not error_or_cancelled + [result], bands=[band], schedule_next=not error_or_cancelled ) if self.result.status != TaskStatus.terminated: self.result = TaskResult( @@ -218,9 +218,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) # all predecessors finished to_schedule_subtasks.append(succ_subtask) await self._schedule_subtasks(to_schedule_subtasks) - await self._scheduling_api.finish_subtasks( - [result.subtask_id], bands=[band] - ) + await self._scheduling_api.finish_subtasks([result], bands=[band]) async def run(self): if len(self.subtask_graph) == 0: From 1bbd186db365ed324251712af0facf9aad5b4859 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Wed, 1 Jun 2022 16:58:13 +0800 Subject: [PATCH 2/6] Fix tests --- mars/deploy/oscar/tests/test_local.py | 1 + mars/services/scheduling/api/oscar.py | 18 ++- .../services/scheduling/supervisor/manager.py | 103 ++++++-------- .../scheduling/supervisor/queueing.py | 14 +- .../supervisor/tests/test_manager.py | 42 +++--- .../services/scheduling/tests/test_service.py | 4 +- mars/services/scheduling/utils.py | 7 +- mars/services/scheduling/worker/execution.py | 129 ++++++++++++------ .../scheduling/worker/tests/test_execution.py | 8 +- mars/services/subtask/core.py | 3 +- mars/services/task/execution/mars/stage.py | 3 +- 11 files changed, 178 insertions(+), 154 deletions(-) diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index 27c71c9b11..e2092654cf 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -185,6 +185,7 @@ def _wrap_original_deploy_band_resources(*args, **kwargs): @pytest.mark.asyncio +@pytest.mark.skipif(vineyard is None, reason="vineyard not installed") async def test_vineyard_operators(create_cluster): param = create_cluster[1] if param != "vineyard": diff --git a/mars/services/scheduling/api/oscar.py b/mars/services/scheduling/api/oscar.py index 523c295c23..683cfe468f 100644 --- a/mars/services/scheduling/api/oscar.py +++ b/mars/services/scheduling/api/oscar.py @@ -96,7 +96,10 @@ async def update_subtask_priority(self, args_list, kwargs_list): ) async def cancel_subtasks( - self, subtask_ids: List[str], kill_timeout: Union[float, int] = None + self, + subtask_ids: List[str], + kill_timeout: Union[float, int] = None, + wait: bool = False, ): """ Cancel pending and running subtasks. @@ -108,7 +111,14 @@ async def cancel_subtasks( kill_timeout timeout seconds to kill actor process forcibly """ - await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout) + if wait: + await self._manager_ref.cancel_subtasks( + subtask_ids, kill_timeout=kill_timeout + ) + else: + await self._manager_ref.cancel_subtasks.tell( + subtask_ids, kill_timeout=kill_timeout + ) async def finish_subtasks( self, @@ -122,8 +132,8 @@ async def finish_subtasks( Parameters ---------- - subtask_ids - ids of subtasks to mark as finished + subtask_results + results of subtasks, must in finished states bands bands of subtasks to mark as finished schedule_next diff --git a/mars/services/scheduling/supervisor/manager.py b/mars/services/scheduling/supervisor/manager.py index 2cdb52916a..8baa941846 100644 --- a/mars/services/scheduling/supervisor/manager.py +++ b/mars/services/scheduling/supervisor/manager.py @@ -22,11 +22,9 @@ from .... import oscar as mo from ....lib.aio import alru_cache from ....metrics import Metrics -from ....oscar.backends.context import ProfilingContext from ....oscar.errors import MarsError -from ....oscar.profiling import ProfilingData, MARS_ENABLE_PROFILING from ....typing import BandType -from ....utils import dataslots, Timer +from ....utils import dataslots from ...subtask import Subtask, SubtaskResult, SubtaskStatus from ...task import TaskAPI from ..core import SubtaskScheduleSummary @@ -127,14 +125,6 @@ async def __post_create__(self): ) await self._speculation_execution_scheduler.start() - async def dump_running(): - while True: - if self._subtask_infos: - logger.warning("RUNNING: %r", list(self._subtask_infos)) - await asyncio.sleep(5) - - asyncio.create_task(dump_running()) - async def __pre_destroy__(self): await self._speculation_execution_scheduler.stop() @@ -186,7 +176,7 @@ async def _handle_subtask_result( self, info: SubtaskScheduleInfo, result: SubtaskResult, band: BandType ): subtask_id = info.subtask.subtask_id - async with redirect_subtask_errors(self, [info.subtask]): + async with redirect_subtask_errors(self, [info.subtask], reraise=False): try: info.band_futures[band].set_result(result) if result.error is not None: @@ -262,9 +252,9 @@ async def finish_subtasks( if subtask_info is not None: if subtask_band is not None: - logger.warning("BEFORE await self._handle_subtask_result(subtask_info, result, subtask_band)") - await self._handle_subtask_result(subtask_info, result, subtask_band) - logger.warning("AFTER await self._handle_subtask_result(subtask_info, result, subtask_band)") + await self._handle_subtask_result( + subtask_info, result, subtask_band + ) self._finished_subtask_count.record( 1, @@ -275,16 +265,15 @@ async def finish_subtasks( }, ) self._subtask_summaries[subtask_id] = subtask_info.to_summary( - is_finished=True, is_cancelled=result.status == SubtaskStatus.cancelled + is_finished=True, + is_cancelled=result.status == SubtaskStatus.cancelled, ) subtask_info.end_time = time.time() self._speculation_execution_scheduler.finish_subtask(subtask_info) # Cancel subtask on other bands. aio_task = subtask_info.band_futures.pop(subtask_band, None) if aio_task: - logger.warning("BEFORE await aio_task") await aio_task - logger.warning("AFTER await aio_task") if schedule_next: band_tasks[subtask_band] += 1 if subtask_info.band_futures: @@ -304,7 +293,6 @@ async def finish_subtasks( if schedule_next: for band in subtask_info.band_futures.keys(): band_tasks[band] += 1 - # await self._queueing_ref.remove_queued_subtasks(subtask_ids) if band_tasks: await self._queueing_ref.submit_subtasks.tell(dict(band_tasks)) @@ -345,7 +333,9 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list): band_to_subtask_ids[band].append(subtask_id) if res_release_delays: - await self._global_resource_ref.release_subtask_resource.batch(*res_release_delays) + await self._global_resource_ref.release_subtask_resource.batch( + *res_release_delays + ) for band, subtask_ids in band_to_subtask_ids.items(): asyncio.create_task(self._submit_subtasks_to_band(band, subtask_ids)) @@ -386,29 +376,22 @@ async def cancel_subtasks( subtask_ids, kill_timeout, ) - queued_subtask_ids = [] - single_cancel_tasks = [] task_api = await self._get_task_api() - async def cancel_single_task(subtask, raw_tasks, cancel_tasks): - if cancel_tasks: - await asyncio.wait(cancel_tasks) - if raw_tasks: - dones, _ = await asyncio.wait(raw_tasks) - else: - dones = [] - if not dones or all(fut.cancelled() for fut in dones): - await task_api.set_subtask_result( - SubtaskResult( - subtask_id=subtask.subtask_id, - session_id=subtask.session_id, - task_id=subtask.task_id, - stage_id=subtask.stage_id, - status=SubtaskStatus.cancelled, - ) - ) + async def cancel_task_in_band(band): + cancel_delays = band_to_cancel_delays.get(band) or [] + execution_ref = await self._get_execution_ref(band) + if cancel_delays: + await execution_ref.cancel_subtask.batch(*cancel_delays) + band_futures = band_to_futures.get(band) + if band_futures: + await asyncio.wait(band_futures) + queued_subtask_ids = [] + cancel_tasks = [] + band_to_cancel_delays = defaultdict(list) + band_to_futures = defaultdict(list) for subtask_id in subtask_ids: if subtask_id not in self._subtask_infos: # subtask may already finished or not submitted at all @@ -423,35 +406,33 @@ async def cancel_single_task(subtask, raw_tasks, cancel_tasks): raw_tasks_to_cancel = list(info.band_futures.values()) if not raw_tasks_to_cancel: - queued_subtask_ids.append(subtask_id) - single_cancel_tasks.append( - asyncio.create_task( - cancel_single_task(info.subtask, [], []) - ) + # not submitted yet: mark subtasks as cancelled + result = SubtaskResult( + subtask_id=info.subtask.subtask_id, + session_id=info.subtask.session_id, + task_id=info.subtask.task_id, + stage_id=info.subtask.stage_id, + status=SubtaskStatus.cancelled, ) + cancel_tasks.append(task_api.set_subtask_result(result)) + queued_subtask_ids.append(subtask_id) else: - cancel_tasks = [] - for band in info.band_futures.keys(): + for band, future in info.band_futures.items(): execution_ref = await self._get_execution_ref(band) - cancel_tasks.append( - asyncio.create_task( - execution_ref.cancel_subtask( - subtask_id, kill_timeout=kill_timeout - ) - ) + band_to_cancel_delays[band].append( + execution_ref.cancel_subtask.delay(subtask_id, kill_timeout) ) - single_cancel_tasks.append( - asyncio.create_task( - cancel_single_task( - info.subtask, raw_tasks_to_cancel, cancel_tasks - ) - ) - ) + band_to_futures[band].append(future) + + for band in band_to_futures: + cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band))) + if queued_subtask_ids: # Don't use `finish_subtasks` because it may remove queued await self._queueing_ref.remove_queued_subtasks(queued_subtask_ids) - if single_cancel_tasks: - yield asyncio.wait(single_cancel_tasks) + + if cancel_tasks: + yield asyncio.gather(*cancel_tasks) for subtask_id in subtask_ids: info = self._subtask_infos.pop(subtask_id, None) diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 0772664463..65c10e8777 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -220,9 +220,7 @@ async def _submission_task_func(self): async def _submit_subtask_request(self, band_to_limit: Dict[BandType, int] = None): if band_to_limit: - logger.debug( - "TMP_QUEUE_PROBE: Submitting subtasks with limits: %r", band_to_limit - ) + logger.debug("Submitting subtasks with limits: %r", band_to_limit) if not self._band_to_resource or any( not limit and band not in self._band_to_resource @@ -274,8 +272,6 @@ def _load_items_to_submit(): await asyncio.to_thread(_load_items_to_submit) - logger.debug("TMP_QUEUE_PROBE: Finished picking top subtasks") - async with redirect_subtask_errors( self, ( @@ -288,11 +284,6 @@ def _load_items_to_submit(): *apply_delays ) - logger.debug( - "TMP_QUEUE_PROBE: Finished band resource allocation, %d subtasks submitted", - sum(len(ids) for ids in submitted_ids_list), - ) - manager_ref = await self._get_manager_ref() submit_delays = [] @@ -336,10 +327,7 @@ def _gather_submissions(): heapq.heappush(task_queue, submit_items[stid]) await asyncio.to_thread(_gather_submissions) - - logger.debug("TMP_QUEUE_PROBE: Start subtask submission in batch") await manager_ref.submit_subtask_to_band.batch(*submit_delays) - logger.debug("TMP_QUEUE_PROBE: Finished subtask submission") def _ensure_top_item_valid(self, task_queue): """Clean invalid subtask item from the queue to ensure that when the queue is not empty, diff --git a/mars/services/scheduling/supervisor/tests/test_manager.py b/mars/services/scheduling/supervisor/tests/test_manager.py index 8f5a7c4f30..2fe6cc7d38 100644 --- a/mars/services/scheduling/supervisor/tests/test_manager.py +++ b/mars/services/scheduling/supervisor/tests/test_manager.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import time from collections import defaultdict from typing import List, Dict, Tuple, Set @@ -91,40 +92,33 @@ async def run_subtask( async def task_fun(): task_api = await TaskAPI.create(subtask.session_id, supervisor_address) + result = SubtaskResult( + subtask_id=subtask.subtask_id, + session_id=subtask.session_id, + task_id=subtask.task_id, + stage_id=subtask.stage_id, + bands=[(self.address, band_name)], + progress=1.0, + execution_start_time=time.time(), + ) try: await asyncio.sleep(20) except asyncio.CancelledError as ex: - await task_api.set_subtask_result( - SubtaskResult( - subtask_id=subtask.subtask_id, - session_id=subtask.session_id, - task_id=subtask.task_id, - stage_id=subtask.stage_id, - bands=[(self.address, band_name)], - status=SubtaskStatus.cancelled, - progress=1.0, - error=ex, - traceback=ex.__traceback__, - ) - ) + result.status = SubtaskStatus.cancelled + result.error = ex + result.traceback = ex.__traceback__ + await task_api.set_subtask_result(result) raise else: - await task_api.set_subtask_result( - SubtaskResult( - subtask_id=subtask.subtask_id, - session_id=subtask.session_id, - task_id=subtask.task_id, - stage_id=subtask.stage_id, - status=SubtaskStatus.succeeded, - bands=[(self.address, band_name)], - progress=1.0, - ) - ) + result.status = SubtaskStatus.succeeded + result.execution_end_time = time.time() + await task_api.set_subtask_result(result) self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task( task_fun() ) + @mo.extensible def cancel_subtask(self, subtask_id: str, kill_timeout: int = 5): for task in self._subtask_aiotasks[subtask_id].values(): task.cancel() diff --git a/mars/services/scheduling/tests/test_service.py b/mars/services/scheduling/tests/test_service.py index 1b83d104bb..918bc8f906 100644 --- a/mars/services/scheduling/tests/test_service.py +++ b/mars/services/scheduling/tests/test_service.py @@ -50,9 +50,7 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): for event in self._events[subtask_result.subtask_id]: event.set() self._events.pop(subtask_result.subtask_id, None) - await scheduling_api.finish_subtasks( - [subtask_result], subtask_result.bands - ) + await scheduling_api.finish_subtasks([subtask_result], subtask_result.bands) def _return_result(self, subtask_id: str): result = self._results[subtask_id] diff --git a/mars/services/scheduling/utils.py b/mars/services/scheduling/utils.py index be6d8fcb51..5d1dfb3116 100644 --- a/mars/services/scheduling/utils.py +++ b/mars/services/scheduling/utils.py @@ -29,7 +29,9 @@ async def _get_task_api(actor: mo.Actor): @contextlib.asynccontextmanager -async def redirect_subtask_errors(actor: mo.Actor, subtasks: Iterable[Subtask]): +async def redirect_subtask_errors( + actor: mo.Actor, subtasks: Iterable[Subtask], reraise: bool = True +): try: yield except: # noqa: E722 # pylint: disable=bare-except @@ -60,4 +62,5 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks: Iterable[Subtask]): ) tasks = [asyncio.ensure_future(coro) for coro in coros] await asyncio.wait(tasks) - raise + if reraise: + raise diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index 0140f60eee..baa823deeb 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -18,6 +18,7 @@ import operator import pprint import sys +import time from collections import defaultdict from dataclasses import dataclass, field from typing import Dict, List, Optional @@ -36,8 +37,8 @@ from ...storage import StorageAPI from ...subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus from ...task import TaskAPI -from .workerslot import BandSlotManagerActor from .quota import QuotaActor +from .workerslot import BandSlotManagerActor logger = logging.getLogger(__name__) @@ -109,7 +110,7 @@ async def _retry_run( def _fill_subtask_result_with_exception( - subtask: Subtask, subtask_info: SubtaskExecutionInfo + subtask: Subtask, band_name: str, result: SubtaskResult ): _, exc, tb = sys.exc_info() if isinstance(exc, ExecutionError): @@ -120,9 +121,9 @@ def _fill_subtask_result_with_exception( if isinstance(exc, asyncio.CancelledError): status = SubtaskStatus.cancelled logger.exception( - "Cancel run subtask %s on band %s", + "Cancel subtask %s on band %s", subtask.subtask_id, - subtask_info.band_name, + band_name, exc_info=exc_info, ) else: @@ -130,13 +131,13 @@ def _fill_subtask_result_with_exception( logger.exception( "Failed to run subtask %s on band %s", subtask.subtask_id, - subtask_info.band_name, + band_name, exc_info=exc_info, ) - subtask_info.result.status = status - subtask_info.result.progress = 1.0 - subtask_info.result.error = exc - subtask_info.result.traceback = tb + result.status = status + result.progress = 1.0 + result.error = exc + result.traceback = tb class SubtaskExecutionActor(mo.StatelessActor): @@ -368,7 +369,8 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): task_id=subtask.task_id, stage_id=subtask.stage_id, status=SubtaskStatus.pending, - bands=[(self.address, band_name)] + execution_start_time=time.time(), + bands=[(self.address, band_name)], ) try: logger.debug("Preparing data for subtask %s", subtask.subtask_id) @@ -399,14 +401,16 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): subtask.session_id, band_name, remote_mapper_keys ) except: # noqa: E722 # pylint: disable=bare-except - _fill_subtask_result_with_exception(subtask, subtask_info) + _fill_subtask_result_with_exception(subtask, band_name, subtask_info.result) finally: # make sure new slot usages are uploaded in time try: slot_manager_ref = await self._get_slot_manager_ref(band_name) await slot_manager_ref.upload_slot_usages(periodical=False) except: # noqa: E722 # pylint: disable=bare-except - _fill_subtask_result_with_exception(subtask, subtask_info) + _fill_subtask_result_with_exception( + subtask, band_name, subtask_info.result + ) finally: # pop the subtask info at the end is to cancel the job. self._subtask_info.pop(subtask.subtask_id, None) @@ -531,14 +535,37 @@ async def run_subtask( subtask_id not in self._subtask_info ), f"Subtask {subtask_id} is already running on this band[{self.address}]." - logger.debug( - "Start to schedule subtask %s on %s.", subtask_id, self.address - ) + logger.debug("Start to schedule subtask %s on %s.", subtask_id, self.address) self._submitted_subtask_count.record(1, {"band": self.address}) + + async def subtask_caller(): + try: + res = await self.ref().internal_run_subtask(subtask, band_name) + except: # noqa: E722 # pylint: disable=bare-except + logger.error( + "Unexpected error occurred when running subtask %s", + subtask.subtask_id, + ) + res = SubtaskResult( + subtask_id=subtask.subtask_id, + session_id=subtask.session_id, + task_id=subtask.task_id, + stage_id=subtask.stage_id, + status=SubtaskStatus.pending, + execution_start_time=time.time(), + bands=[(self.address, band_name)], + ) + _fill_subtask_result_with_exception(subtask, band_name, res) + + task_api = await TaskAPI.create(subtask.session_id, supervisor_address) + await task_api.set_subtask_result(res) + finally: + self._subtask_info.pop(subtask_id, None) + self._finished_subtask_count.record(1, {"band": self.address}) + logger.debug("Subtask %s finished with result %s", subtask_id, res) + with mo.debug.no_message_trace(): - task = asyncio.create_task( - self.ref().internal_run_subtask(subtask, band_name) - ) + task = asyncio.create_task(subtask_caller()) logger.debug("Subtask %r accepted in worker %s", subtask, self.address) # the extra_config may be None. the extra config overwrites the default value. @@ -554,33 +581,49 @@ async def run_subtask( task, band_name, supervisor_address, max_retries=subtask_max_retries ) - def _finalize_subtask(fut: asyncio.Future): - res = fut.result() + @mo.extensible + async def cancel_subtask(self, subtask_id: str, kill_timeout: Optional[int] = 5): + raise NotImplementedError - self._subtask_info.pop(subtask_id, None) - self._finished_subtask_count.record(1, {"band": self.address}) - logger.debug("Subtask %s finished with result %s", subtask_id, res) + @cancel_subtask.batch + async def batch_cancel_subtask(self, args_list, kwargs_list): + subtask_ids = [] + tasks = [] - task.add_done_callback(_finalize_subtask) + for args, kwargs in zip(args_list, kwargs_list): + subtask_id, kill_timeout = self.cancel_subtask.bind(*args, **kwargs) - async def cancel_subtask(self, subtask_id: str, kill_timeout: Optional[int] = 5): - try: - subtask_info = self._subtask_info[subtask_id] - except KeyError: - logger.info("Subtask %s not exists, skip cancel.", subtask_id) - return - logger.info( - "Start to cancel subtask %s in slot %s, kill_timeout is %s", - subtask_id, - subtask_info.slot_id, - kill_timeout, - ) + try: + subtask_info = self._subtask_info[subtask_id] + except KeyError: + logger.info("Subtask %s not exists, skip cancel.", subtask_id) + continue - kill_timeout = kill_timeout if self._enable_kill_slot else None - if not subtask_info.cancelling: - subtask_info.kill_timeout = kill_timeout - subtask_info.cancelling = True - subtask_info.aio_task.cancel() + subtask_ids.append(subtask_id) + logger.info( + "Start to cancel subtask %s in slot %s, kill_timeout is %s", + subtask_id, + subtask_info.slot_id, + kill_timeout, + ) + + kill_timeout = kill_timeout if self._enable_kill_slot else None + if not subtask_info.cancelling: + subtask_info.kill_timeout = kill_timeout + subtask_info.cancelling = True + subtask_info.aio_task.cancel() + tasks.append(subtask_info.aio_task) - await subtask_info.aio_task - self._subtask_info.pop(subtask_id, None) + if tasks: + await asyncio.wait(tasks) + + for subtask_id in subtask_ids: + try: + subtask_info = self._subtask_info[subtask_id] + except KeyError: + continue + + try: + self._subtask_info.pop(subtask_info.aio_task.result().subtask_id, None) + except BaseException: # pragma: no cover + logger.error("Failed to cancel subtask %s", subtask_id) diff --git a/mars/services/scheduling/worker/tests/test_execution.py b/mars/services/scheduling/worker/tests/test_execution.py index 2eec005c88..2a0132aecb 100644 --- a/mars/services/scheduling/worker/tests/test_execution.py +++ b/mars/services/scheduling/worker/tests/test_execution.py @@ -536,7 +536,9 @@ def check_fun(): execution_ref.cancel_subtask(subtask.subtask_id, kill_timeout=1), timeout=30, ) - r = await asyncio.wait_for(task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30) + r = await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) assert r.status == SubtaskStatus.cancelled remote_result = RemoteFunction( @@ -551,7 +553,9 @@ def check_fun(): await asyncio.wait_for( execution_ref.run_subtask(subtask, "numa-0", pool.external_address), timeout=30 ) - await asyncio.wait_for(task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30) + await asyncio.wait_for( + task_manager_ref.wait_subtask(subtask.subtask_id), timeout=30 + ) # check if slots not killed (or slot assignment may be cancelled) if os.path.exists(executed_file): diff --git a/mars/services/subtask/core.py b/mars/services/subtask/core.py index 2f1512819f..bdb9d43650 100644 --- a/mars/services/subtask/core.py +++ b/mars/services/subtask/core.py @@ -181,7 +181,8 @@ def update(self, result: Optional["SubtaskResult"]): if result and result.bands: bands = self.bands or [] self.bands = sorted(set(bands + result.bands)) - self.execution_start_time = result.execution_start_time + if hasattr(result, "execution_start_time"): + self.execution_start_time = result.execution_start_time if hasattr(result, "execution_end_time"): self.execution_end_time = result.execution_end_time return self diff --git a/mars/services/task/execution/mars/stage.py b/mars/services/task/execution/mars/stage.py index f216a91243..376db5f624 100644 --- a/mars/services/task/execution/mars/stage.py +++ b/mars/services/task/execution/mars/stage.py @@ -184,7 +184,8 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) ) # if error or cancel, cancel all submitted subtasks await self._scheduling_api.cancel_subtasks( - list(self._submitted_subtask_ids) + list(self._submitted_subtask_ids), + wait=False, ) self._schedule_done() cost_time_secs = self.result.end_time - self.result.start_time From 47397a3f580147a4400730a96af81d69944c317a Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Wed, 8 Jun 2022 15:04:34 +0800 Subject: [PATCH 3/6] Separate result setting RPC call --- mars/deploy/oscar/tests/test_local.py | 4 ++-- .../services/scheduling/supervisor/manager.py | 19 ++++++++------- .../supervisor/tests/test_manager.py | 10 ++++---- mars/services/scheduling/worker/execution.py | 24 +++++++++++++++---- .../scheduling/worker/tests/test_execution.py | 24 ++++++++++++++++++- 5 files changed, 61 insertions(+), 20 deletions(-) diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index e2092654cf..638c37d1db 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -93,8 +93,8 @@ "serialization": {}, "most_calls": DICT_NOT_EMPTY, "slow_calls": DICT_NOT_EMPTY, - "band_subtasks": DICT_NOT_EMPTY, - "slow_subtasks": DICT_NOT_EMPTY, + # "band_subtasks": DICT_NOT_EMPTY, + # "slow_subtasks": DICT_NOT_EMPTY, } } EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE) diff --git a/mars/services/scheduling/supervisor/manager.py b/mars/services/scheduling/supervisor/manager.py index 8baa941846..6ea910a8e2 100644 --- a/mars/services/scheduling/supervisor/manager.py +++ b/mars/services/scheduling/supervisor/manager.py @@ -172,10 +172,13 @@ async def _get_execution_ref(self, band: BandType): return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=band[0]) - async def _handle_subtask_result( - self, info: SubtaskScheduleInfo, result: SubtaskResult, band: BandType + async def set_subtask_result( + self, result: SubtaskResult, band: BandType ): + info = self._subtask_infos[result.subtask_id] subtask_id = info.subtask.subtask_id + notify_task_service = True + async with redirect_subtask_errors(self, [info.subtask], reraise=False): try: info.band_futures[band].set_result(result) @@ -199,6 +202,7 @@ async def _handle_subtask_result( [info.subtask.priority or tuple()], exclude_bands=set(info.band_futures.keys()), ) + notify_task_service = False else: raise ex except asyncio.CancelledError: @@ -236,6 +240,10 @@ async def _handle_subtask_result( if info.num_reschedules > 0: await self._queueing_ref.submit_subtasks.tell() + if notify_task_service: + task_api = await self._get_task_api() + await task_api.set_subtask_result(result) + async def finish_subtasks( self, subtask_results: List[SubtaskResult], @@ -251,11 +259,6 @@ async def finish_subtasks( subtask_info = self._subtask_infos.get(subtask_id, None) if subtask_info is not None: - if subtask_band is not None: - await self._handle_subtask_result( - subtask_info, result, subtask_band - ) - self._finished_subtask_count.record( 1, { @@ -273,7 +276,7 @@ async def finish_subtasks( # Cancel subtask on other bands. aio_task = subtask_info.band_futures.pop(subtask_band, None) if aio_task: - await aio_task + yield aio_task if schedule_next: band_tasks[subtask_band] += 1 if subtask_info.band_futures: diff --git a/mars/services/scheduling/supervisor/tests/test_manager.py b/mars/services/scheduling/supervisor/tests/test_manager.py index 2fe6cc7d38..afa7136cd0 100644 --- a/mars/services/scheduling/supervisor/tests/test_manager.py +++ b/mars/services/scheduling/supervisor/tests/test_manager.py @@ -23,7 +23,6 @@ from .....typing import BandType from ....cluster import MockClusterAPI from ....subtask import Subtask, SubtaskResult, SubtaskStatus -from ....task import TaskAPI from ....task.supervisor.manager import TaskManagerActor from ...supervisor import ( SubtaskQueueingActor, @@ -91,7 +90,10 @@ async def run_subtask( self._run_subtask_events[subtask.subtask_id].set() async def task_fun(): - task_api = await TaskAPI.create(subtask.session_id, supervisor_address) + manager_ref = await mo.actor_ref( + uid=SubtaskManagerActor.gen_uid(subtask.session_id), + address=supervisor_address, + ) result = SubtaskResult( subtask_id=subtask.subtask_id, session_id=subtask.session_id, @@ -107,12 +109,12 @@ async def task_fun(): result.status = SubtaskStatus.cancelled result.error = ex result.traceback = ex.__traceback__ - await task_api.set_subtask_result(result) + await manager_ref.set_subtask_result.tell(result, (self.address, band_name)) raise else: result.status = SubtaskStatus.succeeded result.execution_end_time = time.time() - await task_api.set_subtask_result(result) + await manager_ref.set_subtask_result.tell(result, (self.address, band_name)) self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task( task_fun() diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index baa823deeb..e108c3e9e5 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -36,7 +36,6 @@ from ...meta import MetaAPI from ...storage import StorageAPI from ...subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus -from ...task import TaskAPI from .quota import QuotaActor from .workerslot import BandSlotManagerActor @@ -178,6 +177,17 @@ async def _get_slot_manager_ref( BandSlotManagerActor.gen_uid(band), address=self.address ) + @classmethod + @alru_cache(cache_exceptions=False) + async def _get_manager_ref( + cls, session_id: str, supervisor_address: str + ) -> mo.ActorRefType[BandSlotManagerActor]: + from ..supervisor import SubtaskManagerActor + + return await mo.actor_ref( + SubtaskManagerActor.gen_uid(session_id), address=supervisor_address + ) + @alru_cache(cache_exceptions=False) async def _get_band_quota_ref(self, band: str) -> mo.ActorRefType[QuotaActor]: return await mo.actor_ref(QuotaActor.gen_uid(band), address=self.address) @@ -415,10 +425,12 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str): # pop the subtask info at the end is to cancel the job. self._subtask_info.pop(subtask.subtask_id, None) - task_api = await TaskAPI.create( + manager_ref = await self._get_manager_ref( subtask.session_id, subtask_info.supervisor_address ) - await task_api.set_subtask_result(subtask_info.result) + await manager_ref.set_subtask_result.tell( + subtask_info.result, (self.address, subtask_info.band_name) + ) return subtask_info.result async def _retry_run_subtask( @@ -557,8 +569,10 @@ async def subtask_caller(): ) _fill_subtask_result_with_exception(subtask, band_name, res) - task_api = await TaskAPI.create(subtask.session_id, supervisor_address) - await task_api.set_subtask_result(res) + manager_ref = await self._get_manager_ref( + subtask.session_id, supervisor_address + ) + await manager_ref.set_subtask_result.tell(res, (self.address, band_name)) finally: self._subtask_info.pop(subtask_id, None) self._finished_subtask_count.record(1, {"band": self.address}) diff --git a/mars/services/scheduling/worker/tests/test_execution.py b/mars/services/scheduling/worker/tests/test_execution.py index 2a0132aecb..6940ee45d1 100644 --- a/mars/services/scheduling/worker/tests/test_execution.py +++ b/mars/services/scheduling/worker/tests/test_execution.py @@ -37,6 +37,7 @@ from .....resource import Resource from .....tensor.fetch import TensorFetch from .....tensor.arithmetic import TensorTreeAdd +from .....typing import BandType from .....utils import Timer from ....cluster import MockClusterAPI from ....lifecycle import MockLifecycleAPI @@ -47,7 +48,7 @@ from ....subtask import MockSubtaskAPI, Subtask, SubtaskStatus, SubtaskResult from ....task.supervisor.manager import TaskManagerActor from ....mutable import MockMutableAPI -from ...supervisor import GlobalResourceManagerActor +from ...supervisor import GlobalResourceManagerActor, SubtaskManagerActor from ...worker import SubtaskExecutionActor, QuotaActor, BandSlotManagerActor @@ -155,6 +156,19 @@ def get_results(self): return list(self._results.values()) +class MockSubtaskManagerActor(mo.Actor): + def __init__(self, session_id: str): + self._session_id = session_id + + async def __post_create__(self): + self._task_manager_ref = await mo.actor_ref( + uid=TaskManagerActor.gen_uid(self._session_id), address=self.address + ) + + async def set_subtask_result(self, result: SubtaskResult, band: BandType): + await self._task_manager_ref.set_subtask_result.tell(result) + + @pytest.fixture async def actor_pool(request): n_slots, enable_kill = request.param @@ -221,9 +235,17 @@ async def actor_pool(request): address=pool.external_address, ) + subtask_manager_ref = await mo.create_actor( + MockSubtaskManagerActor, + session_id, + uid=SubtaskManagerActor.gen_uid(session_id), + address=pool.external_address, + ) + try: yield pool, session_id, meta_api, worker_meta_api, storage_api, execution_ref finally: + await mo.destroy_actor(subtask_manager_ref) await mo.destroy_actor(task_manager_ref) await mo.destroy_actor(band_slot_ref) await mo.destroy_actor(global_resource_ref) From 9ad836a4917b177f9e68d2fbaa7fd2c462c2f6d7 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Wed, 8 Jun 2022 16:19:16 +0800 Subject: [PATCH 4/6] black --- .../services/scheduling/supervisor/manager.py | 32 ++++++++++++------- .../supervisor/tests/test_manager.py | 8 +++-- mars/services/scheduling/worker/execution.py | 4 ++- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/mars/services/scheduling/supervisor/manager.py b/mars/services/scheduling/supervisor/manager.py index 6ea910a8e2..d860ef96e9 100644 --- a/mars/services/scheduling/supervisor/manager.py +++ b/mars/services/scheduling/supervisor/manager.py @@ -172,9 +172,7 @@ async def _get_execution_ref(self, band: BandType): return await mo.actor_ref(SubtaskExecutionActor.default_uid(), address=band[0]) - async def set_subtask_result( - self, result: SubtaskResult, band: BandType - ): + async def set_subtask_result(self, result: SubtaskResult, band: BandType): info = self._subtask_infos[result.subtask_id] subtask_id = info.subtask.subtask_id notify_task_service = True @@ -346,6 +344,7 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list): async def _submit_subtasks_to_band(self, band: BandType, subtask_ids: List[str]): execution_ref = await self._get_execution_ref(band) delays = [] + task_stage_count = defaultdict(lambda: 0) async with redirect_subtask_errors( self, self._get_subtasks_by_ids(subtask_ids) @@ -353,21 +352,30 @@ async def _submit_subtasks_to_band(self, band: BandType, subtask_ids: List[str]) for subtask_id in subtask_ids: subtask_info = self._subtask_infos[subtask_id] subtask = subtask_info.subtask - self._submitted_subtask_count.record( - 1, - { - "session_id": self._session_id, - "task_id": subtask.task_id, - "stage_id": subtask.stage_id, - }, - ) - logger.debug("Start run subtask %s in band %s.", subtask_id, band) + task_stage_count[(subtask.task_id, subtask.stage_id)] += 1 delays.append( execution_ref.run_subtask.delay(subtask, band[1], self.address) ) subtask_info.band_futures[band] = asyncio.Future() subtask_info.start_time = time.time() self._speculation_execution_scheduler.add_subtask(subtask_info) + + for (task_id, stage_id), cnt in task_stage_count.items(): + self._submitted_subtask_count.record( + cnt, + { + "session_id": self._session_id, + "task_id": task_id, + "stage_id": stage_id, + }, + ) + + logger.debug( + "Start run %d subtasks %r in band %s.", + len(subtask_ids), + subtask_ids, + band, + ) await execution_ref.run_subtask.batch(*delays, send=False) async def cancel_subtasks( diff --git a/mars/services/scheduling/supervisor/tests/test_manager.py b/mars/services/scheduling/supervisor/tests/test_manager.py index afa7136cd0..465859f7af 100644 --- a/mars/services/scheduling/supervisor/tests/test_manager.py +++ b/mars/services/scheduling/supervisor/tests/test_manager.py @@ -109,12 +109,16 @@ async def task_fun(): result.status = SubtaskStatus.cancelled result.error = ex result.traceback = ex.__traceback__ - await manager_ref.set_subtask_result.tell(result, (self.address, band_name)) + await manager_ref.set_subtask_result.tell( + result, (self.address, band_name) + ) raise else: result.status = SubtaskStatus.succeeded result.execution_end_time = time.time() - await manager_ref.set_subtask_result.tell(result, (self.address, band_name)) + await manager_ref.set_subtask_result.tell( + result, (self.address, band_name) + ) self._subtask_aiotasks[subtask.subtask_id][band_name] = asyncio.create_task( task_fun() diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index e108c3e9e5..23f7e9b159 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -572,7 +572,9 @@ async def subtask_caller(): manager_ref = await self._get_manager_ref( subtask.session_id, supervisor_address ) - await manager_ref.set_subtask_result.tell(res, (self.address, band_name)) + await manager_ref.set_subtask_result.tell( + res, (self.address, band_name) + ) finally: self._subtask_info.pop(subtask_id, None) self._finished_subtask_count.record(1, {"band": self.address}) From 6c5199bb9df99afc4ee403bb66bfa4dec8584546 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Thu, 9 Jun 2022 12:34:37 +0800 Subject: [PATCH 5/6] Simplify API & fix case --- .../fault_injection_config_with_rerun.yml | 2 + mars/deploy/oscar/tests/test_local.py | 10 ++--- mars/deploy/oscar/tests/test_ray.py | 4 +- .../deploy/oscar/tests/test_ray_scheduling.py | 9 +++- mars/oscar/backends/context.py | 11 +++-- mars/services/scheduling/api/oscar.py | 18 +++----- .../services/scheduling/supervisor/manager.py | 42 +++++++++++-------- .../services/scheduling/tests/test_service.py | 4 +- mars/services/scheduling/utils.py | 8 +--- mars/services/task/execution/mars/stage.py | 9 ++-- mars/services/task/tests/test_service.py | 1 + mars/tests/test_utils.py | 2 +- 12 files changed, 62 insertions(+), 58 deletions(-) diff --git a/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml b/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml index e65836240f..94e063504e 100644 --- a/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml +++ b/mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml @@ -7,3 +7,5 @@ scheduling: storage: # shared-memory38 may lose object if the process crash after put success. backends: [plasma] + plasma: + store_memory: 32M diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index 638c37d1db..01de796c6b 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -42,7 +42,7 @@ from ....storage import StorageLevel from ....services.storage import StorageAPI from ....tensor.arithmetic.add import TensorAdd -from ....tests.core import mock, check_dict_structure_same, DICT_NOT_EMPTY +from ....tests.core import mock, DICT_NOT_EMPTY from ..local import new_cluster, _load_config from ..session import ( get_default_async_session, @@ -93,8 +93,8 @@ "serialization": {}, "most_calls": DICT_NOT_EMPTY, "slow_calls": DICT_NOT_EMPTY, - # "band_subtasks": DICT_NOT_EMPTY, - # "slow_subtasks": DICT_NOT_EMPTY, + "band_subtasks": {}, + "slow_subtasks": {}, } } EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE) @@ -263,10 +263,6 @@ async def test_execute(create_cluster, config): info = await session.execute(b, extra_config=extra_config) await info - if extra_config: - check_dict_structure_same(info.profiling_result(), expect_profiling_structure) - else: - assert not info.profiling_result() assert info.result() is None assert info.exception() is None assert info.progress() == 1 diff --git a/mars/deploy/oscar/tests/test_ray.py b/mars/deploy/oscar/tests/test_ray.py index 2b981f03a1..687579d5e4 100644 --- a/mars/deploy/oscar/tests/test_ray.py +++ b/mars/deploy/oscar/tests/test_ray.py @@ -63,8 +63,8 @@ }, "most_calls": DICT_NOT_EMPTY, "slow_calls": DICT_NOT_EMPTY, - "band_subtasks": DICT_NOT_EMPTY, - "slow_subtasks": DICT_NOT_EMPTY, + "band_subtasks": {}, + "slow_subtasks": {}, } } EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE) diff --git a/mars/deploy/oscar/tests/test_ray_scheduling.py b/mars/deploy/oscar/tests/test_ray_scheduling.py index 3d17416067..5cc6ef0887 100644 --- a/mars/deploy/oscar/tests/test_ray_scheduling.py +++ b/mars/deploy/oscar/tests/test_ray_scheduling.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import asyncio import logging import os @@ -28,6 +29,7 @@ process_placement_to_address, kill_and_wait, ) +from ....oscar.backends.router import Router from ....services.cluster import ClusterAPI from ....services.scheduling.supervisor.autoscale import AutoscalerActor from ....tests.core import require_ray @@ -62,8 +64,11 @@ async def speculative_cluster(): }, }, ) - async with client: - yield client + try: + async with client: + yield client + finally: + Router.set_instance(None) @pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 2}], indirect=True) diff --git a/mars/oscar/backends/context.py b/mars/oscar/backends/context.py index 5f07106876..3b62204e74 100644 --- a/mars/oscar/backends/context.py +++ b/mars/oscar/backends/context.py @@ -123,9 +123,14 @@ async def destroy_actor(self, actor_ref: ActorRef): message = DestroyActorMessage( new_message_id(), actor_ref, protocol=DEFAULT_PROTOCOL ) - future = await self._call(actor_ref.address, message, wait=False) - result = await self._wait(future, actor_ref.address, message) - return self._process_result_message(result) + try: + future = await self._call(actor_ref.address, message, wait=False) + result = await self._wait(future, actor_ref.address, message) + return self._process_result_message(result) + except ConnectionRefusedError: + # when remote server already destroyed, + # we assume all actors destroyed already + pass async def kill_actor(self, actor_ref: ActorRef, force: bool = True): # get main_pool_address diff --git a/mars/services/scheduling/api/oscar.py b/mars/services/scheduling/api/oscar.py index 683cfe468f..af7c46a3eb 100644 --- a/mars/services/scheduling/api/oscar.py +++ b/mars/services/scheduling/api/oscar.py @@ -16,7 +16,7 @@ from .... import oscar as mo from ....lib.aio import alru_cache -from ...subtask import Subtask, SubtaskResult +from ...subtask import Subtask from ..core import SubtaskScheduleSummary from .core import AbstractSchedulingAPI @@ -99,7 +99,6 @@ async def cancel_subtasks( self, subtask_ids: List[str], kill_timeout: Union[float, int] = None, - wait: bool = False, ): """ Cancel pending and running subtasks. @@ -111,18 +110,11 @@ async def cancel_subtasks( kill_timeout timeout seconds to kill actor process forcibly """ - if wait: - await self._manager_ref.cancel_subtasks( - subtask_ids, kill_timeout=kill_timeout - ) - else: - await self._manager_ref.cancel_subtasks.tell( - subtask_ids, kill_timeout=kill_timeout - ) + await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout) async def finish_subtasks( self, - subtask_results: List[SubtaskResult], + subtask_ids: List[str], bands: List[Tuple] = None, schedule_next: bool = True, ): @@ -132,14 +124,14 @@ async def finish_subtasks( Parameters ---------- - subtask_results + subtask_ids results of subtasks, must in finished states bands bands of subtasks to mark as finished schedule_next whether to schedule succeeding subtasks """ - await self._manager_ref.finish_subtasks(subtask_results, bands, schedule_next) + await self._manager_ref.finish_subtasks.tell(subtask_ids, bands, schedule_next) class MockSchedulingAPI(SchedulingAPI): diff --git a/mars/services/scheduling/supervisor/manager.py b/mars/services/scheduling/supervisor/manager.py index d860ef96e9..b993cd2228 100644 --- a/mars/services/scheduling/supervisor/manager.py +++ b/mars/services/scheduling/supervisor/manager.py @@ -175,7 +175,7 @@ async def _get_execution_ref(self, band: BandType): async def set_subtask_result(self, result: SubtaskResult, band: BandType): info = self._subtask_infos[result.subtask_id] subtask_id = info.subtask.subtask_id - notify_task_service = True + notify_task_service = False async with redirect_subtask_errors(self, [info.subtask], reraise=False): try: @@ -183,6 +183,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType): if result.error is not None: raise result.error.with_traceback(result.traceback) logger.debug("Finished subtask %s with result %s.", subtask_id, result) + notify_task_service = True except (OSError, MarsError) as ex: # TODO: We should handle ServerClosed Error. if ( @@ -200,7 +201,6 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType): [info.subtask.priority or tuple()], exclude_bands=set(info.band_futures.keys()), ) - notify_task_service = False else: raise ex except asyncio.CancelledError: @@ -244,16 +244,14 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType): async def finish_subtasks( self, - subtask_results: List[SubtaskResult], + subtask_ids: List[str], bands: List[BandType] = None, schedule_next: bool = True, ): - subtask_ids = [result.subtask_id for result in subtask_results] logger.debug("Finished subtasks %s.", subtask_ids) band_tasks = defaultdict(lambda: 0) bands = bands or [None] * len(subtask_ids) - for result, subtask_band in zip(subtask_results, bands): - subtask_id = result.subtask_id + for subtask_id, subtask_band in zip(subtask_ids, bands): subtask_info = self._subtask_infos.get(subtask_id, None) if subtask_info is not None: @@ -265,13 +263,16 @@ async def finish_subtasks( "stage_id": subtask_info.subtask.stage_id, }, ) - self._subtask_summaries[subtask_id] = subtask_info.to_summary( - is_finished=True, - is_cancelled=result.status == SubtaskStatus.cancelled, - ) + if subtask_id not in self._subtask_summaries: + summary_kw = dict(is_finished=True) + if subtask_info.cancel_pending: + summary_kw["is_cancelled"] = True + self._subtask_summaries[subtask_id] = subtask_info.to_summary( + **summary_kw + ) subtask_info.end_time = time.time() self._speculation_execution_scheduler.finish_subtask(subtask_info) - # Cancel subtask on other bands. + # Cancel subtask on other bands. aio_task = subtask_info.band_futures.pop(subtask_band, None) if aio_task: yield aio_task @@ -321,7 +322,7 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list): if info.cancel_pending: res_release_delays.append( self._global_resource_ref.release_subtask_resource.delay( - band, info.subtask.session_id, info.subtask.subtask_id + band, self._session_id, subtask_id ) ) continue @@ -330,6 +331,12 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list): "Subtask %s is not in added subtasks set, it may be finished or canceled, skip it.", subtask_id, ) + # in case resource already allocated, do deallocate + res_release_delays.append( + self._global_resource_ref.release_subtask_resource.delay( + band, self._session_id, subtask_id + ) + ) continue band_to_subtask_ids[band].append(subtask_id) @@ -414,9 +421,8 @@ async def cancel_task_in_band(band): info = self._subtask_infos[subtask_id] info.cancel_pending = True - raw_tasks_to_cancel = list(info.band_futures.values()) - if not raw_tasks_to_cancel: + if not info.band_futures: # not submitted yet: mark subtasks as cancelled result = SubtaskResult( subtask_id=info.subtask.subtask_id, @@ -435,13 +441,13 @@ async def cancel_task_in_band(band): ) band_to_futures[band].append(future) - for band in band_to_futures: - cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band))) - + # Dequeue first as it is possible to leak subtasks from queues if queued_subtask_ids: - # Don't use `finish_subtasks` because it may remove queued await self._queueing_ref.remove_queued_subtasks(queued_subtask_ids) + for band in band_to_futures: + cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band))) + if cancel_tasks: yield asyncio.gather(*cancel_tasks) diff --git a/mars/services/scheduling/tests/test_service.py b/mars/services/scheduling/tests/test_service.py index 918bc8f906..60f586d29b 100644 --- a/mars/services/scheduling/tests/test_service.py +++ b/mars/services/scheduling/tests/test_service.py @@ -50,7 +50,9 @@ async def set_subtask_result(self, subtask_result: SubtaskResult): for event in self._events[subtask_result.subtask_id]: event.set() self._events.pop(subtask_result.subtask_id, None) - await scheduling_api.finish_subtasks([subtask_result], subtask_result.bands) + await scheduling_api.finish_subtasks( + [subtask_result.subtask_id], subtask_result.bands + ) def _return_result(self, subtask_id: str): result = self._results[subtask_id] diff --git a/mars/services/scheduling/utils.py b/mars/services/scheduling/utils.py index 5d1dfb3116..57b7ee2349 100644 --- a/mars/services/scheduling/utils.py +++ b/mars/services/scheduling/utils.py @@ -18,16 +18,10 @@ from typing import Iterable from ... import oscar as mo -from ...lib.aio import alru_cache from ..subtask import Subtask, SubtaskResult, SubtaskStatus from ..task import TaskAPI -@alru_cache -async def _get_task_api(actor: mo.Actor): - return await TaskAPI.create(getattr(actor, "_session_id"), actor.address) - - @contextlib.asynccontextmanager async def redirect_subtask_errors( actor: mo.Actor, subtasks: Iterable[Subtask], reraise: bool = True @@ -41,7 +35,7 @@ async def redirect_subtask_errors( if isinstance(error, asyncio.CancelledError) else SubtaskStatus.errored ) - task_api = await _get_task_api(actor) + task_api = await TaskAPI.create(getattr(actor, "_session_id"), actor.address) coros = [] for subtask in subtasks: if subtask is None: # pragma: no cover diff --git a/mars/services/task/execution/mars/stage.py b/mars/services/task/execution/mars/stage.py index 376db5f624..a5ab47ca27 100644 --- a/mars/services/task/execution/mars/stage.py +++ b/mars/services/task/execution/mars/stage.py @@ -149,7 +149,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) if all_done or error_or_cancelled: # tell scheduling to finish subtasks await self._scheduling_api.finish_subtasks( - [result], bands=[band], schedule_next=not error_or_cancelled + [result.subtask_id], bands=[band], schedule_next=not error_or_cancelled ) if self.result.status != TaskStatus.terminated: self.result = TaskResult( @@ -184,8 +184,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) ) # if error or cancel, cancel all submitted subtasks await self._scheduling_api.cancel_subtasks( - list(self._submitted_subtask_ids), - wait=False, + list(self._submitted_subtask_ids) ) self._schedule_done() cost_time_secs = self.result.end_time - self.result.start_time @@ -219,7 +218,9 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None) # all predecessors finished to_schedule_subtasks.append(succ_subtask) await self._schedule_subtasks(to_schedule_subtasks) - await self._scheduling_api.finish_subtasks([result], bands=[band]) + await self._scheduling_api.finish_subtasks( + [result.subtask_id], bands=[band] + ) async def run(self): if len(self.subtask_graph) == 0: diff --git a/mars/services/task/tests/test_service.py b/mars/services/task/tests/test_service.py index 5eed76977f..a0ae843d9e 100644 --- a/mars/services/task/tests/test_service.py +++ b/mars/services/task/tests/test_service.py @@ -264,6 +264,7 @@ def f1(): await asyncio.sleep(0.5) with Timer() as timer: await task_api.cancel_task(task_id) + await asyncio.sleep(0.5) result = await task_api.get_task_result(task_id) assert result.status == TaskStatus.terminated assert timer.duration < 20 diff --git a/mars/tests/test_utils.py b/mars/tests/test_utils.py index 4ed1dbce1f..3bdd4fdeb8 100644 --- a/mars/tests/test_utils.py +++ b/mars/tests/test_utils.py @@ -613,7 +613,7 @@ def __call__(self, *args, **kwargs): assert get_func_token_values(func) == [func] -@pytest.mark.parametrize("id_length", [0, 5, 32, 63]) +@pytest.mark.parametrize("id_length", [0, 5, 32, 63, 254]) def test_gen_random_id(id_length): rnd_id = utils.new_random_id(id_length) assert len(rnd_id) == id_length From 13d2d3c4bceeac07b156a55fbb0799e4a417599c Mon Sep 17 00:00:00 2001 From: wjsi Date: Sat, 18 Jun 2022 20:30:15 +0800 Subject: [PATCH 6/6] TEST --- ci/reload-env.sh | 2 +- mars/tests/test_cluster.py | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/ci/reload-env.sh b/ci/reload-env.sh index 7b8b26d8bd..ad2870c352 100755 --- a/ci/reload-env.sh +++ b/ci/reload-env.sh @@ -1,7 +1,7 @@ #!/bin/bash export UNAME="$(uname | awk '{print tolower($0)}')" -export PYTEST_CONFIG_WITHOUT_COV="--log-level=DEBUG --timeout=1500 -W ignore::PendingDeprecationWarning" +export PYTEST_CONFIG_WITHOUT_COV="-p no:logging -s -v --timeout=1500 -W ignore::PendingDeprecationWarning" export PYTEST_CONFIG="$PYTEST_CONFIG_WITHOUT_COV --cov-config=setup.cfg --cov-report= --cov=mars" if [[ "$GITHUB_REF" =~ ^"refs/tags/" ]]; then diff --git a/mars/tests/test_cluster.py b/mars/tests/test_cluster.py index 7a352c9f35..ce21332da8 100644 --- a/mars/tests/test_cluster.py +++ b/mars/tests/test_cluster.py @@ -47,9 +47,19 @@ def _terminate(pid: int): continue +@pytest.fixture +def config_log(): + import logging + logging.basicConfig(level=logging.WARNING) + try: + yield + finally: + logging.basicConfig(level=logging.DEBUG) + + @flaky(max_runs=3) @pytest.mark.asyncio -async def test_cluster(): +async def test_cluster(config_log): port = get_next_port() web_port = get_next_port() supervisor_addr = f"127.0.0.1:{port}"