Skip to content

Commit cca9468

Browse files
mvashishthadevin-petersohnYarShev
authored
PERF-#4493: Use partition size caches more in Modin dataframe. (#4495)
Co-authored-by: Devin Petersohn <[email protected]> Co-authored-by: Yaroslav Igoshev <[email protected]> Signed-off-by: mvashishtha <[email protected]>
1 parent eb20cbc commit cca9468

File tree

3 files changed

+132
-59
lines changed

3 files changed

+132
-59
lines changed

docs/release_notes/release_notes-0.15.0.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Key Features and Updates
2929
* FIX-#4503: Stop the memory logging thread after session exit (#4515)
3030
* Performance enhancements
3131
* FEAT-#4320: Add connectorx as an alternative engine for read_sql (#4346)
32+
* PERF-#4493: Use partition size caches more in Modin dataframe (#4495)
3233
* Benchmarking enhancements
3334
* FEAT-#4371: Add logging to Modin (#4372)
3435
* Refactor Codebase

modin/core/dataframe/pandas/dataframe/dataframe.py

Lines changed: 111 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -187,24 +187,32 @@ def __init__(
187187
self._index_cache = ensure_index(index)
188188
self._columns_cache = ensure_index(columns)
189189
if row_lengths is not None and len(self.index) > 0:
190-
ErrorMessage.catch_bugs_and_request_email(
191-
sum(row_lengths) != len(self._index_cache),
192-
"Row lengths: {} != {}".format(
193-
sum(row_lengths), len(self._index_cache)
194-
),
195-
)
190+
# An empty frame can have 0 rows but a nonempty index. If the frame
191+
# does have rows, the number of rows must equal the size of the
192+
# index.
193+
num_rows = sum(row_lengths)
194+
if num_rows > 0:
195+
ErrorMessage.catch_bugs_and_request_email(
196+
num_rows != len(self._index_cache),
197+
"Row lengths: {} != {}".format(num_rows, len(self._index_cache)),
198+
)
196199
ErrorMessage.catch_bugs_and_request_email(
197200
any(val < 0 for val in row_lengths),
198201
"Row lengths cannot be negative: {}".format(row_lengths),
199202
)
200203
self._row_lengths_cache = row_lengths
201204
if column_widths is not None and len(self.columns) > 0:
202-
ErrorMessage.catch_bugs_and_request_email(
203-
sum(column_widths) != len(self._columns_cache),
204-
"Column widths: {} != {}".format(
205-
sum(column_widths), len(self._columns_cache)
206-
),
207-
)
205+
# An empty frame can have 0 column but a nonempty column index. If
206+
# the frame does have columns, the number of columns must equal the
207+
# size of the columns.
208+
num_columns = sum(column_widths)
209+
if num_columns > 0:
210+
ErrorMessage.catch_bugs_and_request_email(
211+
num_columns != len(self._columns_cache),
212+
"Column widths: {} != {}".format(
213+
num_columns, len(self._columns_cache)
214+
),
215+
)
208216
ErrorMessage.catch_bugs_and_request_email(
209217
any(val < 0 for val in column_widths),
210218
"Column widths cannot be negative: {}".format(column_widths),
@@ -2101,7 +2109,12 @@ def broadcast_apply(
21012109
New Modin DataFrame.
21022110
"""
21032111
# Only sort the indices if they do not match
2104-
left_parts, right_parts, joined_index = self._copartition(
2112+
(
2113+
left_parts,
2114+
right_parts,
2115+
joined_index,
2116+
partition_sizes_along_axis,
2117+
) = self._copartition(
21052118
axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis])
21062119
)
21072120
# unwrap list returned by `copartition`.
@@ -2113,13 +2126,24 @@ def broadcast_apply(
21132126
dtypes = self._dtypes
21142127
new_index = self.index
21152128
new_columns = self.columns
2129+
# Pass shape caches instead of values in order to not trigger shape
2130+
# computation.
2131+
new_row_lengths = self._row_lengths_cache
2132+
new_column_widths = self._column_widths_cache
21162133
if not preserve_labels:
21172134
if axis == 1:
21182135
new_columns = joined_index
2136+
new_column_widths = partition_sizes_along_axis
21192137
else:
21202138
new_index = joined_index
2139+
new_row_lengths = partition_sizes_along_axis
21212140
return self.__constructor__(
2122-
new_frame, new_index, new_columns, None, None, dtypes=dtypes
2141+
new_frame,
2142+
new_index,
2143+
new_columns,
2144+
new_row_lengths,
2145+
new_column_widths,
2146+
dtypes=dtypes,
21232147
)
21242148

21252149
def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all):
@@ -2149,17 +2173,8 @@ def _prepare_frame_to_broadcast(self, axis, indices, broadcast_all):
21492173
broadcast [self[key1], self[key2]] partitions and internal indices for `self` must be [[0, 1], [5]]
21502174
"""
21512175
if broadcast_all:
2152-
2153-
def get_len(part):
2154-
return part.width() if not axis else part.length()
2155-
2156-
parts = self._partitions if not axis else self._partitions.T
2157-
return {
2158-
key: {
2159-
i: np.arange(get_len(parts[0][i])) for i in np.arange(len(parts[0]))
2160-
}
2161-
for key in indices.keys()
2162-
}
2176+
sizes = self._row_lengths if axis else self._column_widths
2177+
return {key: dict(enumerate(sizes)) for key in indices.keys()}
21632178
passed_len = 0
21642179
result_dict = {}
21652180
for part_num, internal in indices.items():
@@ -2372,17 +2387,17 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
23722387
Returns
23732388
-------
23742389
tuple
2375-
Tuple of (left data, right data list, joined index).
2390+
Tuple containing:
2391+
1) 2-d NumPy array of aligned left partitions
2392+
2) list of 2-d NumPy arrays of aligned right partitions
2393+
3) joined index along ``axis``
2394+
4) List with sizes of partitions along axis that partitioning
2395+
was done on. This list will be empty if and only if all
2396+
the frames are empty.
23762397
"""
23772398
if isinstance(other, type(self)):
23782399
other = [other]
23792400

2380-
# define helper functions
2381-
def get_axis_lengths(partitions, axis):
2382-
if axis:
2383-
return [obj.width() for obj in partitions[0]]
2384-
return [obj.length() for obj in partitions.T[0]]
2385-
23862401
self_index = self.axes[axis]
23872402
others_index = [o.axes[axis] for o in other]
23882403
joined_index, make_reindexer = self._join_index_objects(
@@ -2396,7 +2411,14 @@ def get_axis_lengths(partitions, axis):
23962411

23972412
# If all frames are empty
23982413
if len(non_empty_frames_idx) == 0:
2399-
return self._partitions, [o._partitions for o in other], joined_index
2414+
return (
2415+
self._partitions,
2416+
[o._partitions for o in other],
2417+
joined_index,
2418+
# There are no partition sizes because the resulting dataframe
2419+
# has no partitions.
2420+
[],
2421+
)
24002422

24012423
base_frame_idx = non_empty_frames_idx[0]
24022424
other_frames = frames[base_frame_idx + 1 :]
@@ -2409,18 +2431,23 @@ def get_axis_lengths(partitions, axis):
24092431
do_reindex_base = not base_index.equals(joined_index)
24102432
do_repartition_base = force_repartition or do_reindex_base
24112433

2412-
# perform repartitioning and reindexing for `base_frame` if needed
2434+
# Perform repartitioning and reindexing for `base_frame` if needed.
2435+
# Also define length of base and frames. We will need to know the
2436+
# lengths for alignment.
24132437
if do_repartition_base:
24142438
reindexed_base = base_frame._partition_mgr_cls.map_axis_partitions(
24152439
axis,
24162440
base_frame._partitions,
24172441
make_reindexer(do_reindex_base, base_frame_idx),
24182442
)
2443+
if axis:
2444+
base_lengths = [obj.width() for obj in reindexed_base[0]]
2445+
else:
2446+
base_lengths = [obj.length() for obj in reindexed_base.T[0]]
24192447
else:
24202448
reindexed_base = base_frame._partitions
2449+
base_lengths = self._column_widths if axis else self._row_lengths
24212450

2422-
# define length of base and `other` frames to aligning purpose
2423-
base_lengths = get_axis_lengths(reindexed_base, axis)
24242451
others_lengths = [o._axes_lengths[axis] for o in other_frames]
24252452

24262453
# define conditions for reindexing and repartitioning `other` frames
@@ -2456,7 +2483,7 @@ def get_axis_lengths(partitions, axis):
24562483
+ [reindexed_base]
24572484
+ reindexed_other_list
24582485
)
2459-
return reindexed_frames[0], reindexed_frames[1:], joined_index
2486+
return (reindexed_frames[0], reindexed_frames[1:], joined_index, base_lengths)
24602487

24612488
@lazy_metadata_decorator(apply_axis="both")
24622489
def binary_op(self, op, right_frame, join_type="outer"):
@@ -2477,7 +2504,7 @@ def binary_op(self, op, right_frame, join_type="outer"):
24772504
PandasDataframe
24782505
New Modin DataFrame.
24792506
"""
2480-
left_parts, right_parts, joined_index = self._copartition(
2507+
left_parts, right_parts, joined_index, row_lengths = self._copartition(
24812508
0, right_frame, join_type, sort=True
24822509
)
24832510
# unwrap list returned by `copartition`.
@@ -2486,7 +2513,13 @@ def binary_op(self, op, right_frame, join_type="outer"):
24862513
1, left_parts, lambda l, r: op(l, r), right_parts
24872514
)
24882515
new_columns = self.columns.join(right_frame.columns, how=join_type)
2489-
return self.__constructor__(new_frame, joined_index, new_columns, None, None)
2516+
return self.__constructor__(
2517+
new_frame,
2518+
joined_index,
2519+
new_columns,
2520+
row_lengths,
2521+
column_widths=self._column_widths_cache,
2522+
)
24902523

24912524
@lazy_metadata_decorator(apply_axis="both")
24922525
def concat(
@@ -2525,8 +2558,7 @@ def concat(
25252558
joined_index = self.columns
25262559
left_parts = self._partitions
25272560
right_parts = [o._partitions for o in others]
2528-
new_lengths = None
2529-
new_widths = self._column_widths
2561+
new_widths = self._column_widths_cache
25302562
elif (
25312563
axis == Axis.COL_WISE
25322564
and all(o.index.equals(self.index) for o in others)
@@ -2535,16 +2567,20 @@ def concat(
25352567
joined_index = self.index
25362568
left_parts = self._partitions
25372569
right_parts = [o._partitions for o in others]
2538-
new_lengths = self._row_lengths
2539-
new_widths = self._column_widths + [
2540-
length for o in others for length in o._column_widths
2541-
]
2570+
new_lengths = self._row_lengths_cache
25422571
else:
2543-
left_parts, right_parts, joined_index = self._copartition(
2572+
(
2573+
left_parts,
2574+
right_parts,
2575+
joined_index,
2576+
partition_sizes_along_axis,
2577+
) = self._copartition(
25442578
axis.value ^ 1, others, how, sort, force_repartition=False
25452579
)
2546-
new_lengths = None
2547-
new_widths = None
2580+
if axis == Axis.COL_WISE:
2581+
new_lengths = partition_sizes_along_axis
2582+
else:
2583+
new_widths = partition_sizes_along_axis
25482584
new_partitions = self._partition_mgr_cls.concat(
25492585
axis.value, left_parts, right_parts
25502586
)
@@ -2553,13 +2589,40 @@ def concat(
25532589
new_columns = joined_index
25542590
# TODO: Can optimize by combining if all dtypes are materialized
25552591
new_dtypes = None
2592+
# If we have already cached the length of each row in at least one
2593+
# of the row's partitions, we can build new_lengths for the new
2594+
# frame. Typically, if we know the length for any partition in a
2595+
# row, we know the length for the first partition in the row. So
2596+
# just check the lengths of the first column of partitions.
2597+
new_lengths = []
2598+
if new_partitions.size > 0:
2599+
for part in new_partitions.T[0]:
2600+
if part._length_cache is not None:
2601+
new_lengths.append(part.length())
2602+
else:
2603+
new_lengths = None
2604+
break
25562605
else:
25572606
new_columns = self.columns.append([other.columns for other in others])
25582607
new_index = joined_index
25592608
if self._dtypes is not None and all(o._dtypes is not None for o in others):
25602609
new_dtypes = self.dtypes.append([o.dtypes for o in others])
25612610
else:
25622611
new_dtypes = None
2612+
# If we have already cached the width of each column in at least one
2613+
# of the column's partitions, we can build new_widths for the new
2614+
# frame. Typically, if we know the width for any partition in a
2615+
# column, we know the width for the first partition in the column.
2616+
# So just check the widths of the first row of partitions.
2617+
new_widths = []
2618+
if new_partitions.size > 0:
2619+
for part in new_partitions[0]:
2620+
if part._width_cache is not None:
2621+
new_widths.append(part.width())
2622+
else:
2623+
new_widths = None
2624+
break
2625+
25632626
return self.__constructor__(
25642627
new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes
25652628
)

modin/pandas/test/dataframe/test_map_metadata.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,18 +1273,27 @@ def test_transpose(data):
12731273
({"A": [1, 2, 3], "B": [400, 500, 600]}, {"B": [4, np.nan, 6]}),
12741274
],
12751275
)
1276-
def test_update(data, other_data):
1277-
modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data)
1278-
other_modin_df, other_pandas_df = (
1279-
pd.DataFrame(other_data),
1280-
pandas.DataFrame(other_data),
1281-
)
1282-
modin_df.update(other_modin_df)
1283-
pandas_df.update(other_pandas_df)
1284-
df_equals(modin_df, pandas_df)
1276+
@pytest.mark.parametrize(
1277+
"raise_errors", bool_arg_values, ids=arg_keys("raise_errors", bool_arg_keys)
1278+
)
1279+
def test_update(data, other_data, raise_errors):
1280+
modin_df, pandas_df = create_test_dfs(data)
1281+
other_modin_df, other_pandas_df = create_test_dfs(other_data)
12851282

1286-
with pytest.raises(ValueError):
1287-
modin_df.update(other_modin_df, errors="raise")
1283+
if raise_errors:
1284+
kwargs = {"errors": "raise"}
1285+
else:
1286+
kwargs = {}
1287+
1288+
eval_general(
1289+
modin_df,
1290+
pandas_df,
1291+
lambda df: df.update(other_modin_df)
1292+
if isinstance(df, pd.DataFrame)
1293+
else df.update(other_pandas_df),
1294+
__inplace__=True,
1295+
**kwargs,
1296+
)
12881297

12891298

12901299
@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)

0 commit comments

Comments
 (0)