forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcloudpickle_pickler_test.py
More file actions
144 lines (108 loc) · 4.7 KB
/
cloudpickle_pickler_test.py
File metadata and controls
144 lines (108 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the cloudpickle_pickler module."""
# pytype: skip-file
import threading
import types
import unittest
from apache_beam.coders import proto2_coder_test_messages_pb2
from apache_beam.internal import module_test
from apache_beam.internal.cloudpickle_pickler import dumps
from apache_beam.internal.cloudpickle_pickler import loads
class PicklerTest(unittest.TestCase):
NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")
def test_pickle_nested_enum_descriptor(self):
NestedEnum = proto2_coder_test_messages_pb2.MessageD.NestedEnum
def fn():
return NestedEnum.TWO
self.assertEqual(fn(), loads(dumps(fn))())
def test_pickle_top_level_enum_descriptor(self):
TopLevelEnum = proto2_coder_test_messages_pb2.TopLevelEnum
def fn():
return TopLevelEnum.ONE
self.assertEqual(fn(), loads(dumps(fn))())
def test_basics(self):
self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )])))
fun = lambda x: 'xyz-%s' % x
self.assertEqual('xyz-abc', loads(dumps(fun))('abc'))
def test_lambda_with_globals(self):
"""Tests that the globals of a function are preserved."""
# The point of the test is that the lambda being called after unpickling
# relies on having the re module being loaded.
self.assertEqual(['abc', 'def'],
loads(dumps(
module_test.get_lambda_with_globals()))('abc def'))
def test_lambda_with_main_globals(self):
self.assertEqual(unittest, loads(dumps(lambda: unittest))())
def test_lambda_with_closure(self):
"""Tests that the closure of a function is preserved."""
self.assertEqual(
'closure: abc',
loads(dumps(module_test.get_lambda_with_closure('abc')))())
def test_class_object_pickled(self):
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.Xyz))().foo('abc def'))
def test_class_instance_pickled(self):
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
def test_pickling_preserves_closure_of_a_function(self):
self.assertEqual(
'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
self.assertEqual(
'Y:abc',
loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)
def test_pickle_dynamic_class(self):
self.assertEqual(
'Z:abc', loads(dumps(module_test.create_class('abc'))).get())
def test_generators(self):
with self.assertRaises(TypeError):
dumps((_ for _ in range(10)))
def test_recursive_class(self):
self.assertEqual(
'RecursiveClass:abc',
loads(dumps(module_test.RecursiveClass('abc').datum)))
def test_function_with_external_reference(self):
out_of_scope_var = 'expected_value'
def foo():
return out_of_scope_var
self.assertEqual('expected_value', loads(dumps(foo))())
def test_pickle_rlock(self):
rlock_instance = threading.RLock()
rlock_type = type(rlock_instance)
self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type)
def test_pickle_lock(self):
lock_instance = threading.Lock()
lock_type = type(lock_instance)
self.assertIsInstance(loads(dumps(lock_instance)), lock_type)
@unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced')
def test_dump_and_load_mapping_proxy(self):
self.assertEqual(
'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc'])
self.assertEqual(
types.MappingProxyType, type(loads(dumps(types.MappingProxyType({})))))
# pylint: disable=exec-used
def test_dataclass(self):
exec(
'''
from apache_beam.internal.module_test import DataClass
self.assertEqual(DataClass(datum='abc'), loads(dumps(DataClass(datum='abc'))))
''')
def test_best_effort_determinism_not_implemented(self):
with self.assertRaises(NotImplementedError):
dumps(123, enable_best_effort_determinism=True)
if __name__ == '__main__':
unittest.main()