Skip to content

Commit a9a2d2b

Browse files
committed
use PEP-585 generics
1 parent 576acab commit a9a2d2b

File tree

2 files changed

+22
-27
lines changed

2 files changed

+22
-27
lines changed

sdks/python/apache_beam/runners/interactive/interactive_beam.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,9 @@
3535
# pytype: skip-file
3636

3737
import logging
38+
from collections.abc import Iterable
3839
from datetime import timedelta
3940
from typing import Any
40-
from typing import Dict
41-
from typing import Iterable
42-
from typing import List
4341
from typing import Optional
4442
from typing import Union
4543

@@ -276,7 +274,7 @@ class Recordings():
276274
"""
277275
def describe(
278276
self,
279-
pipeline: Optional[beam.Pipeline] = None) -> Dict[str, Any]: # noqa: F821
277+
pipeline: Optional[beam.Pipeline] = None) -> dict[str, Any]: # noqa: F821
280278
"""Returns a description of all the recordings for the given pipeline.
281279
282280
If no pipeline is given then this returns a dictionary of descriptions for
@@ -418,10 +416,10 @@ class Clusters:
418416
# DATAPROC_IMAGE_VERSION = '2.0.XX-debian10'
419417

420418
def __init__(self) -> None:
421-
self.dataproc_cluster_managers: Dict[ClusterMetadata,
419+
self.dataproc_cluster_managers: dict[ClusterMetadata,
422420
DataprocClusterManager] = {}
423-
self.master_urls: Dict[str, ClusterMetadata] = {}
424-
self.pipelines: Dict[beam.Pipeline, DataprocClusterManager] = {}
421+
self.master_urls: dict[str, ClusterMetadata] = {}
422+
self.pipelines: dict[beam.Pipeline, DataprocClusterManager] = {}
425423
self.default_cluster_metadata: Optional[ClusterMetadata] = None
426424

427425
def create(
@@ -512,7 +510,7 @@ def cleanup(
512510
def describe(
513511
self,
514512
cluster_identifier: Optional[ClusterIdentifier] = None
515-
) -> Union[ClusterMetadata, List[ClusterMetadata]]:
513+
) -> Union[ClusterMetadata, list[ClusterMetadata]]:
516514
"""Describes the ClusterMetadata by a ClusterIdentifier.
517515
518516
If no cluster_identifier is given or if the cluster_identifier is unknown,
@@ -680,7 +678,7 @@ def run_pipeline(self):
680678

681679
@progress_indicated
682680
def show(
683-
*pcolls: Union[Dict[Any, PCollection], Iterable[PCollection], PCollection],
681+
*pcolls: Union[dict[Any, PCollection], Iterable[PCollection], PCollection],
684682
include_window_info: bool = False,
685683
visualize_data: bool = False,
686684
n: Union[int, str] = 'inf',
@@ -1015,7 +1013,7 @@ def as_pcollection(pcoll_or_df):
10151013

10161014
@progress_indicated
10171015
def compute(
1018-
*pcolls: Union[Dict[Any, PCollection], Iterable[PCollection], PCollection],
1016+
*pcolls: Union[dict[Any, PCollection], Iterable[PCollection], PCollection],
10191017
wait_for_inputs: bool = True,
10201018
blocking: bool = False,
10211019
runner=None,

sdks/python/apache_beam/runners/interactive/recording_manager.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@
2525
from concurrent.futures import Future
2626
from concurrent.futures import ThreadPoolExecutor
2727
from typing import Any
28-
from typing import Dict
29-
from typing import List
3028
from typing import Optional
31-
from typing import Set
3229
from typing import Union
3330

3431
import pandas as pd
@@ -66,7 +63,7 @@ class AsyncComputationResult:
6663
def __init__(
6764
self,
6865
future: Future,
69-
pcolls: Set[beam.pvalue.PCollection],
66+
pcolls: set[beam.pvalue.PCollection],
7067
user_pipeline: beam.Pipeline,
7168
recording_manager: 'RecordingManager',
7269
):
@@ -323,7 +320,7 @@ class Recording:
323320
def __init__(
324321
self,
325322
user_pipeline: beam.Pipeline,
326-
pcolls: List[beam.pvalue.PCollection], # noqa: F821
323+
pcolls: list[beam.pvalue.PCollection], # noqa: F821
327324
result: 'beam.runner.PipelineResult',
328325
max_n: int,
329326
max_duration_secs: float,
@@ -416,7 +413,7 @@ def wait_until_finish(self) -> None:
416413
self._mark_computed.join()
417414
return self._result.state
418415

419-
def describe(self) -> Dict[str, int]:
416+
def describe(self) -> dict[str, int]:
420417
"""Returns a dictionary describing the cache and recording."""
421418
cache_manager = ie.current_env().get_cache_manager(self._user_pipeline)
422419

@@ -431,7 +428,7 @@ def __init__(
431428
self,
432429
user_pipeline: beam.Pipeline,
433430
pipeline_var: str = None,
434-
test_limiters: List['Limiter'] = None) -> None: # noqa: F821
431+
test_limiters: list['Limiter'] = None) -> None: # noqa: F821
435432

436433
self.user_pipeline: beam.Pipeline = user_pipeline
437434
self.pipeline_var: str = pipeline_var if pipeline_var else ''
@@ -440,12 +437,12 @@ def __init__(
440437
self._test_limiters = test_limiters if test_limiters else []
441438
self._executor = ThreadPoolExecutor(max_workers=os.cpu_count())
442439
self._env = ie.current_env()
443-
self._async_computations: Dict[str, AsyncComputationResult] = {}
440+
self._async_computations: dict[str, AsyncComputationResult] = {}
444441
self._pipeline_graph = PipelineGraph(self.user_pipeline)
445442

446443
def _execute_pipeline_fragment(
447444
self,
448-
pcolls_to_compute: Set[beam.pvalue.PCollection],
445+
pcolls_to_compute: set[beam.pvalue.PCollection],
449446
async_result: Optional['AsyncComputationResult'] = None,
450447
runner: runner.PipelineRunner = None,
451448
options: pipeline_options.PipelineOptions = None,
@@ -483,7 +480,7 @@ def _execute_pipeline_fragment(
483480

484481
def _run_async_computation(
485482
self,
486-
pcolls_to_compute: Set[beam.pvalue.PCollection],
483+
pcolls_to_compute: set[beam.pvalue.PCollection],
487484
async_result: 'AsyncComputationResult',
488485
wait_for_inputs: bool,
489486
runner: runner.PipelineRunner = None,
@@ -522,7 +519,7 @@ def _run_async_computation(
522519
# finally:
523520
# self._env.unmark_pcollection_computing(pcolls_to_compute)
524521

525-
def _watch(self, pcolls: List[beam.pvalue.PCollection]) -> None:
522+
def _watch(self, pcolls: list[beam.pvalue.PCollection]) -> None:
526523
"""Watch any pcollections not being watched.
527524
528525
This allows for the underlying caching layer to identify the PCollection as
@@ -592,7 +589,7 @@ def cancel(self: None) -> None:
592589
# evict the BCJ after they complete.
593590
ie.current_env().evict_background_caching_job(self.user_pipeline)
594591

595-
def describe(self) -> Dict[str, int]:
592+
def describe(self) -> dict[str, int]:
596593
"""Returns a dictionary describing the cache and recording."""
597594

598595
cache_manager = ie.current_env().get_cache_manager(self.user_pipeline)
@@ -643,7 +640,7 @@ def record_pipeline(self) -> bool:
643640

644641
def compute_async(
645642
self,
646-
pcolls: Set[beam.pvalue.PCollection],
643+
pcolls: set[beam.pvalue.PCollection],
647644
wait_for_inputs: bool = True,
648645
blocking: bool = False,
649646
runner: runner.PipelineRunner = None,
@@ -721,7 +718,7 @@ def _get_pcoll_id_map(self):
721718

722719
def _get_all_dependencies(
723720
self,
724-
pcolls: Set[beam.pvalue.PCollection]) -> Set[beam.pvalue.PCollection]:
721+
pcolls: set[beam.pvalue.PCollection]) -> set[beam.pvalue.PCollection]:
725722
"""Gets all upstream PCollection dependencies
726723
for the given set of PCollections."""
727724
if not self._pipeline_graph:
@@ -780,13 +777,13 @@ def _get_all_dependencies(
780777

781778
def _wait_for_dependencies(
782779
self,
783-
pcolls: Set[beam.pvalue.PCollection],
780+
pcolls: set[beam.pvalue.PCollection],
784781
async_result: Optional[AsyncComputationResult] = None,
785782
) -> bool:
786783
"""Waits for any dependencies of the given
787784
PCollections that are currently being computed."""
788785
dependencies = self._get_all_dependencies(pcolls)
789-
computing_deps: Dict[beam.pvalue.PCollection, AsyncComputationResult] = {}
786+
computing_deps: dict[beam.pvalue.PCollection, AsyncComputationResult] = {}
790787

791788
for dep in dependencies:
792789
if self._env.is_pcollection_computing(dep):
@@ -829,7 +826,7 @@ def _wait_for_dependencies(
829826

830827
def record(
831828
self,
832-
pcolls: List[beam.pvalue.PCollection],
829+
pcolls: list[beam.pvalue.PCollection],
833830
*,
834831
max_n: int,
835832
max_duration: Union[int, str],

0 commit comments

Comments
 (0)