Skip to content

Commit e9424b9

Browse files
authored
Merge pull request #33414 from shunping/revert-reshuffle-changes
Revert three commits related to supporting custom coder in reshuffle
2 parents 116df9f + 4cbf257 commit e9424b9

File tree

7 files changed

+2
-119
lines changed

7 files changed

+2
-119
lines changed

sdks/python/apache_beam/coders/coders.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1438,17 +1438,6 @@ def __hash__(self):
14381438
return hash(
14391439
(self.wrapped_value_coder, self.timestamp_coder, self.window_coder))
14401440

1441-
@classmethod
1442-
def from_type_hint(cls, typehint, registry):
1443-
# type: (Any, CoderRegistry) -> WindowedValueCoder
1444-
# Ideally this'd take two parameters so that one could hint at
1445-
# the window type as well instead of falling back to the
1446-
# pickle coders.
1447-
return cls(registry.get_coder(typehint.inner_type))
1448-
1449-
def to_type_hint(self):
1450-
return typehints.WindowedValue[self.wrapped_value_coder.to_type_hint()]
1451-
14521441

14531442
Coder.register_structured_urn(
14541443
common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder)

sdks/python/apache_beam/coders/coders_test.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,6 @@ def test_numpy_int(self):
258258
_ = indata | "CombinePerKey" >> beam.CombinePerKey(sum)
259259

260260

261-
class WindowedValueCoderTest(unittest.TestCase):
262-
def test_to_type_hint(self):
263-
coder = coders.WindowedValueCoder(coders.VarIntCoder())
264-
self.assertEqual(coder.to_type_hint(), typehints.WindowedValue[int]) # type: ignore[misc]
265-
266-
267261
if __name__ == '__main__':
268262
logging.getLogger().setLevel(logging.INFO)
269263
unittest.main()

sdks/python/apache_beam/coders/typecoders.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ def register_standard_coders(self, fallback_coder):
9494
self._register_coder_internal(str, coders.StrUtf8Coder)
9595
self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
9696
self._register_coder_internal(typehints.DictConstraint, coders.MapCoder)
97-
self._register_coder_internal(
98-
typehints.WindowedTypeConstraint, coders.WindowedValueCoder)
9997
# Default fallback coders applied in that order until the first matching
10098
# coder found.
10199
default_fallback_coders = [

sdks/python/apache_beam/transforms/util.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
from typing import Callable
3434
from typing import Iterable
3535
from typing import List
36-
from typing import Optional
3736
from typing import Tuple
3837
from typing import TypeVar
3938
from typing import Union
@@ -74,13 +73,11 @@
7473
from apache_beam.transforms.window import TimestampedValue
7574
from apache_beam.typehints import trivial_inference
7675
from apache_beam.typehints.decorators import get_signature
77-
from apache_beam.typehints.native_type_compatibility import TypedWindowedValue
7876
from apache_beam.typehints.sharded_key_type import ShardedKeyType
7977
from apache_beam.utils import shared
8078
from apache_beam.utils import windowed_value
8179
from apache_beam.utils.annotations import deprecated
8280
from apache_beam.utils.sharded_key import ShardedKey
83-
from apache_beam.utils.timestamp import Timestamp
8481

8582
if TYPE_CHECKING:
8683
from apache_beam.runners.pipeline_context import PipelineContext
@@ -956,10 +953,6 @@ def restore_timestamps(element):
956953
window.GlobalWindows.windowed_value((key, value), timestamp)
957954
for (value, timestamp) in values
958955
]
959-
960-
ungrouped = pcoll | Map(reify_timestamps).with_input_types(
961-
Tuple[K, V]).with_output_types(
962-
Tuple[K, Tuple[V, Optional[Timestamp]]])
963956
else:
964957

965958
# typing: All conditional function variants must have identical signatures
@@ -973,8 +966,7 @@ def restore_timestamps(element):
973966
key, windowed_values = element
974967
return [wv.with_value((key, wv.value)) for wv in windowed_values]
975968

976-
ungrouped = pcoll | Map(reify_timestamps).with_input_types(
977-
Tuple[K, V]).with_output_types(Tuple[K, TypedWindowedValue[V]])
969+
ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any)
978970

979971
# TODO(https://github.com/apache/beam/issues/19785) Using global window as
980972
# one of the standard window. This is to mitigate the Dataflow Java Runner
@@ -1026,8 +1018,7 @@ def expand(self, pcoll):
10261018
pcoll | 'AddRandomKeys' >>
10271019
Map(lambda t: (random.randrange(0, self.num_buckets), t)
10281020
).with_input_types(T).with_output_types(Tuple[int, T])
1029-
| ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types(
1030-
Tuple[int, T])
1021+
| ReshufflePerKey()
10311022
| 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types(
10321023
Tuple[int, T]).with_output_types(T))
10331024

sdks/python/apache_beam/transforms/util_test.py

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,60 +1010,6 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam):
10101010
equal_to(expected_data),
10111011
label="formatted_after_reshuffle")
10121012

1013-
global _Unpicklable
1014-
global _UnpicklableCoder
1015-
1016-
class _Unpicklable(object):
1017-
def __init__(self, value):
1018-
self.value = value
1019-
1020-
def __getstate__(self):
1021-
raise NotImplementedError()
1022-
1023-
def __setstate__(self, state):
1024-
raise NotImplementedError()
1025-
1026-
class _UnpicklableCoder(beam.coders.Coder):
1027-
def encode(self, value):
1028-
return str(value.value).encode()
1029-
1030-
def decode(self, encoded):
1031-
return _Unpicklable(int(encoded.decode()))
1032-
1033-
def to_type_hint(self):
1034-
return _Unpicklable
1035-
1036-
def is_deterministic(self):
1037-
return True
1038-
1039-
def test_reshuffle_unpicklable_in_global_window(self):
1040-
beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)
1041-
1042-
with TestPipeline() as pipeline:
1043-
data = [_Unpicklable(i) for i in range(5)]
1044-
expected_data = [0, 10, 20, 30, 40]
1045-
result = (
1046-
pipeline
1047-
| beam.Create(data)
1048-
| beam.WindowInto(GlobalWindows())
1049-
| beam.Reshuffle()
1050-
| beam.Map(lambda u: u.value * 10))
1051-
assert_that(result, equal_to(expected_data))
1052-
1053-
def test_reshuffle_unpicklable_in_non_global_window(self):
1054-
beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)
1055-
1056-
with TestPipeline() as pipeline:
1057-
data = [_Unpicklable(i) for i in range(5)]
1058-
expected_data = [0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40]
1059-
result = (
1060-
pipeline
1061-
| beam.Create(data)
1062-
| beam.WindowInto(window.SlidingWindows(size=3, period=1))
1063-
| beam.Reshuffle()
1064-
| beam.Map(lambda u: u.value * 10))
1065-
assert_that(result, equal_to(expected_data))
1066-
10671013

10681014
class WithKeysTest(unittest.TestCase):
10691015
def setUp(self):

sdks/python/apache_beam/typehints/native_type_compatibility.py

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,9 @@
2424
import sys
2525
import types
2626
import typing
27-
from typing import Generic
28-
from typing import TypeVar
2927

3028
from apache_beam.typehints import typehints
3129

32-
T = TypeVar('T')
33-
3430
_LOGGER = logging.getLogger(__name__)
3531

3632
# Describes an entry in the type map in convert_to_beam_type.
@@ -220,18 +216,6 @@ def convert_collections_to_typing(typ):
220216
return typ
221217

222218

223-
# During type inference of WindowedValue, we need to pass in the inner value
224-
# type. This cannot be achieved immediately with WindowedValue class because it
225-
# is not parameterized. Changing it to a generic class (e.g. WindowedValue[T])
226-
# could work in theory. However, the class is cythonized and it seems that
227-
# cython does not handle generic classes well.
228-
# The workaround here is to create a separate class solely for the type
229-
# inference purpose. This class should never be used for creating instances.
230-
class TypedWindowedValue(Generic[T]):
231-
def __init__(self, *args, **kwargs):
232-
raise NotImplementedError("This class is solely for type inference")
233-
234-
235219
def convert_to_beam_type(typ):
236220
"""Convert a given typing type to a Beam type.
237221
@@ -283,12 +267,6 @@ def convert_to_beam_type(typ):
283267
# TODO(https://github.com/apache/beam/issues/20076): Currently unhandled.
284268
_LOGGER.info('Converting NewType type hint to Any: "%s"', typ)
285269
return typehints.Any
286-
elif typ_module == 'apache_beam.typehints.native_type_compatibility' and \
287-
getattr(typ, "__name__", typ.__origin__.__name__) == 'TypedWindowedValue':
288-
# Need to pass through WindowedValue class so that it can be converted
289-
# to the correct type constraint in Beam
290-
# This is needed to fix https://github.com/apache/beam/issues/33356
291-
pass
292270
elif (typ_module != 'typing') and (typ_module != 'collections.abc'):
293271
# Only translate types from the typing and collections.abc modules.
294272
return typ
@@ -346,10 +324,6 @@ def convert_to_beam_type(typ):
346324
match=_match_is_exactly_collection,
347325
arity=1,
348326
beam_type=typehints.Collection),
349-
_TypeMapEntry(
350-
match=_match_issubclass(TypedWindowedValue),
351-
arity=1,
352-
beam_type=typehints.WindowedValue),
353327
]
354328

355329
# Find the first matching entry.

sdks/python/apache_beam/typehints/typehints.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,15 +1213,6 @@ def type_check(self, instance):
12131213
repr(self.inner_type),
12141214
instance.value.__class__.__name__))
12151215

1216-
def bind_type_variables(self, bindings):
1217-
bound_inner_type = bind_type_variables(self.inner_type, bindings)
1218-
if bound_inner_type == self.inner_type:
1219-
return self
1220-
return WindowedValue[bound_inner_type]
1221-
1222-
def __repr__(self):
1223-
return 'WindowedValue[%s]' % repr(self.inner_type)
1224-
12251216

12261217
class GeneratorHint(IteratorHint):
12271218
"""A Generator type hint.

0 commit comments

Comments
 (0)