Skip to content

Commit 889ffb7

Browse files
authored
Fix duplicate decref of subtask input chunk (#2611)
1 parent 866a5fa commit 889ffb7

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

mars/services/task/supervisor/processor.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,9 @@ def _get_decref_stage_chunk_keys(
215215
if error_or_cancelled:
216216
# error or cancel, rollback incref for subtask results
217217
for subtask in subtask_graph:
218-
if stage_processor.subtask_results.get(subtask):
218+
if subtask.subtask_id in stage_processor.decref_subtask:
219219
continue
220+
stage_processor.decref_subtask.add(subtask.subtask_id)
220221
# if subtask not executed, rollback incref of predecessors
221222
for inp_subtask in subtask_graph.predecessors(subtask):
222223
for result_chunk in inp_subtask.chunk_graph.results:
@@ -821,9 +822,12 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
821822
# Since every worker will call supervisor to set subtask result,
822823
# we need to release actor lock to make `decref_chunks` parallel to avoid blocking
823824
# other `set_subtask_result` calls.
824-
yield self._decref_input_subtasks(
825-
subtask, stage_processor.subtask_graph
826-
)
825+
if subtask.subtask_id not in stage_processor.decref_subtask:
826+
stage_processor.decref_subtask.add(subtask.subtask_id)
827+
yield self._decref_input_subtasks(
828+
subtask, stage_processor.subtask_graph
829+
)
830+
827831
except: # noqa: E722 # nosec # pylint: disable=bare-except # pragma: no cover
828832
_, err, tb = sys.exc_info()
829833
if subtask_result.status not in (

mars/services/task/supervisor/stage.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ def __init__(
6767
self.subtask_results: Dict[Subtask, SubtaskResult] = dict()
6868
self._submitted_subtask_ids = set()
6969

70+
# All subtask IDs whose input chunk reference count is reduced.
71+
self.decref_subtask = set()
72+
7073
self._band_manager: Dict[BandType, mo.ActorRef] = dict()
7174

7275
# result

0 commit comments

Comments
 (0)