Skip to content

Commit ff1d6bb

Browse files
authored
feat(typehints): add Python 3.12 TypeAliasType support (#36709)
* feat(typehints): add Python 3.12 TypeAliasType support Handle Python 3.12's new type alias statements by unwrapping TypeAliasType to its underlying value in type conversion and pickling. This ensures compatibility with Beam's type checking and serialization for Python 3.12+. * ingore exec * use dispatcher * lint * added one unit test * lint
1 parent c2e72ac commit ff1d6bb

File tree

5 files changed

+90
-1
lines changed

5 files changed

+90
-1
lines changed

sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ class CloudPickleConfig:
168168

169169

170170
DEFAULT_CONFIG = CloudPickleConfig()
171-
172171
builtin_code_type = None
173172
if PYPY:
174173
# builtin-code objects only exist in pypy

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,27 @@ def _get_proto_enum_descriptor_class():
9595
_LOGGER = logging.getLogger(__name__)
9696

9797

98+
# Helper to return an object directly during unpickling.
99+
def _return_obj(obj):
100+
return obj
101+
102+
103+
# Optional import for Python 3.12 TypeAliasType
104+
try: # pragma: no cover - dependent on Python version
105+
from typing import TypeAliasType as _TypeAliasType # type: ignore[attr-defined]
106+
except Exception:
107+
_TypeAliasType = None
108+
109+
110+
def _typealias_reduce(obj):
111+
# Unwrap typing.TypeAliasType to its underlying value for robust pickling.
112+
underlying = getattr(obj, '__value__', None)
113+
if underlying is None:
114+
# Fallback: return the object itself; lets default behavior handle it.
115+
return _return_obj, (obj, )
116+
return _return_obj, (underlying, )
117+
118+
98119
def _reconstruct_enum_descriptor(full_name):
99120
for _, module in list(sys.modules.items()):
100121
if not hasattr(module, 'DESCRIPTOR'):
@@ -171,6 +192,9 @@ def _dumps(
171192
pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags
172193
except NameError:
173194
pass
195+
# Register Python 3.12 `type` alias reducer to unwrap to underlying value.
196+
if _TypeAliasType is not None:
197+
pickler.dispatch_table[_TypeAliasType] = _typealias_reduce
174198
try:
175199
pickler.dispatch_table[RLOCK_TYPE] = _pickle_rlock
176200
except NameError:

sdks/python/apache_beam/transforms/ptransform_test.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import pickle
2626
import random
2727
import re
28+
import sys
2829
import typing
2930
import unittest
3031
from functools import reduce
@@ -2910,6 +2911,37 @@ def test_threshold(self):
29102911
use_subprocess=self.use_subprocess))
29112912

29122913

2914+
class PTransformTypeAliasTest(unittest.TestCase):
2915+
@unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required")
2916+
def test_type_alias_statement_supported_in_with_output_types(self):
2917+
ns = {}
2918+
exec("type InputType = tuple[int, ...]", ns) # pylint: disable=exec-used
2919+
InputType = ns["InputType"]
2920+
2921+
def print_element(element: InputType) -> InputType:
2922+
return element
2923+
2924+
with beam.Pipeline() as p:
2925+
_ = (
2926+
p
2927+
| beam.Create([(1, 2)])
2928+
| beam.Map(lambda x: x)
2929+
| beam.Map(print_element))
2930+
2931+
@unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required")
2932+
def test_type_alias_supported_in_ptransform_with_output_types(self):
2933+
ns = {}
2934+
exec("type OutputType = tuple[int, int]", ns) # pylint: disable=exec-used
2935+
OutputType = ns["OutputType"]
2936+
2937+
with beam.Pipeline() as p:
2938+
_ = (
2939+
p
2940+
| beam.Create([(1, 2)])
2941+
| beam.Map(lambda x: x)
2942+
| beam.Map(lambda x: x).with_output_types(OutputType))
2943+
2944+
29132945
class TestPTransformFn(TypeHintTestCase):
29142946
def test_type_checking_fail(self):
29152947
@beam.ptransform_fn

sdks/python/apache_beam/typehints/native_type_compatibility.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@
3535
except ImportError:
3636
from typing_extensions import is_typeddict
3737

38+
# Python 3.12 adds TypeAliasType for `type` statements; keep optional import.
39+
# pylint: disable=ungrouped-imports
40+
# isort: off
41+
try:
42+
from typing import TypeAliasType # type: ignore[attr-defined]
43+
except Exception: # pragma: no cover - pre-3.12
44+
TypeAliasType = None # type: ignore[assignment]
45+
3846
T = TypeVar('T')
3947

4048
_LOGGER = logging.getLogger(__name__)
@@ -332,6 +340,14 @@ def convert_to_beam_type(typ):
332340
sys.version_info.minor >= 10) and (isinstance(typ, types.UnionType)):
333341
typ = typing.Union[typ]
334342

343+
# Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying value.
344+
# This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]).
345+
if sys.version_info >= (3, 12) and TypeAliasType is not None:
346+
if isinstance(typ, TypeAliasType): # pylint: disable=isinstance-second-argument-not-valid-type
347+
underlying = getattr(typ, '__value__', None)
348+
if underlying is not None:
349+
typ = underlying
350+
335351
if getattr(typ, '__module__', None) == 'typing':
336352
typ = convert_typing_to_builtin(typ)
337353

sdks/python/apache_beam/typehints/native_type_compatibility_test.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,24 @@ def test_convert_typing_to_builtin(self):
491491
builtin_type = convert_typing_to_builtin(typing_type)
492492
self.assertEqual(builtin_type, expected_builtin_type, description)
493493

494+
def test_type_alias_type_unwrapped(self):
495+
# Only applicable on Python 3.12+, where typing.TypeAliasType exists
496+
# and the `type` statement is available.
497+
TypeAliasType = getattr(typing, 'TypeAliasType', None)
498+
if TypeAliasType is None:
499+
self.skipTest('TypeAliasType not available')
500+
501+
ns = {}
502+
try:
503+
exec('type AliasTuple = tuple[int, ...]', {}, ns) # pylint: disable=exec-used
504+
except SyntaxError:
505+
self.skipTest('type statement not supported')
506+
507+
AliasTuple = ns['AliasTuple']
508+
self.assertTrue(isinstance(AliasTuple, TypeAliasType)) # pylint: disable=isinstance-second-argument-not-valid-type
509+
self.assertEqual(
510+
typehints.Tuple[int, ...], convert_to_beam_type(AliasTuple))
511+
494512

495513
if __name__ == '__main__':
496514
unittest.main()

0 commit comments

Comments
 (0)