Skip to content

Commit cc9d34f

Browse files
authored
Reduce estimation time cost (#2577)
1 parent b709458 commit cc9d34f

File tree

7 files changed

+167
-24
lines changed

7 files changed

+167
-24
lines changed

mars/core/operand/core.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -335,20 +335,22 @@ def post_execute(cls, ctx: Union[dict, Context], op: OperandType):
335335
def estimate_size(cls, ctx: dict, op: OperandType):
336336
from .fetch import FetchShuffle
337337

338-
exec_size = 0
338+
# when sizes of all outputs are deterministic, return directly
339339
outputs = op.outputs
340-
pure_dep_keys = set(
341-
inp.key
342-
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
343-
if is_dep
344-
)
345340
if all(
346341
not c.is_sparse() and hasattr(c, "nbytes") and not np.isnan(c.nbytes)
347342
for c in outputs
348343
):
349344
for out in outputs:
350345
ctx[out.key] = (out.nbytes, out.nbytes)
346+
return
351347

348+
pure_dep_keys = set(
349+
inp.key
350+
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
351+
if is_dep
352+
)
353+
exec_sizes = [0]
352354
for inp in op.inputs or ():
353355
if inp.key in pure_dep_keys:
354356
continue
@@ -361,13 +363,16 @@ def estimate_size(cls, ctx: dict, op: OperandType):
361363
# execution size of a specific data chunk may be
362364
# larger than stored type due to objects
363365
for key, shape in keys_and_shapes:
364-
exec_size += ctx[key][0]
366+
exec_sizes.append(ctx[key][0])
365367
except KeyError:
366368
if not op.sparse:
367369
inp_size = calc_data_size(inp)
368370
if not np.isnan(inp_size):
369-
exec_size += inp_size
370-
exec_size = int(exec_size)
371+
exec_sizes.append(inp_size)
372+
if any(c.is_sparse() for c in op.inputs):
373+
exec_size = sum(exec_sizes)
374+
else:
375+
exec_size = max(exec_sizes)
371376

372377
total_out_size = 0
373378
chunk_sizes = dict()
@@ -408,7 +413,7 @@ def estimate_size(cls, ctx: dict, op: OperandType):
408413
max_sparse_size = np.nan
409414
if not np.isnan(max_sparse_size):
410415
result_size = min(result_size, max_sparse_size)
411-
ctx[out.key] = (result_size, exec_size * memory_scale // len(outputs))
416+
ctx[out.key] = (result_size, int(exec_size * memory_scale // len(outputs)))
412417

413418
@classmethod
414419
def concat_tileable_chunks(cls, tileable: TileableType):

mars/lib/groupby_wrapper.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import pandas as pd
2121
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy
2222

23+
from ..utils import estimate_pandas_size
2324
from .version import parse as parse_version
2425

2526
_HAS_SQUEEZE = parse_version(pd.__version__) < parse_version("1.1.0")
@@ -124,6 +125,9 @@ def __sizeof__(self):
124125
getattr(self.groupby_obj.grouper, "_cache", None)
125126
)
126127

128+
def estimate_size(self):
129+
return estimate_pandas_size(self.obj) + estimate_pandas_size(self.obj.index)
130+
127131
def __reduce__(self):
128132
return (
129133
type(self).from_tuple,

mars/lib/tests/test_lib.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import numpy as np
2020

2121
from ...tests.core import assert_groupby_equal
22-
from ...utils import calc_data_size
22+
from ...utils import calc_data_size, estimate_pandas_size
2323
from ..groupby_wrapper import wrapped_groupby
2424

2525

@@ -42,6 +42,7 @@ def test_groupby_wrapper():
4242
assert grouped.is_frame is True
4343
assert sys.getsizeof(grouped) > sys.getsizeof(grouped.groupby_obj)
4444
assert calc_data_size(grouped) > sys.getsizeof(grouped.groupby_obj)
45+
assert grouped.estimate_size() > estimate_pandas_size(grouped.groupby_obj)
4546

4647
grouped = conv_func(wrapped_groupby(df, level=0).C)
4748
assert_groupby_equal(grouped, df.groupby(level=0).C)

mars/services/scheduling/worker/execution.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -207,19 +207,22 @@ async def _collect_input_sizes(
207207
*(storage_api.get_infos.delay(k) for k in fetch_keys)
208208
)
209209

210+
# compute memory quota size. when data located in shared memory, the cost
211+
# should be differences between deserialized memory cost and serialized cost,
212+
# otherwise we should take deserialized memory cost
210213
for key, meta, infos in zip(fetch_keys, fetch_metas, data_infos):
211214
level = functools.reduce(operator.or_, (info.level for info in infos))
212215
if level & StorageLevel.MEMORY:
213216
mem_cost = max(0, meta["memory_size"] - meta["store_size"])
214217
else:
215218
mem_cost = meta["memory_size"]
216-
sizes[key] = (mem_cost, mem_cost)
219+
sizes[key] = (meta["store_size"], mem_cost)
217220

218221
return sizes
219222

220223
@classmethod
221224
def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
222-
size_context = {k: (s, 0) for k, (s, _c) in input_sizes.items()}
225+
size_context = dict(input_sizes.items())
223226
graph = subtask.chunk_graph
224227

225228
key_to_ops = defaultdict(set)
@@ -243,7 +246,7 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
243246

244247
visited_op_keys = set()
245248
total_memory_cost = 0
246-
max_memory_cost = 0
249+
max_memory_cost = sum(calc_size for _, calc_size in size_context.values())
247250
while key_stack:
248251
key = key_stack.pop()
249252
op = key_to_ops[key][0]
@@ -255,24 +258,31 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
255258
total_memory_cost += calc_cost
256259
max_memory_cost = max(total_memory_cost, max_memory_cost)
257260

258-
result_cost = sum(size_context[out.key][0] for out in op.outputs)
259-
total_memory_cost += result_cost - calc_cost
261+
if not isinstance(op, Fetch):
262+
# when calculation result is stored, memory cost of calculation
263+
# can be replaced with result memory cost
264+
result_cost = sum(size_context[out.key][0] for out in op.outputs)
265+
total_memory_cost += result_cost - calc_cost
260266

261-
visited_op_keys.add(op.key)
267+
visited_op_keys.add(key)
262268

263269
for succ_op_key in op_key_graph.iter_successors(key):
264270
pred_ref_count[succ_op_key] -= 1
265271
if pred_ref_count[succ_op_key] == 0:
266272
key_stack.append(succ_op_key)
273+
267274
for pred_op_key in op_key_graph.iter_predecessors(key):
268275
succ_ref_count[pred_op_key] -= 1
269276
if succ_ref_count[pred_op_key] == 0:
277+
pred_op = key_to_ops[pred_op_key][0]
278+
# when clearing fetches, subtract memory size, otherwise subtract store size
279+
account_idx = 1 if isinstance(pred_op, Fetch) else 0
270280
pop_result_cost = sum(
271-
size_context.pop(out.key, (0, 0))[0]
281+
size_context.pop(out.key, (0, 0))[account_idx]
272282
for out in key_to_ops[pred_op_key][0].outputs
273283
)
274284
total_memory_cost -= pop_result_cost
275-
return sum(t[1] for t in size_context.values()), max_memory_cost
285+
return sum(t[0] for t in size_context.values()), max_memory_cost
276286

277287
@classmethod
278288
def _check_cancelling(cls, subtask_info: SubtaskExecutionInfo):

mars/services/scheduling/worker/tests/test_execution.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,18 @@
2121
from typing import Tuple
2222

2323
import numpy as np
24+
import pandas as pd
2425
import pytest
2526

2627
from ..... import oscar as mo
2728
from ..... import remote as mr
28-
from .....core import ChunkGraph, ChunkGraphBuilder, TileableGraph, TileableGraphBuilder
29+
from .....core import (
30+
ChunkGraph,
31+
ChunkGraphBuilder,
32+
TileableGraph,
33+
TileableGraphBuilder,
34+
OutputType,
35+
)
2936
from .....remote.core import RemoteFunction
3037
from .....tensor.fetch import TensorFetch
3138
from .....tensor.arithmetic import TensorTreeAdd
@@ -384,6 +391,47 @@ def delay_fun(delay, _inp1):
384391
)
385392

386393

394+
def test_estimate_size():
395+
from ..execution import SubtaskExecutionActor
396+
from .....dataframe.arithmetic import DataFrameAdd
397+
from .....dataframe.fetch import DataFrameFetch
398+
from .....dataframe.utils import parse_index
399+
400+
index_value = parse_index(pd.Int64Index([10, 20, 30]))
401+
402+
input1 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
403+
[], _key="INPUT1", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
404+
)
405+
input2 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
406+
[], _key="INPUT2", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
407+
)
408+
result_chunk = DataFrameAdd(
409+
axis=0, output_types=[OutputType.series], lhs=input1, rhs=input2
410+
).new_chunk(
411+
[input1, input2],
412+
_key="ADD_RESULT",
413+
shape=(np.nan,),
414+
dtype=np.dtype("O"),
415+
index_value=index_value,
416+
)
417+
418+
chunk_graph = ChunkGraph([result_chunk])
419+
chunk_graph.add_node(input1)
420+
chunk_graph.add_node(input2)
421+
chunk_graph.add_node(result_chunk)
422+
chunk_graph.add_edge(input1, result_chunk)
423+
chunk_graph.add_edge(input2, result_chunk)
424+
425+
input_sizes = {
426+
"INPUT1": (1024, 1024),
427+
"INPUT2": (1024, 1024),
428+
}
429+
430+
subtask = Subtask("test_subtask", session_id="session_id", chunk_graph=chunk_graph)
431+
result = SubtaskExecutionActor._estimate_sizes(subtask, input_sizes)
432+
assert result[0] == 1024
433+
434+
387435
@pytest.mark.asyncio
388436
@pytest.mark.parametrize("actor_pool", [(1, False)], indirect=True)
389437
async def test_cancel_without_kill(actor_pool):

mars/tests/test_utils.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ def test_lazy_import():
188188
old_sys_path = sys.path
189189
mock_mod = textwrap.dedent(
190190
"""
191-
__version__ = '0.1.0b1'
192-
""".strip()
191+
__version__ = '0.1.0b1'
192+
""".strip()
193193
)
194194

195195
temp_dir = tempfile.mkdtemp(prefix="mars-utils-test-")
@@ -482,6 +482,40 @@ def test_readable_size():
482482
assert utils.readable_size(14354000000000) == "13.05T"
483483

484484

485+
def test_estimate_pandas_size():
486+
df1 = pd.DataFrame(np.random.rand(50, 10))
487+
assert utils.estimate_pandas_size(df1) == sys.getsizeof(df1)
488+
489+
df2 = pd.DataFrame(np.random.rand(1000, 10))
490+
assert utils.estimate_pandas_size(df2) == sys.getsizeof(df2)
491+
492+
df3 = pd.DataFrame(
493+
{
494+
"A": np.random.choice(["abcd", "def", "gh"], size=(1000,)),
495+
"B": np.random.rand(1000),
496+
"C": np.random.rand(1000),
497+
}
498+
)
499+
assert utils.estimate_pandas_size(df3) != sys.getsizeof(df3)
500+
501+
s1 = pd.Series(np.random.rand(1000))
502+
assert utils.estimate_pandas_size(s1) == sys.getsizeof(s1)
503+
504+
from ..dataframe.arrays import ArrowStringArray
505+
506+
array = ArrowStringArray(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
507+
s2 = pd.Series(array)
508+
assert utils.estimate_pandas_size(s2) == sys.getsizeof(s2)
509+
510+
s3 = pd.Series(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
511+
assert utils.estimate_pandas_size(s3) != sys.getsizeof(s3)
512+
513+
idx1 = pd.MultiIndex.from_arrays(
514+
[np.arange(0, 1000), np.random.choice(["abcd", "def", "gh"], size=(1000,))]
515+
)
516+
assert utils.estimate_pandas_size(idx1) != sys.getsizeof(idx1)
517+
518+
485519
@require_ray
486520
def test_web_serialize_lambda():
487521
register_ray_serializers()

mars/utils.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,10 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
380380
return sum(calc_data_size(c) for c in dt)
381381

382382
shape = getattr(dt, "shape", None) or shape
383-
if hasattr(dt, "memory_usage") or hasattr(dt, "groupby_obj"):
384-
return sys.getsizeof(dt)
383+
if isinstance(dt, (pd.DataFrame, pd.Series)):
384+
return estimate_pandas_size(dt)
385+
if hasattr(dt, "estimate_size"):
386+
return dt.estimate_size()
385387
if hasattr(dt, "nbytes"):
386388
return max(sys.getsizeof(dt), dt.nbytes)
387389
if hasattr(dt, "shape") and len(dt.shape) == 0:
@@ -404,6 +406,45 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
404406
return sys.getsizeof(dt)
405407

406408

409+
def estimate_pandas_size(
410+
df_obj, max_samples: int = 10, min_sample_rows: int = 100
411+
) -> int:
412+
if len(df_obj) <= min_sample_rows or isinstance(df_obj, pd.RangeIndex):
413+
return sys.getsizeof(df_obj)
414+
415+
from .dataframe.arrays import ArrowDtype
416+
417+
def _is_fast_dtype(dtype):
418+
if isinstance(dtype, np.dtype):
419+
return np.issubdtype(dtype, np.number)
420+
else:
421+
return isinstance(dtype, ArrowDtype)
422+
423+
dtypes = []
424+
if isinstance(df_obj, pd.DataFrame):
425+
dtypes.extend(df_obj.dtypes)
426+
index_obj = df_obj.index
427+
elif isinstance(df_obj, pd.Series):
428+
dtypes.append(df_obj.dtype)
429+
index_obj = df_obj.index
430+
else:
431+
index_obj = df_obj
432+
433+
# handling possible MultiIndex
434+
if hasattr(index_obj, "dtypes"):
435+
dtypes.extend(index_obj.dtypes)
436+
else:
437+
dtypes.append(index_obj.dtype)
438+
439+
if all(_is_fast_dtype(dtype) for dtype in dtypes):
440+
return sys.getsizeof(df_obj)
441+
442+
indices = np.sort(np.random.choice(len(df_obj), size=max_samples, replace=False))
443+
iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc
444+
sample_size = sys.getsizeof(iloc[indices])
445+
return sample_size * len(df_obj) // max_samples
446+
447+
407448
def build_fetch_chunk(
408449
chunk: ChunkType, input_chunk_keys: List[str] = None, **kwargs
409450
) -> ChunkType:

0 commit comments

Comments
 (0)