Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3573,6 +3573,10 @@ class ApplyPartitionFnFn(DoFn):
"""A DoFn that applies a PartitionFn."""
def process(self, element, partitionfn, n, *args, **kwargs):
partition = partitionfn.partition_for(element, n, *args, **kwargs)
if isinstance(partition, bool) or not isinstance(partition, int):
raise ValueError(
f"PartitionFn yielded a '{type(partition).__name__}' "
"when it should only yields integers")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition isinstance(partition, bool) or not isinstance(partition, int) is correct, but it can be simplified. Since bool is a subclass of int, isinstance(True, int) evaluates to True. A more direct way to check if a value is strictly an integer (and not a subclass like bool) is to use type(partition) is not int. This is more readable and achieves the same result. I've also corrected a small grammatical error in the error message ('yields' to 'yield').

Suggested change
if isinstance(partition, bool) or not isinstance(partition, int):
raise ValueError(
f"PartitionFn yielded a '{type(partition).__name__}' "
"when it should only yields integers")
if type(partition) is not int:
raise ValueError(
f"PartitionFn yielded a '{type(partition).__name__}' "
"when it should only yield integers")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. I am wondering whether we could do partition = int(partition) although partition is bool, this could make the changes less aggressive.

if not 0 <= partition < n:
raise ValueError(
'PartitionFn specified out-of-bounds partition index: '
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/transforms/core_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ def test_dofn_with_implicit_return_none_return_without_value(self):


class PartitionTest(unittest.TestCase):
def test_partition_with_bools(self):
with pytest.raises(
Exception,
match="PartitionFn yielded a 'bool' when it should only yields integers"
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test correctly checks for the exception, but it's good practice to be more specific. The code raises a ValueError, so the test should expect a ValueError instead of a generic Exception. This makes the test more robust. Also, there's a small grammatical error in the match string; it should be 'yield' instead of 'yields'.

Suggested change
with pytest.raises(
Exception,
match="PartitionFn yielded a 'bool' when it should only yields integers"
):
with pytest.raises(
ValueError,
match="PartitionFn yielded a 'bool' when it should only yield integers"
):

Copy link
Copy Markdown
Contributor

@liferoad liferoad Aug 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially checked for ValueError and the test works with the directrunner but the portable runner will cast all exceptions to RuntimeError. (https://github.com/apache/beam/runs/48183396870)

I can narrow this to check for either RuntimeError or ValueError though

with beam.testing.test_pipeline.TestPipeline() as p:
_ = (p | beam.Create([True]) | beam.Partition(lambda x, _: x, 2))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test covers the case where the partition function returns a boolean. To make the tests more comprehensive, it would be beneficial to add another test case for other non-integer types that should be rejected, such as floats. This would ensure the validation logic is robust against various invalid types.

For example, you could add a test like this:

  def test_partition_with_floats(self):
    with pytest.raises(
        ValueError,
        match="PartitionFn yielded a 'float' when it should only yield integers"
    ):
      with beam.testing.test_pipeline.TestPipeline() as p:
        _ = (p | beam.Create([1.0]) | beam.Partition(lambda x, _: x, 2))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us add another type like floats to test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added floats and a couple of other types


def test_partition_boundedness(self):
def partition_fn(val, num_partitions):
return val % num_partitions
Expand Down
Loading