Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,21 @@ def enter_composite_transform(self, transform_node):
self._perform_exernal_transform_test(transform_node.transform)


class DisplayDataContainer(HasDisplayData):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class actually used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, leftover from a prior implementation that I scrapped. I originally grabbed all display data from the transform at AppliedPTransform instantiation time, but turns out some transforms define their display_data at transform expansion time (which is after AppliedPTransform instantiation time).

Will remove this

def __init__(self, display_data_dict, display_data_namespace):
self.display_data_dict = display_data_dict
self.display_data_namespace = display_data_namespace

def display_data(self):
# type: () -> Dict[str, DisplayData]

"""Returns the display data for this object."""
return self.display_data_dict

def _get_display_data_namespace(self): # type: () -> str
return self.display_data_namespace


class AppliedPTransform(object):
"""For internal use only; no backwards-compatibility guarantees.

Expand Down Expand Up @@ -1232,6 +1247,8 @@ def __init__(

self.annotations = annotations

self.display_data = {}

@property
def inputs(self):
return tuple(self.main_inputs.values())
Expand Down Expand Up @@ -1435,6 +1452,11 @@ def transform_to_runner_api(
(transform_urn not in Pipeline.runner_implemented_transforms())):
environment_id = context.get_environment_id_for_resource_hints(
self.resource_hints)
if self.transform is not None:
display_data = DisplayData.create_from(
self.transform, extra_items=self.display_data)
else:
display_data = None

return beam_runner_api_pb2.PTransform(
unique_name=self.full_label,
Expand All @@ -1454,8 +1476,7 @@ def transform_to_runner_api(
environment_id=environment_id,
annotations=self.annotations,
# TODO(https://github.com/apache/beam/issues/18012): Add display_data.
display_data=DisplayData.create_from(self.transform).to_proto()
if self.transform else None)
display_data=display_data.to_proto() if display_data else None)

@staticmethod
def from_runner_api(
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/transforms/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def create_from_options(cls, pipeline_options):
return cls(pipeline_options._get_display_data_namespace(), items)

@classmethod
def create_from(cls, has_display_data):
def create_from(cls, has_display_data, extra_items=None):
""" Creates :class:`~apache_beam.transforms.display.DisplayData` from a
:class:`HasDisplayData` instance.

Expand All @@ -243,9 +243,11 @@ def create_from(cls, has_display_data):
raise ValueError(
'Element of class {}.{} does not subclass HasDisplayData'.format(
has_display_data.__module__, has_display_data.__class__.__name__))
if extra_items is None:
extra_items = {}
return cls(
has_display_data._get_display_data_namespace(),
has_display_data.display_data())
dict(**has_display_data.display_data(), **extra_items))


class DisplayDataItem(object):
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/transforms/display_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ def display_data(self):
]

hc.assert_that(dd.items, hc.has_items(*expected_items))
expected_items.append(
DisplayDataItemMatcher(
key='extra_key', value='extra_value', namespace=nspace))
hc.assert_that(
DisplayData.create_from(fn, extra_items={
'extra_key': 'extra_value'
}).items,
hc.has_items(*expected_items))

def test_drop_if_none(self):
class MyDoFn(beam.DoFn):
Expand Down
Loading