Skip to content

Commit d8b9ef9

Browse files
committed
Add tests
1 parent d054d3e commit d8b9ef9

File tree

7 files changed

+170
-78
lines changed

7 files changed

+170
-78
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@
2424
public interface LineageRegistrar {
2525

2626
@Nullable
27-
Lineage fromOptions(PipelineOptions options, Lineage.Type type);
27+
Lineage fromOptions(PipelineOptions options, Lineage.LineageDirection direction);
2828
}

sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public abstract class Lineage {
5757
// Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot.
5858
private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]");
5959

60+
public enum LineageDirection {
61+
SOURCE,
62+
SINK
63+
}
64+
6065
protected Lineage() {}
6166

6267
@Internal
@@ -79,8 +84,8 @@ public static void initialize(PipelineOptions options) {
7984
}
8085

8186
if (LINEAGE_REVISION.compareAndSet(currentRevision, KV.of(optionsId, nextRevision))) {
82-
Lineage sources = createLineage(options, Type.SOURCE);
83-
Lineage sinks = createLineage(options, Type.SINK);
87+
Lineage sources = createLineage(options, LineageDirection.SOURCE);
88+
Lineage sinks = createLineage(options, LineageDirection.SINK);
8489

8590
SOURCES.set(sources);
8691
SINKS.set(sinks);
@@ -100,26 +105,26 @@ public static void initialize(PipelineOptions options) {
100105
}
101106
}
102107

103-
private static Lineage createLineage(PipelineOptions options, Type type) {
108+
private static Lineage createLineage(PipelineOptions options, LineageDirection direction) {
104109
Set<LineageRegistrar> registrars =
105110
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
106111
registrars.addAll(
107112
Lists.newArrayList(
108113
ServiceLoader.load(LineageRegistrar.class, ReflectHelpers.findClassLoader())));
109114

110115
for (LineageRegistrar registrar : registrars) {
111-
Lineage reporter = registrar.fromOptions(options, type);
116+
Lineage reporter = registrar.fromOptions(options, direction);
112117
if (reporter != null) {
113-
LOG.info("Using {} for lineage type {}", reporter.getClass().getName(), type);
118+
LOG.info("Using {} for lineage direction {}", reporter.getClass().getName(), direction);
114119
return reporter;
115120
}
116121
}
117122

118-
LOG.debug("Using default Metrics-based lineage for type {}", type);
119-
return new MetricsLineage(type);
123+
LOG.debug("Using default Metrics-based lineage for direction {}", direction);
124+
return new MetricsLineage(direction);
120125
}
121126

122-
/** Get {@link Lineage} representing sources and optionally side inputs. */
127+
/** {@link Lineage} representing sources and optionally side inputs. */
123128
public static Lineage getSources() {
124129
Lineage sources = SOURCES.get();
125130
if (sources == null) {
@@ -224,8 +229,8 @@ public void add(String system, Iterable<String> segments) {
224229
* @param truncatedMarker the marker to use to represent truncated FQNs.
225230
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing
226231
* truncatedMarker.
227-
* <p>NOTE: When using a custom LineageReporter plugin, this method will return empty results
228-
* since lineage is not stored in Metrics.
232+
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
233+
* lineage is not stored in Metrics.
229234
*/
230235
public static Set<String> query(MetricResults results, Type type, String truncatedMarker) {
231236
MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type);
@@ -254,8 +259,8 @@ public static Set<String> query(MetricResults results, Type type, String truncat
254259
* @param results FQNs from the result
255260
* @param type sources or sinks
256261
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'.
257-
* <p>NOTE: When using a custom LineageReporter plugin, this method will return empty results
258-
* since lineage is not stored in Metrics.
262+
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
263+
* lineage is not stored in Metrics.
259264
*/
260265
public static Set<String> query(MetricResults results, Type type) {
261266
if (MetricsFlag.lineageRollupEnabled()) {

sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,23 @@
1717
*/
1818
package org.apache.beam.sdk.metrics;
1919

20-
import org.apache.beam.sdk.lineage.LineageReporter;
2120
import org.apache.beam.sdk.metrics.Metrics.MetricsFlag;
2221
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
2322

2423
public class MetricsLineage extends Lineage {
2524

2625
private final Metric metric;
2726

28-
public MetricsLineage(final Lineage.Type type) {
27+
public MetricsLineage(final Lineage.LineageDirection direction) {
28+
// Derive Metrics-specific Type from LineageDirection
29+
Lineage.Type type =
30+
(direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK;
31+
2932
if (MetricsFlag.lineageRollupEnabled()) {
3033
this.metric =
3134
Metrics.boundedTrie(
3235
Lineage.LINEAGE_NAMESPACE,
33-
type == Lineage.Type.SOURCE
36+
direction == Lineage.LineageDirection.SOURCE
3437
? Lineage.Type.SOURCEV2.toString()
3538
: Lineage.Type.SINKV2.toString());
3639
} else {
@@ -47,13 +50,4 @@ public void add(final Iterable<String> rollupSegments) {
4750
((StringSet) this.metric).add(String.join("", segments));
4851
}
4952
}
50-
51-
@Override
52-
public void add(
53-
final String system,
54-
final String subtype,
55-
final Iterable<String> segments,
56-
final String lastSegmentSep) {
57-
add(Lineage.getFQNParts(system, subtype, segments, lastSegmentSep));
58-
}
5953
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.lineage;
19+
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.instanceOf;
22+
import static org.hamcrest.Matchers.notNullValue;
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.fail;
25+
26+
import java.util.ServiceLoader;
27+
import org.apache.beam.sdk.metrics.Lineage;
28+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
import org.junit.runners.JUnit4;
33+
34+
/** Tests for {@link LineageRegistrar} ServiceLoader discovery. */
35+
@RunWith(JUnit4.class)
36+
public class LineageRegistrarTest {
37+
38+
@Test
39+
public void testServiceLoaderDiscovery() {
40+
// Load all LineageRegistrar implementations via ServiceLoader
41+
for (LineageRegistrar registrar :
42+
Lists.newArrayList(ServiceLoader.load(LineageRegistrar.class).iterator())) {
43+
44+
// Check if we found the TestLineageRegistrar
45+
if (registrar instanceof TestLineageRegistrar) {
46+
47+
// Test with SOURCE direction
48+
Lineage sourceLineage =
49+
registrar.fromOptions(PipelineOptionsFactory.create(), Lineage.LineageDirection.SOURCE);
50+
assertThat(sourceLineage, notNullValue());
51+
assertThat(sourceLineage, instanceOf(TestLineage.class));
52+
assertEquals(Lineage.LineageDirection.SOURCE, ((TestLineage) sourceLineage).getDirection());
53+
54+
// Test with SINK direction
55+
Lineage sinkLineage =
56+
registrar.fromOptions(PipelineOptionsFactory.create(), Lineage.LineageDirection.SINK);
57+
assertThat(sinkLineage, notNullValue());
58+
assertThat(sinkLineage, instanceOf(TestLineage.class));
59+
assertEquals(Lineage.LineageDirection.SINK, ((TestLineage) sinkLineage).getDirection());
60+
61+
return;
62+
}
63+
}
64+
65+
fail("Expected to find " + TestLineageRegistrar.class);
66+
}
67+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.lineage;
19+
20+
import org.apache.beam.sdk.metrics.Lineage;
21+
22+
/**
23+
* A test implementation of {@link Lineage} for testing LineageRegistrar ServiceLoader discovery.
24+
*/
25+
public class TestLineage extends Lineage {
26+
27+
private final LineageDirection direction;
28+
29+
public TestLineage(LineageDirection direction) {
30+
this.direction = direction;
31+
}
32+
33+
@Override
34+
public void add(Iterable<String> rollupSegments) {
35+
// Test implementation - no-op for discovery testing
36+
}
37+
38+
public LineageDirection getDirection() {
39+
return direction;
40+
}
41+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.lineage;
19+
20+
import com.google.auto.service.AutoService;
21+
import org.apache.beam.sdk.metrics.Lineage;
22+
import org.apache.beam.sdk.options.PipelineOptions;
23+
import org.checkerframework.checker.nullness.qual.Nullable;
24+
25+
/**
26+
* A test {@link LineageRegistrar} for ServiceLoader discovery testing. This registrar always
27+
* returns a TestLineage instance.
28+
*/
29+
@AutoService(LineageRegistrar.class)
30+
public class TestLineageRegistrar implements LineageRegistrar {
31+
32+
@Override
33+
public @Nullable Lineage fromOptions(
34+
PipelineOptions options, Lineage.LineageDirection direction) {
35+
// For testing, always return a TestLineage instance
36+
return new TestLineage(direction);
37+
}
38+
}

0 commit comments

Comments
 (0)