Skip to content

Commit 7840183

Browse files
authored
Patch pandas to make pickle compatible between 1.2 and 1.3 (#3047)
1 parent 3b0130e commit 7840183

File tree

10 files changed

+89
-39
lines changed

10 files changed

+89
-39
lines changed

benchmarks/asv_bench/benchmarks/graph_assigner.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import random
16+
1517
import mars.tensor as mt
1618
import mars.dataframe as md
1719
from mars.core.graph import TileableGraph, TileableGraphBuilder, ChunkGraphBuilder
@@ -25,7 +27,11 @@ class ChunkGraphAssignerSuite:
2527
Benchmark that times performance of chunk graph assigner
2628
"""
2729

30+
repeat = 10
31+
2832
def setup(self):
33+
random.seed()
34+
2935
num_rows = 10000
3036
df1 = md.DataFrame(
3137
mt.random.rand(num_rows, 4, chunk_size=10), columns=list("abcd")

mars/_utils.pxd

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ cdef class TypeDispatcher:
1717
cdef dict _handlers
1818
cdef dict _lazy_handlers
1919
cdef dict _inherit_handlers
20-
cpdef object __weakref__
20+
cdef object __weakref__
2121

2222
cpdef void register(self, object type_, object handler)
2323
cpdef void unregister(self, object type_)
@@ -29,3 +29,5 @@ cpdef str to_str(s, encoding=*)
2929
cpdef bytes to_binary(s, encoding=*)
3030
cpdef unicode to_text(s, encoding=*)
3131
cpdef register_tokenizer(cls, handler)
32+
cpdef void reset_id_random_seed() except *
33+
cpdef bytes new_random_id(int byte_len)

mars/_utils.pyx

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# distutils: language = c++
12
# Copyright 1999-2021 Alibaba Group Holding Ltd.
23
#
34
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,12 +23,16 @@ import uuid
2223
from datetime import date, datetime, timedelta, tzinfo
2324
from enum import Enum
2425
from functools import lru_cache, partial
26+
from random import getrandbits
2527
from weakref import WeakSet
2628

2729
import numpy as np
2830
import pandas as pd
2931
import cloudpickle
3032
cimport cython
33+
from libc.stdint cimport uint_fast64_t
34+
from libc.stdlib cimport malloc, free
35+
from .lib.cython.libcpp cimport mt19937_64
3136
try:
3237
from pandas.tseries.offsets import Tick as PDTick
3338
except ImportError:
@@ -420,5 +425,46 @@ cdef class Timer:
420425
self.duration = time.time() - self._start
421426

422427

428+
cdef mt19937_64 _rnd_gen
429+
cdef bint _rnd_is_seed_set = False
430+
431+
432+
cpdef void reset_id_random_seed() except *:
433+
cdef bytes seed_bytes
434+
global _rnd_is_seed_set
435+
436+
seed_bytes = getrandbits(64).to_bytes(8, "little")
437+
_rnd_gen.seed((<uint_fast64_t *><char *>seed_bytes)[0])
438+
_rnd_is_seed_set = True
439+
440+
441+
cpdef bytes new_random_id(int byte_len):
442+
cdef uint_fast64_t *res_ptr
443+
cdef uint_fast64_t res_data[4]
444+
cdef int i, qw_num = byte_len >> 3
445+
cdef bytes res
446+
447+
if not _rnd_is_seed_set:
448+
reset_id_random_seed()
449+
450+
if (qw_num << 3) < byte_len:
451+
qw_num += 1
452+
453+
if qw_num <= 4:
454+
# use stack memory to accelerate
455+
res_ptr = res_data
456+
else:
457+
res_ptr = <uint_fast64_t *>malloc(qw_num << 3)
458+
459+
try:
460+
for i in range(qw_num):
461+
res_ptr[i] = _rnd_gen()
462+
return <bytes>((<char *>&(res_ptr[0]))[:byte_len])
463+
finally:
464+
# free memory if allocated by malloc
465+
if res_ptr != res_data:
466+
free(res_ptr)
467+
468+
423469
__all__ = ['to_str', 'to_binary', 'to_text', 'TypeDispatcher', 'tokenize', 'tokenize_int',
424-
'register_tokenizer', 'ceildiv', 'Timer']
470+
'register_tokenizer', 'ceildiv', 'Timer', 'reset_id_random_seed', 'new_random_id']

mars/oscar/backends/mars/pool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
from types import TracebackType
2626
from typing import List
2727

28-
from ....utils import get_next_port, dataslots, ensure_coverage
28+
from ....utils import get_next_port, dataslots, ensure_coverage, reset_id_random_seed
2929
from ..config import ActorPoolConfig
30-
from ..message import CreateActorMessage, reset_random_seed as reset_message_seed
30+
from ..message import CreateActorMessage
3131
from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler
3232

3333

@@ -168,7 +168,7 @@ def _start_sub_pool(
168168

169169
# make sure enough randomness for every sub pool
170170
random.seed(uuid.uuid1().bytes)
171-
reset_message_seed()
171+
reset_id_random_seed()
172172

173173
conf = actor_config.get_pool_config(process_index)
174174
suspend_sigint = conf["suspend_sigint"]

mars/oscar/backends/message.pyi

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,5 +210,4 @@ class DeserializeMessageFailed(RuntimeError):
210210
def __init__(self, message_id: bytes): ...
211211
def __str__(self): ...
212212

213-
def reset_random_seed(): ...
214213
def new_message_id() -> bytes: ...

mars/oscar/backends/message.pyx

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
# distutils: language = c++
21
# Copyright 1999-2022 Alibaba Group Holding Ltd.
32
#
43
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,15 +13,12 @@
1413
# limitations under the License.
1514

1615
from enum import Enum
17-
from random import getrandbits
1816
from types import TracebackType
1917
from typing import Any, Type
2018

21-
from libc.stdint cimport uint_fast64_t
22-
23-
from ...lib.cython.libcpp cimport mt19937_64
2419
from ...lib.tblib import pickling_support
2520
from ...serialization.core cimport Serializer
21+
from ..._utils cimport new_random_id
2622
from ...utils import wrap_exception
2723
from ..core cimport ActorRef
2824

@@ -32,9 +28,6 @@ pickling_support.install()
3228
cdef int _DEFAULT_PROTOCOL = 0
3329
DEFAULT_PROTOCOL = _DEFAULT_PROTOCOL
3430

35-
cdef mt19937_64 _rnd_gen
36-
cdef bint _rnd_is_seed_set = False
37-
3831

3932
class MessageType(Enum):
4033
control = 0
@@ -551,22 +544,5 @@ cdef class MessageSerializer(Serializer):
551544
MessageSerializer.register(_MessageBase)
552545

553546

554-
cpdef reset_random_seed():
555-
cdef bytes seed_bytes
556-
global _rnd_is_seed_set
557-
558-
seed_bytes = getrandbits(64).to_bytes(8, "little")
559-
_rnd_gen.seed((<uint_fast64_t *><char *>seed_bytes)[0])
560-
_rnd_is_seed_set = True
561-
562-
563547
cpdef bytes new_message_id():
564-
cdef uint_fast64_t res_array[4]
565-
cdef int i
566-
567-
if not _rnd_is_seed_set:
568-
reset_random_seed()
569-
570-
for i in range(4):
571-
res_array[i] = _rnd_gen()
572-
return <bytes>((<char *>&(res_array[0]))[:32])
548+
return new_random_id(32)

mars/oscar/utils.pyx

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from random import getrandbits
1615
from typing import AsyncGenerator
1716

18-
from .._utils cimport to_str
17+
from .._utils cimport to_str, new_random_id
1918
from .core cimport ActorRef, LocalActorRef
2019

2120

2221
cpdef bytes new_actor_id():
23-
return getrandbits(256).to_bytes(32, "little")
22+
return new_random_id(32)
2423

2524

2625
def create_actor_ref(*args, **kwargs):
@@ -34,6 +33,7 @@ def create_actor_ref(*args, **kwargs):
3433

3534
cdef str address
3635
cdef object uid
36+
cdef ActorRef existing_ref
3737

3838
address = to_str(kwargs.pop('address', None))
3939
uid = kwargs.pop('uid', None)
@@ -49,8 +49,9 @@ def create_actor_ref(*args, **kwargs):
4949
elif len(args) == 1:
5050
tp0 = type(args[0])
5151
if tp0 is ActorRef or tp0 is LocalActorRef:
52-
uid = args[0].uid
53-
address = to_str(address or args[0].address)
52+
existing_ref = <ActorRef>(args[0])
53+
uid = existing_ref.uid
54+
address = to_str(address or existing_ref.address)
5455
else:
5556
uid = args[0]
5657

mars/serialization/core.pyx

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ if sys.version_info[:2] < (3, 8): # pragma: no cover
4040
else:
4141
import pickle # nosec # pylint: disable=import_pickle
4242

43+
# resolve pandas pickle compatibility between <1.2 and >=1.3
44+
try:
45+
from pandas.core.internals import blocks as pd_blocks
46+
if not hasattr(pd_blocks, "new_block") and hasattr(pd_blocks, "make_block"):
47+
# register missing func that would cause errors
48+
pd_blocks.new_block = pd_blocks.make_block
49+
except (ImportError, AttributeError):
50+
pass
51+
4352
BUFFER_PICKLE_PROTOCOL = max(pickle.DEFAULT_PROTOCOL, 5)
4453
cdef bint HAS_PICKLE_BUFFER = pickle.HIGHEST_PROTOCOL >= 5
4554
cdef bint _PANDAS_HAS_MGR = hasattr(pd.Series([0]), "_mgr")
@@ -195,7 +204,8 @@ def buffered(func):
195204

196205

197206
def pickle_buffers(obj):
198-
buffers = [None]
207+
cdef list buffers = [None]
208+
199209
if HAS_PICKLE_BUFFER:
200210

201211
def buffer_cb(x):

mars/tests/test_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,3 +610,9 @@ def __call__(self, *args, **kwargs):
610610

611611
func = Func()
612612
assert get_func_token_values(func) == [func]
613+
614+
615+
@pytest.mark.parametrize("id_length", [0, 5, 32, 63])
616+
def test_gen_random_id(id_length):
617+
rnd_id = utils.new_random_id(id_length)
618+
assert len(rnd_id) == id_length

mars/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
tokenize_int,
6969
register_tokenizer,
7070
ceildiv,
71+
reset_id_random_seed,
72+
new_random_id,
7173
Timer,
7274
)
7375
from .lib.version import parse as parse_version
@@ -84,6 +86,8 @@
8486
tokenize = tokenize
8587
register_tokenizer = register_tokenizer
8688
ceildiv = ceildiv
89+
reset_id_random_seed = reset_id_random_seed
90+
new_random_id = new_random_id
8791
_create_task = asyncio.create_task
8892

8993

@@ -163,7 +167,7 @@ def on_serialize_nsplits(value: Tuple[Tuple[int]]):
163167
return None
164168
new_nsplits = []
165169
for dim_splits in value:
166-
new_nsplits.append(tuple(None if np.isnan(v) else v for v in dim_splits))
170+
new_nsplits.append(tuple(None if pd.isna(v) else v for v in dim_splits))
167171
return tuple(new_nsplits)
168172

169173

0 commit comments

Comments
 (0)