Skip to content

Commit 2ade2f5

Browse files
claudevdmClaude
authored andcommitted
Initial.
1 parent ae7bf20 commit 2ade2f5

File tree

3 files changed

+31
-13
lines changed

3 files changed

+31
-13
lines changed

sdks/python/apache_beam/internal/pickler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333

3434
USE_CLOUDPICKLE = 'cloudpickle'
3535
USE_DILL = 'dill'
36-
DEFAULT_PICKLE_LIB = USE_DILL
3736

38-
desired_pickle_lib = dill_pickler
37+
DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE
38+
desired_pickle_lib = cloudpickle_pickler
3939

4040

4141
def dumps(

sdks/python/apache_beam/pipeline_test.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,19 @@ def test_no_wait_until_finish(self, mock_info):
287287
def test_auto_unique_labels(self):
288288

289289
opts = PipelineOptions(["--auto_unique_labels"])
290-
with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen:
291-
mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
292-
mock_uuid_gen.side_effect = mock_uuids
290+
291+
mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
292+
mock_uuid_gen = mock.Mock(side_effect=mock_uuids)
293+
294+
original_generate_unique_label = Pipeline._generate_unique_label
295+
296+
def patched_generate_unique_label(self, transform):
297+
with mock.patch.object(uuid, 'uuid4', return_value=mock_uuid_gen()):
298+
return original_generate_unique_label(self, transform)
299+
300+
with mock.patch.object(Pipeline,
301+
'_generate_unique_label',
302+
patched_generate_unique_label):
293303
with TestPipeline(options=opts) as pipeline:
294304
pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
295305

sdks/python/apache_beam/runners/direct/direct_runner_test.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@
4141
from apache_beam.testing import test_pipeline
4242
from apache_beam.testing.util import assert_that
4343
from apache_beam.testing.util import equal_to
44+
from apache_beam.utils import shared
45+
46+
47+
class DictWrapper(dict):
48+
pass
49+
50+
51+
def acquire_dict():
52+
return DictWrapper()
4453

4554

4655
class DirectPipelineResultTest(unittest.TestCase):
@@ -159,18 +168,17 @@ def test_retry_fork_graph(self):
159168
# currently does not currently support retries.
160169
p = beam.Pipeline(runner='BundleBasedDirectRunner')
161170

162-
# TODO(mariagh): Remove the use of globals from the test.
163-
global count_b, count_c # pylint: disable=global-variable-undefined
164-
count_b, count_c = 0, 0
171+
shared_handler = shared.Shared()
172+
counts = shared_handler.acquire(acquire_dict)
173+
counts['count_b'] = 0
174+
counts['count_c'] = 0
165175

166176
def f_b(x):
167-
global count_b # pylint: disable=global-variable-undefined
168-
count_b += 1
177+
shared_handler.acquire(acquire_dict)['count_b'] += 1
169178
raise Exception('exception in f_b')
170179

171180
def f_c(x):
172-
global count_c # pylint: disable=global-variable-undefined
173-
count_c += 1
181+
shared_handler.acquire(acquire_dict)['count_c'] += 1
174182
raise Exception('exception in f_c')
175183

176184
names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe'])
@@ -180,7 +188,7 @@ def f_c(x):
180188

181189
with self.assertRaises(Exception):
182190
p.run().wait_until_finish()
183-
assert count_b == count_c == 4
191+
assert counts['count_b'] == counts['count_c'] == 4
184192

185193
def test_no_partial_writeouts(self):
186194
class TestTransformEvaluator(_TransformEvaluator):

0 commit comments

Comments
 (0)