Skip to content

Commit 624b2fc

Browse files
robertwbdamccorm
andauthored
Avoid unreasonably long stage names for @ptransform_fn. (#35660)
* Avoid unreasonably long stage names for @ptransform_fn. This respects the update compatibility flag and truncates rather than elides long first arguments to avoid issues with the prior attempt. * yapf * Update sdks/python/apache_beam/transforms/ptransform.py Co-authored-by: Danny McCormick <dannymccormick@google.com> * add type hint * Slightly safer ane more unique. Includes the tail as well as the prefix (useful for file paths) and ensures the result is a string (mock objects behave strangely). * Bump compat version. * Add a pair of tests. --------- Co-authored-by: Danny McCormick <dannymccormick@google.com>
1 parent ac2706a commit 624b2fc

File tree

3 files changed

+64
-7
lines changed

3 files changed

+64
-7
lines changed

sdks/python/apache_beam/pipeline.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@
115115

116116

117117
class Pipeline(HasDisplayData):
118-
"""A pipeline object that manages a DAG of
119-
:class:`~apache_beam.transforms.ptransform.PTransform` s
118+
"""A pipeline object that manages a DAG of
119+
:class:`~apache_beam.transforms.ptransform.PTransform` s
120120
and their :class:`~apache_beam.pvalue.PValue` s.
121121
122-
Conceptually the :class:`~apache_beam.transforms.ptransform.PTransform` s are
122+
Conceptually the :class:`~apache_beam.transforms.ptransform.PTransform` s are
123123
the DAG's nodes and the :class:`~apache_beam.pvalue.PValue` s are the edges.
124124
125125
All the transforms applied to the pipeline must have distinct full labels.
@@ -722,6 +722,10 @@ def apply(
722722
return self.apply(
723723
transform.transform, pvalueish, label or transform.label)
724724

725+
if not label and isinstance(transform, ptransform._PTransformFnPTransform):
726+
# This must be set before label is inspected.
727+
transform.set_options(self._options)
728+
725729
if not isinstance(transform, ptransform.PTransform):
726730
raise TypeError("Expected a PTransform object, got %s" % transform)
727731

sdks/python/apache_beam/transforms/ptransform.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,6 +1002,7 @@ def __init__(self, fn, *args, **kwargs):
10021002
self._fn = fn
10031003
self._args = args
10041004
self._kwargs = kwargs
1005+
self._use_backwards_compatible_label = True
10051006

10061007
def display_data(self):
10071008
res = {
@@ -1030,11 +1031,30 @@ def expand(self, pcoll):
10301031
pass
10311032
return self._fn(pcoll, *args, **kwargs)
10321033

1033-
def default_label(self):
1034+
def set_options(self, options):
1035+
# Avoid circular import.
1036+
from apache_beam.transforms.util import is_compat_version_prior_to
1037+
self._use_backwards_compatible_label = is_compat_version_prior_to(
1038+
options, '2.68.0')
1039+
1040+
def default_label(self) -> str:
1041+
# Attempt to give a reasonable name to this transform.
1042+
# We want it to be reasonably unique, but also not sensitive to
1043+
# irrelevent parameters to minimize pipeline-to-pipeline variance.
1044+
# For now, use only the first argument (if any), iff it would not make
1045+
# the name unwieldy.
10341046
if self._args:
1035-
return '%s(%s)' % (
1036-
label_from_callable(self._fn), label_from_callable(self._args[0]))
1037-
return label_from_callable(self._fn)
1047+
first_arg_string = label_from_callable(self._args[0])
1048+
if (self._use_backwards_compatible_label or
1049+
not isinstance(first_arg_string, str) or len(first_arg_string) <= 19):
1050+
suffix = '(%s)' % first_arg_string
1051+
else:
1052+
suffix = ('(%s...%s)' %
1053+
(first_arg_string[:10], first_arg_string[-6:])).replace(
1054+
'\n', ' ')
1055+
else:
1056+
suffix = ''
1057+
return label_from_callable(self._fn) + suffix
10381058

10391059

10401060
def ptransform_fn(fn):

sdks/python/apache_beam/transforms/ptransform_test.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,6 +1157,39 @@ def test_apply_ptransform_using_decorator(self):
11571157
self.assertTrue('*Sample*/Group' in pipeline.applied_labels)
11581158
self.assertTrue('*Sample*/Distinct' in pipeline.applied_labels)
11591159

1160+
def test_ptransformfn_default_label(self):
1161+
@beam.ptransform_fn
1162+
def MyTransform(self, suffix="xyz"):
1163+
return pcoll | beam.Map(lambda s: s + suffix)
1164+
1165+
pipeline = TestPipeline()
1166+
pcoll = pipeline | beam.Create(['a', 'b', 'c'])
1167+
1168+
_ = pcoll | MyTransform()
1169+
self.assertIn('MyTransform', pipeline.applied_labels)
1170+
_ = pcoll | MyTransform("suffix")
1171+
self.assertIn('MyTransform(suffix)', pipeline.applied_labels)
1172+
_ = pcoll | MyTransform("looooooooooooooooooooooooooooooooooooooooong")
1173+
self.assertIn('MyTransform(looooooooo...oooong)', pipeline.applied_labels)
1174+
1175+
def test_ptransformfn_legacy_default_label(self):
1176+
@beam.ptransform_fn
1177+
def MyTransform(self, suffix="xyz"):
1178+
return pcoll | beam.Map(lambda s: s + suffix)
1179+
1180+
pipeline = TestPipeline(
1181+
options=PipelineOptions(update_compatibility_version='2.67.0'))
1182+
pcoll = pipeline | beam.Create(['a', 'b', 'c'])
1183+
1184+
_ = pcoll | MyTransform()
1185+
self.assertIn('MyTransform', pipeline.applied_labels)
1186+
_ = pcoll | MyTransform("suffix")
1187+
self.assertIn('MyTransform(suffix)', pipeline.applied_labels)
1188+
_ = pcoll | MyTransform("looooooooooooooooooooooooooooooooooooooooong")
1189+
self.assertIn(
1190+
'MyTransform(looooooooooooooooooooooooooooooooooooooooong)',
1191+
pipeline.applied_labels)
1192+
11601193
def test_combine_with_label(self):
11611194
vals = [1, 2, 3, 4, 5, 6, 7]
11621195
with TestPipeline() as pipeline:

0 commit comments

Comments
 (0)