Skip to content

Commit a0dac27

Browse files
committed
Support customizing how built-in types are pickled for cloudpickle
This is to enable customizing how sets are serialized to increase the pickling determinism. I'm modifying the vendored cloudpickle as a stop-gap measure until the cloudpickle maintainers review cloudpipe/cloudpickle#563. Issue: apache#34410
1 parent d0def26 commit a0dac27

File tree

3 files changed

+147
-104
lines changed

3 files changed

+147
-104
lines changed

sdks/python/apache_beam/internal/cloudpickle/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"__version__",
1010
"Pickler",
1111
"CloudPickler",
12+
"PurePythonPickler",
1213
"dumps",
1314
"loads",
1415
"dump",

sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py

Lines changed: 145 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,7 +1205,14 @@ def _get_dataclass_field_type_sentinel(name):
12051205
return _DATACLASSE_FIELD_TYPE_SENTINELS[name]
12061206

12071207

1208-
class Pickler(pickle.Pickler):
1208+
class BaseCloudPickler:
1209+
"""Class for logic that is common between FastPickler and PurePythonPickler.
1210+
Cloudpickle provides two picklers: one extending the C implementation of
1211+
the CPython pickler and another extending the pure-Python pickler.
1212+
FastPickler and PurePythonPickler inherit from BaseCloudPickler and provide
1213+
BaseCloudPickler access to either the C or pure-Python pickler by
1214+
implementing the _super_pickler() method.
1215+
"""
12091216
# set of reducers defined and used by cloudpickle (private)
12101217
_dispatch_table = {}
12111218
_dispatch_table[classmethod] = _classmethod_reduce
@@ -1294,7 +1301,7 @@ def _function_getnewargs(self, func):
12941301

12951302
def dump(self, obj):
12961303
try:
1297-
return super().dump(obj)
1304+
return self._super_pickler().dump(obj)
12981305
except RuntimeError as e:
12991306
if len(e.args) > 0 and "recursion" in e.args[0]:
13001307
msg = "Could not pickle object as excessively deep recursion required."
@@ -1305,14 +1312,32 @@ def dump(self, obj):
13051312
def __init__(self, file, protocol=None, buffer_callback=None):
13061313
if protocol is None:
13071314
protocol = DEFAULT_PROTOCOL
1308-
super().__init__(file, protocol=protocol, buffer_callback=buffer_callback)
13091315
# map functions __globals__ attribute ids, to ensure that functions
13101316
# sharing the same global namespace at pickling time also share
13111317
# their global namespace at unpickling time.
13121318
self.globals_ref = {}
13131319
self.proto = int(protocol)
1320+
self._super_pickler().__init__(
1321+
file, protocol=protocol, buffer_callback=buffer_callback)
1322+
1323+
def _super_pickler(self):
1324+
"""Returns a proxy object for an instance of the pickler being extended."""
1325+
raise NotImplemented
1326+
1327+
1328+
if not PYPY:
1329+
1330+
class FastPickler(BaseCloudPickler, pickle.Pickler):
1331+
"""Fast pickler extending the C implementation of the CPython pickler.
1332+
The FastPickler is not available for PYPY and does not support
1333+
overriding how built-in types are pickled.
1334+
"""
1335+
def __init__(self, file, protocol=None, buffer_callback=None):
1336+
super().__init__(file, protocol, buffer_callback)
1337+
1338+
def _super_pickler(self):
1339+
return super(BaseCloudPickler, self)
13141340

1315-
if not PYPY:
13161341
# pickle.Pickler is the C implementation of the CPython pickler and
13171342
# therefore we rely on reduce_override method to customize the pickler
13181343
# behavior.
@@ -1328,7 +1353,7 @@ def __init__(self, file, protocol=None, buffer_callback=None):
13281353
# name was not a great choice given because it would collide with a
13291354
# similarly named attribute in the pure-Python `pickle._Pickler`
13301355
# implementation in the standard library.
1331-
dispatch = dispatch_table
1356+
dispatch = BaseCloudPickler.dispatch_table
13321357

13331358
# Implementation of the reducer_override callback, in order to
13341359
# efficiently serialize dynamic functions and classes by subclassing
@@ -1385,111 +1410,121 @@ def reducer_override(self, obj):
13851410
# dispatch_table
13861411
return NotImplemented
13871412

1388-
else:
1389-
# When reducer_override is not available, hack the pure-Python
1390-
# Pickler's types.FunctionType and type savers. Note: the type saver
1391-
# must override Pickler.save_global, because pickle.py contains a
1392-
# hard-coded call to save_global when pickling meta-classes.
1393-
dispatch = pickle.Pickler.dispatch.copy()
1394-
1395-
def _save_reduce_pickle5(
1396-
self,
1413+
1414+
class PurePythonPickler(BaseCloudPickler, pickle._Pickler):
1415+
"""Pure-Python pickler.
1416+
This picker supports overriding how built-in types are pickled.
1417+
"""
1418+
def __init__(self, file, protocol=None, buffer_callback=None):
1419+
super().__init__(file, protocol, buffer_callback)
1420+
1421+
def _super_pickler(self):
1422+
return super(BaseCloudPickler, self)
1423+
1424+
# When reducer_override is not available, hack the pure-Python
1425+
# Pickler's types.FunctionType and type savers. Note: the type saver
1426+
# must override Pickler.save_global, because pickle.py contains a
1427+
# hard-coded call to save_global when pickling meta-classes.
1428+
dispatch = pickle._Pickler.dispatch.copy()
1429+
1430+
def _save_reduce_pickle5(
1431+
self,
1432+
func,
1433+
args,
1434+
state=None,
1435+
listitems=None,
1436+
dictitems=None,
1437+
state_setter=None,
1438+
obj=None,
1439+
):
1440+
save = self.save
1441+
write = self.write
1442+
self.save_reduce(
13971443
func,
13981444
args,
13991445
state=None,
1400-
listitems=None,
1401-
dictitems=None,
1402-
state_setter=None,
1403-
obj=None,
1404-
):
1405-
save = self.save
1406-
write = self.write
1407-
self.save_reduce(
1408-
func,
1409-
args,
1410-
state=None,
1411-
listitems=listitems,
1412-
dictitems=dictitems,
1413-
obj=obj,
1414-
)
1415-
# backport of the Python 3.8 state_setter pickle operations
1416-
save(state_setter)
1417-
save(obj) # simple BINGET opcode as obj is already memoized.
1418-
save(state)
1419-
write(pickle.TUPLE2)
1420-
# Trigger a state_setter(obj, state) function call.
1421-
write(pickle.REDUCE)
1422-
# The purpose of state_setter is to carry-out an
1423-
# inplace modification of obj. We do not care about what the
1424-
# method might return, so its output is eventually removed from
1425-
# the stack.
1426-
write(pickle.POP)
1427-
1428-
def save_global(self, obj, name=None, pack=struct.pack):
1429-
"""Main dispatch method.
1430-
1431-
The name of this method is somewhat misleading: all types get
1432-
dispatched here.
1433-
"""
1434-
if obj is type(None): # noqa
1435-
return self.save_reduce(type, (None, ), obj=obj)
1436-
elif obj is type(Ellipsis):
1437-
return self.save_reduce(type, (Ellipsis, ), obj=obj)
1438-
elif obj is type(NotImplemented):
1439-
return self.save_reduce(type, (NotImplemented, ), obj=obj)
1440-
elif obj in _BUILTIN_TYPE_NAMES:
1441-
return self.save_reduce(
1442-
_builtin_type, (_BUILTIN_TYPE_NAMES[obj], ), obj=obj)
1443-
1444-
if name is not None:
1445-
super().save_global(obj, name=name)
1446-
elif not _should_pickle_by_reference(obj, name=name):
1447-
self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj)
1448-
else:
1449-
super().save_global(obj, name=name)
1446+
listitems=listitems,
1447+
dictitems=dictitems,
1448+
obj=obj,
1449+
)
1450+
# backport of the Python 3.8 state_setter pickle operations
1451+
save(state_setter)
1452+
save(obj) # simple BINGET opcode as obj is already memoized.
1453+
save(state)
1454+
write(pickle.TUPLE2)
1455+
# Trigger a state_setter(obj, state) function call.
1456+
write(pickle.REDUCE)
1457+
# The purpose of state_setter is to carry-out an
1458+
# inplace modification of obj. We do not care about what the
1459+
# method might return, so its output is eventually removed from
1460+
# the stack.
1461+
write(pickle.POP)
1462+
1463+
def save_global(self, obj, name=None, pack=struct.pack):
1464+
"""Main dispatch method.
1465+
1466+
The name of this method is somewhat misleading: all types get
1467+
dispatched here.
1468+
"""
1469+
if obj is type(None): # noqa
1470+
return self.save_reduce(type, (None, ), obj=obj)
1471+
elif obj is type(Ellipsis):
1472+
return self.save_reduce(type, (Ellipsis, ), obj=obj)
1473+
elif obj is type(NotImplemented):
1474+
return self.save_reduce(type, (NotImplemented, ), obj=obj)
1475+
elif obj in _BUILTIN_TYPE_NAMES:
1476+
return self.save_reduce(
1477+
_builtin_type, (_BUILTIN_TYPE_NAMES[obj], ), obj=obj)
1478+
1479+
if name is not None:
1480+
super().save_global(obj, name=name)
1481+
elif not _should_pickle_by_reference(obj, name=name):
1482+
self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj)
1483+
else:
1484+
super().save_global(obj, name=name)
14501485

1451-
dispatch[type] = save_global
1486+
dispatch[type] = save_global
14521487

1453-
def save_function(self, obj, name=None):
1454-
"""Registered with the dispatch to handle all function types.
1488+
def save_function(self, obj, name=None):
1489+
"""Registered with the dispatch to handle all function types.
14551490
1456-
Determines what kind of function obj is (e.g. lambda, defined at
1457-
interactive prompt, etc) and handles the pickling appropriately.
1458-
"""
1459-
if _should_pickle_by_reference(obj, name=name):
1460-
return super().save_global(obj, name=name)
1461-
elif PYPY and isinstance(obj.__code__, builtin_code_type):
1462-
return self.save_pypy_builtin_func(obj)
1463-
else:
1464-
return self._save_reduce_pickle5(
1465-
*self._dynamic_function_reduce(obj), obj=obj)
1466-
1467-
def save_pypy_builtin_func(self, obj):
1468-
"""Save pypy equivalent of builtin functions.
1469-
1470-
PyPy does not have the concept of builtin-functions. Instead,
1471-
builtin-functions are simple function instances, but with a
1472-
builtin-code attribute.
1473-
Most of the time, builtin functions should be pickled by attribute.
1474-
But PyPy has flaky support for __qualname__, so some builtin
1475-
functions such as float.__new__ will be classified as dynamic. For
1476-
this reason only, we created this special routine. Because
1477-
builtin-functions are not expected to have closure or globals,
1478-
there is no additional hack (compared the one already implemented
1479-
in pickle) to protect ourselves from reference cycles. A simple
1480-
(reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note
1481-
also that PyPy improved their support for __qualname__ in v3.6, so
1482-
this routing should be removed when cloudpickle supports only PyPy
1483-
3.6 and later.
1484-
"""
1485-
rv = (
1486-
types.FunctionType,
1487-
(obj.__code__, {}, obj.__name__, obj.__defaults__, obj.__closure__),
1488-
obj.__dict__,
1489-
)
1490-
self.save_reduce(*rv, obj=obj)
1491+
Determines what kind of function obj is (e.g. lambda, defined at
1492+
interactive prompt, etc) and handles the pickling appropriately.
1493+
"""
1494+
if _should_pickle_by_reference(obj, name=name):
1495+
return super().save_global(obj, name=name)
1496+
elif PYPY and isinstance(obj.__code__, builtin_code_type):
1497+
return self.save_pypy_builtin_func(obj)
1498+
else:
1499+
return self._save_reduce_pickle5(
1500+
*self._dynamic_function_reduce(obj), obj=obj)
1501+
1502+
def save_pypy_builtin_func(self, obj):
1503+
"""Save pypy equivalent of builtin functions.
1504+
1505+
PyPy does not have the concept of builtin-functions. Instead,
1506+
builtin-functions are simple function instances, but with a
1507+
builtin-code attribute.
1508+
Most of the time, builtin functions should be pickled by attribute.
1509+
But PyPy has flaky support for __qualname__, so some builtin
1510+
functions such as float.__new__ will be classified as dynamic. For
1511+
this reason only, we created this special routine. Because
1512+
builtin-functions are not expected to have closure or globals,
1513+
there is no additional hack (compared the one already implemented
1514+
in pickle) to protect ourselves from reference cycles. A simple
1515+
(reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note
1516+
also that PyPy improved their support for __qualname__ in v3.6, so
1517+
this routing should be removed when cloudpickle supports only PyPy
1518+
3.6 and later.
1519+
"""
1520+
rv = (
1521+
types.FunctionType,
1522+
(obj.__code__, {}, obj.__name__, obj.__defaults__, obj.__closure__),
1523+
obj.__dict__,
1524+
)
1525+
self.save_reduce(*rv, obj=obj)
14911526

1492-
dispatch[types.FunctionType] = save_function
1527+
dispatch[types.FunctionType] = save_function
14931528

14941529

14951530
# Shorthands similar to pickle.dump/pickle.dumps
@@ -1533,5 +1568,12 @@ def dumps(obj, protocol=None, buffer_callback=None):
15331568
# Include pickles unloading functions in this namespace for convenience.
15341569
load, loads = pickle.load, pickle.loads
15351570

1571+
# Use the fast pickler extending the C implementation of pickler if it is
1572+
# available.
1573+
if PYPY:
1574+
Pickler = PurePythonPickler
1575+
else:
1576+
Pickler = FastPickler
1577+
15361578
# Backward compat alias.
15371579
CloudPickler = Pickler

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def dumps(o, enable_trace=True, use_zlib=False) -> bytes:
111111
"""For internal use only; no backwards-compatibility guarantees."""
112112
with _pickle_lock:
113113
with io.BytesIO() as file:
114-
pickler = cloudpickle.CloudPickler(file)
114+
pickler = cloudpickle.PurePythonPickler(file)
115115
try:
116116
pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
117117
except NameError:

0 commit comments

Comments
 (0)