Skip to content

Commit 9c4cf2a

Browse files
authored
Use tell when remove mapper data after execution (#3027)
1 parent e085051 commit 9c4cf2a

File tree

5 files changed

+34
-24
lines changed

5 files changed

+34
-24
lines changed

mars/lib/tbcode.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
from collections import defaultdict
2525

2626

27-
def dump_traceback_code(
28-
tb: types.TracebackType, number_of_lines_of_context: int = 5
29-
):
27+
def dump_traceback_code(tb: types.TracebackType, number_of_lines_of_context: int = 5):
3028
"""
3129
Dump codes before and after lines of tracebacks.
3230
@@ -56,9 +54,7 @@ def dump_traceback_code(
5654
dict(left=left_range, right=right_range, code=fragment)
5755
)
5856
results[file_name].update(
59-
dict(
60-
size=cache_data[0], lines=len(cache_data[2])
61-
)
57+
dict(size=cache_data[0], lines=len(cache_data[2]))
6258
)
6359
tb = tb.tb_next
6460
return dict(results)
@@ -90,7 +86,10 @@ def load_traceback_code(code_frags: dict, cache: dict = None):
9086
if file_name not in cache:
9187
# keep field 1 (mtime) as None to ensure lazy cache
9288
cache[file_name] = (
93-
profile["size"], None, [""] * profile["lines"], file_name
89+
profile["size"],
90+
None,
91+
[""] * profile["lines"],
92+
file_name,
9493
)
9594
for fragment in profile["fragments"]:
9695
left_range, right_range = fragment["left"], fragment["right"]

mars/services/scheduling/worker/execution.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import sys
2121
from collections import defaultdict
2222
from dataclasses import dataclass, field
23-
from typing import Dict, Optional, Union
23+
from typing import Dict, List, Optional, Union
2424

2525
from .... import oscar as mo
2626
from ....core import ExecutionError
@@ -344,6 +344,20 @@ def _check_cancelling(cls, subtask_info: SubtaskExecutionInfo):
344344
if subtask_info.cancelling:
345345
raise asyncio.CancelledError
346346

347+
async def _remove_mapper_data(
348+
self, session_id: str, band_name: str, remote_mapper_keys: List
349+
):
350+
storage_api = await StorageAPI.create(
351+
session_id, address=self.address, band_name=band_name
352+
)
353+
logger.debug("Delete mapper data %s", remote_mapper_keys)
354+
await storage_api.delete.batch(
355+
*[
356+
storage_api.delete.delay(key, error="ignore")
357+
for key in remote_mapper_keys
358+
]
359+
)
360+
347361
async def internal_run_subtask(self, subtask: Subtask, band_name: str):
348362
subtask_api = SubtaskAPI(self.address)
349363
subtask_info = self._subtask_info[subtask.subtask_id]
@@ -379,15 +393,8 @@ async def internal_run_subtask(self, subtask: Subtask, band_name: str):
379393
subtask, band_name, subtask_api, batch_quota_req
380394
)
381395
if remote_mapper_keys:
382-
storage_api = await StorageAPI.create(
383-
subtask.session_id, address=self.address, band_name=band_name
384-
)
385-
logger.debug("Delete mapper data %s", remote_mapper_keys)
386-
await storage_api.delete.batch(
387-
*[
388-
storage_api.delete.delay(key, error="ignore")
389-
for key in remote_mapper_keys
390-
]
396+
await self.ref()._remove_mapper_data.tell(
397+
subtask.session_id, band_name, remote_mapper_keys
391398
)
392399
except: # noqa: E722 # pylint: disable=bare-except
393400
_fill_subtask_result_with_exception(subtask, subtask_info)

mars/services/storage/transfer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ async def open_writers(
318318
raise
319319

320320
async def do_write(self, message: TransferMessage):
321-
# close may be a high cost operation, use create_task
321+
# close may be a high-cost operation, use create_task
322322
close_tasks = []
323323
finished_keys = []
324324
session_id = message.session_id

mars/services/task/api/web.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,12 @@ def _json_serial_task_result(result: Optional[TaskResult]):
3939
"profiling": result.profiling,
4040
}
4141
if result.error is not None:
42-
res_json["error"] = base64.b64encode(serialize_serializable(result.error)).decode()
43-
res_json["traceback"] = base64.b64encode(serialize_serializable(result.traceback)).decode()
42+
res_json["error"] = base64.b64encode(
43+
serialize_serializable(result.error)
44+
).decode()
45+
res_json["traceback"] = base64.b64encode(
46+
serialize_serializable(result.traceback)
47+
).decode()
4448
res_json["traceback_code"] = dump_traceback_code(result.traceback)
4549
return res_json
4650

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ requires = [
99
build-backend = "setuptools.build_meta"
1010

1111
[tool.black]
12-
extend-exclude = [
13-
"^/mars/_version.py",
14-
"^/mars/lib/tblib/.*",
15-
]
12+
include = '\.pyi?$'
13+
extend-exclude = '''
14+
^/mars/(_version.py|lib/tblib/.*)
15+
'''

0 commit comments

Comments
 (0)