Skip to content

Commit 154bd93

Browse files
authored
Merge branch 'apache:master' into master
2 parents beb3039 + 56409b2 commit 154bd93

File tree

18 files changed

+227
-213
lines changed

18 files changed

+227
-213
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858

5959
## Highlights
6060

61+
* Managed API for [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/managed/Managed.html) and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.managed.html#module-apache_beam.transforms.managed) supports [key I/O connectors](https://beam.apache.org/documentation/io/connectors/) Iceberg, Kafka, and BigQuery.
6162
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
6263
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
6364

@@ -71,6 +72,7 @@
7172
* Support custom coders in Reshuffle ([#29908](https://github.com/apache/beam/issues/29908), [#33356](https://github.com/apache/beam/issues/33356)).
7273
* [Java] Upgrade SLF4J to 2.0.16. Update default Spark version to 3.5.0. ([#33574](https://github.com/apache/beam/pull/33574))
7374
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
75+
* Managed API for [Java](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/managed/Managed.html) and [Python](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.managed.html#module-apache_beam.transforms.managed) supports [key I/O connectors](https://beam.apache.org/documentation/io/connectors/) Iceberg, Kafka, and BigQuery.
7476

7577
## Breaking Changes
7678

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java

Lines changed: 126 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
*/
1818
package org.apache.beam.sdk.metrics;
1919

20+
import java.util.ArrayList;
2021
import java.util.HashSet;
22+
import java.util.Iterator;
23+
import java.util.List;
2124
import java.util.Set;
2225
import java.util.regex.Pattern;
2326
import org.apache.beam.sdk.annotations.Internal;
24-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
27+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
28+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
2530
import org.checkerframework.checker.nullness.qual.Nullable;
2631

2732
/**
@@ -32,12 +37,13 @@ public class Lineage {
3237
public static final String LINEAGE_NAMESPACE = "lineage";
3338
private static final Lineage SOURCES = new Lineage(Type.SOURCE);
3439
private static final Lineage SINKS = new Lineage(Type.SINK);
35-
private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]");
40+
// Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot.
41+
private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]");
3642

37-
private final StringSet metric;
43+
private final BoundedTrie metric;
3844

3945
private Lineage(Type type) {
40-
this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
46+
this.metric = Metrics.boundedTrie(LINEAGE_NAMESPACE, type.toString());
4147
}
4248

4349
/** {@link Lineage} representing sources and optionally side inputs. */
@@ -50,111 +56,135 @@ public static Lineage getSinks() {
5056
return SINKS;
5157
}
5258

53-
/**
54-
* Wrap segment to valid segment name.
55-
*
56-
* <p>Specifically, If there are reserved chars (colon, whitespace, dot), escape with backtick. If
57-
* the segment is already wrapped, return the original.
58-
*
59-
* <p>This helper method is for internal and testing usage only.
60-
*/
61-
@Internal
62-
public static String wrapSegment(String value) {
63-
if (value.startsWith("`") && value.endsWith("`")) {
64-
return value;
59+
@VisibleForTesting
60+
static Iterable<String> getFQNParts(
61+
String system,
62+
@Nullable String subtype,
63+
Iterable<String> segments,
64+
@Nullable String lastSegmentSep) {
65+
66+
List<String> parts = new ArrayList<>();
67+
parts.add(system + ":");
68+
if (subtype != null) {
69+
parts.add(subtype + ":");
6570
}
66-
if (RESERVED_CHARS.matcher(value).find()) {
67-
return String.format("`%s`", value);
68-
}
69-
return value;
70-
}
7171

72-
/**
73-
* Assemble fully qualified name (<a
74-
* href="https://cloud.google.com/data-catalog/docs/fully-qualified-names">FQN</a>). Format:
75-
*
76-
* <ul>
77-
* <li>{@code system:segment1.segment2}
78-
* <li>{@code system:subtype:segment1.segment2}
79-
* <li>{@code system:`segment1.with.dots:clons`.segment2}
80-
* </ul>
81-
*
82-
* <p>This helper method is for internal and testing usage only.
83-
*/
84-
@Internal
85-
public static String getFqName(
86-
String system, @Nullable String subtype, Iterable<String> segments) {
87-
StringBuilder builder = new StringBuilder(system);
88-
if (!Strings.isNullOrEmpty(subtype)) {
89-
builder.append(":").append(subtype);
90-
}
91-
int idx = 0;
92-
for (String segment : segments) {
93-
if (idx == 0) {
94-
builder.append(":");
95-
} else {
96-
builder.append(".");
72+
if (segments != null) {
73+
Iterator<String> iterator = segments.iterator();
74+
String previousSegment = null;
75+
while (iterator.hasNext()) {
76+
if (previousSegment != null) {
77+
parts.add(wrapSegment(previousSegment) + ".");
78+
}
79+
previousSegment = iterator.next();
80+
}
81+
82+
if (previousSegment != null) {
83+
if (lastSegmentSep != null) {
84+
List<String> subSegments =
85+
Splitter.onPattern(lastSegmentSep).splitToList(wrapSegment(previousSegment));
86+
for (int i = 0; i < subSegments.size() - 1; i++) {
87+
parts.add(subSegments.get(i) + lastSegmentSep);
88+
}
89+
parts.add(subSegments.get(subSegments.size() - 1));
90+
} else {
91+
parts.add(wrapSegment(previousSegment));
92+
}
9793
}
98-
builder.append(wrapSegment(segment));
99-
++idx;
10094
}
101-
return builder.toString();
95+
96+
return parts;
10297
}
10398

10499
/**
105-
* Assemble the FQN of given system, and segments.
106-
*
107-
* <p>This helper method is for internal and testing usage only.
100+
* Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link
101+
* #getFQNParts}.
108102
*/
109-
@Internal
110-
public static String getFqName(String system, Iterable<String> segments) {
111-
return getFqName(system, null, segments);
103+
public void add(
104+
String system,
105+
@Nullable String subtype,
106+
Iterable<String> segments,
107+
@Nullable String lastSegmentSep) {
108+
add(getFQNParts(system, subtype, segments, lastSegmentSep));
112109
}
113110

114111
/**
115-
* Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}.
112+
* Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link
113+
* #getFQNParts}.
116114
*/
117-
public void add(String system, @Nullable String subtype, Iterable<String> segments) {
118-
add(getFqName(system, subtype, segments));
115+
public void add(String system, Iterable<String> segments, @Nullable String lastSegmentSep) {
116+
add(system, null, segments, lastSegmentSep);
119117
}
120118

121119
/**
122-
* Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link #getFqName}.
120+
* Add a FQN (fully-qualified name) to Lineage. Segments will be processed via {@link
121+
* #getFQNParts}.
123122
*/
124123
public void add(String system, Iterable<String> segments) {
125-
add(system, null, segments);
124+
add(system, segments, null);
126125
}
127126

128127
/**
129-
* Adds the given details as Lineage. For asset level lineage the resource location should be
130-
* specified as Dataplex FQN https://cloud.google.com/data-catalog/docs/fully-qualified-names
128+
* Adds the given fqn as lineage.
129+
*
130+
* @param rollupSegments should be an iterable of strings whose concatenation is a valid <a
131+
* href="https://cloud.google.com/data-catalog/docs/fully-qualified-names">Dataplex FQN </a>
132+
* which is already escaped.
133+
* <p>In particular, this means they will often have trailing delimiters.
131134
*/
132-
public void add(String details) {
133-
metric.add(details);
135+
public void add(Iterable<String> rollupSegments) {
136+
ImmutableList<String> segments = ImmutableList.copyOf(rollupSegments);
137+
this.metric.add(segments);
134138
}
135139

136-
/** Query {@link StringSet} metrics from {@link MetricResults}. */
137-
public static Set<String> query(MetricResults results, Type type) {
140+
/**
141+
* Query {@link BoundedTrie} metrics from {@link MetricResults}.
142+
*
143+
* @param results FQNs from the result.
144+
* @param type sources or sinks.
145+
* @param truncatedMarker the marker to use to represent truncated FQNs.
146+
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing
147+
* truncatedMarker.
148+
*/
149+
public static Set<String> query(MetricResults results, Type type, String truncatedMarker) {
138150
MetricsFilter filter =
139151
MetricsFilter.builder()
140152
.addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, type.toString()))
141153
.build();
142154
Set<String> result = new HashSet<>();
143-
for (MetricResult<StringSetResult> metrics : results.queryMetrics(filter).getStringSets()) {
155+
truncatedMarker = truncatedMarker == null ? "*" : truncatedMarker;
156+
for (MetricResult<BoundedTrieResult> metrics : results.queryMetrics(filter).getBoundedTries()) {
144157
try {
145-
result.addAll(metrics.getCommitted().getStringSet());
158+
for (List<String> fqn : metrics.getCommitted().getResult()) {
159+
String end = Boolean.parseBoolean(fqn.get(fqn.size() - 1)) ? truncatedMarker : "";
160+
result.add(String.join("", fqn.subList(0, fqn.size() - 1)) + end);
161+
}
146162
} catch (UnsupportedOperationException unused) {
147163
// MetricsResult.getCommitted throws this exception when runner support missing, just skip.
148164
}
149-
result.addAll(metrics.getAttempted().getStringSet());
165+
for (List<String> fqn : metrics.getAttempted().getResult()) {
166+
String end = Boolean.parseBoolean(fqn.get(fqn.size() - 1)) ? truncatedMarker : "";
167+
result.add(String.join("", fqn.subList(0, fqn.size() - 1)) + end);
168+
}
150169
}
151170
return result;
152171
}
153172

173+
/**
174+
* Query {@link BoundedTrie} metrics from {@link MetricResults}.
175+
*
176+
* @param results FQNs from the result
177+
* @param type sources or sinks
178+
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'.
179+
*/
180+
public static Set<String> query(MetricResults results, Type type) {
181+
return query(results, type, "*");
182+
}
183+
154184
/** Lineage metrics resource types. */
155185
public enum Type {
156-
SOURCE("sources"),
157-
SINK("sinks");
186+
SOURCE("sources_v2"),
187+
SINK("sinks_v2");
158188

159189
private final String name;
160190

@@ -167,4 +197,29 @@ public String toString() {
167197
return name;
168198
}
169199
}
200+
201+
/**
202+
* Wrap segment to valid segment name.
203+
*
204+
* <p>It escapes reserved characters
205+
*
206+
* <ul>
207+
* <li>Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot.
208+
* <li>Only segments containing reserved characters must be escaped.
209+
* <li>Segments cannot be escaped partially (i.e. “bigquery:com`.`google.test”).
210+
* <li>Segments must be escaped using backticks (a.k.a. graves).
211+
* <li>Backticks must be escaped using backtick (i.e. bigquery:`test``test`) and the segment
212+
* itself must be escaped as well.
213+
* </ul>
214+
*/
215+
@Internal
216+
public static String wrapSegment(String value) {
217+
value = value.replace("`", "``"); // Escape backticks
218+
// the escaped backticks will not throw this off since escaping will
219+
// happen if it contains ` in first place.
220+
if (RESERVED_CHARS.matcher(value).find()) {
221+
return String.format("`%s`", value);
222+
}
223+
return value;
224+
}
170225
}

sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.junit.Assert.assertEquals;
2121

2222
import java.util.Map;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.StreamSupport;
2325
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
2426
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
2527
import org.junit.Test;
@@ -30,30 +32,66 @@
3032
@RunWith(JUnit4.class)
3133
public class LineageTest {
3234
@Test
33-
public void testGetFqName() {
35+
public void testWrapSegment() {
3436
Map<String, String> testCases =
3537
ImmutableMap.<String, String>builder()
3638
.put("apache-beam", "apache-beam")
37-
.put("`apache-beam`", "`apache-beam`")
39+
.put("`apache-beam`", "```apache-beam```")
3840
.put("apache.beam", "`apache.beam`")
3941
.put("apache:beam", "`apache:beam`")
4042
.put("apache beam", "`apache beam`")
41-
.put("`apache beam`", "`apache beam`")
43+
.put("apache`beam", "`apache``beam`")
44+
.put("apache` beam", "`apache`` beam`")
45+
.put("`apache-beam", "```apache-beam`")
46+
.put("apache-beam`", "`apache-beam```")
47+
.put("`apache beam`", "```apache beam```")
4248
.put("apache\tbeam", "`apache\tbeam`")
4349
.put("apache\nbeam", "`apache\nbeam`")
4450
.build();
4551
testCases.forEach(
46-
(key, value) ->
47-
assertEquals("apache:" + value, Lineage.getFqName("apache", ImmutableList.of(key))));
52+
(key, value) -> {
53+
Iterable<String> fqnPartsIterator =
54+
Lineage.getFQNParts("apache", null, ImmutableList.of(key), null);
55+
String fqnPartsString = getFqnPartsString(fqnPartsIterator);
56+
assertEquals("apache:" + value, fqnPartsString);
57+
});
4858
testCases.forEach(
49-
(key, value) ->
50-
assertEquals(
51-
"apache:beam:" + value,
52-
Lineage.getFqName("apache", "beam", ImmutableList.of(key))));
59+
(key, value) -> {
60+
Iterable<String> fqnPartsIterator =
61+
Lineage.getFQNParts("apache", "beam", ImmutableList.of(key), null);
62+
String fqnPartsString = getFqnPartsString(fqnPartsIterator);
63+
assertEquals("apache:beam:" + value, fqnPartsString);
64+
});
5365
testCases.forEach(
54-
(key, value) ->
55-
assertEquals(
56-
"apache:beam:" + value + "." + value,
57-
Lineage.getFqName("apache", "beam", ImmutableList.of(key, key))));
66+
(key, value) -> {
67+
Iterable<String> fqnPartsIterator =
68+
Lineage.getFQNParts("apache", "beam", ImmutableList.of(key, key), null);
69+
String fqnPartsString = getFqnPartsString(fqnPartsIterator);
70+
assertEquals("apache:beam:" + value + "." + value, fqnPartsString);
71+
});
72+
}
73+
74+
@Test
75+
public void getFQNParts() {
76+
Iterable<String> simpleFQN =
77+
Lineage.getFQNParts("system", null, ImmutableList.of("project", "dataset", "table"), null);
78+
assertEquals("system:project.dataset.table", getFqnPartsString(simpleFQN));
79+
80+
Iterable<String> subTypeFQN =
81+
Lineage.getFQNParts("system", "topic", ImmutableList.of("project", "topicid"), null);
82+
assertEquals("system:topic:project.topicid", getFqnPartsString(subTypeFQN));
83+
84+
Iterable<String> pathFQN =
85+
Lineage.getFQNParts("system", null, ImmutableList.of("bucket", "dir1/dir2/file"), "/");
86+
assertEquals("system:bucket.dir1/dir2/file", getFqnPartsString(pathFQN));
87+
88+
Iterable<String> pathFQNReserved =
89+
Lineage.getFQNParts("system", null, ImmutableList.of("bucket", "dir1/dir.2/file"), "/");
90+
assertEquals("system:bucket.`dir1/dir.2/file`", getFqnPartsString(pathFQNReserved));
91+
}
92+
93+
private static String getFqnPartsString(Iterable<String> fqnPartsIterable) {
94+
return StreamSupport.stream(fqnPartsIterable.spliterator(), false)
95+
.collect(Collectors.joining(""));
5896
}
5997
}

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ protected void reportLineage(GcsResourceId resourceId, Lineage lineage, LineageL
229229
if (level != LineageLevel.TOP_LEVEL && !path.getObject().isEmpty()) {
230230
segments.add(path.getObject());
231231
}
232-
lineage.add("gcs", segments.build());
232+
lineage.add("gcs", segments.build(), "/");
233233
} else {
234234
LOG.warn("Report Lineage on relative path {} is unsupported", path.getObject());
235235
}

sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ private void verifyLineage(String uri, List<String> expected) {
250250
GcsResourceId path = GcsResourceId.fromGcsPath(GcsPath.fromUri(uri));
251251
Lineage mockLineage = mock(Lineage.class);
252252
gcsFileSystem.reportLineage(path, mockLineage);
253-
verify(mockLineage, times(1)).add("gcs", expected);
253+
verify(mockLineage, times(1)).add("gcs", expected, "/");
254254
}
255255

256256
private StorageObject createStorageObject(String gcsFilename, long fileSize) {

sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ protected void reportLineage(S3ResourceId resourceId, Lineage lineage, LineageLe
668668
if (level != LineageLevel.TOP_LEVEL && !resourceId.getKey().isEmpty()) {
669669
segments.add(resourceId.getKey());
670670
}
671-
lineage.add("s3", segments.build());
671+
lineage.add("s3", segments.build(), "/");
672672
}
673673

674674
/**

0 commit comments

Comments
 (0)