Skip to content

Commit 94ec1e8

Browse files
不涸zhongchun
authored andcommitted
PullRequest: 392 Add cleanup for FailOverContext when error recovery fails
Merge branch 'fix-worker-node-fo-context-cleanup-090dev of [email protected]:ray-project/mars.git into 0.9-dev https://code.alipay.com/ray-project/mars/pull_requests/392 Signed-off-by: 慕白 <[email protected]> * Add cleanup for FailOverContext when error recovery fails
1 parent ca11010 commit 94ec1e8

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

mars/services/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,5 +308,8 @@ def enable_lineage(self):
308308
def is_lineage_enabled(self):
309309
return self._enable_lineage
310310

311+
def cleanup(self):
312+
self.subtask_to_dependency_subtasks.clear()
313+
311314

312315
FailOverContext = _FailOverContext()

mars/services/task/execution/mars/stage.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,10 +345,10 @@ async def _run(self):
345345
return await self._get_stage_result()
346346

347347
async def cancel(self):
348-
logger.info("Start to cancel stage %s of task %s.", self.stage_id, self.task)
349348
if self._done.is_set() or self._cancelled.is_set(): # pragma: no cover
350349
# already finished, ignore cancel
351350
return
351+
logger.info("Start to cancel stage %s of task %s.", self.stage_id, self.task)
352352
self._cancelled.set()
353353
# cancel running subtasks
354354
await self._scheduling_api.cancel_subtasks(list(self._submitted_subtask_ids))
@@ -458,6 +458,7 @@ async def _detect_error(self, subtask, error, expect_error_cls_tuple):
458458
if not FailOverContext.is_lineage_enabled():
459459
logger.info("Lineage of failover is not enabled.")
460460
return False
461+
461462
# Note: There are some error that do not need to be handled,
462463
# like `DuplicatedSubtaskError`.
463464
if isinstance(error, DuplicatedSubtaskError):
@@ -510,6 +511,7 @@ async def _detect_error(self, subtask, error, expect_error_cls_tuple):
510511
s,
511512
subtask,
512513
)
514+
FailOverContext.cleanup()
513515
return False
514516
if s not in dependency_subtasks:
515517
order = await task_manager_ref.get_generation_order(
@@ -525,6 +527,7 @@ async def _detect_error(self, subtask, error, expect_error_cls_tuple):
525527
"No dependent subtasks to restore of subtask %s.",
526528
subtask.subtask_id,
527529
)
530+
FailOverContext.cleanup()
528531
return False
529532
priorities = [
530533
(pri,) + s.priority
@@ -550,9 +553,11 @@ async def _detect_error(self, subtask, error, expect_error_cls_tuple):
550553
)
551554
return True
552555
except:
556+
FailOverContext.cleanup()
553557
logger.exception("Error recovery failed.")
554558
return False
555559
else:
560+
FailOverContext.cleanup()
556561
logger.error("Could not to recover the error: %s", error)
557562
return False
558563

0 commit comments

Comments
 (0)