Skip to content

[Feature Request]: Support generic PTransforms in Python SDK #33802

@mattalbr

Description

@mattalbr

What would you like to happen?

It would be awesome for the following code to work (written in 3.12 syntax, but would be great in <=3.11 syntax too)

T = TypeVar("T")

@dataclass.dataclass()
class NeatOutputClass[T]:
  data: T

class ReusableTransform[T](beam.PTransform):
  def __init__(self, value_extractor: Callable[[Any], T):
    # This is a bit contrived, but not unreasonable for a transform acting
    # on slightly different data. Could be done as a generic DoFn too.
    self.value_extractor = value_extractor

  def expand(self, pcoll: beam.PCollection) -> NeatOutputClass[T]:
    # Overly simplified for simple reproducibility.
    return pcoll | beam.Map(lambda x: self.value_extractor(x))

But unfortunately this yields TypeError: Subscripted generics cannot be used with class and instance checks

Full Stack Trace
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:622: in __ror__
    result = p.apply(self, pvalueish, label)
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/pipeline.py:754: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/runners/runner.py:191: in apply
    return self.apply_PTransform(transform, input, options)
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/runners/runner.py:195: in apply_PTransform
    return transform.expand(input)
my_repo/transformation.py:263: in expand
    pcolls
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:622: in __ror__
    result = p.apply(self, pvalueish, label)
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/pipeline.py:757: in apply
    transform.type_check_outputs(pvalueish_result)
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:475: in type_check_outputs
    self.type_check_inputs_or_outputs(pvalueish, 'output')
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:496: in type_check_inputs_or_outputs
    if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/typehints/typehints.py:1323: in is_consistent_with
    return base._consistent_with_check_(sub)
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/typehints/typehints.py:711: in _consistent_with_check_
    len(sub.tuple_types) == len(self.tuple_types) and all(
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/typehints/typehints.py:712: in <genexpr>
    is_consistent_with(sub_elem, elem) for sub_elem,
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/typehints/typehints.py:1323: in is_consistent_with
    return base._consistent_with_check_(sub)
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/typehints/typehints.py:271: in _consistent_with_check_
    is_consistent_with(sub.inner_type, self.inner_type))
/home/vscode/.cache/pants/named_caches/pex_root/venvs/s/192fb705/venv/lib/python3.11/site-packages/apache_beam/typehints/typehints.py:1329: in is_consistent_with
    return issubclass(sub, base)

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions