Skip to content

Commit 1d4ab9a

Browse files
author
Claude
committed
Move options override to before validation.
1 parent d9cca49 commit 1d4ab9a

File tree

4 files changed

+18
-16
lines changed

4 files changed

+18
-16
lines changed

sdks/python/apache_beam/pipeline.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,17 @@ def __init__(
207207
'Runner %s is not a PipelineRunner object or the '
208208
'name of a registered runner.' % runner)
209209

210+
# Runner can oerride the default runner to be used.
211+
if (self._options.view_as(SetupOptions).pickle_library == 'default' and
212+
runner.default_pickle_library_override() is not None):
213+
logging.info(
214+
"Runner defaulting to pickling library: %s.",
215+
runner.default_pickle_library_override())
216+
self._options.view_as(
217+
SetupOptions).pickle_library = runner.default_pickle_library_override(
218+
)
219+
pickler.set_library(self._options.view_as(SetupOptions).pickle_library)
220+
210221
# Validate pipeline options
211222
errors = PipelineOptionsValidator(self._options, runner).validate()
212223
if errors:
@@ -226,15 +237,6 @@ def __init__(
226237
# Default runner to be used.
227238
self.runner = runner
228239

229-
if (self._options.view_as(SetupOptions).pickle_library == 'default' and
230-
self.runner.default_pickle_library_override() is not None):
231-
logging.info(
232-
"Default pickling library set to : %s.",
233-
runner.default_pickle_library_override())
234-
self._options.view_as(
235-
SetupOptions).pickle_library = runner.default_pickle_library_override(
236-
)
237-
pickler.set_library(self._options.view_as(SetupOptions).pickle_library)
238240
# Stack of transforms generated by nested apply() calls. The stack will
239241
# contain a root node as an enclosing (parent) node for top transforms.
240242
self.transforms_stack = [

sdks/python/apache_beam/pipeline_test.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from apache_beam.portability.api import beam_runner_api_pb2
4242
from apache_beam.pvalue import AsSingleton
4343
from apache_beam.pvalue import TaggedOutput
44+
from apache_beam.runners.runner import PipelineRunner
4445
from apache_beam.testing.test_pipeline import TestPipeline
4546
from apache_beam.testing.util import assert_that
4647
from apache_beam.testing.util import equal_to
@@ -62,7 +63,6 @@
6263
from apache_beam.transforms.window import TimestampedValue
6364
from apache_beam.utils import windowed_value
6465
from apache_beam.utils.timestamp import MIN_TIMESTAMP
65-
from apache_beam.runners.direct import direct_runner
6666

6767

6868
class FakeUnboundedSource(SourceBase):
@@ -159,7 +159,7 @@ def test_create(self):
159159

160160
@mock.patch('logging.info')
161161
def test_runner_overrides_default_pickler(self, mock_info):
162-
with mock.patch.object(direct_runner.SwitchingDirectRunner,
162+
with mock.patch.object(PipelineRunner,
163163
'default_pickle_library_override') as mock_fn:
164164
mock_fn.return_value = 'dill'
165165
with TestPipeline() as pipeline:
@@ -169,7 +169,8 @@ def test_runner_overrides_default_pickler(self, mock_info):
169169
from apache_beam.internal import pickler
170170
from apache_beam.internal import dill_pickler
171171
self.assertIs(pickler.desired_pickle_lib, dill_pickler)
172-
mock_info.assert_any_call('Default pickling library set to : %s.', 'dill')
172+
mock_info.assert_any_call(
173+
'Runner defaulting to pickling library: %s.', 'dill')
173174

174175
def test_flatmap_builtin(self):
175176
with TestPipeline() as pipeline:

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ class DataflowRunner(PipelineRunner):
9898
def __init__(self, cache=None):
9999
self._default_environment = None
100100

101+
def default_pickle_library_override(self):
102+
return 'cloudpickle'
103+
101104
def is_fnapi_compatible(self):
102105
return False
103106

sdks/python/apache_beam/runners/direct/direct_runner.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,6 @@ class SwitchingDirectRunner(PipelineRunner):
6666
which supports streaming execution and certain primitives not yet
6767
implemented in the FnApiRunner.
6868
"""
69-
def default_pickle_library_override(self):
70-
"""Default pickle library, can be overridden by runner implementation."""
71-
return 'cloudpickle'
72-
7369
def is_fnapi_compatible(self):
7470
return BundleBasedDirectRunner.is_fnapi_compatible()
7571

0 commit comments

Comments
 (0)