3131from apache_beam .internal .cloudpickle_pickler import dumps
3232from apache_beam .internal .cloudpickle_pickler import loads
3333from apache_beam .utils import shared
34+ from unittest import mock
3435
3536GLOBAL_DICT_REF = module_test .GLOBAL_DICT
3637
@@ -223,12 +224,14 @@ def test_best_effort_determinism_not_implemented(self):
223224 'Ignoring unsupported option: enable_best_effort_determinism' ,
224225 '\n ' .join (l .output ))
225226
226- @unittest .mock .patch .object (
227- code_object_pickler ,
228- 'get_normalized_path' ,
229- wraps = code_object_pickler .get_normalized_path )
230- def test_default_config_interceptor (self , mock_get_normalized_path ):
227+ @mock .patch .object (
228+ beam_cloudpickle .DEFAULT_CONFIG , 'filepath_interceptor' , autospec = True
229+ )
230+ def test_default_config_interceptor (self , mock_filepath_interceptor ):
231231 """Tests config.filepath_interceptor is called for CodeType pickling."""
232+ mock_filepath_interceptor .side_effect = (
233+ code_object_pickler .get_normalized_path
234+ )
232235
233236 def sample_func ():
234237 return "Beam"
@@ -240,16 +243,18 @@ def sample_func():
240243 pickled_code = beam_cloudpickle .dumps (code_obj )
241244 unpickled_code = beam_cloudpickle .loads (pickled_code )
242245
243- mock_get_normalized_path .assert_called ()
246+ mock_filepath_interceptor .assert_called ()
244247
245248 unpickled_filename = os .path .abspath (unpickled_code .co_filename )
246249 self .assertEqual (unpickled_filename , original_filename )
247250
248251 except AttributeError as e :
249252 if 'get_code_object_params' in str (e ):
250253 self .fail (
251- "Vendored cloudpickle BUG: AttributeError 'get_code_object_params' "
252- f"raised during CodeType pickling. Error: { e } " )
254+ "Vendored cloudpickle BUG: AttributeError "
255+ f"'get_code_object_params' raised during CodeType pickling. "
256+ f"Error: { e } "
257+ )
253258 else :
254259 raise
255260
0 commit comments