Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,12 @@ def get_all_options(
drop_default=False,
add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] = None,
retain_unknown_options=False,
display_warnings=False) -> Dict[str, Any]:
display_warnings=False,
hierarchy_only=False,
) -> Dict[str, Any]:
"""Returns a dictionary of all defined arguments.

Returns a dictionary of all defined arguments (arguments that are defined in
any subclass of PipelineOptions) into a dictionary.
Returns a dictionary of all defined arguments into a dictionary.

Args:
drop_default: If set to true, options that are equal to their default
Expand All @@ -500,6 +501,9 @@ def get_all_options(
retain_unknown_options: If set to true, options not recognized by any
known pipeline options class will still be included in the result. If
set to false, they will be discarded.
hierarchy_only: If set to true, only returns options defined in this class
and its super classes only. Otherwise, arguments that are defined in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have use-cases for is returning options from superclasses? would it be sufficient to return only options defined in the current class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for current one (DaskOptions) it is sufficient. Simplified.

any subclass of PipelineOptions are returned (default).

Returns:
Dictionary of all args and values.
Expand All @@ -510,8 +514,13 @@ def get_all_options(
# instance of each subclass to avoid conflicts.
subset = {}
parser = _BeamArgumentParser(allow_abbrev=False)
for cls in PipelineOptions.__subclasses__():
subset.setdefault(str(cls), cls)
if not hierarchy_only:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would read easier if we reverse the order of conditions (if hierarchy_only: ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

for cls in PipelineOptions.__subclasses__():
subset.setdefault(str(cls), cls)
else:
for cls in self.__class__.__mro__:
if issubclass(cls, PipelineOptions):
subset.setdefault(str(cls), cls)
for cls in subset.values():
cls._add_argparse_args(parser) # pylint: disable=protected-access
if add_extra_args_fn:
Expand Down Expand Up @@ -562,7 +571,7 @@ def add_new_arg(arg, **kwargs):
continue
parsed_args, _ = parser.parse_known_args(self._flags)
else:
if unknown_args:
if unknown_args and not hierarchy_only:
_LOGGER.warning("Discarding unparseable args: %s", unknown_args)
parsed_args = known_args
result = vars(parsed_args)
Expand All @@ -580,7 +589,7 @@ def add_new_arg(arg, **kwargs):
if overrides:
if retain_unknown_options:
result.update(overrides)
else:
elif not hierarchy_only:
_LOGGER.warning("Discarding invalid overrides: %s", overrides)

return result
Expand Down
27 changes: 27 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ def _add_argparse_args(cls, parser):
parser.add_argument(
'--fake_multi_option', action='append', help='fake multi option')

class FakeSubclassOptions(FakeOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--fake_sub_option', help='fake option')

@parameterized.expand(TEST_CASES)
def test_display_data(self, flags, _, display_data):
options = PipelineOptions(flags=flags)
Expand Down Expand Up @@ -238,6 +243,28 @@ def test_get_all_options(self, flags, expected, _):
options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
expected['mock_multi_option'])

def test_get_superclass_options(self):
flags = [
"--mock_option",
"mock",
"--fake_option",
"fake",
"--fake_sub_option",
"fake_sub"
]
options = PipelineOptions(flags=flags).view_as(
PipelineOptionsTest.FakeSubclassOptions)
items = options.get_all_options(hierarchy_only=True).items()
print(items)
self.assertTrue(('fake_option', 'fake') in items)
self.assertTrue(('fake_sub_option', 'fake_sub') in items)
self.assertFalse(('mock_option', 'mock') in items)
items = options.view_as(PipelineOptionsTest.MockOptions).get_all_options(
hierarchy_only=True).items()
self.assertFalse(('fake_option', 'fake') in items)
self.assertFalse(('fake_sub_option', 'fake_sub') in items)
self.assertTrue(('mock_option', 'mock') in items)

@parameterized.expand(TEST_CASES)
def test_subclasses_of_pipeline_options_can_be_instantiated(
self, flags, expected, _):
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/dask/dask_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def run_pipeline(self, pipeline, options):
'DaskRunner is not available. Please install apache_beam[dask].')

dask_options = options.view_as(DaskOptions).get_all_options(
drop_default=True)
drop_default=True, hierarchy_only=True)
bag_kwargs = DaskOptions._extract_bag_kwargs(dask_options)
client = ddist.Client(**dask_options)

Expand Down
Loading