diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 87d63d423156..26795b8a9833 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -59,17 +59,12 @@ import unicodedata import uuid from collections import defaultdict +from collections.abc import Iterable +from collections.abc import Mapping +from collections.abc import Sequence from typing import TYPE_CHECKING from typing import Any -from typing import Dict -from typing import FrozenSet -from typing import Iterable -from typing import List -from typing import Mapping from typing import Optional -from typing import Sequence -from typing import Set -from typing import Tuple from typing import Type from typing import Union @@ -131,8 +126,7 @@ class Pipeline(HasDisplayData): (e.g. ``input | "label" >> my_transform``). """ @classmethod - def runner_implemented_transforms(cls): - # type: () -> FrozenSet[str] + def runner_implemented_transforms(cls) -> frozenset[str]: # This set should only contain transforms which are required to be # implemented by a runner. @@ -145,8 +139,8 @@ def __init__( self, runner: Optional[Union[str, PipelineRunner]] = None, options: Optional[PipelineOptions] = None, - argv: Optional[List[str]] = None, - display_data: Optional[Dict[str, Any]] = None): + argv: Optional[list[str]] = None, + display_data: Optional[dict[str, Any]] = None): """Initialize a pipeline object. Args: @@ -158,11 +152,11 @@ def __init__( A configured :class:`~apache_beam.options.pipeline_options.PipelineOptions` object containing arguments that should be used for running the Beam job. - argv (List[str]): a list of arguments (such as :data:`sys.argv`) + argv (list[str]): a list of arguments (such as :data:`sys.argv`) to be used for building a :class:`~apache_beam.options.pipeline_options.PipelineOptions` object. This will only be used if argument **options** is :data:`None`. - display_data (Dict[str, Any]): a dictionary of static data associated + display_data (dict[str, Any]): a dictionary of static data associated with this pipeline that can be displayed when it runs. Raises: @@ -256,7 +250,7 @@ def __init__( # Set of transform labels (full labels) applied to the pipeline. # If a transform is applied and the full label is already in the set # then the transform will have to be cloned with a new label. - self.applied_labels = set() # type: Set[str] + self.applied_labels: set[str] = set() # Hints supplied via pipeline options are considered the outermost hints. self._root_transform().resource_hints = resource_hints_from_options(options) # Create a ComponentIdMap for assigning IDs to components. Ensures that any @@ -272,26 +266,21 @@ def __init__( self._error_handlers = [] self._annotations_stack = [{}] - def display_data(self): - # type: () -> Dict[str, Any] + def display_data(self) -> dict[str, Any]: return self._display_data @property # type: ignore[misc] # decorated property not supported - def options(self): - # type: () -> PipelineOptions + def options(self) -> PipelineOptions: return self._options @property - def allow_unsafe_triggers(self): - # type: () -> bool + def allow_unsafe_triggers(self) -> bool: return self._options.view_as(TypeOptions).allow_unsafe_triggers def _register_error_handler(self, error_handler): self._error_handlers.append(error_handler) - def _current_transform(self): - # type: () -> AppliedPTransform - + def _current_transform(self) -> 'AppliedPTransform': """Returns the transform currently on the top of the stack.""" return self.transforms_stack[-1] @@ -313,40 +302,38 @@ def _current_annotations(self): """Returns the set of annotations that should be used on apply.""" return {**_global_annotations_stack()[-1], **self._annotations_stack[-1]} - def _root_transform(self): - # type: () -> AppliedPTransform - + def _root_transform(self) -> 'AppliedPTransform': """Returns the root transform of the transform stack.""" return self.transforms_stack[0] - def _remove_labels_recursively(self, applied_transform): - # type: (AppliedPTransform) -> None + def _remove_labels_recursively( + self, applied_transform: 'AppliedPTransform') -> None: for part in applied_transform.parts: if part.full_label in self.applied_labels: self.applied_labels.remove(part.full_label) self._remove_labels_recursively(part) - def _replace(self, override): - # type: (PTransformOverride) -> None + def _replace(self, override: 'PTransformOverride') -> None: assert isinstance(override, PTransformOverride) # From original transform output --> replacement transform output - output_map = {} # type: Dict[pvalue.PValue, pvalue.PValue] - output_replacements = { - } # type: Dict[AppliedPTransform, List[Tuple[pvalue.PValue, Optional[str]]]] - input_replacements = { - } # type: Dict[AppliedPTransform, Mapping[str, Union[pvalue.PBegin, pvalue.PCollection]]] - side_input_replacements = { - } # type: Dict[AppliedPTransform, List[pvalue.AsSideInput]] + output_map: dict[pvalue.PValue, pvalue.PValue] = {} + output_replacements: dict[AppliedPTransform, + list[tuple[pvalue.PValue, Optional[str]]]] = {} + input_replacements: dict[AppliedPTransform, + Mapping[str, + Union[pvalue.PBegin, + pvalue.PCollection]]] = {} + side_input_replacements: dict[AppliedPTransform, + list[pvalue.AsSideInput]] = {} class TransformUpdater(PipelineVisitor): # pylint: disable=used-before-assignment """"A visitor that replaces the matching PTransforms.""" - def __init__(self, pipeline): - # type: (Pipeline) -> None + def __init__(self, pipeline: Pipeline) -> None: self.pipeline = pipeline - def _replace_if_needed(self, original_transform_node): - # type: (AppliedPTransform) -> None + def _replace_if_needed( + self, original_transform_node: AppliedPTransform) -> None: if override.matches(original_transform_node): assert isinstance(original_transform_node, AppliedPTransform) replacement_transform = ( @@ -449,12 +436,11 @@ def _replace_if_needed(self, original_transform_node): finally: self.pipeline.transforms_stack.pop() - def enter_composite_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def enter_composite_transform( + self, transform_node: AppliedPTransform) -> None: self._replace_if_needed(transform_node) - def visit_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def visit_transform(self, transform_node: AppliedPTransform) -> None: self._replace_if_needed(transform_node) self.visit(TransformUpdater(self)) @@ -475,16 +461,14 @@ class InputOutputUpdater(PipelineVisitor): # pylint: disable=used-before-assign We cannot update input and output values while visiting since that results in validation errors. """ - def __init__(self, pipeline): - # type: (Pipeline) -> None + def __init__(self, pipeline: Pipeline) -> None: self.pipeline = pipeline - def enter_composite_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def enter_composite_transform( + self, transform_node: AppliedPTransform) -> None: self.visit_transform(transform_node) - def visit_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def visit_transform(self, transform_node: AppliedPTransform) -> None: replace_output = False for tag in transform_node.outputs: if transform_node.outputs[tag] in output_map: @@ -539,11 +523,9 @@ def visit_transform(self, transform_node): for transform, side_input_replacement in side_input_replacements.items(): transform.replace_side_inputs(side_input_replacement) - def _check_replacement(self, override): - # type: (PTransformOverride) -> None + def _check_replacement(self, override: 'PTransformOverride') -> None: class ReplacementValidator(PipelineVisitor): - def visit_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def visit_transform(self, transform_node: AppliedPTransform) -> None: if override.matches(transform_node): raise RuntimeError( 'Transform node %r was not replaced as expected.' % @@ -551,9 +533,7 @@ def visit_transform(self, transform_node): self.visit(ReplacementValidator()) - def replace_all(self, replacements): - # type: (Iterable[PTransformOverride]) -> None - + def replace_all(self, replacements: Iterable['PTransformOverride']) -> None: """ Dynamically replaces PTransforms in the currently populated hierarchy. Currently this only works for replacements where input and output types @@ -563,7 +543,7 @@ def replace_all(self, replacements): output types are different. Args: - replacements (List[~apache_beam.pipeline.PTransformOverride]): a list of + replacements (list[~apache_beam.pipeline.PTransformOverride]): a list of :class:`~apache_beam.pipeline.PTransformOverride` objects. """ for override in replacements: @@ -577,9 +557,7 @@ def replace_all(self, replacements): for override in replacements: self._check_replacement(override) - def run(self, test_runner_api='AUTO'): - # type: (Union[bool, str]) -> PipelineResult - + def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult': """Runs the pipeline. Returns whatever our runner returns after running.""" # All pipeline options are finalized at this point. # Call get_all_options to print warnings on invalid options. @@ -648,8 +626,7 @@ def run(self, test_runner_api='AUTO'): shutil.rmtree(self.local_tempdir, ignore_errors=True) # else interactive beam handles the cleanup. - def __enter__(self): - # type: () -> Pipeline + def __enter__(self) -> 'Pipeline': self._extra_context = contextlib.ExitStack() self._extra_context.enter_context( subprocess_server.JavaJarServer.beam_services( @@ -660,11 +637,9 @@ def __enter__(self): def __exit__( self, - exc_type, # type: Optional[Type[BaseException]] - exc_val, # type: Optional[BaseException] - exc_tb # type: Optional[TracebackType] - ): - # type: (...) -> None + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional['TracebackType']) -> None: try: if not exc_type: @@ -679,9 +654,7 @@ def __exit__( finally: self._extra_context.__exit__(exc_type, exc_val, exc_tb) - def visit(self, visitor): - # type: (PipelineVisitor) -> None - + def visit(self, visitor: 'PipelineVisitor') -> None: """Visits depth-first every node of a pipeline's DAG. Runner-internal implementation detail; no backwards-compatibility guarantees @@ -699,17 +672,14 @@ def visit(self, visitor): belong to this pipeline instance. """ - visited = set() # type: Set[pvalue.PValue] + visited: set[pvalue.PValue] = set() self._root_transform().visit(visitor, self, visited) def apply( self, - transform, # type: ptransform.PTransform - pvalueish=None, # type: Optional[pvalue.PValue] - label=None # type: Optional[str] - ): - # type: (...) -> pvalue.PValue - + transform: ptransform.PTransform, + pvalueish: Optional[pvalue.PValue] = None, + label: Optional[str] = None) -> pvalue.PValue: """Applies a custom transform using the pvalueish specified. Args: @@ -873,9 +843,8 @@ def apply( def _assert_not_applying_PDone( self, - pvalueish, # type: Optional[pvalue.PValue] - transform # type: ptransform.PTransform - ): + pvalueish: Optional[pvalue.PValue], + transform: ptransform.PTransform): if isinstance(pvalueish, pvalue.PDone) and isinstance(transform, ParDo): # If the input is a PDone, we cannot apply a ParDo transform. full_label = self._current_transform().full_label @@ -885,12 +854,7 @@ def _assert_not_applying_PDone( f'"{producer_label}" but "{producer_label.split("/")[-1]}" ' 'produces no PCollections.') - def _generate_unique_label( - self, - transform # type: str - ): - # type: (...) -> str - + def _generate_unique_label(self, transform: str) -> str: """ Given a transform, generate a unique label for it based on current label. """ @@ -899,11 +863,9 @@ def _generate_unique_label( def _infer_result_type( self, - transform, # type: ptransform.PTransform - inputs, # type: Sequence[Union[pvalue.PBegin, pvalue.PCollection]] - result_pcollection # type: Union[pvalue.PValue, pvalue.DoOutputsTuple] - ): - # type: (...) -> None + transform: ptransform.PTransform, + inputs: Sequence[Union[pvalue.PBegin, pvalue.PCollection]], + result_pcollection: Union[pvalue.PValue, pvalue.DoOutputsTuple]) -> None: # TODO(robertwb): Multi-input inference. type_options = self._options.view_as(TypeOptions) if type_options is None or not type_options.pipeline_type_check: @@ -943,16 +905,14 @@ def _infer_result_type( if pcoll.element_type is None: pcoll.element_type = typehints.Any - def __reduce__(self): - # type: () -> Tuple[Type, Tuple[str, ...]] + def __reduce__(self) -> tuple[Type, tuple[str, ...]]: # Some transforms contain a reference to their enclosing pipeline, # which in turn reference all other transforms (resulting in quadratic # time/space to pickle each transform individually). As we don't # require pickled pipelines to be executable, break the chain here. return str, ('Pickled pipeline stub.', ) - def _verify_runner_api_compatible(self): - # type: () -> bool + def _verify_runner_api_compatible(self) -> bool: if self._options.view_as(TypeOptions).runtime_type_check: # This option is incompatible with the runner API as it requires # the runner to inspect non-serialized hints on the transform @@ -962,12 +922,11 @@ def _verify_runner_api_compatible(self): class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment ok = True # Really a nonlocal. - def enter_composite_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def enter_composite_transform( + self, transform_node: AppliedPTransform) -> None: pass - def visit_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def visit_transform(self, transform_node: AppliedPTransform) -> None: try: # Transforms must be picklable. pickler.loads( @@ -976,8 +935,7 @@ def visit_transform(self, transform_node): except Exception: Visitor.ok = False - def visit_value(self, value, _): - # type: (pvalue.PValue, AppliedPTransform) -> None + def visit_value(self, value: pvalue.PValue, _: AppliedPTransform) -> None: if isinstance(value, pvalue.PDone): Visitor.ok = False @@ -986,13 +944,11 @@ def visit_value(self, value, _): def to_runner_api( self, - return_context=False, # type: bool - context=None, # type: Optional[PipelineContext] - use_fake_coders=False, # type: bool - default_environment=None # type: Optional[environments.Environment] - ): - # type: (...) -> beam_runner_api_pb2.Pipeline - + return_context: bool = False, + context: Optional['PipelineContext'] = None, + use_fake_coders: bool = False, + default_environment: Optional['environments.Environment'] = None + ) -> beam_runner_api_pb2.Pipeline: """For internal use only; no backwards-compatibility guarantees.""" from apache_beam.runners import pipeline_context if context is None: @@ -1020,12 +976,11 @@ def to_runner_api( TypeOptions).allow_non_deterministic_key_coders class ForceKvInputTypes(PipelineVisitor): - def enter_composite_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def enter_composite_transform( + self, transform_node: AppliedPTransform) -> None: self.visit_transform(transform_node) - def visit_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def visit_transform(self, transform_node: AppliedPTransform) -> None: if not transform_node.transform: return if hasattr( @@ -1087,13 +1042,11 @@ def merge_compatible_environments(proto): @staticmethod def from_runner_api( - proto, # type: beam_runner_api_pb2.Pipeline - runner, # type: PipelineRunner - options, # type: PipelineOptions - return_context=False, # type: bool - ): - # type: (...) -> Pipeline - + proto: beam_runner_api_pb2.Pipeline, + runner: PipelineRunner, + options: PipelineOptions, + return_context: bool = False, + ) -> 'Pipeline': """For internal use only; no backwards-compatibility guarantees.""" p = Pipeline( runner=runner, @@ -1142,9 +1095,8 @@ class PipelineVisitor(object): Visitor pattern class used to traverse a DAG of transforms (used internally by Pipeline for bookkeeping purposes). """ - def visit_value(self, value, producer_node): - # type: (pvalue.PValue, AppliedPTransform) -> None - + def visit_value( + self, value: pvalue.PValue, producer_node: 'AppliedPTransform') -> None: """Callback for visiting a PValue in the pipeline DAG. Args: @@ -1154,21 +1106,17 @@ def visit_value(self, value, producer_node): """ pass - def visit_transform(self, transform_node): - # type: (AppliedPTransform) -> None - + def visit_transform(self, transform_node: 'AppliedPTransform') -> None: """Callback for visiting a transform leaf node in the pipeline DAG.""" pass - def enter_composite_transform(self, transform_node): - # type: (AppliedPTransform) -> None - + def enter_composite_transform( + self, transform_node: 'AppliedPTransform') -> None: """Callback for entering traversal of a composite transform node.""" pass - def leave_composite_transform(self, transform_node): - # type: (AppliedPTransform) -> None - + def leave_composite_transform( + self, transform_node: 'AppliedPTransform') -> None: """Callback for leaving traversal of a composite transform node.""" pass @@ -1193,12 +1141,11 @@ def _perform_exernal_transform_test(self, transform): if isinstance(transform, ExternalTransform): self._contains_external_transforms = True - def visit_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def visit_transform(self, transform_node: 'AppliedPTransform') -> None: self._perform_exernal_transform_test(transform_node.transform) - def enter_composite_transform(self, transform_node): - # type: (AppliedPTransform) -> None + def enter_composite_transform( + self, transform_node: 'AppliedPTransform') -> None: # Python SDK object graph may represent an external transform that is a leaf # of the pipeline graph as a composite without sub-transforms. # Note that this visitor is just used to identify pipelines with external @@ -1215,14 +1162,14 @@ class AppliedPTransform(object): """ def __init__( self, - parent, # type: Optional[AppliedPTransform] - transform, # type: Optional[ptransform.PTransform] - full_label, # type: str - main_inputs, # type: Optional[Mapping[str, Union[pvalue.PBegin, pvalue.PCollection]]] - environment_id, # type: Optional[str] - annotations, # type: Optional[Dict[str, bytes]] - ): - # type: (...) -> None + parent: Optional['AppliedPTransform'], + transform: Optional[ptransform.PTransform], + full_label: str, + main_inputs: Optional[Mapping[str, + Union[pvalue.PBegin, pvalue.PCollection]]], + environment_id: Optional[str], + annotations: Optional[dict[str, bytes]], + ) -> None: self.parent = parent self.transform = transform # Note that we want the PipelineVisitor classes to use the full_label, @@ -1236,15 +1183,16 @@ def __init__( self.side_inputs = ( tuple() if transform is None else getattr( transform, 'side_inputs', tuple())) - self.outputs = {} # type: Dict[Union[str, int, None], pvalue.PValue] - self.parts = [] # type: List[AppliedPTransform] - self.environment_id = environment_id if environment_id else None # type: Optional[str] + self.outputs: dict[Union[str, int, None], pvalue.PValue] = {} + self.parts: list[AppliedPTransform] = [] + self.environment_id: Optional[ + str] = environment_id if environment_id else None # We may need to merge the hints with environment-provided hints here # once environment is a first-class citizen in Beam graph and we have # access to actual environment, not just an id. - self.resource_hints = dict( + self.resource_hints: dict[str, bytes] = dict( transform.get_resource_hints()) if transform and hasattr( - transform, 'get_resource_hints') else {} # type: Dict[str, bytes] + transform, 'get_resource_hints') else {} if transform and hasattr(transform, 'annotations'): annotations = { @@ -1259,18 +1207,14 @@ def __init__( def inputs(self): return tuple(self.main_inputs.values()) - def __repr__(self): - # type: () -> str + def __repr__(self) -> str: return "%s(%s, %s)" % ( self.__class__.__name__, self.full_label, type(self.transform).__name__) def replace_output( self, - output, # type: Union[pvalue.PValue, pvalue.DoOutputsTuple] - tag=None # type: Union[str, int, None] - ): - # type: (...) -> None - + output: Union[pvalue.PValue, pvalue.DoOutputsTuple], + tag: Union[str, int, None] = None) -> None: """Replaces the output defined by the given tag with the given output. Args: @@ -1310,10 +1254,8 @@ def replace_side_inputs(self, side_inputs): def add_output( self, - output, # type: Union[pvalue.DoOutputsTuple, pvalue.PValue] - tag # type: Union[str, int, None] - ): - # type: (...) -> None + output: Union[pvalue.DoOutputsTuple, pvalue.PValue], + tag: Union[str, int, None]) -> None: if isinstance(output, pvalue.DoOutputsTuple): self.add_output(output[tag], tag) elif isinstance(output, pvalue.PValue): @@ -1322,15 +1264,12 @@ def add_output( else: raise TypeError("Unexpected output type: %s" % output) - def add_part(self, part): - # type: (AppliedPTransform) -> None + def add_part(self, part: 'AppliedPTransform') -> None: assert isinstance(part, AppliedPTransform) part._merge_outer_resource_hints() self.parts.append(part) - def is_composite(self): - # type: () -> bool - + def is_composite(self) -> bool: """Returns whether this is a composite transform. A composite transform has parts (inner transforms) or isn't the @@ -1342,12 +1281,9 @@ def is_composite(self): def visit( self, - visitor, # type: PipelineVisitor - pipeline, # type: Pipeline - visited # type: Set[pvalue.PValue] - ): - # type: (...) -> None - + visitor: PipelineVisitor, + pipeline: Pipeline, + visited: set[pvalue.PValue]) -> None: """Visits all nodes reachable from the current node.""" for in_pval in self.inputs: @@ -1396,8 +1332,7 @@ def visit( visited.add(v) visitor.visit_value(v, self) - def named_inputs(self): - # type: () -> Dict[str, pvalue.PValue] + def named_inputs(self) -> dict[str, pvalue.PValue]: if self.transform is None: assert not self.main_inputs and not self.side_inputs return {} @@ -1414,8 +1349,7 @@ def named_inputs(self): named_inputs[f'__implicit_input_{name}'] = pc_out return named_inputs - def named_outputs(self): - # type: () -> Dict[str, pvalue.PCollection] + def named_outputs(self) -> dict[str, pvalue.PCollection]: if self.transform is None: assert not self.outputs return {} @@ -1425,8 +1359,8 @@ def named_outputs(self): else: return {} - def to_runner_api(self, context): - # type: (PipelineContext) -> beam_runner_api_pb2.PTransform + def to_runner_api( + self, context: 'PipelineContext') -> beam_runner_api_pb2.PTransform: # External transforms require more splicing than just setting the spec. from apache_beam.transforms import external if isinstance(self.transform, external.ExternalTransform): @@ -1436,10 +1370,8 @@ def to_runner_api(self, context): return self.transform.to_runner_api_transform(context, self.full_label) def transform_to_runner_api( - transform, # type: Optional[ptransform.PTransform] - context # type: PipelineContext - ): - # type: (...) -> Optional[beam_runner_api_pb2.FunctionSpec] + transform: Optional[ptransform.PTransform], context: 'PipelineContext' + ) -> Optional[beam_runner_api_pb2.FunctionSpec]: if transform is None: return None else: @@ -1494,10 +1426,8 @@ def transform_to_runner_api( @staticmethod def from_runner_api( - proto, # type: beam_runner_api_pb2.PTransform - context # type: PipelineContext - ): - # type: (...) -> AppliedPTransform + proto: beam_runner_api_pb2.PTransform, + context: 'PipelineContext') -> 'AppliedPTransform': if common_urns.primitives.PAR_DO.urn == proto.spec.urn: # Preserving side input tags. @@ -1574,7 +1504,7 @@ def _merge_outer_resource_hints(self): part._merge_outer_resource_hints() -def encode_annotations(annotations: Optional[Dict[str, Any]]): +def encode_annotations(annotations: Optional[dict[str, Any]]): """Encodes non-byte annotation values as bytes.""" if not annotations: return {} @@ -1627,9 +1557,7 @@ class PTransformOverride(metaclass=abc.ABCMeta): different. """ @abc.abstractmethod - def matches(self, applied_ptransform): - # type: (AppliedPTransform) -> bool - + def matches(self, applied_ptransform: AppliedPTransform) -> bool: """Determines whether the given AppliedPTransform matches. Note that the matching will happen *after* Runner API proto translation. @@ -1648,9 +1576,7 @@ def matches(self, applied_ptransform): raise NotImplementedError def get_replacement_transform_for_applied_ptransform( - self, applied_ptransform): - # type: (AppliedPTransform) -> ptransform.PTransform - + self, applied_ptransform: AppliedPTransform) -> ptransform.PTransform: """Provides a runner specific override for a given `AppliedPTransform`. Args: @@ -1666,9 +1592,9 @@ def get_replacement_transform_for_applied_ptransform( @deprecated( since='2.24', current='get_replacement_transform_for_applied_ptransform') - def get_replacement_transform(self, ptransform): - # type: (Optional[ptransform.PTransform]) -> ptransform.PTransform - + def get_replacement_transform( + self, + ptransform: Optional[ptransform.PTransform]) -> ptransform.PTransform: """Provides a runner specific override for a given PTransform. Args: @@ -1681,9 +1607,8 @@ def get_replacement_transform(self, ptransform): # Returns a PTransformReplacement raise NotImplementedError - def get_replacement_inputs(self, applied_ptransform): - # type: (AppliedPTransform) -> Iterable[pvalue.PValue] - + def get_replacement_inputs( + self, applied_ptransform: AppliedPTransform) -> Iterable[pvalue.PValue]: """Provides inputs that will be passed to the replacement PTransform. Args: @@ -1706,8 +1631,8 @@ class ComponentIdMap(object): """ def __init__(self, namespace="ref"): self.namespace = namespace - self._counters = defaultdict(lambda: 0) # type: Dict[type, int] - self._obj_to_id = {} # type: Dict[Any, str] + self._counters: dict[type, int] = defaultdict(lambda: 0) + self._obj_to_id: dict[Any, str] = {} def get_or_assign(self, obj=None, obj_type=None, label=None): if obj not in self._obj_to_id: