Skip to content

Commit 19f9724

Browse files
author
Claude
committed
Move options override to before validation.
1 parent 5a725b8 commit 19f9724

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

sdks/python/apache_beam/pipeline.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,17 @@ def __init__(
204204
'Runner %s is not a PipelineRunner object or the '
205205
'name of a registered runner.' % runner)
206206

207+
# Runner can specify the default runner to be used.
208+
if (self._options.view_as(SetupOptions).pickle_library == 'default' and
209+
runner.default_pickle_library_override() is not None):
210+
logging.info(
211+
"Runner defaulting to pickling library: %s.",
212+
runner.default_pickle_library_override())
213+
self._options.view_as(
214+
SetupOptions).pickle_library = runner.default_pickle_library_override(
215+
)
216+
pickler.set_library(self._options.view_as(SetupOptions).pickle_library)
217+
207218
# Validate pipeline options
208219
errors = PipelineOptionsValidator(self._options, runner).validate()
209220
if errors:
@@ -223,15 +234,6 @@ def __init__(
223234
# Default runner to be used.
224235
self.runner = runner
225236

226-
if (self._options.view_as(SetupOptions).pickle_library == 'default' and
227-
self.runner.default_pickle_library_override() is not None):
228-
logging.info(
229-
"Default pickling library set to : %s.",
230-
runner.default_pickle_library_override())
231-
self._options.view_as(
232-
SetupOptions).pickle_library = runner.default_pickle_library_override(
233-
)
234-
pickler.set_library(self._options.view_as(SetupOptions).pickle_library)
235237
# Stack of transforms generated by nested apply() calls. The stack will
236238
# contain a root node as an enclosing (parent) node for top transforms.
237239
self.transforms_stack = [

sdks/python/apache_beam/pipeline_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from apache_beam.portability.api import beam_runner_api_pb2
4242
from apache_beam.pvalue import AsSingleton
4343
from apache_beam.pvalue import TaggedOutput
44+
from apache_beam.runners.direct import direct_runner
4445
from apache_beam.testing.test_pipeline import TestPipeline
4546
from apache_beam.testing.util import assert_that
4647
from apache_beam.testing.util import equal_to
@@ -62,7 +63,6 @@
6263
from apache_beam.transforms.window import TimestampedValue
6364
from apache_beam.utils import windowed_value
6465
from apache_beam.utils.timestamp import MIN_TIMESTAMP
65-
from apache_beam.runners.direct import direct_runner
6666

6767

6868
class FakeUnboundedSource(SourceBase):
@@ -169,7 +169,8 @@ def test_runner_overrides_default_pickler(self, mock_info):
169169
from apache_beam.internal import pickler
170170
from apache_beam.internal import dill_pickler
171171
self.assertIs(pickler.desired_pickle_lib, dill_pickler)
172-
mock_info.assert_any_call('Default pickling library set to : %s.', 'dill')
172+
mock_info.assert_any_call(
173+
'Runner defaulting to pickling library: %s.', 'dill')
173174

174175
def test_flatmap_builtin(self):
175176
with TestPipeline() as pipeline:

0 commit comments

Comments
 (0)