Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/internal/cloudpickle_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@

from apache_beam.internal import code_object_pickler
from apache_beam.internal.cloudpickle import cloudpickle
from apache_beam.internal.code_object_pickler import get_normalized_path

DEFAULT_CONFIG = cloudpickle.CloudPickleConfig(
skip_reset_dynamic_type_state=True)
NO_DYNAMIC_CLASS_TRACKING_CONFIG = cloudpickle.CloudPickleConfig(
id_generator=None, skip_reset_dynamic_type_state=True)
skip_reset_dynamic_type_state=True,
filepath_interceptor=get_normalized_path)
STABLE_CODE_IDENTIFIER_CONFIG = cloudpickle.CloudPickleConfig(
skip_reset_dynamic_type_state=True,
filepath_interceptor=get_normalized_path,
get_code_object_params=cloudpickle.GetCodeObjectParams(
get_code_object_identifier=code_object_pickler.
get_code_object_identifier,
Expand Down
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle_pickler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

# pytype: skip-file

import os
import threading
import types
import unittest
from unittest import mock

from apache_beam.coders import proto2_coder_test_messages_pb2
from apache_beam.internal import cloudpickle_pickler as beam_cloudpickle
from apache_beam.internal import code_object_pickler
from apache_beam.internal import module_test
from apache_beam.internal.cloudpickle_pickler import dumps
from apache_beam.internal.cloudpickle_pickler import loads
Expand Down Expand Up @@ -220,6 +224,26 @@ def test_best_effort_determinism_not_implemented(self):
'Ignoring unsupported option: enable_best_effort_determinism',
'\n'.join(l.output))

@mock.patch.object(
beam_cloudpickle.DEFAULT_CONFIG, 'filepath_interceptor', autospec=True)
def test_default_config_interceptor(self, mock_filepath_interceptor):
"""Tests config.filepath_interceptor is called for CodeType pickling."""
mock_filepath_interceptor.side_effect = (
code_object_pickler.get_normalized_path)

def sample_func():
return "Beam"

code_obj = sample_func.__code__
original_filename = os.path.abspath(code_obj.co_filename)
pickled_code = beam_cloudpickle.dumps(code_obj)
unpickled_code = beam_cloudpickle.loads(pickled_code)

mock_filepath_interceptor.assert_called()

unpickled_filename = os.path.abspath(unpickled_code.co_filename)
self.assertEqual(unpickled_filename, original_filename)


if __name__ == '__main__':
unittest.main()
Loading