Skip to content

Commit 1211cc8

Browse files
claudevdmClaude
andauthored
Make dill optional (#36093)
* Make dill optional * Fix some tests that use update compat flag. * Lint and revert unrelated change. * Fix lint. * Lint fixes. * Fix messages and actually use dill when dill_unsafe flag is used. * Trigger postcommit. * Fix docstring --------- Co-authored-by: Claude <[email protected]>
1 parent 1c74621 commit 1211cc8

File tree

20 files changed

+236
-19
lines changed

20 files changed

+236
-19
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 27
3+
"modification": 28
44
}
55

sdks/python/apache_beam/coders/coders.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@
8585
# occurs.
8686
from apache_beam.internal.dill_pickler import dill
8787
except ImportError:
88-
# We fall back to using the stock dill library in tests that don't use the
89-
# full Python SDK.
90-
import dill
88+
dill = None
9189

9290
__all__ = [
9391
'Coder',
@@ -900,6 +898,13 @@ def to_type_hint(self):
900898

901899
class DillCoder(_PickleCoderBase):
902900
"""Coder using dill's pickle functionality."""
901+
def __init__(self):
902+
if not dill:
903+
raise RuntimeError(
904+
"This pipeline contains a DillCoder which requires "
905+
"the dill package. Install the dill package with the dill extra "
906+
"e.g. apache-beam[dill]")
907+
903908
def _create_impl(self):
904909
return coder_impl.CallbackCoderImpl(maybe_dill_dumps, maybe_dill_loads)
905910

sdks/python/apache_beam/coders/coders_test_common.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
except ImportError:
6060
dataclasses = None # type: ignore
6161

62+
try:
63+
import dill
64+
except ImportError:
65+
dill = None
66+
6267
MyNamedTuple = collections.namedtuple('A', ['x', 'y']) # type: ignore[name-match]
6368
AnotherNamedTuple = collections.namedtuple('AnotherNamedTuple', ['x', 'y'])
6469
MyTypedNamedTuple = NamedTuple('MyTypedNamedTuple', [('f1', int), ('f2', str)])
@@ -116,6 +121,7 @@ class UnFrozenDataClass:
116121
# These tests need to all be run in the same process due to the asserts
117122
# in tearDownClass.
118123
@pytest.mark.no_xdist
124+
@pytest.mark.uses_dill
119125
class CodersTest(unittest.TestCase):
120126

121127
# These class methods ensure that we test each defined coder in both
@@ -173,6 +179,9 @@ def tearDownClass(cls):
173179
coders.BigIntegerCoder, # tested in DecimalCoder
174180
coders.TimestampPrefixingOpaqueWindowCoder,
175181
])
182+
if not dill:
183+
standard -= set(
184+
[coders.DillCoder, coders.DeterministicFastPrimitivesCoder])
176185
cls.seen_nested -= set(
177186
[coders.ProtoCoder, coders.ProtoPlusCoder, CustomCoder])
178187
assert not standard - cls.seen, str(standard - cls.seen)
@@ -241,8 +250,13 @@ def test_memoizing_pickle_coder(self):
241250
param(compat_version="2.67.0"),
242251
])
243252
def test_deterministic_coder(self, compat_version):
253+
244254
typecoders.registry.update_compatibility_version = compat_version
245255
coder = coders.FastPrimitivesCoder()
256+
if not dill and compat_version:
257+
with self.assertRaises(RuntimeError):
258+
coder.as_deterministic_coder(step_label="step")
259+
self.skipTest('Dill not installed')
246260
deterministic_coder = coder.as_deterministic_coder(step_label="step")
247261

248262
self.check_coder(deterministic_coder, *self.test_values_deterministic)
@@ -321,6 +335,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):
321335
coder = coders.MapCoder(
322336
coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder())
323337

338+
if not dill and compat_version:
339+
with self.assertRaises(RuntimeError):
340+
coder.as_deterministic_coder(step_label="step")
341+
self.skipTest('Dill not installed')
342+
324343
deterministic_coder = coder.as_deterministic_coder(step_label="step")
325344

326345
assert isinstance(
@@ -331,6 +350,11 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):
331350
self.check_coder(deterministic_coder, *values)
332351

333352
def test_dill_coder(self):
353+
if not dill:
354+
with self.assertRaises(RuntimeError):
355+
coders.DillCoder()
356+
self.skipTest('Dill not installed')
357+
334358
cell_value = (lambda x: lambda: x)(0).__closure__[0]
335359
self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
336360
self.check_coder(
@@ -661,6 +685,8 @@ def test_param_windowed_value_coder(self):
661685
def test_cross_process_encoding_of_special_types_is_deterministic(
662686
self, compat_version):
663687
"""Test cross-process determinism for all special deterministic types"""
688+
if compat_version:
689+
pytest.importorskip("dill")
664690

665691
if sys.executable is None:
666692
self.skipTest('No Python interpreter found')

sdks/python/apache_beam/internal/pickler.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929
"""
3030

3131
from apache_beam.internal import cloudpickle_pickler
32-
from apache_beam.internal import dill_pickler
32+
33+
try:
34+
from apache_beam.internal import dill_pickler
35+
except ImportError:
36+
dill_pickler = None # type: ignore[assignment]
3337

3438
USE_CLOUDPICKLE = 'cloudpickle'
3539
USE_DILL = 'dill'
40+
USE_DILL_UNSAFE = 'dill_unsafe'
3641

3742
DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE
3843
desired_pickle_lib = cloudpickle_pickler
@@ -74,14 +79,29 @@ def load_session(file_path):
7479
def set_library(selected_library=DEFAULT_PICKLE_LIB):
7580
""" Sets pickle library that will be used. """
7681
global desired_pickle_lib
82+
83+
if selected_library == USE_DILL and not dill_pickler:
84+
raise ImportError(
85+
"Pipeline option pickle_library=dill is set, but dill is not "
86+
"installed. Install apache-beam with the dill extras package "
87+
"e.g. apache-beam[dill].")
88+
if selected_library == USE_DILL_UNSAFE and not dill_pickler:
89+
raise ImportError(
90+
"Pipeline option pickle_library=dill_unsafe is set, but dill is not "
91+
"installed. Install dill in job submission and runtime environments.")
92+
93+
is_currently_dill = (desired_pickle_lib == dill_pickler)
94+
dill_is_requested = (
95+
selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE)
96+
7797
# If switching to or from dill, update the pickler hook overrides.
78-
if (selected_library == USE_DILL) != (desired_pickle_lib == dill_pickler):
98+
if is_currently_dill != dill_is_requested:
7999
dill_pickler.override_pickler_hooks(selected_library == USE_DILL)
80100

81101
if selected_library == 'default':
82102
selected_library = DEFAULT_PICKLE_LIB
83103

84-
if selected_library == USE_DILL:
104+
if dill_is_requested:
85105
desired_pickle_lib = dill_pickler
86106
elif selected_library == USE_CLOUDPICKLE:
87107
desired_pickle_lib = cloudpickle_pickler

sdks/python/apache_beam/internal/pickler_test.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import types
2626
import unittest
2727

28+
import pytest
2829
from parameterized import param
2930
from parameterized import parameterized
3031

@@ -34,6 +35,12 @@
3435
from apache_beam.internal.pickler import loads
3536

3637

38+
def maybe_skip_if_no_dill(pickle_library):
39+
if pickle_library == 'dill':
40+
pytest.importorskip("dill")
41+
42+
43+
@pytest.mark.uses_dill
3744
class PicklerTest(unittest.TestCase):
3845

3946
NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType")
@@ -43,6 +50,7 @@ class PicklerTest(unittest.TestCase):
4350
param(pickle_lib='cloudpickle'),
4451
])
4552
def test_basics(self, pickle_lib):
53+
maybe_skip_if_no_dill(pickle_lib)
4654
pickler.set_library(pickle_lib)
4755

4856
self.assertEqual([1, 'a', ('z', )], loads(dumps([1, 'a', ('z', )])))
@@ -55,6 +63,7 @@ def test_basics(self, pickle_lib):
5563
])
5664
def test_lambda_with_globals(self, pickle_lib):
5765
"""Tests that the globals of a function are preserved."""
66+
maybe_skip_if_no_dill(pickle_lib)
5867
pickler.set_library(pickle_lib)
5968

6069
# The point of the test is that the lambda being called after unpickling
@@ -68,6 +77,7 @@ def test_lambda_with_globals(self, pickle_lib):
6877
param(pickle_lib='cloudpickle'),
6978
])
7079
def test_lambda_with_main_globals(self, pickle_lib):
80+
maybe_skip_if_no_dill(pickle_lib)
7181
pickler.set_library(pickle_lib)
7282
self.assertEqual(unittest, loads(dumps(lambda: unittest))())
7383

@@ -77,6 +87,7 @@ def test_lambda_with_main_globals(self, pickle_lib):
7787
])
7888
def test_lambda_with_closure(self, pickle_lib):
7989
"""Tests that the closure of a function is preserved."""
90+
maybe_skip_if_no_dill(pickle_lib)
8091
pickler.set_library(pickle_lib)
8192
self.assertEqual(
8293
'closure: abc',
@@ -88,6 +99,7 @@ def test_lambda_with_closure(self, pickle_lib):
8899
])
89100
def test_class(self, pickle_lib):
90101
"""Tests that a class object is pickled correctly."""
102+
maybe_skip_if_no_dill(pickle_lib)
91103
pickler.set_library(pickle_lib)
92104
self.assertEqual(['abc', 'def'],
93105
loads(dumps(module_test.Xyz))().foo('abc def'))
@@ -98,6 +110,7 @@ def test_class(self, pickle_lib):
98110
])
99111
def test_object(self, pickle_lib):
100112
"""Tests that a class instance is pickled correctly."""
113+
maybe_skip_if_no_dill(pickle_lib)
101114
pickler.set_library(pickle_lib)
102115
self.assertEqual(['abc', 'def'],
103116
loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
@@ -108,6 +121,7 @@ def test_object(self, pickle_lib):
108121
])
109122
def test_nested_class(self, pickle_lib):
110123
"""Tests that a nested class object is pickled correctly."""
124+
maybe_skip_if_no_dill(pickle_lib)
111125
pickler.set_library(pickle_lib)
112126
self.assertEqual(
113127
'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
@@ -121,6 +135,7 @@ def test_nested_class(self, pickle_lib):
121135
])
122136
def test_dynamic_class(self, pickle_lib):
123137
"""Tests that a nested class object is pickled correctly."""
138+
maybe_skip_if_no_dill(pickle_lib)
124139
pickler.set_library(pickle_lib)
125140
self.assertEqual(
126141
'Z:abc', loads(dumps(module_test.create_class('abc'))).get())
@@ -130,6 +145,7 @@ def test_dynamic_class(self, pickle_lib):
130145
param(pickle_lib='cloudpickle'),
131146
])
132147
def test_generators(self, pickle_lib):
148+
maybe_skip_if_no_dill(pickle_lib)
133149
pickler.set_library(pickle_lib)
134150
with self.assertRaises(TypeError):
135151
dumps((_ for _ in range(10)))
@@ -139,6 +155,7 @@ def test_generators(self, pickle_lib):
139155
param(pickle_lib='cloudpickle'),
140156
])
141157
def test_recursive_class(self, pickle_lib):
158+
maybe_skip_if_no_dill(pickle_lib)
142159
pickler.set_library(pickle_lib)
143160
self.assertEqual(
144161
'RecursiveClass:abc',
@@ -149,6 +166,7 @@ def test_recursive_class(self, pickle_lib):
149166
param(pickle_lib='cloudpickle'),
150167
])
151168
def test_pickle_rlock(self, pickle_lib):
169+
maybe_skip_if_no_dill(pickle_lib)
152170
pickler.set_library(pickle_lib)
153171
rlock_instance = threading.RLock()
154172
rlock_type = type(rlock_instance)
@@ -160,6 +178,7 @@ def test_pickle_rlock(self, pickle_lib):
160178
param(pickle_lib='cloudpickle'),
161179
])
162180
def test_save_paths(self, pickle_lib):
181+
maybe_skip_if_no_dill(pickle_lib)
163182
pickler.set_library(pickle_lib)
164183
f = loads(dumps(lambda x: x))
165184
co_filename = f.__code__.co_filename
@@ -171,6 +190,7 @@ def test_save_paths(self, pickle_lib):
171190
param(pickle_lib='cloudpickle'),
172191
])
173192
def test_dump_and_load_mapping_proxy(self, pickle_lib):
193+
maybe_skip_if_no_dill(pickle_lib)
174194
pickler.set_library(pickle_lib)
175195
self.assertEqual(
176196
'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc'])
@@ -184,6 +204,7 @@ def test_dump_and_load_mapping_proxy(self, pickle_lib):
184204
param(pickle_lib='cloudpickle'),
185205
])
186206
def test_dataclass(self, pickle_lib):
207+
maybe_skip_if_no_dill(pickle_lib)
187208
exec(
188209
'''
189210
from apache_beam.internal.module_test import DataClass
@@ -195,6 +216,7 @@ def test_dataclass(self, pickle_lib):
195216
param(pickle_lib='cloudpickle'),
196217
])
197218
def test_class_states_not_changed_at_subsequent_loading(self, pickle_lib):
219+
maybe_skip_if_no_dill(pickle_lib)
198220
pickler.set_library(pickle_lib)
199221

200222
class Local:
@@ -255,6 +277,7 @@ def maybe_get_sets_with_different_iteration_orders(self):
255277
return set1, set2
256278

257279
def test_best_effort_determinism(self):
280+
maybe_skip_if_no_dill('dill')
258281
pickler.set_library('dill')
259282
set1, set2 = self.maybe_get_sets_with_different_iteration_orders()
260283
self.assertEqual(
@@ -267,6 +290,7 @@ def test_best_effort_determinism(self):
267290
self.skipTest('Set iteration orders matched. Test results inconclusive.')
268291

269292
def test_disable_best_effort_determinism(self):
293+
maybe_skip_if_no_dill('dill')
270294
pickler.set_library('dill')
271295
set1, set2 = self.maybe_get_sets_with_different_iteration_orders()
272296
# The test relies on the sets having different iteration orders for the

sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@
6363
except ImportError:
6464
raise unittest.SkipTest('GCP dependencies are not installed')
6565

66+
try:
67+
import dill
68+
except ImportError:
69+
dill = None
70+
6671
_LOGGER = logging.getLogger(__name__)
6772

6873
_DESTINATION_ELEMENT_PAIRS = [
@@ -406,6 +411,13 @@ def test_partition_files_dofn_size_split(self):
406411
label='CheckSinglePartition')
407412

408413

414+
def maybe_skip(compat_version):
415+
if compat_version and not dill:
416+
raise unittest.SkipTest(
417+
'Dill dependency not installed which is required for compat_version'
418+
' <= 2.67.0')
419+
420+
409421
class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
410422
def test_trigger_load_jobs_with_empty_files(self):
411423
destination = "project:dataset.table"
@@ -485,7 +497,9 @@ def test_records_traverse_transform_with_mocks(self):
485497
param(compat_version=None),
486498
param(compat_version="2.64.0"),
487499
])
500+
@pytest.mark.uses_dill
488501
def test_reshuffle_before_load(self, compat_version):
502+
maybe_skip(compat_version)
489503
destination = 'project1:dataset1.table1'
490504

491505
job_reference = bigquery_api.JobReference()
@@ -994,6 +1008,7 @@ def dynamic_destination_resolver(element, *side_inputs):
9941008
])
9951009
def test_triggering_frequency(
9961010
self, is_streaming, with_auto_sharding, compat_version):
1011+
maybe_skip(compat_version)
9971012
destination = 'project1:dataset1.table1'
9981013

9991014
job_reference = bigquery_api.JobReference()

sdks/python/apache_beam/ml/anomaly/specifiable_test.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import unittest
2323
from typing import Optional
2424

25+
import pytest
2526
from parameterized import parameterized
2627

2728
from apache_beam.internal.cloudpickle import cloudpickle
@@ -323,7 +324,10 @@ def __init__(self, arg):
323324
self.my_arg = arg * 10
324325
type(self).counter += 1
325326

326-
def test_on_pickle(self):
327+
@pytest.mark.uses_dill
328+
def test_on_dill_pickle(self):
329+
pytest.importorskip("dill")
330+
327331
FooForPickle = TestInitCallCount.FooForPickle
328332

329333
import dill
@@ -339,6 +343,9 @@ def test_on_pickle(self):
339343
self.assertEqual(FooForPickle.counter, 1)
340344
self.assertEqual(new_foo_2.__dict__, foo.__dict__)
341345

346+
def test_on_pickle(self):
347+
FooForPickle = TestInitCallCount.FooForPickle
348+
342349
# Note that pickle does not support classes/functions nested in a function.
343350
import pickle
344351
FooForPickle.counter = 0

0 commit comments

Comments
 (0)