Skip to content

Commit 8a4551b

Browse files
committed
Use PEP-585 types.
1 parent 8db33ca commit 8a4551b

File tree

1 file changed

+36
-41
lines changed

1 file changed

+36
-41
lines changed

sdks/python/apache_beam/pipeline.py

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,12 @@
5959
import unicodedata
6060
import uuid
6161
from collections import defaultdict
62+
from collections.abc import Iterable
63+
from collections.abc import Mapping
64+
from collections.abc import Sequence
6265
from typing import TYPE_CHECKING
6366
from typing import Any
64-
from typing import Dict
65-
from typing import FrozenSet
66-
from typing import Iterable
67-
from typing import List
68-
from typing import Mapping
6967
from typing import Optional
70-
from typing import Sequence
71-
from typing import Set
72-
from typing import Tuple
7368
from typing import Type
7469
from typing import Union
7570

@@ -131,7 +126,7 @@ class Pipeline(HasDisplayData):
131126
(e.g. ``input | "label" >> my_transform``).
132127
"""
133128
@classmethod
134-
def runner_implemented_transforms(cls) -> FrozenSet[str]:
129+
def runner_implemented_transforms(cls) -> frozenset[str]:
135130

136131
# This set should only contain transforms which are required to be
137132
# implemented by a runner.
@@ -144,8 +139,8 @@ def __init__(
144139
self,
145140
runner: Optional[Union[str, PipelineRunner]] = None,
146141
options: Optional[PipelineOptions] = None,
147-
argv: Optional[List[str]] = None,
148-
display_data: Optional[Dict[str, Any]] = None):
142+
argv: Optional[list[str]] = None,
143+
display_data: Optional[dict[str, Any]] = None):
149144
"""Initialize a pipeline object.
150145
151146
Args:
@@ -157,11 +152,11 @@ def __init__(
157152
A configured
158153
:class:`~apache_beam.options.pipeline_options.PipelineOptions` object
159154
containing arguments that should be used for running the Beam job.
160-
argv (List[str]): a list of arguments (such as :data:`sys.argv`)
155+
argv (list[str]): a list of arguments (such as :data:`sys.argv`)
161156
to be used for building a
162157
:class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
163158
This will only be used if argument **options** is :data:`None`.
164-
display_data (Dict[str, Any]): a dictionary of static data associated
159+
display_data (dict[str, Any]): a dictionary of static data associated
165160
with this pipeline that can be displayed when it runs.
166161
167162
Raises:
@@ -255,7 +250,7 @@ def __init__(
255250
# Set of transform labels (full labels) applied to the pipeline.
256251
# If a transform is applied and the full label is already in the set
257252
# then the transform will have to be cloned with a new label.
258-
self.applied_labels: Set[str] = set()
253+
self.applied_labels: set[str] = set()
259254
# Hints supplied via pipeline options are considered the outermost hints.
260255
self._root_transform().resource_hints = resource_hints_from_options(options)
261256
# Create a ComponentIdMap for assigning IDs to components. Ensures that any
@@ -271,7 +266,7 @@ def __init__(
271266
self._error_handlers = []
272267
self._annotations_stack = [{}]
273268

274-
def display_data(self) -> Dict[str, Any]:
269+
def display_data(self) -> dict[str, Any]:
275270
return self._display_data
276271

277272
@property # type: ignore[misc] # decorated property not supported
@@ -322,15 +317,15 @@ def _replace(self, override: 'PTransformOverride') -> None:
322317
assert isinstance(override, PTransformOverride)
323318

324319
# From original transform output --> replacement transform output
325-
output_map: Dict[pvalue.PValue, pvalue.PValue] = {}
326-
output_replacements: Dict[AppliedPTransform,
327-
List[Tuple[pvalue.PValue, Optional[str]]]] = {}
328-
input_replacements: Dict[AppliedPTransform,
320+
output_map: dict[pvalue.PValue, pvalue.PValue] = {}
321+
output_replacements: dict[AppliedPTransform,
322+
list[tuple[pvalue.PValue, Optional[str]]]] = {}
323+
input_replacements: dict[AppliedPTransform,
329324
Mapping[str,
330325
Union[pvalue.PBegin,
331326
pvalue.PCollection]]] = {}
332-
side_input_replacements: Dict[AppliedPTransform,
333-
List[pvalue.AsSideInput]] = {}
327+
side_input_replacements: dict[AppliedPTransform,
328+
list[pvalue.AsSideInput]] = {}
334329

335330
class TransformUpdater(PipelineVisitor): # pylint: disable=used-before-assignment
336331
""""A visitor that replaces the matching PTransforms."""
@@ -548,7 +543,7 @@ def replace_all(self, replacements: Iterable['PTransformOverride']) -> None:
548543
output types are different.
549544
550545
Args:
551-
replacements (List[~apache_beam.pipeline.PTransformOverride]): a list of
546+
replacements (list[~apache_beam.pipeline.PTransformOverride]): a list of
552547
:class:`~apache_beam.pipeline.PTransformOverride` objects.
553548
"""
554549
for override in replacements:
@@ -562,7 +557,7 @@ def replace_all(self, replacements: Iterable['PTransformOverride']) -> None:
562557
for override in replacements:
563558
self._check_replacement(override)
564559

565-
def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> PipelineResult:
560+
def run(self, test_runner_api: Union[bool, str] = 'AUTO') -> 'PipelineResult':
566561
"""Runs the pipeline. Returns whatever our runner returns after running."""
567562
# All pipeline options are finalized at this point.
568563
# Call get_all_options to print warnings on invalid options.
@@ -677,7 +672,7 @@ def visit(self, visitor: 'PipelineVisitor') -> None:
677672
belong to this pipeline instance.
678673
"""
679674

680-
visited: Set[pvalue.PValue] = set()
675+
visited: set[pvalue.PValue] = set()
681676
self._root_transform().visit(visitor, self, visited)
682677

683678
def apply(
@@ -910,7 +905,7 @@ def _infer_result_type(
910905
if pcoll.element_type is None:
911906
pcoll.element_type = typehints.Any
912907

913-
def __reduce__(self) -> Tuple[Type, Tuple[str, ...]]:
908+
def __reduce__(self) -> tuple[Type, tuple[str, ...]]:
914909
# Some transforms contain a reference to their enclosing pipeline,
915910
# which in turn reference all other transforms (resulting in quadratic
916911
# time/space to pickle each transform individually). As we don't
@@ -950,9 +945,9 @@ def visit_value(self, value: pvalue.PValue, _: AppliedPTransform) -> None:
950945
def to_runner_api(
951946
self,
952947
return_context: bool = False,
953-
context: Optional[PipelineContext] = None,
948+
context: Optional['PipelineContext'] = None,
954949
use_fake_coders: bool = False,
955-
default_environment: Optional[environments.Environment] = None
950+
default_environment: Optional['environments.Environment'] = None
956951
) -> beam_runner_api_pb2.Pipeline:
957952
"""For internal use only; no backwards-compatibility guarantees."""
958953
from apache_beam.runners import pipeline_context
@@ -1173,7 +1168,7 @@ def __init__(
11731168
main_inputs: Optional[Mapping[str,
11741169
Union[pvalue.PBegin, pvalue.PCollection]]],
11751170
environment_id: Optional[str],
1176-
annotations: Optional[Dict[str, bytes]],
1171+
annotations: Optional[dict[str, bytes]],
11771172
) -> None:
11781173
self.parent = parent
11791174
self.transform = transform
@@ -1188,14 +1183,14 @@ def __init__(
11881183
self.side_inputs = (
11891184
tuple() if transform is None else getattr(
11901185
transform, 'side_inputs', tuple()))
1191-
self.outputs: Dict[Union[str, int, None], pvalue.PValue] = {}
1192-
self.parts: List[AppliedPTransform] = []
1186+
self.outputs: dict[Union[str, int, None], pvalue.PValue] = {}
1187+
self.parts: list[AppliedPTransform] = []
11931188
self.environment_id: Optional[
11941189
str] = environment_id if environment_id else None
11951190
# We may need to merge the hints with environment-provided hints here
11961191
# once environment is a first-class citizen in Beam graph and we have
11971192
# access to actual environment, not just an id.
1198-
self.resource_hints: Dict[str, bytes] = dict(
1193+
self.resource_hints: dict[str, bytes] = dict(
11991194
transform.get_resource_hints()) if transform and hasattr(
12001195
transform, 'get_resource_hints') else {}
12011196

@@ -1288,7 +1283,7 @@ def visit(
12881283
self,
12891284
visitor: PipelineVisitor,
12901285
pipeline: Pipeline,
1291-
visited: Set[pvalue.PValue]) -> None:
1286+
visited: set[pvalue.PValue]) -> None:
12921287
"""Visits all nodes reachable from the current node."""
12931288

12941289
for in_pval in self.inputs:
@@ -1337,7 +1332,7 @@ def visit(
13371332
visited.add(v)
13381333
visitor.visit_value(v, self)
13391334

1340-
def named_inputs(self) -> Dict[str, pvalue.PValue]:
1335+
def named_inputs(self) -> dict[str, pvalue.PValue]:
13411336
if self.transform is None:
13421337
assert not self.main_inputs and not self.side_inputs
13431338
return {}
@@ -1354,7 +1349,7 @@ def named_inputs(self) -> Dict[str, pvalue.PValue]:
13541349
named_inputs[f'__implicit_input_{name}'] = pc_out
13551350
return named_inputs
13561351

1357-
def named_outputs(self) -> Dict[str, pvalue.PCollection]:
1352+
def named_outputs(self) -> dict[str, pvalue.PCollection]:
13581353
if self.transform is None:
13591354
assert not self.outputs
13601355
return {}
@@ -1365,7 +1360,7 @@ def named_outputs(self) -> Dict[str, pvalue.PCollection]:
13651360
return {}
13661361

13671362
def to_runner_api(
1368-
self, context: PipelineContext) -> beam_runner_api_pb2.PTransform:
1363+
self, context: 'PipelineContext') -> beam_runner_api_pb2.PTransform:
13691364
# External transforms require more splicing than just setting the spec.
13701365
from apache_beam.transforms import external
13711366
if isinstance(self.transform, external.ExternalTransform):
@@ -1375,8 +1370,8 @@ def to_runner_api(
13751370
return self.transform.to_runner_api_transform(context, self.full_label)
13761371

13771372
def transform_to_runner_api(
1378-
transform: Optional[ptransform.PTransform],
1379-
context: PipelineContext) -> Optional[beam_runner_api_pb2.FunctionSpec]:
1373+
transform: Optional[ptransform.PTransform], context: 'PipelineContext'
1374+
) -> Optional[beam_runner_api_pb2.FunctionSpec]:
13801375
if transform is None:
13811376
return None
13821377
else:
@@ -1432,7 +1427,7 @@ def transform_to_runner_api(
14321427
@staticmethod
14331428
def from_runner_api(
14341429
proto: beam_runner_api_pb2.PTransform,
1435-
context: PipelineContext) -> 'AppliedPTransform':
1430+
context: 'PipelineContext') -> 'AppliedPTransform':
14361431

14371432
if common_urns.primitives.PAR_DO.urn == proto.spec.urn:
14381433
# Preserving side input tags.
@@ -1509,7 +1504,7 @@ def _merge_outer_resource_hints(self):
15091504
part._merge_outer_resource_hints()
15101505

15111506

1512-
def encode_annotations(annotations: Optional[Dict[str, Any]]):
1507+
def encode_annotations(annotations: Optional[dict[str, Any]]):
15131508
"""Encodes non-byte annotation values as bytes."""
15141509
if not annotations:
15151510
return {}
@@ -1636,8 +1631,8 @@ class ComponentIdMap(object):
16361631
"""
16371632
def __init__(self, namespace="ref"):
16381633
self.namespace = namespace
1639-
self._counters: Dict[type, int] = defaultdict(lambda: 0)
1640-
self._obj_to_id: Dict[Any, str] = {}
1634+
self._counters: dict[type, int] = defaultdict(lambda: 0)
1635+
self._obj_to_id: dict[Any, str] = {}
16411636

16421637
def get_or_assign(self, obj=None, obj_type=None, label=None):
16431638
if obj not in self._obj_to_id:

0 commit comments

Comments
 (0)