Skip to content

Commit 5c00fdb

Browse files
committed
Fix cases
1 parent d814d7c commit 5c00fdb

File tree

4 files changed

+11
-18
lines changed

4 files changed

+11
-18
lines changed

mars/deploy/oscar/tests/test_local.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@
9393
"serialization": {},
9494
"most_calls": DICT_NOT_EMPTY,
9595
"slow_calls": DICT_NOT_EMPTY,
96-
# "band_subtasks": DICT_NOT_EMPTY,
97-
# "slow_subtasks": DICT_NOT_EMPTY,
96+
"band_subtasks": {},
97+
"slow_subtasks": {},
9898
}
9999
}
100100
EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE)

mars/services/scheduling/api/oscar.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ async def cancel_subtasks(
9999
self,
100100
subtask_ids: List[str],
101101
kill_timeout: Union[float, int] = None,
102-
wait: bool = False,
103102
):
104103
"""
105104
Cancel pending and running subtasks.
@@ -111,14 +110,7 @@ async def cancel_subtasks(
111110
kill_timeout
112111
timeout seconds to kill actor process forcibly
113112
"""
114-
if wait:
115-
await self._manager_ref.cancel_subtasks(
116-
subtask_ids, kill_timeout=kill_timeout
117-
)
118-
else:
119-
await self._manager_ref.cancel_subtasks.tell(
120-
subtask_ids, kill_timeout=kill_timeout
121-
)
113+
await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout)
122114

123115
async def finish_subtasks(
124116
self,
@@ -139,7 +131,9 @@ async def finish_subtasks(
139131
schedule_next
140132
whether to schedule succeeding subtasks
141133
"""
142-
await self._manager_ref.finish_subtasks(subtask_results, bands, schedule_next)
134+
await self._manager_ref.finish_subtasks.tell(
135+
subtask_results, bands, schedule_next
136+
)
143137

144138

145139
class MockSchedulingAPI(SchedulingAPI):

mars/services/scheduling/supervisor/manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,13 +435,13 @@ async def cancel_task_in_band(band):
435435
)
436436
band_to_futures[band].append(future)
437437

438-
for band in band_to_futures:
439-
cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band)))
440-
438+
# Dequeue first as it is possible to leak subtasks from queues
441439
if queued_subtask_ids:
442-
# Don't use `finish_subtasks` because it may remove queued
443440
await self._queueing_ref.remove_queued_subtasks(queued_subtask_ids)
444441

442+
for band in band_to_futures:
443+
cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band)))
444+
445445
if cancel_tasks:
446446
yield asyncio.gather(*cancel_tasks)
447447

mars/services/task/execution/mars/stage.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
184184
)
185185
# if error or cancel, cancel all submitted subtasks
186186
await self._scheduling_api.cancel_subtasks(
187-
list(self._submitted_subtask_ids),
188-
wait=False,
187+
list(self._submitted_subtask_ids)
189188
)
190189
self._schedule_done()
191190
cost_time_secs = self.result.end_time - self.result.start_time

0 commit comments

Comments
 (0)