|
22 | 22 | from .... import oscar as mo |
23 | 23 | from ....core import ChunkGraph, OperandType, enter_mode, ExecutionError |
24 | 24 | from ....core.context import get_context, set_context |
25 | | -from ....core.operand import ( |
26 | | - Fetch, |
27 | | - FetchShuffle, |
28 | | - execute, |
29 | | - MapReduceOperand, |
30 | | - OperandStage, |
31 | | -) |
| 25 | +from ....core.operand import Fetch, FetchShuffle, execute |
32 | 26 | from ....metrics import Metrics |
33 | 27 | from ....optimization.physical import optimize |
34 | 28 | from ....typing import BandType, ChunkType |
@@ -426,7 +420,7 @@ async def set_chunks_meta(): |
426 | 420 | # set result data size |
427 | 421 | self.result.data_size = result_data_size |
428 | 422 |
|
429 | | - async def push_mapper_data(self, chunk_graph): |
| 423 | + async def _push_mapper_data(self, chunk_graph): |
430 | 424 | # TODO: use task api to get reducer bands |
431 | 425 | reducer_idx_to_band = dict() |
432 | 426 | if not reducer_idx_to_band: |
@@ -520,7 +514,7 @@ async def run(self): |
520 | 514 |
|
521 | 515 | await self.done() |
522 | 516 | # after done, we push mapper data to reducers in advance. |
523 | | - await self.push_mapper_data(chunk_graph) |
| 517 | + await self.ref()._push_mapper_data.tell(chunk_graph) |
524 | 518 | if self.result.status == SubtaskStatus.succeeded: |
525 | 519 | cost_time_secs = ( |
526 | 520 | self.result.execution_end_time - self.result.execution_start_time |
|
0 commit comments