diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index eebba178e7c3..41d79a80ab16 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -37,13 +37,14 @@ from apache_beam.internal import code_object_pickler from apache_beam.internal.cloudpickle import cloudpickle +from apache_beam.internal.code_object_pickler import get_normalized_path DEFAULT_CONFIG = cloudpickle.CloudPickleConfig( - skip_reset_dynamic_type_state=True) -NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig( - id_generator=None, skip_reset_dynamic_type_state=True) + skip_reset_dynamic_type_state=True, + filepath_interceptor=get_normalized_path) STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig( skip_reset_dynamic_type_state=True, + filepath_interceptor=get_normalized_path, get_code_object_params=cloudpickle.GetCodeObjectParams( get_code_object_identifier=code_object_pickler. get_code_object_identifier, diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index b63ebd6c7109..4a51c56c24be 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -19,11 +19,15 @@ # pytype: skip-file +import os import threading import types import unittest +from unittest import mock from apache_beam.coders import proto2_coder_test_messages_pb2 +from apache_beam.internal import cloudpickle_pickler as beam_cloudpickle +from apache_beam.internal import code_object_pickler from apache_beam.internal import module_test from apache_beam.internal.cloudpickle_pickler import dumps from apache_beam.internal.cloudpickle_pickler import loads @@ -220,6 +224,26 @@ def test_best_effort_determinism_not_implemented(self): 'Ignoring unsupported option: enable_best_effort_determinism', '\n'.join(l.output)) + @mock.patch.object( + beam_cloudpickle.DEFAULT_CONFIG, 'filepath_interceptor', autospec=True) + def test_default_config_interceptor(self, mock_filepath_interceptor): + """Tests config.filepath_interceptor is called for CodeType pickling.""" + mock_filepath_interceptor.side_effect = ( + code_object_pickler.get_normalized_path) + + def sample_func(): + return "Beam" + + code_obj = sample_func.__code__ + original_filename = os.path.abspath(code_obj.co_filename) + pickled_code = beam_cloudpickle.dumps(code_obj) + unpickled_code = beam_cloudpickle.loads(pickled_code) + + mock_filepath_interceptor.assert_called() + + unpickled_filename = os.path.abspath(unpickled_code.co_filename) + self.assertEqual(unpickled_filename, original_filename) + if __name__ == '__main__': unittest.main()