Skip to content

Commit e77029f

Browse files
committed
Fix typehint in ReshufflePerKey on global window setting.
1 parent e1245d9 commit e77029f

File tree

1 file changed

+8
-8
lines changed
  • sdks/python/apache_beam/transforms

1 file changed

+8
-8
lines changed

sdks/python/apache_beam/transforms/util.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
from apache_beam.typehints.decorators import get_signature
7676
from apache_beam.typehints.sharded_key_type import ShardedKeyType
7777
from apache_beam.utils import shared
78+
from apache_beam.utils.timestamp import Timestamp
7879
from apache_beam.utils import windowed_value
7980
from apache_beam.utils.annotations import deprecated
8081
from apache_beam.utils.sharded_key import ShardedKey
@@ -966,7 +967,7 @@ def restore_timestamps(element):
966967
key, windowed_values = element
967968
return [wv.with_value((key, wv.value)) for wv in windowed_values]
968969

969-
ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any)
970+
ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]])
970971

971972
# TODO(https://github.com/apache/beam/issues/19785) Using global window as
972973
# one of the standard window. This is to mitigate the Dataflow Java Runner
@@ -1005,7 +1006,6 @@ def __init__(self, num_buckets=None):
10051006
generated.
10061007
"""
10071008
self.num_buckets = num_buckets if num_buckets else self._DEFAULT_NUM_BUCKETS
1008-
10091009
valid_buckets = isinstance(num_buckets, int) and num_buckets > 0
10101010
if not (num_buckets is None or valid_buckets):
10111011
raise ValueError(
@@ -1015,12 +1015,12 @@ def __init__(self, num_buckets=None):
10151015
def expand(self, pcoll):
10161016
# type: (pvalue.PValue) -> pvalue.PCollection
10171017
return (
1018-
pcoll | 'AddRandomKeys' >>
1019-
Map(lambda t: (random.randrange(0, self.num_buckets), t)
1020-
).with_input_types(T).with_output_types(Tuple[int, T])
1021-
| ReshufflePerKey()
1022-
| 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types(
1023-
Tuple[int, T]).with_output_types(T))
1018+
pcoll | 'AddRandomKeys' >>
1019+
Map(lambda t: (random.randrange(0, self.num_buckets), t)
1020+
).with_input_types(T).with_output_types(Tuple[int, T])
1021+
| ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types(Tuple[int, T])
1022+
| 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types(
1023+
Tuple[int, T]).with_output_types(T))
10241024

10251025
def to_runner_api_parameter(self, unused_context):
10261026
# type: (PipelineContext) -> Tuple[str, None]

0 commit comments

Comments
 (0)