Skip to content
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
f777002
implement lambda name pickling in cloudpickle
kristynsmith Aug 19, 2025
d172299
add enable_lambda_name to __init__
kristynsmith Aug 19, 2025
1f9725e
fix formatting and lint
kristynsmith Aug 19, 2025
a9e6652
fix typo
kristynsmith Aug 20, 2025
c5fa831
fix code paths in test
kristynsmith Aug 20, 2025
c32c32f
fix tests
kristynsmith Aug 20, 2025
df614e0
fix lint
kristynsmith Aug 20, 2025
261104a
fix formatting and failing test
kristynsmith Aug 20, 2025
d1618f4
fix formatting again
kristynsmith Aug 21, 2025
475e7f1
Merge branch 'master' into main
kristynsmith Aug 27, 2025
2eb780a
fix formatting
kristynsmith Aug 27, 2025
577513e
add conditionals for error handling
kristynsmith Sep 3, 2025
42ce261
formatting
kristynsmith Sep 3, 2025
e5632c1
fix typo
kristynsmith Sep 3, 2025
c8e864f
formatting
kristynsmith Sep 4, 2025
bde162c
Merge branch 'master' into main
kristynsmith Sep 15, 2025
2e9b8ad
remove enable_lambda_pickle from __init__ and add to config
kristynsmith Sep 18, 2025
7d731e5
fix typo
kristynsmith Sep 18, 2025
81befec
remove enable_lambda_name from dumps()
kristynsmith Sep 23, 2025
3d7f8b8
do monkey-patch of cloudpickle
kristynsmith Sep 24, 2025
3e90b99
fix typo
kristynsmith Sep 24, 2025
a776234
Update cloudpickle.py
kristynsmith Sep 24, 2025
5674ef1
remove context manager and patched_function_getnewargs
kristynsmith Sep 25, 2025
c07a044
rename enable_stable_code_indentifier_pickling to enable_stable_funct…
kristynsmith Sep 25, 2025
8d7b663
address comments and formatting
kristynsmith Sep 25, 2025
59b23aa
remove enable_lambda_name form pipeline_context and remove enable_lam…
kristynsmith Sep 30, 2025
d3acc1e
make newargs a tuple
kristynsmith Sep 30, 2025
2ada3f3
Merge branch 'master' into main
kristynsmith Sep 30, 2025
2b8becb
remove unnecessary changes to cloudpickle.py
kristynsmith Sep 30, 2025
3a4dc75
remove lambda pickling tests
kristynsmith Oct 1, 2025
5207565
fix formatting
kristynsmith Oct 1, 2025
55de26c
fix lint
kristynsmith Oct 1, 2025
599cd4a
format
kristynsmith Oct 1, 2025
73df5fe
Update cloudpickle_pickler.py
kristynsmith Oct 1, 2025
af7d95a
fix pickler_test tests
kristynsmith Oct 1, 2025
ecaf70e
implement new design
kristynsmith Oct 2, 2025
40cc2c7
fix indent
kristynsmith Oct 3, 2025
cfb3224
fix naming typo
kristynsmith Oct 7, 2025
0935a45
typo
kristynsmith Oct 7, 2025
c1b48b7
fix naming
kristynsmith Oct 7, 2025
e8de95a
remove get_code_object_identifier form DEFAULT_CONFIG
kristynsmith Oct 7, 2025
cbf40db
change conditional
kristynsmith Oct 7, 2025
5cb1566
formatting
kristynsmith Oct 7, 2025
053911a
fix error
kristynsmith Oct 7, 2025
a636c89
format
kristynsmith Oct 7, 2025
eea0652
fix tests
kristynsmith Oct 7, 2025
f2ccacc
add import
kristynsmith Oct 8, 2025
9aa75d3
fix test again
kristynsmith Oct 8, 2025
0c26254
edit _stable_identifier_function_reduce to make tests pass
kristynsmith Oct 8, 2025
b2b6675
add GetCodeObjectParams and edit STABLE_CODE_IDENTIFIER_CONFIG
kristynsmith Oct 8, 2025
4fd61ce
still trying to fix test
kristynsmith Oct 9, 2025
31dd574
fix test and cleanup
kristynsmith Oct 9, 2025
413a840
format
kristynsmith Oct 9, 2025
788a7ab
fix typo
kristynsmith Oct 9, 2025
c9799ae
add filtering for globals in _stable_identifier_function_reduce
kristynsmith Oct 9, 2025
d0749b9
add test_stable_identifier_uses_current_code
kristynsmith Oct 13, 2025
2395e24
fix indent
kristynsmith Oct 14, 2025
a177cbc
format
kristynsmith Oct 14, 2025
8b446d4
add mutable_test_function
kristynsmith Oct 14, 2025
82525d0
format
kristynsmith Oct 14, 2025
029fdea
fix string indent
kristynsmith Oct 15, 2025
0e5baa1
fix string indent again
kristynsmith Oct 15, 2025
9b91709
update _make_function_from_identifier to fix test
kristynsmith Oct 15, 2025
363455e
use textwrap for string formatting
kristynsmith Oct 15, 2025
d4f06e2
format
kristynsmith Oct 15, 2025
24d160e
fix indent
kristynsmith Oct 15, 2025
0ecc485
edit docstring
kristynsmith Oct 17, 2025
e10235c
Merge branch 'apache:master' into main
kristynsmith Oct 20, 2025
5313064
Update _dumps function to include stable code identifier
kristynsmith Oct 20, 2025
a161511
fix syntax
kristynsmith Oct 20, 2025
449031e
format
kristynsmith Oct 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ def uuid_generator(_):
return uuid.uuid4().hex


@dataclasses.dataclass
class GetCodeObjectParams:
get_code_object_identifier: typing.Optional[callable]
get_code_from_identifier: typing.Optional[callable]


@dataclasses.dataclass
class CloudPickleConfig:
"""Configuration for cloudpickle behavior.
Expand All @@ -127,10 +133,18 @@ class CloudPickleConfig:

filepath_interceptor: Used to modify filepaths in `co_filename` and
function.__globals__['__file__'].

get_code_object_params: Use identifiers derived from code
location when pickling dynamic functions (e.g. lambdas). Enabling
this setting results in pickled payloads becoming more stable to
code changes: when a particular lambda function is slightly
modified but the location of the function in the codebase has not
changed, the pickled representation might stay the same.
"""
id_generator: typing.Optional[callable] = uuid_generator
skip_reset_dynamic_type_state: bool = False
filepath_interceptor: typing.Optional[callable] = None
get_code_object_params: typing.Optional[GetCodeObjectParams] = None


DEFAULT_CONFIG = CloudPickleConfig()
Expand Down Expand Up @@ -567,6 +581,12 @@ def _make_function(code, globals, name, argdefs, closure):
return types.FunctionType(code, globals, name, argdefs, closure)


def _make_function_from_identifier(
get_code_from_identifier, code_path, globals, name, argdefs, closure):
fcode = get_code_from_identifier(code_path)
return _make_function(fcode, globals, name, argdefs, closure)


def _make_empty_cell():
if False:
# trick the compiler into creating an empty cell in our lambda
Expand Down Expand Up @@ -1305,6 +1325,44 @@ class Pickler(pickle.Pickler):

dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table)

def _stable_identifier_function_reduce(self, func):
code_object_params = self.config.get_code_object_params
if code_object_params is None:
return self._dynamic_function_reduce(func)
code_path = code_object_params.get_code_object_identifier(func)
if not code_path:
return self._dynamic_function_reduce(func)
base_globals = self.globals_ref.setdefault(id(func.__globals__), {})

if base_globals == {}:
if "__file__" in func.__globals__:
# Apply normalization ONLY to the __file__ attribute
file_path = func.__globals__["__file__"]
if self.config.filepath_interceptor:
file_path = self.config.filepath_interceptor(file_path)
base_globals["__file__"] = file_path
# Add module attributes used to resolve relative imports
# instructions inside func.
for k in ["__package__", "__name__", "__path__"]:
if k in func.__globals__:
base_globals[k] = func.__globals__[k]
newargs = (
code_path,
base_globals,
func.__name__,
func.__defaults__,
func.__closure__)
state = _function_getstate(func)
return (
functools.partial(
_make_function_from_identifier,
code_object_params.get_code_from_identifier),
newargs,
state,
None,
None,
_function_setstate)

# function reducers are defined as instance methods of cloudpickle.Pickler
# objects, as they rely on a cloudpickle.Pickler attribute (globals_ref)
def _dynamic_function_reduce(self, func):
Expand All @@ -1324,6 +1382,8 @@ def _function_reduce(self, obj):
"""
if _should_pickle_by_reference(obj):
return NotImplemented
elif self.config.get_code_object_params is not None:
return self._stable_identifier_function_reduce(obj)
else:
return self._dynamic_function_reduce(obj)

Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,19 @@
import threading
import zlib

from apache_beam.internal import code_object_pickler
from apache_beam.internal.cloudpickle import cloudpickle

DEFAULT_CONFIG = cloudpickle.CloudPickleConfig(
skip_reset_dynamic_type_state=True)
NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig(
id_generator=None, skip_reset_dynamic_type_state=True)
STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig(
skip_reset_dynamic_type_state=True,
get_code_object_params=cloudpickle.GetCodeObjectParams(
get_code_object_identifier=code_object_pickler.
get_code_object_identifier,
get_code_from_identifier=code_object_pickler.get_code_from_identifier))

try:
from absl import flags
Expand Down Expand Up @@ -119,6 +126,7 @@ def dumps(
enable_trace=True,
use_zlib=False,
enable_best_effort_determinism=False,
enable_stable_code_identifier_pickling=False,
config: cloudpickle.CloudPickleConfig = DEFAULT_CONFIG) -> bytes:
"""For internal use only; no backwards-compatibility guarantees."""
if enable_best_effort_determinism:
Expand All @@ -129,6 +137,8 @@ def dumps(
'This has only been implemented for dill.')
with _pickle_lock:
with io.BytesIO() as file:
if enable_stable_code_identifier_pickling:
config = STABLE_CODE_IDENTIFIER_CONFIG
pickler = cloudpickle.CloudPickler(file, config=config)
try:
pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
Expand Down
90 changes: 58 additions & 32 deletions sdks/python/apache_beam/internal/code_object_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

This module provides helper functions to improve pickling code objects,
especially lambdas, in a consistent way by using code object identifiers. These
helper functions will be used to patch pickler implementations used by Beam
helper functions are used to patch pickler implementations used by Beam
(e.g. Cloudpickle).

A code object identifier is a unique identifier for a code object that provides
Expand Down Expand Up @@ -81,8 +81,9 @@ def get_code_object_identifier(callable: types.FunctionType):
- __main__.ClassWithNestedLambda.process.__code__.co_consts[
<lambda>, ('x',), 1234567890]
"""
if not hasattr(callable, '__module__') or not hasattr(callable,
'__qualname__'):
if (not hasattr(callable, '__module__') or
not hasattr(callable, '__qualname__') or not callable.__module__ or
callable.__module__ not in sys.modules):
return None
code_path: str = _extend_path(
callable.__module__,
Expand All @@ -100,7 +101,7 @@ def _extend_path(prefix: str, current_path: Optional[str]):

Args:
prefix: The prefix of the path.
suffix: The rest of the path.
current_path: The rest of the path.

Returns:
The extended path.
Expand Down Expand Up @@ -189,6 +190,8 @@ def _search_module_or_class(
if path is not None:
return _extend_path(name, _extend_path(f'__defaults__[{i}]', path))
else:
if not hasattr(node, first_part):
return None
return _extend_path(
first_part, _search(callable, getattr(node, first_part), rest))

Expand Down Expand Up @@ -281,6 +284,8 @@ def _search_lambda(
lambda_code_objects_by_name = collections.defaultdict(list)
name = qual_name_parts[0]
code_objects = code_objects_by_name[name]
if not code_objects:
return None
if name == '<lambda>':
for code_object in code_objects:
lambda_name = f'<lambda>, {_signature(code_object)}'
Expand Down Expand Up @@ -315,10 +320,10 @@ def _search_lambda(
_SINGLE_NAME_PATTERN = re.compile(r'co_consts\[([a-zA-Z0-9\<\>_-]+)]')
# Matches a path like: co_consts[<lambda>, ('x',)]
_LAMBDA_WITH_ARGS_PATTERN = re.compile(
r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\))\]")
r"co_consts\[(<.*?>),\s(\('[^']+'(?:,\s*'[^']+')*,?\))\]")
# Matches a path like: co_consts[<lambda>, ('x',), 1234567890]
_LAMBDA_WITH_HASH_PATTERN = re.compile(
r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\)),\s*(.+)\]")
r"co_consts\[(<[^>]+>),\s*(\([^\)]*\)),?\s*(.*)\]")
# Matches a path like: __defaults__[0]
_DEFAULT_PATTERN = re.compile(r'(__defaults__)\[(\d+)\]')
# Matches an argument like: 'x'
Expand All @@ -345,9 +350,10 @@ def _get_code_object_from_single_name_pattern(
raise ValueError(f'Invalid pattern for single name: {name_result.group(0)}')
# Groups are indexed starting at 1, group(0) is the entire match.
name = name_result.group(1)
for co_const in obj.co_consts:
if inspect.iscode(co_const) and co_const.co_name == name:
return co_const
if hasattr(obj, 'co_consts'):
for co_const in obj.co_consts:
if inspect.iscode(co_const) and co_const.co_name == name:
return co_const
raise AttributeError(f'Could not find code object with path: {path}')


Expand All @@ -368,15 +374,16 @@ def _get_code_object_from_lambda_with_args_pattern(
"""
name = lambda_with_args_result.group(1)
code_objects = collections.defaultdict(list)
for co_const in obj.co_consts:
if inspect.iscode(co_const) and co_const.co_name == name:
code_objects[co_const.co_name].append(co_const)
for name, objects in code_objects.items():
for obj_ in objects:
args = tuple(
re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2)))
if obj_.co_varnames == args:
return obj_
if hasattr(obj, 'co_consts'):
for co_const in obj.co_consts:
if inspect.iscode(co_const) and co_const.co_name == name:
code_objects[co_const.co_name].append(co_const)
for name, objects in code_objects.items():
for obj_ in objects:
args = tuple(
re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2)))
if obj_.co_varnames[:_get_arg_count(obj_)] == args:
return obj_
raise AttributeError(f'Could not find code object with path: {path}')


Expand All @@ -397,17 +404,18 @@ def _get_code_object_from_lambda_with_hash_pattern(
"""
name = lambda_with_hash_result.group(1)
code_objects = collections.defaultdict(list)
for co_const in obj.co_consts:
if inspect.iscode(co_const) and co_const.co_name == name:
code_objects[co_const.co_name].append(co_const)
for name, objects in code_objects.items():
for obj_ in objects:
args = tuple(
re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2)))
if obj_.co_varnames == args:
hash_value = lambda_with_hash_result.group(3)
if hash_value == str(_create_bytecode_hash(obj_)):
return obj_
if hasattr(obj, 'co_consts'):
for co_const in obj.co_consts:
if inspect.iscode(co_const) and co_const.co_name == name:
code_objects[co_const.co_name].append(co_const)
for name, objects in code_objects.items():
for obj_ in objects:
args = tuple(
re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2)))
if obj_.co_varnames[:_get_arg_count(obj_)] == args:
hash_value = lambda_with_hash_result.group(3)
if hash_value == str(_create_bytecode_hash(obj_)):
return obj_
raise AttributeError(f'Could not find code object with path: {path}')


Expand All @@ -427,6 +435,8 @@ def get_code_from_identifier(code_object_identifier: str):
if not code_object_identifier:
raise ValueError('Path must not be empty.')
parts = code_object_identifier.split('.')
if parts[0] not in sys.modules:
raise AttributeError(f'Module {parts[0]} not found in sys.modules')
obj = sys.modules[parts[0]]
for part in parts[1:]:
if name_result := _SINGLE_NAME_PATTERN.fullmatch(part):
Expand All @@ -447,7 +457,11 @@ def get_code_from_identifier(code_object_identifier: str):
obj = getattr(obj, '__defaults__')[index]
else:
obj = getattr(obj, part)
return obj
if isinstance(obj, types.CodeType):
return obj
else:
raise AttributeError(
f'Could not find code object with path: {code_object_identifier}')


def _signature(obj: types.CodeType):
Expand All @@ -462,12 +476,24 @@ def _signature(obj: types.CodeType):
Returns:
A tuple of the names of the arguments of the code object.
"""
arg_count = (
return obj.co_varnames[:_get_arg_count(obj)]


def _get_arg_count(obj: types.CodeType):
"""Returns the number of arguments of a code object.

Args:
obj: A code object, function, method, or cell.

Returns:
The number of arguments of the code object, or None if the object is not a
code object.
"""
return (
obj.co_argcount + obj.co_kwonlyargcount +
(obj.co_flags & 4 == 4) # PyCF_VARARGS
+ (obj.co_flags & 8 == 8) # PyCF_VARKEYWORDS
)
return obj.co_varnames[:arg_count]


def _create_bytecode_hash(code_object: types.CodeType):
Expand Down
27 changes: 16 additions & 11 deletions sdks/python/apache_beam/internal/code_object_pickler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,14 @@ def test_adding_lambda_variable_in_class_preserves_object(self):
module_2_modified.AddLambdaVariable.my_method(self).__code__,
)

def test_removing_lambda_variable_in_class_changes_object(self):
with self.assertRaisesRegex(AttributeError, "object has no attribute"):
code_object_pickler.get_code_from_identifier(
code_object_pickler.get_code_object_identifier(
module_2.RemoveLambdaVariable.my_method(self)).replace(
"module_2", "module_2_modified"))
def test_removing_lambda_variable_in_class_preserves_object(self):
self.assertEqual(
code_object_pickler.get_code_from_identifier(
code_object_pickler.get_code_object_identifier(
module_2.RemoveLambdaVariable.my_method(self)).replace(
"module_2", "module_2_modified")),
module_2_modified.RemoveLambdaVariable.my_method(self).__code__,
)

def test_adding_nested_function_in_class_preserves_object(self):
self.assertEqual(
Expand Down Expand Up @@ -391,11 +393,14 @@ def test_adding_lambda_variable_in_function_preserves_object(self):
module_1_lambda_variable_added.my_function().__code__,
)

def test_removing_lambda_variable_in_function_raises_exception(self):
with self.assertRaisesRegex(AttributeError, "object has no attribute"):
code_object_pickler.get_code_from_identifier(
code_object_pickler.get_code_object_identifier(
module_3.my_function()).replace("module_3", "module_3_modified"))
def test_removing_lambda_variable_in_function_preserves_object(self):
self.assertEqual(
code_object_pickler.get_code_from_identifier(
code_object_pickler.get_code_object_identifier(
module_3.my_function()).replace(
"module_3", "module_3_modified")),
module_3_modified.my_function().__code__,
)


class CodePathStabilityTest(unittest.TestCase):
Expand Down
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/internal/module_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
GLOBAL_DICT = {}


def mutable_test_function():
def dynamic_function():
return 'version1'

return dynamic_function


class UnPicklable:
def __init__(self, x):
self.x = x
Expand Down
14 changes: 12 additions & 2 deletions sdks/python/apache_beam/internal/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,18 @@ def dumps(
o,
enable_trace=True,
use_zlib=False,
enable_best_effort_determinism=False) -> bytes:

enable_best_effort_determinism=False,
enable_stable_code_identifier_pickling=False) -> bytes:

if (desired_pickle_lib == cloudpickle_pickler):
return cloudpickle_pickler.dumps(
o,
enable_trace=enable_trace,
use_zlib=use_zlib,
enable_best_effort_determinism=enable_best_effort_determinism,
enable_stable_code_identifier_pickling=
enable_stable_code_identifier_pickling,
)
return desired_pickle_lib.dumps(
o,
enable_trace=enable_trace,
Expand Down
Loading
Loading