Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions sdks/python/apache_beam/internal/cloudpickle/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ class CloudPickleConfig:

DEFAULT_CONFIG = CloudPickleConfig()


# Minimal helper to return the provided object during unpickling.
def _return_obj(obj):
return obj


# Optional import for Python 3.12 TypeAliasType
try: # pragma: no cover - dependent on Python version
from typing import TypeAliasType # type: ignore[attr-defined]
except Exception:
TypeAliasType = None # type: ignore[assignment]
builtin_code_type = None
if PYPY:
# builtin-code objects only exist in pypy
Expand Down Expand Up @@ -1535,6 +1546,14 @@ def reducer_override(self, obj):
return _class_reduce(obj, self.config)
elif isinstance(obj, typing.TypeVar): # Add this check
return _typevar_reduce(obj, self.config)
elif TypeAliasType is not None and isinstance(obj, TypeAliasType):
# Unwrap typing.TypeAliasType to its underlying value to make pickling
# robust for locally-defined `type` aliases (Python 3.12+).
underlying = getattr(obj, '__value__', None)
if underlying is not None:
return (_return_obj, (underlying, ))
# Fallback to default behavior if no underlying value.
return NotImplemented
elif isinstance(obj, types.CodeType):
return _code_reduce(obj, self.config)
elif isinstance(obj, types.FunctionType):
Expand Down
32 changes: 32 additions & 0 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pickle
import random
import re
import sys
import typing
import unittest
from functools import reduce
Expand Down Expand Up @@ -2910,6 +2911,37 @@ def test_threshold(self):
use_subprocess=self.use_subprocess))


class PTransformTypeAliasTest(unittest.TestCase):
@unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required")
def test_type_alias_statement_supported_in_with_output_types(self):
ns = {}
exec("type InputType = tuple[int, ...]", ns) # pylint: disable=exec-used
InputType = ns["InputType"]

def print_element(element: InputType) -> InputType:
return element

with beam.Pipeline() as p:
_ = (
p
| beam.Create([(1, 2)])
| beam.Map(lambda x: x)
| beam.Map(print_element))

@unittest.skipIf(sys.version_info < (3, 12), "Python 3.12 required")
def test_type_alias_supported_in_ptransform_with_output_types(self):
ns = {}
exec("type OutputType = tuple[int, int]", ns) # pylint: disable=exec-used
OutputType = ns["OutputType"]

with beam.Pipeline() as p:
_ = (
p
| beam.Create([(1, 2)])
| beam.Map(lambda x: x)
| beam.Map(lambda x: x).with_output_types(OutputType))


class TestPTransformFn(TypeHintTestCase):
def test_type_checking_fail(self):
@beam.ptransform_fn
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/typehints/native_type_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
except ImportError:
from typing_extensions import is_typeddict

# Python 3.12 adds TypeAliasType for `type` statements; keep optional import.
try:
from typing import TypeAliasType # type: ignore[attr-defined]
except Exception: # pragma: no cover - pre-3.12
TypeAliasType = None # type: ignore[assignment]

T = TypeVar('T')

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -332,6 +338,14 @@ def convert_to_beam_type(typ):
sys.version_info.minor >= 10) and (isinstance(typ, types.UnionType)):
typ = typing.Union[typ]

# Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like one or two unit test cases for this just to cover us. Nothing fancy, just alias a simple built-in type and make sure that the type gets converted as expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Added one unit test test_type_alias_type_unwrapped in sdks/python/apache_beam/typehints/native_type_compatibility_test.py

# This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]).
if sys.version_info >= (3, 12) and TypeAliasType is not None:
if isinstance(typ, TypeAliasType): # pylint: disable=isinstance-second-argument-not-valid-type
underlying = getattr(typ, '__value__', None)
if underlying is not None:
typ = underlying

if getattr(typ, '__module__', None) == 'typing':
typ = convert_typing_to_builtin(typ)

Expand Down
Loading