Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
K = TypeVar('K')
V = TypeVar('V')
T = TypeVar('T')
U = TypeVar('U')

RESHUFFLE_TYPEHINT_BREAKING_CHANGE_VERSION = "2.64.0"

Expand Down Expand Up @@ -266,7 +267,9 @@ def collect_values(key, tagged_values):
]
| Flatten(pipeline=self.pipeline)
| GroupByKey()
| MapTuple(collect_values))
| MapTuple(collect_values).with_input_types(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this break some internal tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to #33932, it is possible to break some tests because we are changing the coder for this particular step, from a universal pickled coder to the actual coder for the element.

However, the impact radius will be much smaller than the previous PR, as cogbk is used less often than Reshuffle.

tuple[K, Iterable[tuple[U, V]]]).with_output_types(
tuple[K, dict[U, list[V]]]))


@ptransform_fn
Expand Down
65 changes: 39 additions & 26 deletions sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,31 @@
'ignore', category=FutureWarning, module='apache_beam.transform.util_test')


class _Unpicklable(object):
def __init__(self, value):
self.value = value

def __getstate__(self):
raise NotImplementedError()

def __setstate__(self, state):
raise NotImplementedError()


class _UnpicklableCoder(beam.coders.Coder):
def encode(self, value):
return str(value.value).encode()

def decode(self, encoded):
return _Unpicklable(int(encoded.decode()))

def to_type_hint(self):
return _Unpicklable

def is_deterministic(self):
return True


class CoGroupByKeyTest(unittest.TestCase):
def test_co_group_by_key_on_tuple(self):
with TestPipeline() as pipeline:
Expand Down Expand Up @@ -186,6 +211,20 @@ def test_co_group_by_key_on_one(self):
equal_to(expected),
label='AssertOneDict')

def test_co_group_by_key_on_unpickled(self):
beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)
values = [_Unpicklable(i) for i in range(5)]
with TestPipeline() as pipeline:
xs = pipeline | beam.Create(values) | beam.WithKeys(lambda x: x)
pcoll = ({
'x': xs
}
| beam.CoGroupByKey()
| beam.FlatMapTuple(
lambda k, tagged: (k.value, tagged['x'][0].value * 2)))
expected = [0, 0, 1, 2, 2, 4, 3, 6, 4, 8]
assert_that(pcoll, equal_to(expected))


class FakeClock(object):
def __init__(self, now=time.time()):
Expand Down Expand Up @@ -1205,32 +1244,6 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam):
equal_to(expected_data),
label="formatted_after_reshuffle")

global _Unpicklable
global _UnpicklableCoder

class _Unpicklable(object):
def __init__(self, value):
self.value = value

def __getstate__(self):
raise NotImplementedError()

def __setstate__(self, state):
raise NotImplementedError()

class _UnpicklableCoder(beam.coders.Coder):
def encode(self, value):
return str(value.value).encode()

def decode(self, encoded):
return _Unpicklable(int(encoded.decode()))

def to_type_hint(self):
return _Unpicklable

def is_deterministic(self):
return True

def reshuffle_unpicklable_in_global_window_helper(
self, update_compatibility_version=None):
with TestPipeline(options=PipelineOptions(
Expand Down
Loading