Skip to content

Commit bfb2126

Browse files
committed
Optimize SubtaskGraph generation
1 parent 19aa2d1 commit bfb2126

File tree

2 files changed

+7
-29
lines changed

2 files changed

+7
-29
lines changed

mars/services/task/analyzer/analyzer.py

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ def __init__(
128128
if graph_assigner_cls is None:
129129
graph_assigner_cls = GraphAssigner
130130
self._graph_assigner_cls = graph_assigner_cls
131-
self._chunk_to_copied = dict()
132131
self._logic_key_generator = LogicKeyGenerator()
133132

134133
@classmethod
@@ -227,7 +226,6 @@ def _gen_subtask_info(
227226
result_chunks_set = set()
228227
chunk_graph = ChunkGraph(result_chunks)
229228
out_of_scope_chunks = []
230-
chunk_to_copied = self._chunk_to_copied
231229
update_meta_chunks = []
232230
# subtask properties
233231
band = None
@@ -276,7 +274,7 @@ def _gen_subtask_info(
276274
build_fetch_index_to_chunks = dict()
277275
for i, inp_chunk in enumerate(chunk.inputs):
278276
if inp_chunk in chunks_set:
279-
inp_chunks.append(chunk_to_copied[inp_chunk])
277+
inp_chunks.append(inp_chunk)
280278
else:
281279
build_fetch_index_to_chunks[i] = inp_chunk
282280
inp_chunks.append(None)
@@ -289,20 +287,10 @@ def _gen_subtask_info(
289287
inp_chunks[i] = fetch_chunk
290288
copied_op = chunk.op.copy()
291289
copied_op._key = chunk.op.key
292-
out_chunks = [
293-
c.data
294-
for c in copied_op.new_chunks(
295-
inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs]
296-
)
297-
]
298-
for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks):
299-
processed.add(src_chunk)
300-
out_chunk._key = src_chunk.key
290+
for out_chunk in chunk.op.outputs:
291+
processed.add(out_chunk)
301292
chunk_graph.add_node(out_chunk)
302-
# cannot be copied twice
303-
assert src_chunk not in chunk_to_copied
304-
chunk_to_copied[src_chunk] = out_chunk
305-
if src_chunk in self._final_result_chunks_set:
293+
if out_chunk in self._final_result_chunks_set:
306294
if out_chunk not in result_chunks_set:
307295
# add to result chunks
308296
result_chunks.append(out_chunk)
@@ -330,18 +318,12 @@ def _gen_subtask_info(
330318
if out_of_scope_chunks:
331319
inp_subtasks = []
332320
for out_of_scope_chunk in out_of_scope_chunks:
333-
copied_out_of_scope_chunk = chunk_to_copied[out_of_scope_chunk]
334321
inp_subtask = chunk_to_subtask[out_of_scope_chunk]
335-
if (
336-
copied_out_of_scope_chunk
337-
not in inp_subtask.chunk_graph.result_chunks
338-
):
322+
if out_of_scope_chunk not in inp_subtask.chunk_graph.result_chunks:
339323
# make sure the chunk that out of scope
340324
# is in the input subtask's results,
341325
# or the meta may be lost
342-
inp_subtask.chunk_graph.result_chunks.append(
343-
copied_out_of_scope_chunk
344-
)
326+
inp_subtask.chunk_graph.result_chunks.append(out_of_scope_chunk)
345327
inp_subtasks.append(inp_subtask)
346328
depth = max(st.priority[0] for st in inp_subtasks) + 1
347329
else:

mars/services/task/supervisor/tests/task_preprocessor.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,7 @@ def analyze(
180180
map_reduce_id_to_infos=self.map_reduce_id_to_infos,
181181
)
182182
subtask_graph = analyzer.gen_subtask_graph()
183-
results = set(
184-
analyzer._chunk_to_copied[c]
185-
for c in chunk_graph.results
186-
if not isinstance(c.op, Fetch)
187-
)
183+
results = set(c for c in chunk_graph.results if not isinstance(c.op, Fetch))
188184
for subtask in subtask_graph:
189185
if subtask.extra_config is None:
190186
subtask.extra_config = dict()

0 commit comments

Comments
 (0)