Skip to content

Commit 1d22198

Browse files
author
xuye.qin
committed
Make push mapper data work
1 parent cb61b00 commit 1d22198

File tree

15 files changed

+241
-35
lines changed

15 files changed

+241
-35
lines changed

benchmarks/tpch/run_queries.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ def g2(x):
611611
def q13(customer, orders):
612612
customer_filtered = customer.loc[:, ["C_CUSTKEY"]]
613613
orders_filtered = orders[
614-
~orders["O_COMMENT"].str.contains("special(\S|\s)*requests")
614+
~orders["O_COMMENT"].str.contains("special[\S|\s]*requests")
615615
]
616616
orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]]
617617
c_o_merged = customer_filtered.merge(

mars/core/operand/shuffle.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ class ShuffleProxy(VirtualOperand):
2727

2828

2929
class MapReduceOperand(Operand):
30-
reducer_index = TupleField("reducer_index", FieldTypes.uint64)
30+
# for mapper
3131
mapper_id = Int32Field("mapper_id", default=0)
32+
# for reducer
33+
reducer_index = TupleField("reducer_index", FieldTypes.uint64)
3234
reducer_phase = StringField("reducer_phase", default=None)
3335

3436
def __init__(self, *args, **kwargs):

mars/services/subtask/worker/processor.py

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@
1717
import sys
1818
import time
1919
from collections import defaultdict
20-
from typing import Any, Dict, List, Optional, Set, Type
20+
from typing import Any, Dict, List, Optional, Set, Type, Tuple
2121

2222
from .... import oscar as mo
2323
from ....core import ChunkGraph, OperandType, enter_mode, ExecutionError
2424
from ....core.context import get_context, set_context
25-
from ....core.operand import Fetch, FetchShuffle, execute
25+
from ....core.operand import (
26+
Fetch,
27+
FetchShuffle,
28+
execute,
29+
)
30+
from ....lib.aio import alru_cache
2631
from ....metrics import Metrics
2732
from ....optimization.physical import optimize
2833
from ....typing import BandType, ChunkType
@@ -420,26 +425,56 @@ async def set_chunks_meta():
420425
# set result data size
421426
self.result.data_size = result_data_size
422427

423-
async def _push_mapper_data(self, chunk_graph):
424-
# TODO: use task api to get reducer bands
425-
reducer_idx_to_band = dict()
426-
if not reducer_idx_to_band:
427-
return
428+
@classmethod
429+
@alru_cache(cache_exceptions=False)
430+
async def _gen_reducer_index_to_bands(
431+
cls, session_id: str, supervisor_address: str, task_id: str, map_reduce_id: int
432+
) -> Dict[Tuple[int], BandType]:
433+
task_api = await TaskAPI.create(session_id, supervisor_address)
434+
map_reduce_info = await task_api.get_map_reduce_info(task_id, map_reduce_id)
435+
assert len(map_reduce_info.reducer_indexes) == len(
436+
map_reduce_info.reducer_bands
437+
)
438+
return {
439+
reducer_index: band
440+
for reducer_index, band in zip(
441+
map_reduce_info.reducer_indexes, map_reduce_info.reducer_bands
442+
)
443+
}
444+
445+
async def _push_mapper_data(self):
428446
storage_api_to_fetch_tasks = defaultdict(list)
429-
for result_chunk in chunk_graph.result_chunks:
430-
key = result_chunk.key
431-
reducer_idx = key[1]
432-
if isinstance(key, tuple):
447+
skip = True
448+
for result_chunk in self._chunk_graph.result_chunks:
449+
map_reduce_id = getattr(result_chunk.op, "extra_params", dict()).get(
450+
"analyzer_map_reduce_id"
451+
)
452+
if map_reduce_id is None:
453+
continue
454+
skip = False
455+
reducer_index_to_bands = await self._gen_reducer_index_to_bands(
456+
self._session_id,
457+
self._supervisor_address,
458+
self.subtask.task_id,
459+
map_reduce_id,
460+
)
461+
for reducer_index, band in reducer_index_to_bands.items():
433462
# mapper key is a tuple
434-
address, band_name = reducer_idx_to_band[reducer_idx]
435-
storage_api = StorageAPI(address, self._session_id, band_name)
463+
address, band_name = band
464+
storage_api = await StorageAPI.create(
465+
self._session_id, address, band_name
466+
)
436467
fetch_task = storage_api.fetch.delay(
437-
key, band_name=self._band[1], remote_address=self._band[0]
468+
(result_chunk.key, reducer_index),
469+
band_name=self._band[1],
470+
remote_address=self._band[0],
438471
)
439472
storage_api_to_fetch_tasks[storage_api].append(fetch_task)
473+
if skip:
474+
return
440475
batch_tasks = []
441476
for storage_api, tasks in storage_api_to_fetch_tasks.items():
442-
batch_tasks.append(asyncio.create_task(storage_api.fetch.batch(*tasks)))
477+
batch_tasks.append(storage_api.fetch.batch(*tasks))
443478
await asyncio.gather(*batch_tasks)
444479

445480
async def done(self):
@@ -513,8 +548,6 @@ async def run(self):
513548
await self._unpin_data(input_keys)
514549

515550
await self.done()
516-
# after done, we push mapper data to reducers in advance.
517-
await self.ref()._push_mapper_data.tell(chunk_graph)
518551
if self.result.status == SubtaskStatus.succeeded:
519552
cost_time_secs = (
520553
self.result.execution_end_time - self.result.execution_start_time
@@ -536,6 +569,9 @@ async def run(self):
536569
pass
537570
return self.result
538571

572+
async def post_run(self):
573+
await self._push_mapper_data()
574+
539575
async def report_progress_periodically(self, interval=0.5, eps=0.001):
540576
last_progress = self.result.progress
541577
while not self.result.status.is_done:
@@ -618,7 +654,7 @@ async def _init_context(self, session_id: str):
618654
await context.init()
619655
set_context(context)
620656

621-
async def run(self, subtask: Subtask):
657+
async def run(self, subtask: Subtask, wait_post_run: bool = False):
622658
logger.info("Start to run subtask: %r on %s.", subtask, self.address)
623659

624660
assert subtask.session_id == self._session_id
@@ -639,10 +675,18 @@ async def run(self, subtask: Subtask):
639675
try:
640676
result = yield self._running_aio_task
641677
logger.info("Finished subtask: %s", subtask.subtask_id)
678+
# post run with actor tell which will not block
679+
if not wait_post_run:
680+
await self.ref().post_run.tell(processor)
681+
else:
682+
await self.post_run(processor)
642683
raise mo.Return(result)
643684
finally:
644685
self._processor = self._running_aio_task = None
645686

687+
async def post_run(self, processor: SubtaskProcessor):
688+
await processor.post_run()
689+
646690
async def wait(self):
647691
return self._processor.is_done.wait()
648692

mars/services/subtask/worker/runner.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ async def _get_supervisor_address(self, session_id: str):
8989
[address] = await self._cluster_api.get_supervisors_by_keys([session_id])
9090
return address
9191

92-
async def run_subtask(self, subtask: Subtask):
92+
async def run_subtask(self, subtask: Subtask, wait_post_run: bool = False):
9393
if self._running_processor is not None: # pragma: no cover
9494
running_subtask_id = await self._running_processor.get_running_subtask_id()
9595
# current subtask is still running
@@ -122,7 +122,9 @@ async def run_subtask(self, subtask: Subtask):
122122
processor = self._session_id_to_processors[session_id]
123123
try:
124124
self._running_processor = self._last_processor = processor
125-
result = yield self._running_processor.run(subtask)
125+
result = yield self._running_processor.run(
126+
subtask, wait_post_run=wait_post_run
127+
)
126128
finally:
127129
self._running_processor = None
128130
raise mo.Return(result)

mars/services/subtask/worker/tests/test_subtask.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@
1818
import time
1919

2020
import numpy as np
21+
import pandas as pd
2122
import pytest
2223
import pytest_asyncio
2324

2425
from ..... import oscar as mo
26+
from ..... import dataframe as md
2527
from ..... import tensor as mt
2628
from ..... import remote as mr
27-
from .....core import ExecutionError
29+
from .....core import ExecutionError, ChunkGraph
2830
from .....core.context import get_context
2931
from .....core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
30-
32+
from .....core.operand import OperandStage
3133
from .....resource import Resource
3234
from .....utils import Timer
3335
from ....cluster import MockClusterAPI
@@ -36,7 +38,7 @@
3638
from ....scheduling import MockSchedulingAPI
3739
from ....session import MockSessionAPI
3840
from ....storage import MockStorageAPI
39-
from ....task import new_task_id
41+
from ....task import new_task_id, MapReduceInfo
4042
from ....task.supervisor.manager import TaskManagerActor, TaskConfigurationActor
4143
from ....mutable import MockMutableAPI
4244
from ... import Subtask, SubtaskStatus, SubtaskResult
@@ -48,6 +50,13 @@ class FakeTaskManager(TaskManagerActor):
4850
def set_subtask_result(self, subtask_result: SubtaskResult):
4951
return
5052

53+
def get_map_reduce_info(self, task_id: str, map_reduce_id: int) -> MapReduceInfo:
54+
return MapReduceInfo(
55+
map_reduce_id=0,
56+
reducer_indexes=[(0, 0)],
57+
reducer_bands=[(self.address, "numa-0")],
58+
)
59+
5160

5261
@pytest_asyncio.fixture
5362
async def actor_pool():
@@ -144,6 +153,39 @@ async def test_subtask_success(actor_pool):
144153
assert await subtask_runner.is_runner_free() is True
145154

146155

156+
@pytest.mark.asyncio
157+
async def test_shuffle_subtask(actor_pool):
158+
pool, session_id, meta_api, storage_api, manager = actor_pool
159+
160+
pdf = pd.DataFrame({"f1": ["a", "b", "a"], "f2": [1, 2, 3]})
161+
df = md.DataFrame(pdf)
162+
result = df.groupby("f1").sum(method="shuffle")
163+
164+
graph = TileableGraph([result.data])
165+
next(TileableGraphBuilder(graph).build())
166+
chunk_graph = next(ChunkGraphBuilder(graph, fuse_enabled=False).build())
167+
result_chunks = []
168+
new_chunk_graph = ChunkGraph(result_chunks)
169+
chunk_graph_iter = chunk_graph.topological_iter()
170+
curr = None
171+
for _ in range(3):
172+
prev = curr
173+
curr = next(chunk_graph_iter)
174+
new_chunk_graph.add_node(curr)
175+
if prev is not None:
176+
new_chunk_graph.add_edge(prev, curr)
177+
assert curr.op.stage == OperandStage.map
178+
curr.op.extra_params = {"analyzer_map_reduce_id": 0}
179+
result_chunks.append(curr)
180+
subtask = Subtask(new_task_id(), session_id, new_task_id(), new_chunk_graph)
181+
subtask_runner: SubtaskRunnerRef = await mo.actor_ref(
182+
SubtaskRunnerActor.gen_uid("numa-0", 0), address=pool.external_address
183+
)
184+
await subtask_runner.run_subtask(subtask, wait_post_run=True)
185+
result = await subtask_runner.get_subtask_result()
186+
assert result.status == SubtaskStatus.succeeded
187+
188+
147189
@pytest.mark.asyncio
148190
async def test_subtask_failure(actor_pool):
149191
pool, session_id, meta_api, storage_api, manager = actor_pool

mars/services/task/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,5 @@
1414

1515
from .api import AbstractTaskAPI, TaskAPI, WebTaskAPI
1616
from .config import task_options
17-
from .core import Task, TaskStatus, TaskResult, new_task_id
17+
from .core import Task, TaskStatus, TaskResult, new_task_id, MapReduceInfo
1818
from .errors import TaskNotExist

mars/services/task/analyzer/analyzer.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,28 @@
1919

2020
from ....config import Config
2121
from ....core import ChunkGraph, ChunkType, enter_mode
22-
from ....core.operand import Fetch, VirtualOperand, LogicKeyGenerator
22+
from ....core.operand import (
23+
Fetch,
24+
VirtualOperand,
25+
LogicKeyGenerator,
26+
ShuffleProxy,
27+
OperandStage,
28+
)
29+
from ....lib.ordered_set import OrderedSet
2330
from ....resource import Resource
2431
from ....typing import BandType
2532
from ....utils import build_fetch, tokenize
2633
from ...subtask import SubtaskGraph, Subtask
27-
from ..core import Task, new_task_id
34+
from ..core import Task, new_task_id, MapReduceInfo
2835
from .assigner import AbstractGraphAssigner, GraphAssigner
2936
from .fusion import Coloring
3037

3138
logger = logging.getLogger(__name__)
3239

3340

3441
class GraphAnalyzer:
42+
_map_reduce_id = itertools.count()
43+
3544
def __init__(
3645
self,
3746
chunk_graph: ChunkGraph,
@@ -41,6 +50,7 @@ def __init__(
4150
chunk_to_subtasks: Dict[ChunkType, Subtask],
4251
graph_assigner_cls: Type[AbstractGraphAssigner] = None,
4352
stage_id: str = None,
53+
map_reduce_id_to_infos: Dict[int, MapReduceInfo] = None,
4454
):
4555
self._chunk_graph = chunk_graph
4656
self._band_resource = band_resource
@@ -50,12 +60,17 @@ def __init__(
5060
self._fuse_enabled = task.fuse_enabled
5161
self._extra_config = task.extra_config
5262
self._chunk_to_subtasks = chunk_to_subtasks
63+
self._map_reduce_id_to_infos = map_reduce_id_to_infos
5364
if graph_assigner_cls is None:
5465
graph_assigner_cls = GraphAssigner
5566
self._graph_assigner_cls = graph_assigner_cls
5667
self._chunk_to_copied = dict()
5768
self._logic_key_generator = LogicKeyGenerator()
5869

70+
@classmethod
71+
def next_map_reduce_id(cls) -> int:
72+
return next(cls._map_reduce_id)
73+
5974
@classmethod
6075
def _iter_start_ops(cls, chunk_graph: ChunkGraph):
6176
visited = set()
@@ -282,6 +297,38 @@ def _gen_logic_key(self, chunks: List[ChunkType]):
282297
*[self._logic_key_generator.get_logic_key(chunk.op) for chunk in chunks]
283298
)
284299

300+
def _gen_map_reduce_info(
301+
self, chunk: ChunkType, assign_results: Dict[ChunkType, BandType]
302+
):
303+
reducer_ops = OrderedSet(
304+
[
305+
c.op
306+
for c in self._chunk_graph.successors(chunk)
307+
if c.op.stage == OperandStage.reduce
308+
]
309+
)
310+
map_chunks = [
311+
c
312+
for c in self._chunk_graph.predecessors(chunk)
313+
if c.op.stage == OperandStage.map
314+
]
315+
map_reduce_id = self.next_map_reduce_id()
316+
for map_chunk in map_chunks:
317+
# record analyzer map reduce id for mapper op
318+
# copied chunk exists because map chunk must have
319+
# been processed before shuffle proxy
320+
copied_map_chunk_op = self._chunk_to_copied[map_chunk].op
321+
if not hasattr(copied_map_chunk_op, "extra_params"):
322+
copied_map_chunk_op.extra_params = dict()
323+
copied_map_chunk_op.extra_params["analyzer_map_reduce_id"] = map_reduce_id
324+
reducer_bands = [assign_results[r.outputs[0]] for r in reducer_ops]
325+
map_reduce_info = MapReduceInfo(
326+
map_reduce_id=map_reduce_id,
327+
reducer_indexes=[reducer_op.reducer_index for reducer_op in reducer_ops],
328+
reducer_bands=reducer_bands,
329+
)
330+
self._map_reduce_id_to_infos[map_reduce_id] = map_reduce_info
331+
285332
@enter_mode(build=True)
286333
def gen_subtask_graph(
287334
self, op_to_bands: Dict[str, BandType] = None
@@ -400,6 +447,10 @@ def gen_subtask_graph(
400447

401448
for c in same_color_chunks:
402449
chunk_to_subtask[c] = subtask
450+
if self._map_reduce_id_to_infos is not None and isinstance(
451+
chunk.op, ShuffleProxy
452+
):
453+
self._gen_map_reduce_info(chunk, chunk_to_bands)
403454
visited.update(same_color_chunks)
404455

405456
for subtasks in logic_key_to_subtasks.values():

mars/services/task/api/oscar.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from ....core import Tileable
1919
from ....lib.aio import alru_cache
2020
from ...subtask import SubtaskResult
21-
from ..core import TileableGraph, TaskResult
21+
from ..core import TileableGraph, TaskResult, MapReduceInfo
2222
from ..supervisor.manager import TaskManagerActor
2323
from .core import AbstractTaskAPI
2424

@@ -104,3 +104,8 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
104104

105105
async def get_last_idle_time(self) -> Union[float, None]:
106106
return await self._task_manager_ref.get_last_idle_time()
107+
108+
async def get_map_reduce_info(
109+
self, task_id: str, map_reduce_id: int
110+
) -> MapReduceInfo:
111+
return await self._task_manager_ref.get_map_reduce_info(task_id, map_reduce_id)

0 commit comments

Comments
 (0)