forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpickler_test.py
More file actions
125 lines (99 loc) · 4.3 KB
/
pickler_test.py
File metadata and controls
125 lines (99 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the pickler module."""
# pytype: skip-file
import sys
import threading
import types
import unittest
from apache_beam.internal import module_test
from apache_beam.internal.pickler import dumps
from apache_beam.internal.pickler import loads
class PicklerTest(unittest.TestCase):
NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")
def test_basics(self):
self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )])))
fun = lambda x: 'xyz-%s' % x
self.assertEqual('xyz-abc', loads(dumps(fun))('abc'))
def test_lambda_with_globals(self):
"""Tests that the globals of a function are preserved."""
# The point of the test is that the lambda being called after unpickling
# relies on having the re module being loaded.
self.assertEqual(['abc', 'def'],
loads(dumps(
module_test.get_lambda_with_globals()))('abc def'))
def test_lambda_with_main_globals(self):
self.assertEqual(unittest, loads(dumps(lambda: unittest))())
def test_lambda_with_closure(self):
"""Tests that the closure of a function is preserved."""
self.assertEqual(
'closure: abc',
loads(dumps(module_test.get_lambda_with_closure('abc')))())
def test_class(self):
"""Tests that a class object is pickled correctly."""
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.Xyz))().foo('abc def'))
def test_object(self):
"""Tests that a class instance is pickled correctly."""
self.assertEqual(['abc', 'def'],
loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
def test_nested_class(self):
"""Tests that a nested class object is pickled correctly."""
self.assertEqual(
'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
self.assertEqual(
'Y:abc',
loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)
def test_dynamic_class(self):
"""Tests that a nested class object is pickled correctly."""
self.assertEqual(
'Z:abc', loads(dumps(module_test.create_class('abc'))).get())
def test_generators(self):
with self.assertRaises(TypeError):
dumps((_ for _ in range(10)))
def test_recursive_class(self):
self.assertEqual(
'RecursiveClass:abc',
loads(dumps(module_test.RecursiveClass('abc').datum)))
def test_pickle_rlock(self):
rlock_instance = threading.RLock()
rlock_type = type(rlock_instance)
self.assertIsInstance(loads(dumps(rlock_instance)), rlock_type)
def test_save_paths(self):
f = loads(dumps(lambda x: x))
co_filename = f.__code__.co_filename
self.assertTrue(co_filename.endswith('pickler_test.py'))
@unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced')
def test_dump_and_load_mapping_proxy(self):
self.assertEqual(
'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc'])
self.assertEqual(
types.MappingProxyType, type(loads(dumps(types.MappingProxyType({})))))
# pylint: disable=exec-used
@unittest.skipIf(sys.version_info < (3, 7), 'Python 3.7 or above only')
def test_dataclass(self):
exec(
'''
from apache_beam.internal.module_test import DataClass
self.assertEqual(DataClass(datum='abc'), loads(dumps(DataClass(datum='abc'))))
''')
def test_best_effort_determinism(self):
self.assertEqual(
dumps({'a', 'b', 'c'}, enable_best_effort_determinism=True),
dumps({'c', 'b', 'a'}, enable_best_effort_determinism=True))
if __name__ == '__main__':
unittest.main()