Skip to content
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

* Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)).
* (Python) Inference args are now allowed in most model handlers, except where they are explicitly/intentionally disallowed ([#37093](https://github.com/apache/beam/issues/37093)).
* Added plugin mechanism to support different Lineage implementations (Java) ([#36790](https://github.com/apache/beam/issues/36790)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ public static void setDefaultPipelineOptions(PipelineOptions options) {

// entry to set other PipelineOption determined flags
Metrics.setDefaultPipelineOptions(options);
Lineage.setDefaultPipelineOptions(options);

while (true) {
KV<Long, Integer> revision = FILESYSTEM_REVISION.get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.lineage;

import javax.annotation.Nullable;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;

public interface LineageRegistrar {

@Nullable
Lineage fromOptions(PipelineOptions options, Lineage.LineageDirection direction);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Lineage tracking support for Apache Beam pipelines.
*
* <p>This package provides a plugin mechanism to support different lineage implementations through
* the {@link org.apache.beam.sdk.lineage.LineageRegistrar} interface. Lineage implementations can
* be registered and discovered at runtime to track data lineage information during pipeline
* execution.
*
* <p>For lineage capabilities, see {@link org.apache.beam.sdk.metrics.Lineage}.
*/
@DefaultAnnotation(NonNull.class)
package org.apache.beam.sdk.lineage;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import org.checkerframework.checker.nullness.qual.NonNull;
125 changes: 101 additions & 24 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,131 @@
*/
package org.apache.beam.sdk.metrics;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.lineage.LineageRegistrar;
import org.apache.beam.sdk.metrics.Metrics.MetricsFlag;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Standard collection of metrics used to record source and sinks information for lineage tracking.
*/
public class Lineage {

public abstract class Lineage {
public static final String LINEAGE_NAMESPACE = "lineage";
private static final Lineage SOURCES = new Lineage(Type.SOURCE);
private static final Lineage SINKS = new Lineage(Type.SINK);
private static final Logger LOG = LoggerFactory.getLogger(Lineage.class);
private static final AtomicReference<Lineage> SOURCES = new AtomicReference<>();
private static final AtomicReference<Lineage> SINKS = new AtomicReference<>();

private static final AtomicReference<KV<Long, Integer>> LINEAGE_REVISION =
new AtomicReference<>();

// Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot.
private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.`]");

private final Metric metric;
public enum LineageDirection {
SOURCE,
SINK
}

private Lineage(Type type) {
if (MetricsFlag.lineageRollupEnabled()) {
this.metric =
Metrics.boundedTrie(
LINEAGE_NAMESPACE,
type == Type.SOURCE ? Type.SOURCEV2.toString() : Type.SINKV2.toString());
} else {
this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
protected Lineage() {}

@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options cannot be null");
long optionsId = options.getOptionsId();
int nextRevision = options.revision();

while (true) {
KV<Long, Integer> currentRevision = LINEAGE_REVISION.get();

if (currentRevision != null
&& currentRevision.getKey().equals(optionsId)
&& currentRevision.getValue() >= nextRevision) {
LOG.debug(
"Lineage already initialized with options ID {} revision {}, skipping",
optionsId,
currentRevision.getValue());
return;
}

if (LINEAGE_REVISION.compareAndSet(currentRevision, KV.of(optionsId, nextRevision))) {
Lineage sources = createLineage(options, LineageDirection.SOURCE);
Lineage sinks = createLineage(options, LineageDirection.SINK);

SOURCES.set(sources);
SINKS.set(sinks);

if (currentRevision == null) {
LOG.info("Lineage initialized with options ID {} revision {}", optionsId, nextRevision);
} else {
LOG.info(
"Lineage re-initialized from options ID {} to {} (revision {} -> {})",
currentRevision.getKey(),
optionsId,
currentRevision.getValue(),
nextRevision);
}
return;
}
}
}

private static Lineage createLineage(PipelineOptions options, LineageDirection direction) {
Set<LineageRegistrar> registrars =
Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
registrars.addAll(
Lists.newArrayList(
ServiceLoader.load(LineageRegistrar.class, ReflectHelpers.findClassLoader())));

for (LineageRegistrar registrar : registrars) {
Lineage reporter = registrar.fromOptions(options, direction);
if (reporter != null) {
LOG.info("Using {} for lineage direction {}", reporter.getClass().getName(), direction);
return reporter;
}
}

LOG.debug("Using default Metrics-based lineage for direction {}", direction);
return new MetricsLineage(direction);
}

/** {@link Lineage} representing sources and optionally side inputs. */
public static Lineage getSources() {
return SOURCES;
Lineage sources = SOURCES.get();
if (sources == null) {
setDefaultPipelineOptions(PipelineOptionsFactory.create());
sources = SOURCES.get();
}
return sources;
}

/** {@link Lineage} representing sinks. */
public static Lineage getSinks() {
return SINKS;
Lineage sinks = SINKS.get();
if (sinks == null) {
setDefaultPipelineOptions(PipelineOptionsFactory.create());
sinks = SINKS.get();
}
return sinks;
}

@VisibleForTesting
Expand Down Expand Up @@ -139,14 +219,7 @@ public void add(String system, Iterable<String> segments) {
* which is already escaped.
* <p>In particular, this means they will often have trailing delimiters.
*/
public void add(Iterable<String> rollupSegments) {
ImmutableList<String> segments = ImmutableList.copyOf(rollupSegments);
if (MetricsFlag.lineageRollupEnabled()) {
((BoundedTrie) this.metric).add(segments);
} else {
((StringSet) this.metric).add(String.join("", segments));
}
}
public abstract void add(Iterable<String> rollupSegments);

/**
* Query {@link BoundedTrie} metrics from {@link MetricResults}.
Expand All @@ -156,6 +229,8 @@ public void add(Iterable<String> rollupSegments) {
* @param truncatedMarker the marker to use to represent truncated FQNs.
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing
* truncatedMarker.
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
* lineage is not stored in Metrics.
*/
public static Set<String> query(MetricResults results, Type type, String truncatedMarker) {
MetricQueryResults lineageQueryResults = getLineageQueryResults(results, type);
Expand Down Expand Up @@ -184,6 +259,8 @@ public static Set<String> query(MetricResults results, Type type, String truncat
* @param results FQNs from the result
* @param type sources or sinks
* @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'.
* <p>NOTE: When using a custom Lineage plugin, this method will return empty results since
* lineage is not stored in Metrics.
*/
public static Set<String> query(MetricResults results, Type type) {
if (MetricsFlag.lineageRollupEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;

import org.apache.beam.sdk.metrics.Metrics.MetricsFlag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

public class MetricsLineage extends Lineage {

private final Metric metric;

public MetricsLineage(final Lineage.LineageDirection direction) {
// Derive Metrics-specific Type from LineageDirection
Lineage.Type type =
(direction == Lineage.LineageDirection.SOURCE) ? Lineage.Type.SOURCE : Lineage.Type.SINK;

if (MetricsFlag.lineageRollupEnabled()) {
this.metric =
Metrics.boundedTrie(
Lineage.LINEAGE_NAMESPACE,
direction == Lineage.LineageDirection.SOURCE
? Lineage.Type.SOURCEV2.toString()
: Lineage.Type.SINKV2.toString());
} else {
this.metric = Metrics.stringSet(Lineage.LINEAGE_NAMESPACE, type.toString());
}
}

@Override
public void add(final Iterable<String> rollupSegments) {
ImmutableList<String> segments = ImmutableList.copyOf(rollupSegments);
if (MetricsFlag.lineageRollupEnabled()) {
((BoundedTrie) this.metric).add(segments);
} else {
((StringSet) this.metric).add(String.join("", segments));
}
}
}
Loading
Loading