From df9ff36092567812762f09cbd37d8e517a46c85a Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Mon, 10 Nov 2025 17:20:37 -0500 Subject: [PATCH 01/14] LineageReporter plugin initial implementation --- .../beam/sdk/lineage/LineageReporter.java | 40 +++++++ .../sdk/lineage/LineageReporterRegistrar.java | 12 ++ .../org/apache/beam/sdk/metrics/Lineage.java | 113 ++++++++++++++++-- .../sdk/metrics/MetricsLineageReporter.java | 36 ++++++ 4 files changed, 192 insertions(+), 9 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java new file mode 100644 index 000000000000..a50a72eca9a9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java @@ -0,0 +1,40 @@ +package org.apache.beam.sdk.lineage; + +import com.facebook.presto.hadoop.$internal.org.apache.avro.reflect.Nullable; + +public interface LineageReporter { + /** + * Adds lineage information using pre-formatted FQN segments. + * + * @param rollupSegments FQN segments already escaped per Dataplex format + */ + void add(Iterable rollupSegments); + + /** + * Adds lineage with system, optional subtype, and hierarchical segments. + * + * @param system The data system identifier (e.g., "bigquery", "kafka") + * @param subtype Optional subtype (e.g., "table", "topic"), may be null + * @param segments Hierarchical path segments + * @param lastSegmentSep Separator for the last segment, may be null + */ + void add( + String system, + @Nullable String subtype, + Iterable segments, + @Nullable String lastSegmentSep); + + /** + * Add a FQN (fully-qualified name) to Lineage. + */ + default void add(String system, Iterable segments, @Nullable String sep) { + add(system, null, segments, sep); + } + + /** + * Add a FQN (fully-qualified name) to Lineage. + */ + default void add(String system, Iterable segments) { + add(system, segments, null); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java new file mode 100644 index 000000000000..beba19d4f243 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java @@ -0,0 +1,12 @@ +package org.apache.beam.sdk.lineage; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; + +public interface LineageReporterRegistrar { + + @Nullable + LineageReporter fromOptions(PipelineOptions options, Lineage.Type type); + +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 1e0124fc518b..260b2cd5c181 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -17,27 +17,46 @@ */ package org.apache.beam.sdk.metrics; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.lineage.LineageReporter; +import org.apache.beam.sdk.lineage.LineageReporterRegistrar; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { - + // Namespace for lineage metrics; used to filter queries in Lineage.query() and in MetricsLineageReporter public static final String LINEAGE_NAMESPACE = "lineage"; - private static final Lineage SOURCES = new Lineage(Type.SOURCE); - private static final Lineage SINKS = new Lineage(Type.SINK); + private static final Logger LOG = LoggerFactory.getLogger(Lineage.class); + private static final AtomicReference SOURCES = new AtomicReference<>(); + private static final AtomicReference SINKS = new AtomicReference<>(); + + private static final AtomicReference> LINEAGE_REVISION = + new AtomicReference<>(); + // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot. private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]"); @@ -54,14 +73,84 @@ private Lineage(Type type) { } } - /** {@link Lineage} representing sources and optionally side inputs. */ - public static Lineage getSources() { - return SOURCES; + @Internal + public static void initialize(PipelineOptions options) { + checkNotNull(options, "options cannot be null"); + long optionsId = options.getOptionsId(); + int nextRevision = options.revision(); + + while (true) { + KV currentRevision = LINEAGE_REVISION.get(); + + // Skip re-initialization if same options and revision hasn't changed + if (currentRevision != null + && currentRevision.getKey().equals(optionsId) + && currentRevision.getValue() >= nextRevision) { + LOG.debug("Lineage already initialized with options ID {} revision {}, skipping", + optionsId, currentRevision.getValue()); + return; + } + + if (LINEAGE_REVISION.compareAndSet(currentRevision, KV.of(optionsId, nextRevision))) { + LineageReporter sources = createReporter(options, Type.SOURCE); + LineageReporter sinks = createReporter(options, Type.SINK); + + SOURCES.set(sources); + SINKS.set(sinks); + + if (currentRevision == null) { + LOG.info("Lineage initialized with options ID {} revision {}", optionsId, nextRevision); + } else { + LOG.info("Lineage re-initialized from options ID {} to {} (revision {} -> {})", + currentRevision.getKey(), optionsId, + currentRevision.getValue(), nextRevision); + } + return; + } + } } - /** {@link Lineage} representing sinks. */ - public static Lineage getSinks() { - return SINKS; + /// //// NEW METHOD + private static LineageReporter createReporter(PipelineOptions options, Type type) { + Set registrars = Sets.newTreeSet( + ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll(Lists.newArrayList( + ServiceLoader.load(LineageReporterRegistrar.class, + ReflectHelpers.findClassLoader()))); + + for (LineageReporterRegistrar registrar : registrars) { + LineageReporter reporter = registrar.fromOptions(options, type); + if (reporter != null) { + LOG.info("Using {} for lineage type {}", + reporter.getClass().getName(), type); + return reporter; + } + } + + LOG.debug("Using default Metrics-based lineage for type {}", type); + return new MetricsLineageReporter(type); + } + + /** + * Get {@link LineageReporter} representing sources and optionally side inputs. + */ + public static LineageReporter getSources() { + LineageReporter sources = SOURCES.get(); + if (sources == null) { + initialize(PipelineOptionsFactory.create()); + sources = SOURCES.get(); + } + return sources; + } + + /** {@link LineageReporter} representing sinks. */ + public static LineageReporter getSinks() { + LineageReporter sinks = SINKS.get(); + if (sinks == null) { + initialize(PipelineOptionsFactory.create()); + sinks = SINKS.get(); + } + return sinks; } @VisibleForTesting @@ -156,6 +245,9 @@ public void add(Iterable rollupSegments) { * @param truncatedMarker the marker to use to represent truncated FQNs. * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing * truncatedMarker. + * + *

NOTE: When using a custom LineageReporter plugin, this method + * will return empty results since lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type, String truncatedMarker) { MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type); @@ -184,6 +276,9 @@ public static Set query(MetricResults results, Type type, String truncat * @param results FQNs from the result * @param type sources or sinks * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'. + * + *

NOTE: When using a custom LineageReporter plugin, this method + * will return empty results since lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type) { if (MetricsFlag.lineageRollupEnabled()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java new file mode 100644 index 000000000000..73f57188a3d2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java @@ -0,0 +1,36 @@ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.lineage.LineageReporter; +import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +public class MetricsLineageReporter implements LineageReporter { + + private final Metric metric; + + public MetricsLineageReporter(final Lineage.Type type) { + if (MetricsFlag.lineageRollupEnabled()) { + this.metric = Metrics.boundedTrie( + Lineage.LINEAGE_NAMESPACE, + type == Lineage.Type.SOURCE ? Lineage.Type.SOURCEV2.toString() : Lineage.Type.SINKV2.toString()); + } else { + this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString()); + } + } + + @Override + public void add(final Iterable rollupSegments) { + ImmutableList segments = ImmutableList.copyOf(rollupSegments); + if (MetricsFlag.lineageRollupEnabled()) { + ((BoundedTrie) this.metric).add(segments); + } else { + ((StringSet) this.metric).add(String.join("", segments)); + } + } + + @Override + public void add(final String system, final String subtype, final Iterable segments, + final String lastSegmentSep) { + add(Lineage.getFQNParts(system, subtype, segments, lastSegmentSep)); + } +} From 979e4350ea3723e6bbbae76d8cc5963762e6fb3b Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Mon, 10 Nov 2025 20:28:08 -0500 Subject: [PATCH 02/14] Change to Lineage as a base abstract class --- .../beam/sdk/lineage/LineageRegistrar.java | 28 ++++++ .../beam/sdk/lineage/LineageReporter.java | 93 +++++++++-------- .../sdk/lineage/LineageReporterRegistrar.java | 12 --- .../org/apache/beam/sdk/metrics/Lineage.java | 99 +++++++------------ .../beam/sdk/metrics/MetricsLineage.java | 59 +++++++++++ .../sdk/metrics/MetricsLineageReporter.java | 36 ------- 6 files changed, 178 insertions(+), 149 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java new file mode 100644 index 000000000000..a716e1f529d7 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; + +public interface LineageRegistrar { + + @Nullable + Lineage fromOptions(PipelineOptions options, Lineage.Type type); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java index a50a72eca9a9..45c6605308d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java @@ -1,40 +1,53 @@ -package org.apache.beam.sdk.lineage; - -import com.facebook.presto.hadoop.$internal.org.apache.avro.reflect.Nullable; - -public interface LineageReporter { - /** - * Adds lineage information using pre-formatted FQN segments. - * - * @param rollupSegments FQN segments already escaped per Dataplex format - */ - void add(Iterable rollupSegments); - - /** - * Adds lineage with system, optional subtype, and hierarchical segments. - * - * @param system The data system identifier (e.g., "bigquery", "kafka") - * @param subtype Optional subtype (e.g., "table", "topic"), may be null - * @param segments Hierarchical path segments - * @param lastSegmentSep Separator for the last segment, may be null - */ - void add( - String system, - @Nullable String subtype, - Iterable segments, - @Nullable String lastSegmentSep); - - /** - * Add a FQN (fully-qualified name) to Lineage. - */ - default void add(String system, Iterable segments, @Nullable String sep) { - add(system, null, segments, sep); - } - - /** - * Add a FQN (fully-qualified name) to Lineage. - */ - default void add(String system, Iterable segments) { - add(system, segments, null); - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package org.apache.beam.sdk.lineage; +// +//import com.facebook.presto.hadoop.$internal.org.apache.avro.reflect.Nullable; +// +//public interface LineageReporter { +// /** +// * Adds lineage information using pre-formatted FQN segments. +// * +// * @param rollupSegments FQN segments already escaped per Dataplex format +// */ +// void add(Iterable rollupSegments); +// +// /** +// * Adds lineage with system, optional subtype, and hierarchical segments. +// * +// * @param system The data system identifier (e.g., "bigquery", "kafka") +// * @param subtype Optional subtype (e.g., "table", "topic"), may be null +// * @param segments Hierarchical path segments +// * @param lastSegmentSep Separator for the last segment, may be null +// */ +// void add( +// String system, +// @Nullable String subtype, +// Iterable segments, +// @Nullable String lastSegmentSep); +// +// /** Add a FQN (fully-qualified name) to Lineage. */ +// default void add(String system, Iterable segments, @Nullable String sep) { +// add(system, null, segments, sep); +// } +// +// /** Add a FQN (fully-qualified name) to Lineage. */ +// default void add(String system, Iterable segments) { +// add(system, segments, null); +// } +//} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java deleted file mode 100644 index beba19d4f243..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporterRegistrar.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.beam.sdk.lineage; - -import javax.annotation.Nullable; -import org.apache.beam.sdk.metrics.Lineage; -import org.apache.beam.sdk.options.PipelineOptions; - -public interface LineageReporterRegistrar { - - @Nullable - LineageReporter fromOptions(PipelineOptions options, Lineage.Type type); - -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 260b2cd5c181..5ef0819eb44f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -28,8 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.lineage.LineageReporter; -import org.apache.beam.sdk.lineage.LineageReporterRegistrar; +import org.apache.beam.sdk.lineage.LineageRegistrar; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -37,7 +36,6 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; @@ -47,12 +45,11 @@ /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ -public class Lineage { - // Namespace for lineage metrics; used to filter queries in Lineage.query() and in MetricsLineageReporter +public abstract class Lineage { public static final String LINEAGE_NAMESPACE = "lineage"; private static final Logger LOG = LoggerFactory.getLogger(Lineage.class); - private static final AtomicReference SOURCES = new AtomicReference<>(); - private static final AtomicReference SINKS = new AtomicReference<>(); + private static final AtomicReference SOURCES = new AtomicReference<>(); + private static final AtomicReference SINKS = new AtomicReference<>(); private static final AtomicReference> LINEAGE_REVISION = new AtomicReference<>(); @@ -60,18 +57,7 @@ public class Lineage { // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot. private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]"); - private final Metric metric; - - private Lineage(Type type) { - if (MetricsFlag.lineageRollupEnabled()) { - this.metric = - Metrics.boundedTrie( - LINEAGE_NAMESPACE, - type == Type.SOURCE ? Type.SOURCEV2.toString() : Type.SINKV2.toString()); - } else { - this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString()); - } - } + protected Lineage() {} @Internal public static void initialize(PipelineOptions options) { @@ -82,18 +68,19 @@ public static void initialize(PipelineOptions options) { while (true) { KV currentRevision = LINEAGE_REVISION.get(); - // Skip re-initialization if same options and revision hasn't changed if (currentRevision != null && currentRevision.getKey().equals(optionsId) && currentRevision.getValue() >= nextRevision) { - LOG.debug("Lineage already initialized with options ID {} revision {}, skipping", - optionsId, currentRevision.getValue()); + LOG.debug( + "Lineage already initialized with options ID {} revision {}, skipping", + optionsId, + currentRevision.getValue()); return; } if (LINEAGE_REVISION.compareAndSet(currentRevision, KV.of(optionsId, nextRevision))) { - LineageReporter sources = createReporter(options, Type.SOURCE); - LineageReporter sinks = createReporter(options, Type.SINK); + Lineage sources = createLineage(options, Type.SOURCE); + Lineage sinks = createLineage(options, Type.SINK); SOURCES.set(sources); SINKS.set(sinks); @@ -101,41 +88,40 @@ public static void initialize(PipelineOptions options) { if (currentRevision == null) { LOG.info("Lineage initialized with options ID {} revision {}", optionsId, nextRevision); } else { - LOG.info("Lineage re-initialized from options ID {} to {} (revision {} -> {})", - currentRevision.getKey(), optionsId, - currentRevision.getValue(), nextRevision); + LOG.info( + "Lineage re-initialized from options ID {} to {} (revision {} -> {})", + currentRevision.getKey(), + optionsId, + currentRevision.getValue(), + nextRevision); } return; } } } - /// //// NEW METHOD - private static LineageReporter createReporter(PipelineOptions options, Type type) { - Set registrars = Sets.newTreeSet( - ReflectHelpers.ObjectsClassComparator.INSTANCE); - registrars.addAll(Lists.newArrayList( - ServiceLoader.load(LineageReporterRegistrar.class, - ReflectHelpers.findClassLoader()))); + private static Lineage createLineage(PipelineOptions options, Type type) { + Set registrars = + Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); + registrars.addAll( + Lists.newArrayList( + ServiceLoader.load(LineageRegistrar.class, ReflectHelpers.findClassLoader()))); - for (LineageReporterRegistrar registrar : registrars) { - LineageReporter reporter = registrar.fromOptions(options, type); + for (LineageRegistrar registrar : registrars) { + Lineage reporter = registrar.fromOptions(options, type); if (reporter != null) { - LOG.info("Using {} for lineage type {}", - reporter.getClass().getName(), type); + LOG.info("Using {} for lineage type {}", reporter.getClass().getName(), type); return reporter; } } LOG.debug("Using default Metrics-based lineage for type {}", type); - return new MetricsLineageReporter(type); + return new MetricsLineage(type); } - /** - * Get {@link LineageReporter} representing sources and optionally side inputs. - */ - public static LineageReporter getSources() { - LineageReporter sources = SOURCES.get(); + /** Get {@link Lineage} representing sources and optionally side inputs. */ + public static Lineage getSources() { + Lineage sources = SOURCES.get(); if (sources == null) { initialize(PipelineOptionsFactory.create()); sources = SOURCES.get(); @@ -143,9 +129,9 @@ public static LineageReporter getSources() { return sources; } - /** {@link LineageReporter} representing sinks. */ - public static LineageReporter getSinks() { - LineageReporter sinks = SINKS.get(); + /** {@link Lineage} representing sinks. */ + public static Lineage getSinks() { + Lineage sinks = SINKS.get(); if (sinks == null) { initialize(PipelineOptionsFactory.create()); sinks = SINKS.get(); @@ -228,14 +214,7 @@ public void add(String system, Iterable segments) { * which is already escaped. *

In particular, this means they will often have trailing delimiters. */ - public void add(Iterable rollupSegments) { - ImmutableList segments = ImmutableList.copyOf(rollupSegments); - if (MetricsFlag.lineageRollupEnabled()) { - ((BoundedTrie) this.metric).add(segments); - } else { - ((StringSet) this.metric).add(String.join("", segments)); - } - } + public abstract void add(Iterable rollupSegments); /** * Query {@link BoundedTrie} metrics from {@link MetricResults}. @@ -245,9 +224,8 @@ public void add(Iterable rollupSegments) { * @param truncatedMarker the marker to use to represent truncated FQNs. * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing * truncatedMarker. - * - *

NOTE: When using a custom LineageReporter plugin, this method - * will return empty results since lineage is not stored in Metrics. + *

NOTE: When using a custom LineageReporter plugin, this method will return empty results + * since lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type, String truncatedMarker) { MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type); @@ -276,9 +254,8 @@ public static Set query(MetricResults results, Type type, String truncat * @param results FQNs from the result * @param type sources or sinks * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'. - * - *

NOTE: When using a custom LineageReporter plugin, this method - * will return empty results since lineage is not stored in Metrics. + *

NOTE: When using a custom LineageReporter plugin, this method will return empty results + * since lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type) { if (MetricsFlag.lineageRollupEnabled()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java new file mode 100644 index 000000000000..9076cfc525eb --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.lineage.LineageReporter; +import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +public class MetricsLineage extends Lineage { + + private final Metric metric; + + public MetricsLineage(final Lineage.Type type) { + if (MetricsFlag.lineageRollupEnabled()) { + this.metric = + Metrics.boundedTrie( + Lineage.LINEAGE_NAMESPACE, + type == Lineage.Type.SOURCE + ? Lineage.Type.SOURCEV2.toString() + : Lineage.Type.SINKV2.toString()); + } else { + this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString()); + } + } + + @Override + public void add(final Iterable rollupSegments) { + ImmutableList segments = ImmutableList.copyOf(rollupSegments); + if (MetricsFlag.lineageRollupEnabled()) { + ((BoundedTrie) this.metric).add(segments); + } else { + ((StringSet) this.metric).add(String.join("", segments)); + } + } + + @Override + public void add( + final String system, + final String subtype, + final Iterable segments, + final String lastSegmentSep) { + add(Lineage.getFQNParts(system, subtype, segments, lastSegmentSep)); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java deleted file mode 100644 index 73f57188a3d2..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineageReporter.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.beam.sdk.metrics; - -import org.apache.beam.sdk.lineage.LineageReporter; -import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; - -public class MetricsLineageReporter implements LineageReporter { - - private final Metric metric; - - public MetricsLineageReporter(final Lineage.Type type) { - if (MetricsFlag.lineageRollupEnabled()) { - this.metric = Metrics.boundedTrie( - Lineage.LINEAGE_NAMESPACE, - type == Lineage.Type.SOURCE ? Lineage.Type.SOURCEV2.toString() : Lineage.Type.SINKV2.toString()); - } else { - this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString()); - } - } - - @Override - public void add(final Iterable rollupSegments) { - ImmutableList segments = ImmutableList.copyOf(rollupSegments); - if (MetricsFlag.lineageRollupEnabled()) { - ((BoundedTrie) this.metric).add(segments); - } else { - ((StringSet) this.metric).add(String.join("", segments)); - } - } - - @Override - public void add(final String system, final String subtype, final Iterable segments, - final String lastSegmentSep) { - add(Lineage.getFQNParts(system, subtype, segments, lastSegmentSep)); - } -} From bd5f44c62b5722202888c08c66a70c275433438d Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Mon, 10 Nov 2025 21:14:50 -0500 Subject: [PATCH 03/14] Add tests --- .../beam/sdk/lineage/LineageRegistrar.java | 2 +- .../beam/sdk/lineage/LineageReporter.java | 53 --------------- .../org/apache/beam/sdk/metrics/Lineage.java | 29 ++++---- .../beam/sdk/metrics/MetricsLineage.java | 18 ++--- .../sdk/lineage/LineageRegistrarTest.java | 67 +++++++++++++++++++ .../apache/beam/sdk/lineage/TestLineage.java | 41 ++++++++++++ .../sdk/lineage/TestLineageRegistrar.java | 38 +++++++++++ 7 files changed, 170 insertions(+), 78 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java index a716e1f529d7..278564701f0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageRegistrar.java @@ -24,5 +24,5 @@ public interface LineageRegistrar { @Nullable - Lineage fromOptions(PipelineOptions options, Lineage.Type type); + Lineage fromOptions(PipelineOptions options, Lineage.LineageDirection direction); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java deleted file mode 100644 index 45c6605308d9..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/LineageReporter.java +++ /dev/null @@ -1,53 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package org.apache.beam.sdk.lineage; -// -//import com.facebook.presto.hadoop.$internal.org.apache.avro.reflect.Nullable; -// -//public interface LineageReporter { -// /** -// * Adds lineage information using pre-formatted FQN segments. -// * -// * @param rollupSegments FQN segments already escaped per Dataplex format -// */ -// void add(Iterable rollupSegments); -// -// /** -// * Adds lineage with system, optional subtype, and hierarchical segments. -// * -// * @param system The data system identifier (e.g., "bigquery", "kafka") -// * @param subtype Optional subtype (e.g., "table", "topic"), may be null -// * @param segments Hierarchical path segments -// * @param lastSegmentSep Separator for the last segment, may be null -// */ -// void add( -// String system, -// @Nullable String subtype, -// Iterable segments, -// @Nullable String lastSegmentSep); -// -// /** Add a FQN (fully-qualified name) to Lineage. */ -// default void add(String system, Iterable segments, @Nullable String sep) { -// add(system, null, segments, sep); -// } -// -// /** Add a FQN (fully-qualified name) to Lineage. */ -// default void add(String system, Iterable segments) { -// add(system, segments, null); -// } -//} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index 5ef0819eb44f..cafbb9fa6d39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -57,6 +57,11 @@ public abstract class Lineage { // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot. private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]"); + public enum LineageDirection { + SOURCE, + SINK + } + protected Lineage() {} @Internal @@ -79,8 +84,8 @@ public static void initialize(PipelineOptions options) { } if (LINEAGE_REVISION.compareAndSet(currentRevision, KV.of(optionsId, nextRevision))) { - Lineage sources = createLineage(options, Type.SOURCE); - Lineage sinks = createLineage(options, Type.SINK); + Lineage sources = createLineage(options, LineageDirection.SOURCE); + Lineage sinks = createLineage(options, LineageDirection.SINK); SOURCES.set(sources); SINKS.set(sinks); @@ -100,7 +105,7 @@ public static void initialize(PipelineOptions options) { } } - private static Lineage createLineage(PipelineOptions options, Type type) { + private static Lineage createLineage(PipelineOptions options, LineageDirection direction) { Set registrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE); registrars.addAll( @@ -108,18 +113,18 @@ private static Lineage createLineage(PipelineOptions options, Type type) { ServiceLoader.load(LineageRegistrar.class, ReflectHelpers.findClassLoader()))); for (LineageRegistrar registrar : registrars) { - Lineage reporter = registrar.fromOptions(options, type); + Lineage reporter = registrar.fromOptions(options, direction); if (reporter != null) { - LOG.info("Using {} for lineage type {}", reporter.getClass().getName(), type); + LOG.info("Using {} for lineage direction {}", reporter.getClass().getName(), direction); return reporter; } } - LOG.debug("Using default Metrics-based lineage for type {}", type); - return new MetricsLineage(type); + LOG.debug("Using default Metrics-based lineage for direction {}", direction); + return new MetricsLineage(direction); } - /** Get {@link Lineage} representing sources and optionally side inputs. */ + /** {@link Lineage} representing sources and optionally side inputs. */ public static Lineage getSources() { Lineage sources = SOURCES.get(); if (sources == null) { @@ -224,8 +229,8 @@ public void add(String system, Iterable segments) { * @param truncatedMarker the marker to use to represent truncated FQNs. * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing * truncatedMarker. - *

NOTE: When using a custom LineageReporter plugin, this method will return empty results - * since lineage is not stored in Metrics. + *

NOTE: When using a custom Lineage plugin, this method will return empty results since + * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type, String truncatedMarker) { MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type); @@ -254,8 +259,8 @@ public static Set query(MetricResults results, Type type, String truncat * @param results FQNs from the result * @param type sources or sinks * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'. - *

NOTE: When using a custom LineageReporter plugin, this method will return empty results - * since lineage is not stored in Metrics. + *

NOTE: When using a custom Lineage plugin, this method will return empty results since + * lineage is not stored in Metrics. */ public static Set query(MetricResults results, Type type) { if (MetricsFlag.lineageRollupEnabled()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java index 9076cfc525eb..836abe4c4cc3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsLineage.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.metrics; -import org.apache.beam.sdk.lineage.LineageReporter; import org.apache.beam.sdk.metrics.Metrics.MetricsFlag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -25,12 +24,16 @@ public class MetricsLineage extends Lineage { private final Metric metric; - public MetricsLineage(final Lineage.Type type) { + public MetricsLineage(final Lineage.LineageDirection direction) { + // Derive Metrics-specific Type from LineageDirection + Lineage.Type type = + (direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK; + if (MetricsFlag.lineageRollupEnabled()) { this.metric = Metrics.boundedTrie( Lineage.LINEAGE_NAMESPACE, - type == Lineage.Type.SOURCE + direction == Lineage.LineageDirection.SOURCE ? Lineage.Type.SOURCEV2.toString() : Lineage.Type.SINKV2.toString()); } else { @@ -47,13 +50,4 @@ public void add(final Iterable rollupSegments) { ((StringSet) this.metric).add(String.join("", segments)); } } - - @Override - public void add( - final String system, - final String subtype, - final Iterable segments, - final String lastSegmentSep) { - add(Lineage.getFQNParts(system, subtype, segments, lastSegmentSep)); - } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java new file mode 100644 index 000000000000..20370a2ac161 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ServiceLoader; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link LineageRegistrar} ServiceLoader discovery. */ +@RunWith(JUnit4.class) +public class LineageRegistrarTest { + + @Test + public void testServiceLoaderDiscovery() { + // Load all LineageRegistrar implementations via ServiceLoader + for (LineageRegistrar registrar : + Lists.newArrayList(ServiceLoader.load(LineageRegistrar.class).iterator())) { + + // Check if we found the TestLineageRegistrar + if (registrar instanceof TestLineageRegistrar) { + + // Test with SOURCE direction + Lineage sourceLineage = + registrar.fromOptions(PipelineOptionsFactory.create(), Lineage.LineageDirection.SOURCE); + assertThat(sourceLineage, notNullValue()); + assertThat(sourceLineage, instanceOf(TestLineage.class)); + assertEquals(Lineage.LineageDirection.SOURCE, ((TestLineage) sourceLineage).getDirection()); + + // Test with SINK direction + Lineage sinkLineage = + registrar.fromOptions(PipelineOptionsFactory.create(), Lineage.LineageDirection.SINK); + assertThat(sinkLineage, notNullValue()); + assertThat(sinkLineage, instanceOf(TestLineage.class)); + assertEquals(Lineage.LineageDirection.SINK, ((TestLineage) sinkLineage).getDirection()); + + return; + } + } + + fail("Expected to find " + TestLineageRegistrar.class); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java new file mode 100644 index 000000000000..7ccd73a1e2d0 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import org.apache.beam.sdk.metrics.Lineage; + +/** + * A test implementation of {@link Lineage} for testing LineageRegistrar ServiceLoader discovery. + */ +public class TestLineage extends Lineage { + + private final LineageDirection direction; + + public TestLineage(LineageDirection direction) { + this.direction = direction; + } + + @Override + public void add(Iterable rollupSegments) { + // Test implementation - no-op for discovery testing + } + + public LineageDirection getDirection() { + return direction; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java new file mode 100644 index 000000000000..3e06bfca4c82 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A test {@link LineageRegistrar} for ServiceLoader discovery testing. This registrar always + * returns a TestLineage instance. + */ +@AutoService(LineageRegistrar.class) +public class TestLineageRegistrar implements LineageRegistrar { + + @Override + public @Nullable Lineage fromOptions( + PipelineOptions options, Lineage.LineageDirection direction) { + // For testing, always return a TestLineage instance + return new TestLineage(direction); + } +} From 74912ac4c14eeaafe9a4f4ca58253743512bff34 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 11 Nov 2025 10:17:54 -0500 Subject: [PATCH 04/14] Add a TestPipeline integration test --- .../sdk/lineage/LineageRegistrarTest.java | 212 +++++++++++++++++- .../apache/beam/sdk/lineage/TestLineage.java | 35 ++- .../beam/sdk/lineage/TestLineageOptions.java | 32 +++ .../sdk/lineage/TestLineageRegistrar.java | 15 +- 4 files changed, 283 insertions(+), 11 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageOptions.java diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java index 20370a2ac161..66959dee8c0e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -18,23 +18,46 @@ package org.apache.beam.sdk.lineage; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import java.util.Arrays; +import java.util.List; import java.util.ServiceLoader; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link LineageRegistrar} ServiceLoader discovery. */ +/** + * Tests for {@link LineageRegistrar} ServiceLoader discovery and DirectRunner integration. + */ @RunWith(JUnit4.class) public class LineageRegistrarTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Before + public void setUp() { + // Clear any recorded lineage from previous tests + TestLineage.clearRecorded(); + } + @Test public void testServiceLoaderDiscovery() { // Load all LineageRegistrar implementations via ServiceLoader @@ -44,16 +67,18 @@ public void testServiceLoaderDiscovery() { // Check if we found the TestLineageRegistrar if (registrar instanceof TestLineageRegistrar) { + // Create options with test lineage enabled + TestLineageOptions options = PipelineOptionsFactory.create().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + // Test with SOURCE direction - Lineage sourceLineage = - registrar.fromOptions(PipelineOptionsFactory.create(), Lineage.LineageDirection.SOURCE); + Lineage sourceLineage = registrar.fromOptions(options, Lineage.LineageDirection.SOURCE); assertThat(sourceLineage, notNullValue()); assertThat(sourceLineage, instanceOf(TestLineage.class)); assertEquals(Lineage.LineageDirection.SOURCE, ((TestLineage) sourceLineage).getDirection()); // Test with SINK direction - Lineage sinkLineage = - registrar.fromOptions(PipelineOptionsFactory.create(), Lineage.LineageDirection.SINK); + Lineage sinkLineage = registrar.fromOptions(options, Lineage.LineageDirection.SINK); assertThat(sinkLineage, notNullValue()); assertThat(sinkLineage, instanceOf(TestLineage.class)); assertEquals(Lineage.LineageDirection.SINK, ((TestLineage) sinkLineage).getDirection()); @@ -64,4 +89,181 @@ public void testServiceLoaderDiscovery() { fail("Expected to find " + TestLineageRegistrar.class); } + + @Test + public void testLineageIntegrationWithSimpleFQN() { + // Enable test lineage plugin + TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + Lineage.initialize(pipeline.getOptions()); + + // Run pipeline that records lineage + pipeline + .apply(Create.of("a", "b", "c")) + .apply(ParDo.of(new RecordSourceLineageDoFn("testsystem", Arrays.asList("db", "table")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("testsystem:db.table")); + } + + @Test + public void testLineageIntegrationWithSubtype() { + // Enable test lineage plugin + TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + Lineage.initialize(pipeline.getOptions()); + + // Run pipeline that records lineage with subtype + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + ParDo.of( + new RecordSourceLineageWithSubtypeDoFn( + "spanner", "table", Arrays.asList("project", "instance", "database", "table")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded with subtype + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("spanner:table:project.instance.database.table")); + } + + @Test + public void testLineageIntegrationWithLastSegmentSeparator() { + // Enable test lineage plugin + TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + Lineage.initialize(pipeline.getOptions()); + + // Run pipeline that records lineage with custom separator + pipeline + .apply(Create.of("x", "y", "z")) + .apply( + ParDo.of( + new RecordSourceLineageWithSeparatorDoFn( + "gcs", Arrays.asList("bucket", "path/to/file.txt"), "/"))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded with separator + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasItem("gcs:bucket.path/to/file.txt")); + } + + @Test + public void testLineageIntegrationWithBothSourcesAndSinks() { + // Enable test lineage plugin + TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + Lineage.initialize(pipeline.getOptions()); + + // Run pipeline that records both source and sink lineage + pipeline + .apply(Create.of("data1", "data2")) + .apply(ParDo.of(new RecordBothSourceAndSinkLineageDoFn())); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify both source and sink lineage were recorded + List sources = TestLineage.getRecordedSources(); + List sinks = TestLineage.getRecordedSinks(); + + assertThat(sources, hasItem("input-system:input-db.input-table")); + assertThat(sinks, hasItem("output-system:output-db.output-table")); + } + + @Test + public void testLineageIntegrationWithMultipleElements() { + // Enable test lineage plugin + TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + Lineage.initialize(pipeline.getOptions()); + + // Run pipeline with multiple elements to test thread safety + pipeline + .apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .apply(ParDo.of(new RecordSourceLineageDoFn("system", Arrays.asList("resource")))); + + PipelineResult result = pipeline.run(); + result.waitUntilFinish(); + + // Verify lineage was recorded for all elements (may have duplicates) + List sources = TestLineage.getRecordedSources(); + assertThat(sources, hasSize(10)); // One per element + assertThat(sources, hasItem("system:resource")); + } + + // Helper DoFn classes for recording lineage + + /** DoFn that records source lineage with simple FQN. */ + private static class RecordSourceLineageDoFn extends DoFn { + private final String system; + private final List segments; + + RecordSourceLineageDoFn(String system, List segments) { + this.system = system; + this.segments = segments; + } + + @ProcessElement + public void processElement(ProcessContext c) { + Lineage.getSources().add(system, segments); + c.output(c.element()); + } + } + + /** DoFn that records source lineage with subtype. */ + private static class RecordSourceLineageWithSubtypeDoFn extends DoFn { + private final String system; + private final String subtype; + private final List segments; + + RecordSourceLineageWithSubtypeDoFn(String system, String subtype, List segments) { + this.system = system; + this.subtype = subtype; + this.segments = segments; + } + + @ProcessElement + public void processElement(ProcessContext c) { + Lineage.getSources().add(system, subtype, segments, null); + c.output(c.element()); + } + } + + /** DoFn that records source lineage with custom last segment separator. */ + private static class RecordSourceLineageWithSeparatorDoFn extends DoFn { + private final String system; + private final List segments; + private final String separator; + + RecordSourceLineageWithSeparatorDoFn(String system, List segments, String separator) { + this.system = system; + this.segments = segments; + this.separator = separator; + } + + @ProcessElement + public void processElement(ProcessContext c) { + Lineage.getSources().add(system, segments, separator); + c.output(c.element()); + } + } + + /** DoFn that records both source and sink lineage. */ + private static class RecordBothSourceAndSinkLineageDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) { + Lineage.getSources().add("input-system", ImmutableList.of("input-db", "input-table")); + Lineage.getSinks().add("output-system", ImmutableList.of("output-db", "output-table")); + c.output(c.element()); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java index 7ccd73a1e2d0..c0b3e67470c2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java @@ -17,13 +17,25 @@ */ package org.apache.beam.sdk.lineage; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** - * A test implementation of {@link Lineage} for testing LineageRegistrar ServiceLoader discovery. + * A test implementation of {@link Lineage} for testing LineageRegistrar ServiceLoader discovery + * and integration testing with DirectRunner. + * + *

This implementation records all lineage FQNs in thread-safe static storage for test + * assertions. */ public class TestLineage extends Lineage { + // Thread-safe storage for recorded lineage, keyed by direction + private static final ConcurrentHashMap> RECORDED_LINEAGE = + new ConcurrentHashMap<>(); + private final LineageDirection direction; public TestLineage(LineageDirection direction) { @@ -32,10 +44,29 @@ public TestLineage(LineageDirection direction) { @Override public void add(Iterable rollupSegments) { - // Test implementation - no-op for discovery testing + // Record the FQN for test assertions + String fqn = String.join("", rollupSegments); + RECORDED_LINEAGE.computeIfAbsent(direction, k -> new CopyOnWriteArrayList<>()).add(fqn); } public LineageDirection getDirection() { return direction; } + + /** Returns all recorded source lineage FQNs. */ + public static List getRecordedSources() { + return ImmutableList.copyOf( + RECORDED_LINEAGE.getOrDefault(LineageDirection.SOURCE, ImmutableList.of())); + } + + /** Returns all recorded sink lineage FQNs. */ + public static List getRecordedSinks() { + return ImmutableList.copyOf( + RECORDED_LINEAGE.getOrDefault(LineageDirection.SINK, ImmutableList.of())); + } + + /** Clears all recorded lineage. Should be called in @Before to ensure test isolation. */ + public static void clearRecorded() { + RECORDED_LINEAGE.clear(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageOptions.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageOptions.java new file mode 100644 index 000000000000..e3437a55bb3c --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.lineage; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** PipelineOptions for configuring the test lineage plugin. */ +public interface TestLineageOptions extends PipelineOptions { + + @Description("Enable test lineage plugin for integration testing") + @Default.Boolean(false) + Boolean getEnableTestLineage(); + + void setEnableTestLineage(Boolean value); +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java index 3e06bfca4c82..23598d4420cf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineageRegistrar.java @@ -23,8 +23,10 @@ import org.checkerframework.checker.nullness.qual.Nullable; /** - * A test {@link LineageRegistrar} for ServiceLoader discovery testing. This registrar always - * returns a TestLineage instance. + * A test {@link LineageRegistrar} for ServiceLoader discovery testing. + * + *

This registrar only activates when {@link TestLineageOptions#getEnableTestLineage()} is true, + * ensuring it doesn't interfere with other tests in the suite. */ @AutoService(LineageRegistrar.class) public class TestLineageRegistrar implements LineageRegistrar { @@ -32,7 +34,12 @@ public class TestLineageRegistrar implements LineageRegistrar { @Override public @Nullable Lineage fromOptions( PipelineOptions options, Lineage.LineageDirection direction) { - // For testing, always return a TestLineage instance - return new TestLineage(direction); + // Only activate if explicitly enabled via TestLineageOptions + TestLineageOptions testOptions = options.as(TestLineageOptions.class); + if (testOptions.getEnableTestLineage()) { + return new TestLineage(direction); + } + // Return null to use default MetricsLineage + return null; } } From 4fdc0c2fda1dcc25e211577eea213e6100bb1780 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 11 Nov 2025 14:23:24 -0500 Subject: [PATCH 05/14] Initialize Lineage from FileSystem --- .../main/java/org/apache/beam/sdk/io/FileSystems.java | 1 + .../main/java/org/apache/beam/sdk/metrics/Lineage.java | 6 +++--- .../apache/beam/sdk/lineage/LineageRegistrarTest.java | 10 +++++----- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 7e2940a2c35b..06e08a1a53df 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -578,6 +578,7 @@ public static void setDefaultPipelineOptions(PipelineOptions options) { // entry to set other PipelineOption determined flags Metrics.setDefaultPipelineOptions(options); + Lineage.setDefaultPipelineOptions(options); while (true) { KV revision = FILESYSTEM_REVISION.get(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java index cafbb9fa6d39..0b01579f367b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java @@ -65,7 +65,7 @@ public enum LineageDirection { protected Lineage() {} @Internal - public static void initialize(PipelineOptions options) { + public static void setDefaultPipelineOptions(PipelineOptions options) { checkNotNull(options, "options cannot be null"); long optionsId = options.getOptionsId(); int nextRevision = options.revision(); @@ -128,7 +128,7 @@ private static Lineage createLineage(PipelineOptions options, LineageDirection d public static Lineage getSources() { Lineage sources = SOURCES.get(); if (sources == null) { - initialize(PipelineOptionsFactory.create()); + setDefaultPipelineOptions(PipelineOptionsFactory.create()); sources = SOURCES.get(); } return sources; @@ -138,7 +138,7 @@ public static Lineage getSources() { public static Lineage getSinks() { Lineage sinks = SINKS.get(); if (sinks == null) { - initialize(PipelineOptionsFactory.create()); + setDefaultPipelineOptions(PipelineOptionsFactory.create()); sinks = SINKS.get(); } return sinks; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java index 66959dee8c0e..c03a487e0ba9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -95,7 +95,7 @@ public void testLineageIntegrationWithSimpleFQN() { // Enable test lineage plugin TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); options.setEnableTestLineage(true); - Lineage.initialize(pipeline.getOptions()); + Lineage.setDefaultPipelineOptions(pipeline.getOptions()); // Run pipeline that records lineage pipeline @@ -115,7 +115,7 @@ public void testLineageIntegrationWithSubtype() { // Enable test lineage plugin TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); options.setEnableTestLineage(true); - Lineage.initialize(pipeline.getOptions()); + Lineage.setDefaultPipelineOptions(pipeline.getOptions()); // Run pipeline that records lineage with subtype pipeline @@ -138,7 +138,7 @@ public void testLineageIntegrationWithLastSegmentSeparator() { // Enable test lineage plugin TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); options.setEnableTestLineage(true); - Lineage.initialize(pipeline.getOptions()); + Lineage.setDefaultPipelineOptions(pipeline.getOptions()); // Run pipeline that records lineage with custom separator pipeline @@ -161,7 +161,7 @@ public void testLineageIntegrationWithBothSourcesAndSinks() { // Enable test lineage plugin TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); options.setEnableTestLineage(true); - Lineage.initialize(pipeline.getOptions()); + Lineage.setDefaultPipelineOptions(pipeline.getOptions()); // Run pipeline that records both source and sink lineage pipeline @@ -184,7 +184,7 @@ public void testLineageIntegrationWithMultipleElements() { // Enable test lineage plugin TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); options.setEnableTestLineage(true); - Lineage.initialize(pipeline.getOptions()); + Lineage.setDefaultPipelineOptions(pipeline.getOptions()); // Run pipeline with multiple elements to test thread safety pipeline From a088374dad281f1cf11ecbf11b332f845fb5d933 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 11 Nov 2025 15:50:54 -0500 Subject: [PATCH 06/14] Fix formatting --- .../apache/beam/sdk/lineage/LineageRegistrarTest.java | 9 ++++----- .../java/org/apache/beam/sdk/lineage/TestLineage.java | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java index c03a487e0ba9..2de4f9a4260f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.lineage; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; @@ -44,9 +43,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link LineageRegistrar} ServiceLoader discovery and DirectRunner integration. - */ +/** Tests for {@link LineageRegistrar} ServiceLoader discovery and DirectRunner integration. */ @RunWith(JUnit4.class) public class LineageRegistrarTest { @@ -123,7 +120,9 @@ public void testLineageIntegrationWithSubtype() { .apply( ParDo.of( new RecordSourceLineageWithSubtypeDoFn( - "spanner", "table", Arrays.asList("project", "instance", "database", "table")))); + "spanner", + "table", + Arrays.asList("project", "instance", "database", "table")))); PipelineResult result = pipeline.run(); result.waitUntilFinish(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java index c0b3e67470c2..89997661915f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/TestLineage.java @@ -24,8 +24,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; /** - * A test implementation of {@link Lineage} for testing LineageRegistrar ServiceLoader discovery - * and integration testing with DirectRunner. + * A test implementation of {@link Lineage} for testing LineageRegistrar ServiceLoader discovery and + * integration testing with DirectRunner. * *

This implementation records all lineage FQNs in thread-safe static storage for test * assertions. From d3b91a278c182f5df3c71f22fc4c56314477a6a3 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 11 Nov 2025 16:21:51 -0500 Subject: [PATCH 07/14] Fix Flaky JmsIOTest --- .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 7f3b394d7f6a..9c59f434995d 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -660,6 +660,9 @@ private JmsIO.UnboundedJmsReader setupReaderForTest() throws JMSException { final int delay = 10; return connectorClass == JmsConnectionFactory.class ? (JmsTextMessage message) -> { + if (message == null) { + return null; + } final JmsAcknowledgeCallback originalCallback = message.getAcknowledgeCallback(); JmsAcknowledgeCallback jmsAcknowledgeCallbackMock = Mockito.mock(JmsAcknowledgeCallback.class); @@ -679,6 +682,9 @@ private JmsIO.UnboundedJmsReader setupReaderForTest() throws JMSException { return message; } : (ActiveMQMessage message) -> { + if (message == null) { + return null; + } final Callback originalCallback = message.getAcknowledgeCallback(); message.setAcknowledgeCallback( () -> { From a54d7c2349b4368004ab6ef0d9c170876ef290d4 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 11 Nov 2025 16:32:23 -0500 Subject: [PATCH 08/14] Adding to change log and fixing style error --- CHANGES.md | 1 + .../apache/beam/sdk/lineage/package-info.java | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java diff --git a/CHANGES.md b/CHANGES.md index e5632b2608fe..4bbb32015582 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,6 +74,7 @@ * Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)). * (Python) Inference args are now allowed in most model handlers, except where they are explicitly/intentionally disallowed ([#37093](https://github.com/apache/beam/issues/37093)). +* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)). ## Breaking Changes diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java new file mode 100644 index 000000000000..d65e9fc966d1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Lineage tracking support for Apache Beam pipelines. + * + *

This package provides a plugin mechanism to support different lineage implementations through + * the {@link org.apache.beam.sdk.lineage.LineageRegistrar} interface. Lineage implementations can + * be registered and discovered at runtime to track data lineage information during pipeline + * execution. + * + *

For lineage capabilities, see {@link org.apache.beam.sdk.metrics.Lineage}. + */ +@DefaultAnnotation(NonNull.class) +package org.apache.beam.sdk.lineage; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import org.checkerframework.checker.nullness.qual.NonNull; \ No newline at end of file From c4422e5578efe370216a234c1846c5e06ad2aa4d Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 11 Nov 2025 17:18:02 -0500 Subject: [PATCH 09/14] fix build --- .../src/main/java/org/apache/beam/sdk/lineage/package-info.java | 2 +- .../jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java index d65e9fc966d1..30fbde839023 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/lineage/package-info.java @@ -29,4 +29,4 @@ package org.apache.beam.sdk.lineage; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import org.checkerframework.checker.nullness.qual.NonNull; \ No newline at end of file +import org.checkerframework.checker.nullness.qual.NonNull; diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java index 266d04342d1f..212a27b1b2a7 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java @@ -215,7 +215,7 @@ public void testPublishingThenReadingAll() throws IOException, JMSException { int unackRecords = countRemain(QUEUE); assertTrue( String.format("Too many unacknowledged messages: %d", unackRecords), - unackRecords < OPTIONS.getNumberOfRecords() * 0.003); + unackRecords < OPTIONS.getNumberOfRecords() * 0.005); // acknowledged records int ackRecords = OPTIONS.getNumberOfRecords() - unackRecords; From 8411bdaf26a288debe814b5cfcb4c637d45d67ce Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Wed, 12 Nov 2025 16:05:28 -0500 Subject: [PATCH 10/14] fix tests --- .../sdk/lineage/LineageRegistrarTest.java | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java index 2de4f9a4260f..86b8ad075f89 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -38,8 +39,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -47,14 +48,22 @@ @RunWith(JUnit4.class) public class LineageRegistrarTest { - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - @Before public void setUp() { // Clear any recorded lineage from previous tests TestLineage.clearRecorded(); } + /** Helper to create a TestPipeline with test lineage enabled. */ + private TestPipeline createTestPipelineWithLineage() { + TestLineageOptions options = PipelineOptionsFactory.create().as(TestLineageOptions.class); + options.setEnableTestLineage(true); + TestPipeline pipeline = TestPipeline.fromOptions(options); + // Disable enforcement since we're not using @Rule + pipeline.enableAbandonedNodeEnforcement(false); + return pipeline; + } + @Test public void testServiceLoaderDiscovery() { // Load all LineageRegistrar implementations via ServiceLoader @@ -88,11 +97,10 @@ public void testServiceLoaderDiscovery() { } @Test + @Category(NeedsRunner.class) public void testLineageIntegrationWithSimpleFQN() { - // Enable test lineage plugin - TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); - options.setEnableTestLineage(true); - Lineage.setDefaultPipelineOptions(pipeline.getOptions()); + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); // Run pipeline that records lineage pipeline @@ -108,11 +116,10 @@ public void testLineageIntegrationWithSimpleFQN() { } @Test + @Category(NeedsRunner.class) public void testLineageIntegrationWithSubtype() { - // Enable test lineage plugin - TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); - options.setEnableTestLineage(true); - Lineage.setDefaultPipelineOptions(pipeline.getOptions()); + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); // Run pipeline that records lineage with subtype pipeline @@ -133,11 +140,10 @@ public void testLineageIntegrationWithSubtype() { } @Test + @Category(NeedsRunner.class) public void testLineageIntegrationWithLastSegmentSeparator() { - // Enable test lineage plugin - TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); - options.setEnableTestLineage(true); - Lineage.setDefaultPipelineOptions(pipeline.getOptions()); + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); // Run pipeline that records lineage with custom separator pipeline @@ -156,11 +162,10 @@ public void testLineageIntegrationWithLastSegmentSeparator() { } @Test + @Category(NeedsRunner.class) public void testLineageIntegrationWithBothSourcesAndSinks() { - // Enable test lineage plugin - TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); - options.setEnableTestLineage(true); - Lineage.setDefaultPipelineOptions(pipeline.getOptions()); + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); // Run pipeline that records both source and sink lineage pipeline @@ -179,11 +184,10 @@ public void testLineageIntegrationWithBothSourcesAndSinks() { } @Test + @Category(NeedsRunner.class) public void testLineageIntegrationWithMultipleElements() { - // Enable test lineage plugin - TestLineageOptions options = pipeline.getOptions().as(TestLineageOptions.class); - options.setEnableTestLineage(true); - Lineage.setDefaultPipelineOptions(pipeline.getOptions()); + // Create pipeline with test lineage enabled - Lineage will be initialized during pipeline.run() + TestPipeline pipeline = createTestPipelineWithLineage(); // Run pipeline with multiple elements to test thread safety pipeline From 43fcb57448e165cd4843244c1b39ac3ad65e8609 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Thu, 13 Nov 2025 17:05:13 -0500 Subject: [PATCH 11/14] Improve test logging --- .../sdk/lineage/LineageRegistrarTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java index 86b8ad075f89..96dd445d753a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -39,8 +39,11 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -48,6 +51,35 @@ @RunWith(JUnit4.class) public class LineageRegistrarTest { + /** + * TestWatcher that logs detailed lineage diagnostics only when tests fail. + * This keeps successful test output clean while providing deep debugging for failures. + */ + @Rule + public TestWatcher lineageDebugLogger = new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + System.err.println("=== Lineage Test Failure Diagnostics ==="); + System.err.println("Test: " + description.getMethodName()); + System.err.println("Error: " + e.getMessage()); + + List sources = TestLineage.getRecordedSources(); + List sinks = TestLineage.getRecordedSinks(); + + System.err.println("\nRecorded Sources (" + sources.size() + "):"); + for (int i = 0; i < sources.size(); i++) { + System.err.println(" [" + i + "] \"" + sources.get(i) + "\""); + } + + System.err.println("\nRecorded Sinks (" + sinks.size() + "):"); + for (int i = 0; i < sinks.size(); i++) { + System.err.println(" [" + i + "] \"" + sinks.get(i) + "\""); + } + + System.err.println("========================================"); + } + }; + @Before public void setUp() { // Clear any recorded lineage from previous tests From 4065277e1689c0db538d206ff65fb6d138799cb9 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Dec 2025 12:46:11 -0500 Subject: [PATCH 12/14] fix test --- .../org/apache/beam/sdk/lineage/LineageRegistrarTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java index 96dd445d753a..17bd95bfd352 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -190,7 +190,7 @@ public void testLineageIntegrationWithLastSegmentSeparator() { // Verify lineage was recorded with separator List sources = TestLineage.getRecordedSources(); - assertThat(sources, hasItem("gcs:bucket.path/to/file.txt")); + assertThat(sources, hasItem("gcs:bucket.`path/to/file.txt`")); } @Test @@ -249,6 +249,7 @@ private static class RecordSourceLineageDoFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! Lineage.getSources().add(system, segments); c.output(c.element()); } @@ -268,6 +269,7 @@ private static class RecordSourceLineageWithSubtypeDoFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) { + // !!! Lineage Caller !!! Lineage.getSources().add("input-system", ImmutableList.of("input-db", "input-table")); + // !!! Lineage Caller !!! Lineage.getSinks().add("output-system", ImmutableList.of("output-db", "output-table")); c.output(c.element()); } From b9b20edb1c36694dbd9762a60c263d18bbe41a27 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Dec 2025 17:19:18 -0500 Subject: [PATCH 13/14] fix formatting --- .../sdk/lineage/LineageRegistrarTest.java | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java index 17bd95bfd352..efccd6d20370 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/lineage/LineageRegistrarTest.java @@ -52,33 +52,34 @@ public class LineageRegistrarTest { /** - * TestWatcher that logs detailed lineage diagnostics only when tests fail. - * This keeps successful test output clean while providing deep debugging for failures. + * TestWatcher that logs detailed lineage diagnostics only when tests fail. This keeps successful + * test output clean while providing deep debugging for failures. */ @Rule - public TestWatcher lineageDebugLogger = new TestWatcher() { - @Override - protected void failed(Throwable e, Description description) { - System.err.println("=== Lineage Test Failure Diagnostics ==="); - System.err.println("Test: " + description.getMethodName()); - System.err.println("Error: " + e.getMessage()); - - List sources = TestLineage.getRecordedSources(); - List sinks = TestLineage.getRecordedSinks(); - - System.err.println("\nRecorded Sources (" + sources.size() + "):"); - for (int i = 0; i < sources.size(); i++) { - System.err.println(" [" + i + "] \"" + sources.get(i) + "\""); - } - - System.err.println("\nRecorded Sinks (" + sinks.size() + "):"); - for (int i = 0; i < sinks.size(); i++) { - System.err.println(" [" + i + "] \"" + sinks.get(i) + "\""); - } - - System.err.println("========================================"); - } - }; + public TestWatcher lineageDebugLogger = + new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + System.err.println("=== Lineage Test Failure Diagnostics ==="); + System.err.println("Test: " + description.getMethodName()); + System.err.println("Error: " + e.getMessage()); + + List sources = TestLineage.getRecordedSources(); + List sinks = TestLineage.getRecordedSinks(); + + System.err.println("\nRecorded Sources (" + sources.size() + "):"); + for (int i = 0; i < sources.size(); i++) { + System.err.println(" [" + i + "] \"" + sources.get(i) + "\""); + } + + System.err.println("\nRecorded Sinks (" + sinks.size() + "):"); + for (int i = 0; i < sinks.size(); i++) { + System.err.println(" [" + i + "] \"" + sinks.get(i) + "\""); + } + + System.err.println("========================================"); + } + }; @Before public void setUp() { From 032292ba6c025f394cd9c69951acd58547530f56 Mon Sep 17 00:00:00 2001 From: Andrew Kabas Date: Tue, 30 Dec 2025 18:34:27 -0500 Subject: [PATCH 14/14] Revert unnecessary fixes in tests --- .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java | 2 +- .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java index 212a27b1b2a7..266d04342d1f 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java @@ -215,7 +215,7 @@ public void testPublishingThenReadingAll() throws IOException, JMSException { int unackRecords = countRemain(QUEUE); assertTrue( String.format("Too many unacknowledged messages: %d", unackRecords), - unackRecords < OPTIONS.getNumberOfRecords() * 0.005); + unackRecords < OPTIONS.getNumberOfRecords() * 0.003); // acknowledged records int ackRecords = OPTIONS.getNumberOfRecords() - unackRecords; diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 9c59f434995d..7f3b394d7f6a 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -660,9 +660,6 @@ private JmsIO.UnboundedJmsReader setupReaderForTest() throws JMSException { final int delay = 10; return connectorClass == JmsConnectionFactory.class ? (JmsTextMessage message) -> { - if (message == null) { - return null; - } final JmsAcknowledgeCallback originalCallback = message.getAcknowledgeCallback(); JmsAcknowledgeCallback jmsAcknowledgeCallbackMock = Mockito.mock(JmsAcknowledgeCallback.class); @@ -682,9 +679,6 @@ private JmsIO.UnboundedJmsReader setupReaderForTest() throws JMSException { return message; } : (ActiveMQMessage message) -> { - if (message == null) { - return null; - } final Callback originalCallback = message.getAcknowledgeCallback(); message.setAcknowledgeCallback( () -> {