Skip to content

Commit 14f7caf

Browse files
committed
Update lineage query function.
1 parent 71a5ced commit 14f7caf

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

sdks/python/apache_beam/metrics/cells.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,9 @@ def contains(self, value):
811811
else:
812812
return False
813813

814+
def flattened(self):
815+
return self.as_trie().flattened()
816+
814817
def to_proto(self) -> metrics_pb2.BoundedTrie:
815818
return metrics_pb2.BoundedTrie(
816819
bound=self._bound,

sdks/python/apache_beam/metrics/metric.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -456,14 +456,18 @@ def add_raw(self, *rollup_segments: str) -> None:
456456
self.metric.add(rollup_segments)
457457

458458
@staticmethod
459-
def query(results: MetricResults, label: str) -> Set[str]:
459+
def query(results: MetricResults,
460+
label: str,
461+
truncated_marker: str = '*') -> Set[str]:
460462
if not label in Lineage._METRICS:
461463
raise ValueError("Label {} does not exist for Lineage", label)
462464
response = results.query(
463465
MetricsFilter().with_namespace(Lineage.LINEAGE_NAMESPACE).with_name(
464-
label))[MetricResults.STRINGSETS]
466+
label))[MetricResults.BOUNDED_TRIES]
465467
result = set()
466468
for metric in response:
467-
result.update(metric.committed)
468-
result.update(metric.attempted)
469+
for fqn in metric.committed.flattened():
470+
result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else ''))
471+
for fqn in metric.attempted.flattened():
472+
result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else ''))
469473
return result

0 commit comments

Comments
 (0)