Skip to content

Commit 6afd7ed

Browse files
authored
Fix backward compatibility for pandas 1.0 (#2628)
1 parent a969a17 commit 6afd7ed

30 files changed

+228
-125
lines changed

azure-pipelines.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,18 @@ jobs:
9494
- bash: |
9595
set -e
9696
source ci/reload-env.sh
97+
mkdir -p build
9798
pytest $PYTEST_CONFIG mars/$(mars.test.module)
98-
coverage report
99+
mv .coverage build/.coverage.main.file
100+
101+
# do compatibility test for earliest supported pandas release
102+
if [[ "$(mars.test.module)" == "dataframe" ]]; then
103+
pip install pandas==1.0.5
104+
pytest $PYTEST_CONFIG -m pd_compat mars/dataframe
105+
mv .coverage build/.coverage.pd_compat.file
106+
fi
107+
108+
coverage combine build/ && coverage report
99109
coverage xml
100110
displayName: 'Run tests'
101111

mars/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pytest
1919

2020
from mars.config import option_context
21+
from mars.core.mode import is_kernel_mode, is_build_mode
2122
from mars.lib.aio import stop_isolation
2223
from mars.oscar.backends.router import Router
2324
from mars.oscar.backends.ray.communication import RayServer
@@ -168,6 +169,7 @@ def _new_gpu_test_session(_stop_isolation): # pragma: no cover
168169
def setup(_new_test_session):
169170
_new_test_session.as_default()
170171
yield _new_test_session
172+
assert not (is_build_mode() or is_kernel_mode())
171173

172174

173175
@pytest.fixture

mars/core/graph/builder/chunk.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,14 @@ def _if_add_node(self, node: EntityType, visited: Set):
259259
return node not in visited and node not in self._processed_chunks
260260

261261
def _build(self) -> Iterable[Union[TileableGraph, ChunkGraph]]:
262-
yield from self.tiler
262+
tile_iterator = iter(self.tiler)
263+
while True:
264+
try:
265+
with enter_mode(build=True, kernel=True):
266+
graph = next(tile_iterator)
267+
yield graph
268+
except StopIteration:
269+
break
263270

264271
def build(self) -> Generator[Union[TileableGraph, ChunkGraph], None, None]:
265-
with enter_mode(build=True, kernel=True):
266-
yield from self._build()
272+
yield from self._build()

mars/core/graph/core.pyx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ cdef class DirectedGraph:
143143
try:
144144
return list(self._successors[n])
145145
except KeyError:
146-
return KeyError(f'Node {n} does not exist in the directed graph')
146+
raise KeyError(f'Node {n} does not exist in the directed graph')
147147

148148
def iter_predecessors(self, n):
149149
try:

mars/core/mode.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
from ..config import options
2020

21-
2221
_internal_mode = threading.local()
2322

2423

@@ -69,18 +68,19 @@ def __exit__(self, *_):
6968
setattr(_internal_mode, mode_name, mode_name_to_old_value[mode_name])
7069

7170
def __call__(self, func):
71+
mode_name_to_value = self.mode_name_to_value.copy()
7272
if not inspect.iscoroutinefunction(func):
7373
# sync
7474
@functools.wraps(func)
7575
def _inner(*args, **kwargs):
76-
with self:
76+
with enter_mode(**mode_name_to_value):
7777
return func(*args, **kwargs)
7878

7979
else:
8080
# async
8181
@functools.wraps(func)
8282
async def _inner(*args, **kwargs):
83-
with self:
83+
with enter_mode(**mode_name_to_value):
8484
return await func(*args, **kwargs)
8585

8686
return _inner

mars/dataframe/arrays.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,9 @@
5454

5555
from ..config import options
5656
from ..core import is_kernel_mode
57-
from ..lib.version import parse as parse_version
58-
from ..utils import tokenize
57+
from ..utils import pd_release_version, tokenize
5958

60-
_use_bool_any_all = parse_version(pd.__version__) >= parse_version("1.3.0")
59+
_use_bool_any_all = pd_release_version >= (1, 3, 0)
6160

6261

6362
class ArrowDtype(ExtensionDtype):

mars/dataframe/base/astype.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
from ... import opcodes as OperandDef
2020
from ...core import recursive_tile
21-
from ...lib.version import parse as parse_version
2221
from ...serialization.serializables import AnyField, StringField, ListField
2322
from ...tensor.base import sort
23+
from ...utils import pd_release_version
2424
from ..core import DATAFRAME_TYPE, SERIES_TYPE
2525
from ..operands import DataFrameOperand, DataFrameOperandMixin
2626
from ..utils import build_empty_df, build_empty_series, parse_index
2727

28-
_need_astype_contiguous = parse_version(pd.__version__) == parse_version("1.3.0")
28+
_need_astype_contiguous = pd_release_version == (1, 3, 0)
2929

3030

3131
class DataFrameAstype(DataFrameOperand, DataFrameOperandMixin):

mars/dataframe/base/duplicated.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,20 @@ def _execute_tree_map(cls, ctx, op):
148148
duplicated = cls._duplicated(inp, op)
149149
if not duplicated.name:
150150
duplicated.name = "_duplicated_"
151-
result.iloc[duplicated] = None
151+
result.iloc[duplicated.values] = None
152152
result = xdf.concat([result, duplicated], axis=1)
153153
ctx[op.outputs[0].key] = result
154154

155155
@classmethod
156156
def _execute_tree_combine(cls, ctx, op):
157157
inp = ctx[op.input.key]
158158
result = inp.copy()
159-
duplicates = inp[~inp.iloc[:, -1]]
159+
duplicated_filter = ~inp.iloc[:, -1]
160+
duplicates = inp.loc[duplicated_filter]
160161
dup_on_duplicated = cls._duplicated(duplicates, op)
161-
result.iloc[~inp.iloc[:, -1], -1] = dup_on_duplicated
162+
result.iloc[duplicated_filter.values, -1] = dup_on_duplicated
162163
duplicated = result.iloc[:, -1]
163-
result.iloc[duplicated, :-1] = None
164+
result.iloc[duplicated.values, :-1] = None
164165
ctx[op.outputs[0].key] = result
165166

166167
@classmethod

mars/dataframe/base/tests/test_base_execution.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from ....tensor import arange, tensor
3030
from ....tensor.random import rand
3131
from ....tests.core import require_cudf
32-
from ....utils import lazy_import
32+
from ....utils import lazy_import, pd_release_version
3333
from ... import eval as mars_eval, cut, qcut, get_dummies
3434
from ...datasource.dataframe import from_pandas as from_pandas_df
3535
from ...datasource.series import from_pandas as from_pandas_series
@@ -38,8 +38,12 @@
3838
from ..to_numeric import to_numeric
3939
from ..rebalance import DataFrameRebalance
4040

41+
pytestmark = pytest.mark.pd_compat
42+
4143
cudf = lazy_import("cudf", globals=globals())
4244

45+
_explode_with_ignore_index = pd_release_version[:2] >= (1, 1)
46+
4347

4448
@require_cudf
4549
def test_to_gpu_execution(setup_gpu):
@@ -1968,7 +1972,12 @@ def test_stack_execution(setup):
19681972
assert_method(result, expected)
19691973

19701974

1971-
def test_explode_execution(setup):
1975+
@pytest.mark.parametrize(
1976+
"ignore_index", [False, True] if _explode_with_ignore_index else [False]
1977+
)
1978+
def test_explode_execution(setup, ignore_index):
1979+
explode_kw = {"ignore_index": True} if ignore_index else {}
1980+
19721981
raw = pd.DataFrame(
19731982
{
19741983
"a": np.random.rand(10),
@@ -1978,20 +1987,12 @@ def test_explode_execution(setup):
19781987
}
19791988
)
19801989
df = from_pandas_df(raw, chunk_size=(4, 2))
1981-
1982-
for ignore_index in [False, True]:
1983-
r = df.explode("b", ignore_index=ignore_index)
1984-
pd.testing.assert_frame_equal(
1985-
r.execute().fetch(), raw.explode("b", ignore_index=ignore_index)
1986-
)
1990+
r = df.explode("b", ignore_index=ignore_index)
1991+
pd.testing.assert_frame_equal(r.execute().fetch(), raw.explode("b", **explode_kw))
19871992

19881993
series = from_pandas_series(raw.b, chunk_size=4)
1989-
1990-
for ignore_index in [False, True]:
1991-
r = series.explode(ignore_index=ignore_index)
1992-
pd.testing.assert_series_equal(
1993-
r.execute().fetch(), raw.b.explode(ignore_index=ignore_index)
1994-
)
1994+
r = series.explode(ignore_index=ignore_index)
1995+
pd.testing.assert_series_equal(r.execute().fetch(), raw.b.explode(**explode_kw))
19951996

19961997

19971998
def test_eval_query_execution(setup):

mars/dataframe/base/transform.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
from ...config import options
2020
from ...core import OutputType, recursive_tile
2121
from ...core.custom_log import redirect_custom_log
22-
from ...lib.version import parse as parse_version
2322
from ...serialization.serializables import AnyField, BoolField, TupleField, DictField
24-
from ...utils import enter_current_session, quiet_stdio
23+
from ...utils import enter_current_session, quiet_stdio, pd_release_version
2524
from ..core import DATAFRAME_CHUNK_TYPE, DATAFRAME_TYPE
2625
from ..operands import DataFrameOperandMixin, DataFrameOperand
2726
from ..utils import (
@@ -33,6 +32,8 @@
3332
make_dtypes,
3433
)
3534

35+
_with_convert_dtype = pd_release_version < (1, 2, 0)
36+
3637

3738
class TransformOperand(DataFrameOperand, DataFrameOperandMixin):
3839
_op_type_ = opcodes.TRANSFORM
@@ -236,7 +237,7 @@ def _infer_df_func_returns(self, df, dtypes):
236237
if self.call_agg:
237238
infer_df = test_df.agg(self._func, args=self.args, **self.kwds)
238239
else:
239-
if parse_version(pd.__version__) >= parse_version("1.2.0"):
240+
if not _with_convert_dtype:
240241
infer_df = test_df.transform(
241242
self._func, *self.args, **self.kwds
242243
)

0 commit comments

Comments
 (0)