Skip to content

Commit 08d7cfe

Browse files
author
Xuye (Chris) Qin
authored
Assign reducer ops in task assigner to make them more balanced across cluster (#3048)
1 parent 7840183 commit 08d7cfe

File tree

6 files changed

+104
-21
lines changed

6 files changed

+104
-21
lines changed

benchmarks/tpch/run_queries.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ def g2(x):
617617
def q13(customer, orders):
618618
customer_filtered = customer.loc[:, ["C_CUSTKEY"]]
619619
orders_filtered = orders[
620-
~orders["O_COMMENT"].str.contains("special(\S|\s)*requests")
620+
~orders["O_COMMENT"].str.contains("special[\S|\s]*requests")
621621
]
622622
orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]]
623623
c_o_merged = customer_filtered.merge(

mars/core/operand/shuffle.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ class ShuffleProxy(VirtualOperand):
2727

2828

2929
class MapReduceOperand(Operand):
30-
reducer_index = TupleField("reducer_index", FieldTypes.uint64)
30+
# for mapper
3131
mapper_id = Int32Field("mapper_id", default=0)
32+
# for reducer
33+
reducer_index = TupleField("reducer_index", FieldTypes.uint64)
3234
reducer_phase = StringField("reducer_phase", default=None)
3335

3436
def _new_chunks(self, inputs, kws=None, **kw):

mars/dataframe/groupby/aggregation.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,9 +806,12 @@ def _choose_tree_method(
806806
len(ctx.get_worker_addresses()) > 1
807807
and estimate_size > chunk_store_limit
808808
and np.mean(agg_sizes) > 1024**2
809+
and total_count <= 256
809810
):
810811
# for distributed, if estimate size could be potentially large,
811812
# and each chunk size is large enough(>1M, small chunk means large error),
813+
# total count is relatively small(<=256, large number of chunks
814+
# is not quite efficient for shuffle)
812815
# we choose to use shuffle
813816
return False
814817
# calculate the coefficient of variation of aggregation sizes,

mars/services/task/analyzer/analyzer.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,15 @@
1919

2020
from ....config import Config
2121
from ....core import ChunkGraph, ChunkType, enter_mode
22-
from ....core.operand import Fetch, VirtualOperand, LogicKeyGenerator
22+
from ....core.operand import (
23+
Fetch,
24+
VirtualOperand,
25+
LogicKeyGenerator,
26+
MapReduceOperand,
27+
OperandStage,
28+
)
2329
from ....resource import Resource
24-
from ....typing import BandType
30+
from ....typing import BandType, OperandType
2531
from ....utils import build_fetch, tokenize
2632
from ...subtask import SubtaskGraph, Subtask
2733
from ..core import Task, new_task_id
@@ -31,6 +37,18 @@
3137
logger = logging.getLogger(__name__)
3238

3339

40+
def need_reassign_worker(op: OperandType) -> bool:
41+
# NOTE(qinxuye): special process for reducer
42+
# We'd better set reducer op's stage to reduce, however,
43+
# in many case, we copy a reducer op from tileable op,
44+
# then set stage as reducer one,
45+
# it would be quite nasty to take over the __setattr__ and
46+
# make reassign_worker True etc.
47+
return op.reassign_worker or (
48+
isinstance(op, MapReduceOperand) and op.stage == OperandStage.reduce
49+
)
50+
51+
3452
class GraphAnalyzer:
3553
def __init__(
3654
self,
@@ -294,8 +312,10 @@ def gen_subtask_graph(
294312
subtask_graph: SubtaskGraph
295313
Subtask graph.
296314
"""
315+
# reassign worker when specified reassign_worker = True
316+
# or it's a reducer operands
297317
reassign_worker_ops = [
298-
chunk.op for chunk in self._chunk_graph if chunk.op.reassign_worker
318+
chunk.op for chunk in self._chunk_graph if need_reassign_worker(chunk.op)
299319
]
300320
start_ops = (
301321
list(self._iter_start_ops(self._chunk_graph))

mars/services/task/analyzer/assigner.py

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
from abc import ABC, abstractmethod
1616
from collections import defaultdict
1717
from operator import itemgetter
18-
from typing import List, Dict, Set, Union
18+
from typing import List, Dict, Union
1919

2020
import numpy as np
2121

2222
from ....core import ChunkGraph, ChunkData
2323
from ....core.operand import Operand
24+
from ....lib.ordered_set import OrderedSet
2425
from ....resource import Resource
2526
from ....typing import BandType
2627
from ....utils import implements
@@ -77,8 +78,9 @@ def __init__(
7778
band_resource: Dict[BandType, Resource],
7879
):
7980
super().__init__(chunk_graph, start_ops, band_resource)
80-
self._undirected_chunk_graph = None
81-
self._op_keys: Set[str] = {start_op.key for start_op in start_ops}
81+
self._op_keys: OrderedSet[str] = OrderedSet(
82+
[start_op.key for start_op in start_ops]
83+
)
8284

8385
def _calc_band_assign_limits(
8486
self, initial_count: int, occupied: Dict[BandType, int]
@@ -124,13 +126,15 @@ def _calc_band_assign_limits(
124126
pos = (pos + 1) % len(counts)
125127
return dict(zip(bands, counts))
126128

129+
@classmethod
127130
def _assign_by_bfs(
128-
self,
131+
cls,
132+
undirected_chunk_graph: ChunkGraph,
129133
start: ChunkData,
130134
band: BandType,
131135
initial_sizes: Dict[BandType, int],
132136
spread_limits: Dict[BandType, float],
133-
key_to_assign: Set[str],
137+
key_to_assign: OrderedSet[str],
134138
assigned_record: Dict[str, Union[str, BandType]],
135139
):
136140
"""
@@ -140,19 +144,15 @@ def _assign_by_bfs(
140144
if initial_sizes[band] <= 0:
141145
return
142146

143-
graph = self._chunk_graph
144-
if self._undirected_chunk_graph is None:
145-
self._undirected_chunk_graph = graph.build_undirected()
146-
undirected_chunk_graph = self._undirected_chunk_graph
147-
148147
assigned = 0
149148
spread_range = 0
150149
for chunk in undirected_chunk_graph.bfs(start=start, visit_predicate="all"):
151150
op_key = chunk.op.key
152151
if op_key in assigned_record:
153152
continue
154153
spread_range += 1
155-
# `op_key` may not be in `key_to_assign`, but we need to record it to avoid iterate the node repeatedly.
154+
# `op_key` may not be in `key_to_assign`,
155+
# but we need to record it to avoid iterate the node repeatedly.
156156
assigned_record[op_key] = band
157157
if op_key not in key_to_assign:
158158
continue
@@ -161,8 +161,22 @@ def _assign_by_bfs(
161161
break
162162
initial_sizes[band] -= assigned
163163

164+
def _build_undirected_chunk_graph(
165+
self, chunk_to_assign: List[ChunkData]
166+
) -> ChunkGraph:
167+
chunk_graph = self._chunk_graph.copy()
168+
# remove edges for all chunk_to_assign which may contain chunks
169+
# that need be reassigned
170+
for chunk in chunk_to_assign:
171+
if chunk_graph.count_predecessors(chunk) > 0:
172+
for pred in list(chunk_graph.predecessors(chunk)):
173+
chunk_graph.remove_edge(pred, chunk)
174+
return chunk_graph.build_undirected()
175+
164176
@implements(AbstractGraphAssigner.assign)
165-
def assign(self, cur_assigns: Dict[str, str] = None) -> Dict[ChunkData, BandType]:
177+
def assign(
178+
self, cur_assigns: Dict[str, BandType] = None
179+
) -> Dict[ChunkData, BandType]:
166180
graph = self._chunk_graph
167181
assign_result = dict()
168182
cur_assigns = cur_assigns or dict()
@@ -173,7 +187,7 @@ def assign(self, cur_assigns: Dict[str, str] = None) -> Dict[ChunkData, BandType
173187
for chunk in graph:
174188
op_key_to_chunks[chunk.op.key].append(chunk)
175189

176-
op_keys = set(self._op_keys)
190+
op_keys = OrderedSet(self._op_keys)
177191
chunk_to_assign = [
178192
op_key_to_chunks[op_key][0]
179193
for op_key in op_keys
@@ -183,6 +197,9 @@ def assign(self, cur_assigns: Dict[str, str] = None) -> Dict[ChunkData, BandType
183197
for band in cur_assigns.values():
184198
assigned_counts[band] += 1
185199

200+
# build undirected graph
201+
undirected_chunk_graph = self._build_undirected_chunk_graph(chunk_to_assign)
202+
186203
# calculate the number of chunks to be assigned to each band
187204
# given number of bands and existing assignments
188205
band_quotas = self._calc_band_assign_limits(
@@ -195,14 +212,20 @@ def assign(self, cur_assigns: Dict[str, str] = None) -> Dict[ChunkData, BandType
195212
spread_ranges = defaultdict(lambda: average_spread_range)
196213
# assign from other chunks to be assigned
197214
# TODO: sort by what?
198-
sorted_candidates = [v for v in chunk_to_assign]
215+
sorted_candidates = chunk_to_assign.copy()
199216
while max(band_quotas.values()):
200217
band = max(band_quotas, key=lambda k: band_quotas[k])
201218
cur = sorted_candidates.pop()
202219
while cur.op.key in cur_assigns:
203220
cur = sorted_candidates.pop()
204221
self._assign_by_bfs(
205-
cur, band, band_quotas, spread_ranges, op_keys, cur_assigns
222+
undirected_chunk_graph,
223+
cur,
224+
band,
225+
band_quotas,
226+
spread_ranges,
227+
op_keys,
228+
cur_assigns,
206229
)
207230

208231
key_to_assign = {n.op.key for n in chunk_to_assign} | initial_assigned_op_keys

mars/services/task/analyzer/tests/test_assigner.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,19 @@
1313
# limitations under the License.
1414

1515
import numpy as np
16+
import pandas as pd
1617

18+
from ..... import dataframe as md
1719
from .....config import Config
1820
from .....core import ChunkGraph
21+
from .....core.graph.builder.utils import build_graph
22+
from .....core.operand import OperandStage
1923
from .....tensor.random import TensorRand
2024
from .....tensor.arithmetic import TensorAdd
2125
from .....tensor.fetch import TensorFetch
2226
from .....resource import Resource
2327
from ...core import Task
24-
from ..analyzer import GraphAnalyzer
28+
from ..analyzer import GraphAnalyzer, need_reassign_worker
2529
from ..assigner import GraphAssigner
2630

2731

@@ -71,3 +75,34 @@ def test_assigner_with_fetch_inputs():
7175
for inp in input_chunks:
7276
if not isinstance(inp.op, TensorFetch):
7377
assert subtask.expect_band == key_to_assign[inp.key]
78+
79+
80+
def test_shuffle_assign():
81+
band_num = 8
82+
all_bands = [(f"address_{i}", "numa-0") for i in range(band_num)]
83+
84+
pdf = pd.DataFrame(np.random.rand(32, 4))
85+
df = md.DataFrame(pdf, chunk_size=4)
86+
r = df.groupby(0).sum(method="shuffle")
87+
chunk_graph = build_graph([r], tile=True)
88+
89+
band_resource = dict((band, Resource(num_cpus=1)) for band in all_bands)
90+
91+
reassign_worker_ops = [
92+
chunk.op for chunk in chunk_graph if need_reassign_worker(chunk.op)
93+
]
94+
start_ops = list(GraphAnalyzer._iter_start_ops(chunk_graph))
95+
to_assign_ops = start_ops + reassign_worker_ops
96+
97+
assigner = GraphAssigner(chunk_graph, to_assign_ops, band_resource)
98+
assigns = assigner.assign()
99+
assert len(assigns) == 16
100+
init_assigns = set()
101+
reducer_assigns = set()
102+
for chunk, assign in assigns.items():
103+
if chunk.op.stage == OperandStage.reduce:
104+
reducer_assigns.add(assign)
105+
else:
106+
init_assigns.add(assign)
107+
# init and reducers are assigned on all bands
108+
assert len(init_assigns) == len(reducer_assigns) == 8

0 commit comments

Comments
 (0)