diff --git a/CHANGES.md b/CHANGES.md index e5632b2608fe..4bbb32015582 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,7 @@ * Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). * (Python) Inference args are now allowed in most model handlers, except where they are explicitly/intentionally disallowed ([#37093](https://github.com/apache/beam/issues/37093)). +* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)). ## Breaking Changes diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 7e2940a2c35b..06e08a1a53df 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -578,6 +578,7 @@ public static void setDefaultPipelineOptions(PipelineOptions options) { // entry to set other PipelineOption determined flags Metrics.setDefaultPipelineOptions(options); + Lineage.setDefaultPipelineOptions(options); while (true) { KV revision = FILESYSTEM_REVISION.get(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java new file mode 100644 index 000000000000..278564701f0c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; + +public interface LineageRegistrar { + + @Nullable + Lineage fromOptions(PipelineOptions options, Lineage.LineageDirection direction); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java new file mode 100644 index 000000000000..30fbde839023 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Lineage tracking support for Apache Beam pipelines. + * + *

This package provides a plugin mechanism to support different lineage implementations through + * the {@link org.apache.beam.sdk.lineage.LineageRegistrar} interface. Lineage implementations can + * be registered and discovered at runtime to track data lineage information during pipeline + * execution. + * + *

For lineage capabilities, see {@link org.apache.beam.sdk.metrics.Lineage}. + */ +@DefaultAnnotation(NonNull.class) +package org.apache.beam.sdk.lineage; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import org.checkerframework.checker.nullness.qual.NonNull; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 1e0124fc518b..0b01579f367b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -17,51 +17,131 @@ */ package org.apache.beam.sdk.metrics; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.lineage.LineageRegistrar; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ -public class Lineage { - +public abstract class Lineage { public static final String LINEAGE_NAMESPACE = "lineage"; - private static final Lineage SOURCES = new Lineage(Type.SOURCE); - private static final Lineage SINKS = new Lineage(Type.SINK); + private static final Logger LOG = LoggerFactory.getLogger(Lineage.class); + private static final AtomicReference SOURCES = new AtomicReference<>(); + private static final AtomicReference SINKS = new AtomicReference<>(); + + private static final AtomicReference> LINEAGE_REVISION = + new AtomicReference<>(); + // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot. private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]"); - private final Metric metric; + public enum LineageDirection { + SOURCE, + SINK + } - private Lineage(Type type) { - if (MetricsFlag.lineageRollupEnabled()) { - this.metric = - Metrics.boundedTrie( - LINEAGE_NAMESPACE, - type == Type.SOURCE ? Type.SOURCEV2.toString() : Type.SINKV2.toString()); - } else { - this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString()); + protected Lineage() {} + + @Internal + public static void setDefaultPipelineOptions(PipelineOptions options) { + checkNotNull(options, "options cannot be null"); + long optionsId = options.getOptionsId(); + int nextRevision = options.revision(); + + while (true) { + KV currentRevision = LINEAGE_REVISION.get(); + + if (currentRevision != null + && currentRevision.getKey().equals(optionsId) + && currentRevision.getValue() >= nextRevision) { + LOG.debug( + "Lineage already initialized with options ID {} revision {}, skipping", + optionsId, + currentRevision.getValue()); + return; + } + + if (LINEAGE_REVISION.compareAndSet(currentRevision, KV.of(optionsId, nextRevision))) { + Lineage sources = createLineage(options, LineageDirection.SOURCE); + Lineage sinks = createLineage(options, LineageDirection.SINK); + + SOURCES.set(sources); + SINKS.set(sinks); + + if (currentRevision == null) { + LOG.info("Lineage initialized with options ID {} revision {}", optionsId, nextRevision); + } else { + LOG.info( + "Lineage re-initialized from options ID {} to {} (revision {} -> {})", + currentRevision.getKey(), + optionsId, + currentRevision.getValue(), + nextRevision); + } + return; + } } } + private static Lineage createLineage(PipelineOptions options, LineageDirection direction) { + Set registrars = + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll( + Lists.newArrayList( + ServiceLoader.load(LineageRegistrar.class, ReflectHelpers.findClassLoader()))); + + for (LineageRegistrar registrar : registrars) { + Lineage reporter = registrar.fromOptions(options, direction); + if (reporter != null) { + LOG.info("Using {} for lineage direction {}", reporter.getClass().getName(), direction); + return reporter; + } + } + + LOG.debug("Using default Metrics-based lineage for direction {}", direction); + return new MetricsLineage(direction); + } + /** {@link Lineage} representing sources and optionally side inputs. */ public static Lineage getSources() { - return SOURCES; + Lineage sources = SOURCES.get(); + if (sources == null) { + setDefaultPipelineOptions(PipelineOptionsFactory.create()); + sources = SOURCES.get(); + } + return sources; } /** {@link Lineage} representing sinks. */ public static Lineage getSinks() { - return SINKS; + Lineage sinks = SINKS.get(); + if (sinks == null) { + setDefaultPipelineOptions(PipelineOptionsFactory.create()); + sinks = SINKS.get(); + } + return sinks; } @VisibleForTesting @@ -139,14 +219,7 @@ public void add(String system, Iterable segments) { * which is already escaped. *

In particular, this means they will often have trailing delimiters. */ - public void add(Iterable rollupSegments) { - ImmutableList segments = ImmutableList.copyOf(rollupSegments); - if (MetricsFlag.lineageRollupEnabled()) { - ((BoundedTrie) this.metric).add(segments); - } else { - ((StringSet) this.metric).add(String.join("", segments)); - } - } + public abstract void add(Iterable rollupSegments); /** * Query {@link BoundedTrie} metrics from {@link MetricResults}. @@ -156,6 +229,8 @@ public void add(Iterable rollupSegments) { * @param truncatedMarker the marker to use to represent truncated FQNs. * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing * truncatedMarker. + *

NOTE: When using a custom Lineage plugin, this method will return empty results since + * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type, String truncatedMarker) { MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type); @@ -184,6 +259,8 @@ public static Set query(MetricResults results, Type type, String truncat * @param results FQNs from the result * @param type sources or sinks * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'. + *

NOTE: When using a custom Lineage plugin, this method will return empty results since + * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type) { if (MetricsFlag.lineageRollupEnabled()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java new file mode 100644 index 000000000000..836abe4c4cc3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +public class MetricsLineage extends Lineage { + + private final Metric metric; + + public MetricsLineage(final Lineage.LineageDirection direction) { + // Derive Metrics-specific Type from LineageDirection + Lineage.Type type = + (direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK; + + if (MetricsFlag.lineageRollupEnabled()) { + this.metric = + Metrics.boundedTrie( + Lineage.LINEAGE_NAMESPACE, + direction == Lineage.LineageDirection.SOURCE + ? Lineage.Type.SOURCEV2.toString() + : Lineage.Type.SINKV2.toString()); + } else { + this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString()); + } + } + + @Override + public void add(final Iterable rollupSegments) { + ImmutableList segments = ImmutableList.copyOf(rollupSegments); + if (MetricsFlag.lineageRollupEnabled()) { + ((BoundedTrie) this.metric).add(segments); + } else { + ((StringSet) this.metric).add(String.join("", segments)); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java new file mode 100644 index 000000000000..efccd6d20370 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.List; +import java.util.ServiceLoader; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link LineageRegistrar} ServiceLoader discovery and DirectRunner integration. */ +@RunWith(JUnit4.class) +public class LineageRegistrarTest { + + /** + * TestWatcher that logs detailed lineage diagnostics only when tests fail. This keeps successful + * test output clean while providing deep debugging for failures. + */ + @Rule + public TestWatcher lineageDebugLogger = + new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + System.err.println("=== Lineage Test Failure Diagnostics ==="); + System.err.println("Test: " + description.getMethodName()); + System.err.println("Error: " + e.getMessage()); + + List sources = TestLineage.getRecordedSources(); + List sinks = TestLineage.getRecordedSinks(); + + System.err.println("\nRecorded Sources (" + sources.size() + "):"); + for (int i = 0; i < sources.size(); i++) { + System.err.println(" [" + i + "] \"" + sources.get(i) + "\""); + } + + System.err.println("\nRecorded Sinks (" + sinks.size() + "):"); + for (int i = 0; i < sinks.size(); i++) { + System.err.println(" [" + i + "] \"" + sinks.get(i) + "\""); + } + + System.err.println("========================================"); + } + }; + + @Before + public void setUp() { + // Clear any recorded lineage from previous tests + TestLineage.clearRecorded(); + } + + /** Helper to create a TestPipeline with test lineage enabled. */ + private TestPipeline createTestPipelineWithLineage() { + TestLineageOptions options = PipelineOptionsFactory.create().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + TestPipeline pipeline = TestPipeline.fromOptions(options); + // Disable enforcement since we're not using @Rule + pipeline.enableAbandonedNodeEnforcement(false); + return pipeline; + } + + @Test + public void testServiceLoaderDiscovery() { + // Load all LineageRegistrar implementations via ServiceLoader + for (LineageRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(LineageRegistrar.class).iterator())) { + + // Check if we found the TestLineageRegistrar + if (registrar instanceof TestLineageRegistrar) { + + // Create options with test lineage enabled + TestLineageOptions options = PipelineOptionsFactory.create().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + + // Test with SOURCE direction + Lineage sourceLineage = registrar.fromOptions(options, Lineage.LineageDirection.SOURCE); + assertThat(sourceLineage, notNullValue()); + assertThat(sourceLineage, instanceOf(TestLineage.class)); + assertEquals(Lineage.LineageDirection.SOURCE, ((TestLineage) sourceLineage).getDirection()); + + // Test with SINK direction + Lineage sinkLineage = registrar.fromOptions(options, Lineage.LineageDirection.SINK); + assertThat(sinkLineage, notNullValue()); + assertThat(sinkLineage, instanceOf(TestLineage.class)); + assertEquals(Lineage.LineageDirection.SINK, ((TestLineage) sinkLineage).getDirection()); + + return; + } + } + + fail("Expected to find " + TestLineageRegistrar.class); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithSimpleFQN() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records lineage + pipeline + .apply(Create.of("a", "b", "c")) + .apply(ParDo.of(new RecordSourceLineageDoFn("testsystem", Arrays.asList("db", "table")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("testsystem:db.table")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithSubtype() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records lineage with subtype + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + ParDo.of( + new RecordSourceLineageWithSubtypeDoFn( + "spanner", + "table", + Arrays.asList("project", "instance", "database", "table")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded with subtype + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("spanner:table:project.instance.database.table")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithLastSegmentSeparator() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records lineage with custom separator + pipeline + .apply(Create.of("x", "y", "z")) + .apply( + ParDo.of( + new RecordSourceLineageWithSeparatorDoFn( + "gcs", Arrays.asList("bucket", "path/to/file.txt"), "/"))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded with separator + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("gcs:bucket.`path/to/file.txt`")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithBothSourcesAndSinks() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline that records both source and sink lineage + pipeline + .apply(Create.of("data1", "data2")) + .apply(ParDo.of(new RecordBothSourceAndSinkLineageDoFn())); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify both source and sink lineage were recorded + List sources = TestLineage.getRecordedSources(); + List sinks = TestLineage.getRecordedSinks(); + + assertThat(sources, hasItem("input-system:input-db.input-table")); + assertThat(sinks, hasItem("output-system:output-db.output-table")); + } + + @Test + @Category(NeedsRunner.class) + public void testLineageIntegrationWithMultipleElements() { + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); + + // Run pipeline with multiple elements to test thread safety + pipeline + .apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .apply(ParDo.of(new RecordSourceLineageDoFn("system", Arrays.asList("resource")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded for all elements (may have duplicates) + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasSize(10)); // One per element + assertThat(sources, hasItem("system:resource")); + } + + // Helper DoFn classes for recording lineage + + /** DoFn that records source lineage with simple FQN. */ + private static class RecordSourceLineageDoFn extends DoFn { + private final String system; + private final List segments; + + RecordSourceLineageDoFn(String system, List segments) { + this.system = system; + this.segments = segments; + } + + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add(system, segments); + c.output(c.element()); + } + } + + /** DoFn that records source lineage with subtype. */ + private static class RecordSourceLineageWithSubtypeDoFn extends DoFn { + private final String system; + private final String subtype; + private final List segments; + + RecordSourceLineageWithSubtypeDoFn(String system, String subtype, List segments) { + this.system = system; + this.subtype = subtype; + this.segments = segments; + } + + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add(system, subtype, segments, null); + c.output(c.element()); + } + } + + /** DoFn that records source lineage with custom last segment separator. */ + private static class RecordSourceLineageWithSeparatorDoFn extends DoFn { + private final String system; + private final List segments; + private final String separator; + + RecordSourceLineageWithSeparatorDoFn(String system, List segments, String separator) { + this.system = system; + this.segments = segments; + this.separator = separator; + } + + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add(system, segments, separator); + c.output(c.element()); + } + } + + /** DoFn that records both source and sink lineage. */ + private static class RecordBothSourceAndSinkLineageDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! + Lineage.getSources().add("input-system", ImmutableList.of("input-db", "input-table")); + // !!! Lineage Caller !!! + Lineage.getSinks().add("output-system", ImmutableList.of("output-db", "output-table")); + c.output(c.element()); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java new file mode 100644 index 000000000000..89997661915f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** + * A test implementation of {@link Lineage} for testing LineageRegistrar ServiceLoader discovery and + * integration testing with DirectRunner. + * + *

This implementation records all lineage FQNs in thread-safe static storage for test + * assertions. + */ +public class TestLineage extends Lineage { + + // Thread-safe storage for recorded lineage, keyed by direction + private static final ConcurrentHashMap> RECORDED_LINEAGE = + new ConcurrentHashMap<>(); + + private final LineageDirection direction; + + public TestLineage(LineageDirection direction) { + this.direction = direction; + } + + @Override + public void add(Iterable rollupSegments) { + // Record the FQN for test assertions + String fqn = String.join("", rollupSegments); + RECORDED_LINEAGE.computeIfAbsent(direction, k -> new CopyOnWriteArrayList<>()).add(fqn); + } + + public LineageDirection getDirection() { + return direction; + } + + /** Returns all recorded source lineage FQNs. */ + public static List getRecordedSources() { + return ImmutableList.copyOf( + RECORDED_LINEAGE.getOrDefault(LineageDirection.SOURCE, ImmutableList.of())); + } + + /** Returns all recorded sink lineage FQNs. */ + public static List getRecordedSinks() { + return ImmutableList.copyOf( + RECORDED_LINEAGE.getOrDefault(LineageDirection.SINK, ImmutableList.of())); + } + + /** Clears all recorded lineage. Should be called in @Before to ensure test isolation. */ + public static void clearRecorded() { + RECORDED_LINEAGE.clear(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageOptions.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageOptions.java new file mode 100644 index 000000000000..e3437a55bb3c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** PipelineOptions for configuring the test lineage plugin. */ +public interface TestLineageOptions extends PipelineOptions { + + @Description("Enable test lineage plugin for integration testing") + @Default.Boolean(false) + Boolean getEnableTestLineage(); + + void setEnableTestLineage(Boolean value); +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java new file mode 100644 index 000000000000..23598d4420cf --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A test {@link LineageRegistrar} for ServiceLoader discovery testing. + * + *

This registrar only activates when {@link TestLineageOptions#getEnableTestLineage()} is true, + * ensuring it doesn't interfere with other tests in the suite. + */ +@AutoService(LineageRegistrar.class) +public class TestLineageRegistrar implements LineageRegistrar { + + @Override + public @Nullable Lineage fromOptions( + PipelineOptions options, Lineage.LineageDirection direction) { + // Only activate if explicitly enabled via TestLineageOptions + TestLineageOptions testOptions = options.as(TestLineageOptions.class); + if (testOptions.getEnableTestLineage()) { + return new TestLineage(direction); + } + // Return null to use default MetricsLineage + return null; + } +}