Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"https://github.com/apache/beam/pull/32440": "test new datastream runner for batch"
"modification": 9
"modification": 10
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 10
"modification": 11
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 5
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/internal/pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@

USE_CLOUDPICKLE = 'cloudpickle'
USE_DILL = 'dill'
DEFAULT_PICKLE_LIB = USE_DILL

desired_pickle_lib = dill_pickler
DEFAULT_PICKLE_LIB = USE_CLOUDPICKLE
desired_pickle_lib = cloudpickle_pickler


def dumps(
Expand Down
15 changes: 12 additions & 3 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ def __init__(

FileSystems.set_options(self._options)

pickle_library = self._options.view_as(SetupOptions).pickle_library
pickler.set_library(pickle_library)

if runner is None:
runner = self._options.view_as(StandardOptions).runner
if runner is None:
Expand All @@ -210,6 +207,17 @@ def __init__(
'Runner %s is not a PipelineRunner object or the '
'name of a registered runner.' % runner)

# Runner can oerride the default runner to be used.
if (self._options.view_as(SetupOptions).pickle_library == 'default' and
runner.default_pickle_library_override() is not None):
logging.info(
"Runner defaulting to pickling library: %s.",
runner.default_pickle_library_override())
self._options.view_as(
SetupOptions).pickle_library = runner.default_pickle_library_override(
)
pickler.set_library(self._options.view_as(SetupOptions).pickle_library)

# Validate pipeline options
errors = PipelineOptionsValidator(self._options, runner).validate()
if errors:
Expand All @@ -228,6 +236,7 @@ def __init__(

# Default runner to be used.
self.runner = runner

# Stack of transforms generated by nested apply() calls. The stack will
# contain a root node as an enclosing (parent) node for top transforms.
self.transforms_stack = [
Expand Down
34 changes: 30 additions & 4 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.pvalue import AsSingleton
from apache_beam.pvalue import TaggedOutput
from apache_beam.runners.runner import PipelineRunner
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
Expand Down Expand Up @@ -156,6 +157,21 @@ def test_create(self):
pcoll3 = pcoll2 | 'do' >> FlatMap(lambda x: [x + 10])
assert_that(pcoll3, equal_to([14, 15, 16]), label='pcoll3')

@mock.patch('logging.info')
def test_runner_overrides_default_pickler(self, mock_info):
with mock.patch.object(PipelineRunner,
'default_pickle_library_override') as mock_fn:
mock_fn.return_value = 'dill'
with TestPipeline() as pipeline:
pcoll = pipeline | 'label1' >> Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))

from apache_beam.internal import pickler
from apache_beam.internal import dill_pickler
self.assertIs(pickler.desired_pickle_lib, dill_pickler)
mock_info.assert_any_call(
'Runner defaulting to pickling library: %s.', 'dill')

def test_flatmap_builtin(self):
with TestPipeline() as pipeline:
pcoll = pipeline | 'label1' >> Create([1, 2, 3])
Expand Down Expand Up @@ -279,17 +295,27 @@ def test_no_wait_until_finish(self, mock_info):
with Pipeline(runner='DirectRunner',
options=PipelineOptions(["--no_wait_until_finish"])) as p:
_ = p | beam.Create(['test'])
mock_info.assert_called_once_with(
mock_info.assert_any_call(
'Job execution continues without waiting for completion. '
'Use "wait_until_finish" in PipelineResult to block until finished.')
p.result.wait_until_finish()

def test_auto_unique_labels(self):

opts = PipelineOptions(["--auto_unique_labels"])
with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen:
mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
mock_uuid_gen.side_effect = mock_uuids

mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
mock_uuid_gen = mock.Mock(side_effect=mock_uuids)

original_generate_unique_label = Pipeline._generate_unique_label

def patched_generate_unique_label(self, transform):
with mock.patch.object(uuid, 'uuid4', return_value=mock_uuid_gen()):
return original_generate_unique_label(self, transform)

with mock.patch.object(Pipeline,
'_generate_unique_label',
patched_generate_unique_label):
with TestPipeline(options=opts) as pipeline:
pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])

Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class DataflowRunner(PipelineRunner):
def __init__(self, cache=None):
self._default_environment = None

def default_pickle_library_override(self):
return 'cloudpickle'

def is_fnapi_compatible(self):
return False

Expand Down
24 changes: 16 additions & 8 deletions sdks/python/apache_beam/runners/direct/direct_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@
from apache_beam.testing import test_pipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.utils import shared


class DictWrapper(dict):
pass


def acquire_dict():
return DictWrapper()


class DirectPipelineResultTest(unittest.TestCase):
Expand Down Expand Up @@ -159,18 +168,17 @@ def test_retry_fork_graph(self):
# currently does not currently support retries.
p = beam.Pipeline(runner='BundleBasedDirectRunner')

# TODO(mariagh): Remove the use of globals from the test.
global count_b, count_c # pylint: disable=global-variable-undefined
count_b, count_c = 0, 0
shared_handler = shared.Shared()
counts = shared_handler.acquire(acquire_dict)
counts['count_b'] = 0
counts['count_c'] = 0

def f_b(x):
global count_b # pylint: disable=global-variable-undefined
count_b += 1
shared_handler.acquire(acquire_dict)['count_b'] += 1
raise Exception('exception in f_b')

def f_c(x):
global count_c # pylint: disable=global-variable-undefined
count_c += 1
shared_handler.acquire(acquire_dict)['count_c'] += 1
raise Exception('exception in f_c')

names = p | 'CreateNodeA' >> beam.Create(['Ann', 'Joe'])
Expand All @@ -180,7 +188,7 @@ def f_c(x):

with self.assertRaises(Exception):
p.run().wait_until_finish()
assert count_b == count_c == 4
assert counts['count_b'] == counts['count_c'] == 4

def test_no_partial_writeouts(self):
class TestTransformEvaluator(_TransformEvaluator):
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ def check_requirements(
beam_runner_api_pb2.TimeDomain.PROCESSING_TIME):
raise NotImplementedError(timer.time_domain)

def default_pickle_library_override(self):
"""Default pickle library, can be overridden by runner implementation."""
return None


# FIXME: replace with PipelineState(str, enum.Enum)
class PipelineState(object):
Expand Down
Loading