Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/internal/cloudpickle_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@

from apache_beam.internal import code_object_pickler
from apache_beam.internal.cloudpickle import cloudpickle
from apache_beam.internal.code_object_pickler import get_normalized_path

DEFAULT_CONFIG = cloudpickle.CloudPickleConfig(
skip_reset_dynamic_type_state=True)
skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path)
NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig(
id_generator=None, skip_reset_dynamic_type_state=True)
id_generator=None, skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path)
STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig(
skip_reset_dynamic_type_state=True,
skip_reset_dynamic_type_state=True, filepath_interceptor=get_normalized_path,
get_code_object_params=cloudpickle.GetCodeObjectParams(
get_code_object_identifier=code_object_pickler.
get_code_object_identifier,
Expand Down
33 changes: 33 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

# pytype: skip-file

import os
import threading
import types
import unittest

from apache_beam.coders import proto2_coder_test_messages_pb2
from apache_beam.internal import cloudpickle_pickler as beam_cloudpickle
from apache_beam.internal import code_object_pickler
from apache_beam.internal import module_test
from apache_beam.internal.cloudpickle_pickler import dumps
from apache_beam.internal.cloudpickle_pickler import loads
Expand Down Expand Up @@ -220,6 +223,36 @@ def test_best_effort_determinism_not_implemented(self):
'Ignoring unsupported option: enable_best_effort_determinism',
'\n'.join(l.output))

@unittest.mock.patch.object(
code_object_pickler,
'get_normalized_path',
wraps=code_object_pickler.get_normalized_path)
def test_default_config_interceptor(self, mock_get_normalized_path):
"""Tests config.filepath_interceptor is called for CodeType pickling."""

def sample_func():
return "Beam"

code_obj = sample_func.__code__
original_filename = os.path.abspath(code_obj.co_filename)

try:
pickled_code = beam_cloudpickle.dumps(code_obj)
unpickled_code = beam_cloudpickle.loads(pickled_code)

mock_get_normalized_path.assert_called()

unpickled_filename = os.path.abspath(unpickled_code.co_filename)
self.assertEqual(unpickled_filename, original_filename)

except AttributeError as e:
if 'get_code_object_params' in str(e):
self.fail(
"Vendored cloudpickle BUG: AttributeError 'get_code_object_params' "
f"raised during CodeType pickling. Error: {e}")
else:
raise


if __name__ == '__main__':
unittest.main()
Loading