Skip to content

Commit fe7064d

Browse files
author
Xuye (Chris) Qin
authored
Optimize tile of DataFrame.__setitem__ by reducing time of generating chunk meta (#3140)
1 parent 424cfb9 commit fe7064d

File tree

5 files changed

+62
-28
lines changed

5 files changed

+62
-28
lines changed

benchmarks/asv_bench/benchmarks/graph_builder.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,10 @@ def setup(self):
3030
def time_filter(self):
3131
df = self.df[self.df["a"] < 0.8]
3232
build_graph([df], tile=True)
33+
34+
def time_setitem(self):
35+
df2 = self.df.copy()
36+
df2["k"] = df2["c"]
37+
df2["l"] = df2["a"] * (1 - df2["d"])
38+
df2["m"] = df2["e"] * (1 + df2["d"]) * (1 - df2["h"])
39+
build_graph([df2], tile=True)

benchmarks/tpch/run_queries.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,8 +1051,12 @@ def main():
10511051
queries = (
10521052
set(x.lower().strip() for x in args.query.split(",")) if args.query else None
10531053
)
1054-
mars.new_session(endpoint)
1055-
run_queries(folder, use_arrow_dtype=use_arrow_dtype)
1054+
sess = mars.new_session(endpoint)
1055+
try:
1056+
run_queries(folder, use_arrow_dtype=use_arrow_dtype)
1057+
finally:
1058+
if endpoint is None:
1059+
sess.stop_server()
10561060

10571061

10581062
if __name__ == "__main__":

mars/dataframe/align.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
build_split_idx_to_origin_idx,
4040
filter_index_value,
4141
hash_index,
42+
is_index_value_identical,
4243
)
4344

4445

@@ -974,6 +975,10 @@ def align_dataframe_series(left, right, axis="columns"):
974975

975976

976977
def align_series_series(left, right):
978+
if is_index_value_identical(left, right):
979+
# index identical, skip align
980+
return left.nsplits, left.chunk_shape, left.chunks, right.chunks
981+
977982
left_index_chunks = [c.index_value for c in left.chunks]
978983
right_index_chunks = [c.index_value for c in right.chunks]
979984

@@ -988,9 +993,6 @@ def align_series_series(left, right):
988993

989994
left_chunks = _gen_series_chunks(splits, out_chunk_shape, 0, left)
990995
right_chunks = _gen_series_chunks(splits, out_chunk_shape, 1, right)
991-
if _is_index_identical(left_index_chunks, right_index_chunks):
992-
index_nsplits = left.nsplits[0]
993-
else:
994-
index_nsplits = [np.nan for _ in range(out_chunk_shape[0])]
996+
index_nsplits = [np.nan for _ in range(out_chunk_shape[0])]
995997
nsplits = [index_nsplits]
996998
return nsplits, out_chunk_shape, left_chunks, right_chunks

mars/dataframe/indexing/setitem.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from ..core import DATAFRAME_TYPE, SERIES_TYPE, DataFrame
2525
from ..initializer import DataFrame as asframe, Series as asseries
2626
from ..operands import DataFrameOperand, DataFrameOperandMixin
27-
from ..utils import parse_index
27+
from ..utils import parse_index, is_index_value_identical
2828

2929
# in pandas 1.0.x, __setitem__ with a list with missing items are not allowed
3030
_allow_set_missing_list = pd_release_version[:2] >= (1, 1)
@@ -161,16 +161,7 @@ def tile(cls, op: "DataFrameSetitem"):
161161
rechunk_arg = {}
162162

163163
# check if all chunk's index_value are identical
164-
target_chunk_index_values = [
165-
c.index_value for c in target.chunks if c.index[1] == 0
166-
]
167-
value_chunk_index_values = [v.index_value for v in value.chunks]
168-
is_identical = len(target_chunk_index_values) == len(
169-
value_chunk_index_values
170-
) and all(
171-
c.key == v.key
172-
for c, v in zip(target_chunk_index_values, value_chunk_index_values)
173-
)
164+
is_identical = is_index_value_identical(target, value)
174165
if not is_identical:
175166
# do rechunk
176167
if any(np.isnan(s) for s in target.nsplits[0]) or any(
@@ -202,8 +193,8 @@ def tile(cls, op: "DataFrameSetitem"):
202193

203194
out_chunks = []
204195
nsplits = [list(ns) for ns in target.nsplits]
205-
206196
nsplits[1][-1] += len(append_cols)
197+
nsplits = tuple(tuple(ns) for ns in nsplits)
207198

208199
column_chunk_shape = target.chunk_shape[1]
209200
for c in target.chunks:
@@ -239,26 +230,27 @@ def tile(cls, op: "DataFrameSetitem"):
239230

240231
chunk_inputs = [c, value_chunk]
241232

242-
dtypes, shape, columns_value = c.dtypes, c.shape, c.columns_value
243-
233+
shape = c.shape
244234
if append_cols and c.index[-1] == column_chunk_shape - 1:
245235
# some columns appended at the last column of chunks
246236
shape = (shape[0], shape[1] + len(append_cols))
247-
dtypes = pd.concat([dtypes, out.dtypes.iloc[-len(append_cols) :]])
248-
columns_value = parse_index(dtypes.index, store_data=True)
249237

250238
result_chunk = chunk_op.new_chunk(
251239
chunk_inputs,
252240
shape=shape,
253-
dtypes=dtypes,
254-
index_value=c.index_value,
255-
columns_value=columns_value,
256241
index=c.index,
257242
)
243+
result_chunk._set_tileable_meta(
244+
tileable_key=out.key,
245+
nsplits=nsplits,
246+
index_value=out.index_value,
247+
columns_value=out.columns_value,
248+
dtypes=out.dtypes,
249+
)
258250
out_chunks.append(result_chunk)
259251

260252
params = out.params
261-
params["nsplits"] = tuple(tuple(ns) for ns in nsplits)
253+
params["nsplits"] = nsplits
262254
params["chunks"] = out_chunks
263255
new_op = op.copy()
264256
return new_op.new_tileables(op.inputs, kws=[params])
@@ -270,10 +262,17 @@ def estimate_size(cls, ctx: dict, op: "DataFrameSetitem"):
270262

271263
@classmethod
272264
def execute(cls, ctx, op: "DataFrameSetitem"):
273-
target = ctx[op.target.key].copy()
265+
target = ctx[op.target.key]
266+
# only deep copy when updating
267+
indexes = (
268+
(op.indexes,)
269+
if not isinstance(op.indexes, (tuple, list, set))
270+
else op.indexes
271+
)
272+
deep = bool(set(indexes) & set(target.columns))
273+
target = ctx[op.target.key].copy(deep=deep)
274274
value = ctx[op.value.key] if not np.isscalar(op.value) else op.value
275275
try:
276-
277276
target[op.indexes] = value
278277
except KeyError:
279278
if _allow_set_missing_list: # pragma: no cover

mars/dataframe/utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,28 @@ def build_concatenated_rows_frame(df):
732732
)
733733

734734

735+
def is_index_value_identical(left: TileableType, right: TileableType) -> bool:
736+
if (
737+
left.index_value.key == right.index_value.key
738+
and not np.isnan(sum(left.nsplits[0]))
739+
and not np.isnan(sum(right.nsplits[0]))
740+
and left.nsplits[0] == right.nsplits[0]
741+
):
742+
is_identical = True
743+
else:
744+
target_chunk_index_values = [
745+
c.index_value for c in left.chunks if len(c.index) <= 1 or c.index[1] == 0
746+
]
747+
value_chunk_index_values = [v.index_value for v in right.chunks]
748+
is_identical = len(target_chunk_index_values) == len(
749+
value_chunk_index_values
750+
) and all(
751+
c.key == v.key
752+
for c, v in zip(target_chunk_index_values, value_chunk_index_values)
753+
)
754+
return is_identical
755+
756+
735757
def _filter_range_index(pd_range_index, min_val, min_val_close, max_val, max_val_close):
736758
if is_pd_range_empty(pd_range_index):
737759
return pd_range_index

0 commit comments

Comments
 (0)