Skip to content

Conversation

@shnapz
Copy link

@shnapz shnapz commented Nov 10, 2025

Addresses #36790: "[Feature Request]: Make lineage tracking pluggable"

Key changes:

  • Added org.apache.beam.sdk.lineage.LineageRegistrar as an entry point in plugin system.
  • Added org.apache.beam.sdk.metrics.MetricsLineage that contains extracted original metric-based implementation.
  • Changed org.apache.beam.sdk.metrics.Lineage:
    • to abstract base class to serve as a base for custom plugin implementations
    • now contains plugin discovery and initialization logic
    • Instance methods are part of plugins, static methods remain untouched
  • Changed org.apache.beam.sdk.io.FileSystems:
  • Added tests

Plugin Implementation

The Lineage class acts as an abstract base for pluggable implementations, defining these instance members:

public abstract class Lineage {
  protected Lineage() {}

  public void add(
      String system,
      @Nullable String subtype,
      Iterable<String> segments,
      @Nullable String lastSegmentSep) {
    add(getFQNParts(system, subtype, segments, lastSegmentSep));
  }

  public void add(String system, Iterable<String> segments, @Nullable String lastSegmentSep) {
    add(system, null, segments, lastSegmentSep);
  }

  public void add(String system, Iterable<String> segments) {
    add(system, segments, null);
  }

  public abstract void add(Iterable<String> rollupSegments);
}

An alternative approach would be to create separate interface (e.g. interface LineageReporter) for plugin implementations, but that would require breaking changes to main public API that is in use:

public static Lineage getSources()
public static Lineage getSinks()

Therefore, using Lineage as a base class is the best solution to maintain backward compatibility.

Initialization

Originally Lineage instances were created in static field initialization without any need of external parameters. But plugins rely on PipelineOptions by design, that's why Lineage.setDefaultPipelineOptions(options) must be called externally. Lineage.setDefaultPipelineOptions(options) is called from FileSystems.setDefaultPipelineOptions()
(at FileSystems.java:581), following the same pattern used by Metrics.setDefaultPipelineOptions() (line 580).

Rationale: FileSystems.setDefaultPipelineOptions() is called at 48+ locations across the codebase, covering all execution scenarios: pipeline construction, worker startup across all runners (Flink, Spark, Dataflow, etc.), and deserialization points. This single-line addition ensures Lineage is initialized everywhere without modifying 48+ call sites.

Known limitations: While FileSystems.setDefaultPipelineOptions() has known issues (see #18430 regarding race conditions and initialization semantics), this PR follows the established pattern rather than introducing a divergent approach. Any broader architectural improvements to subsystem initialization would naturally address Lineage initialization as part of that larger effort.

Thread Safety

Lineage.setDefaultPipelineOptions is expected to be called concurrently because it inherits the same execution context as FileSystems.setDefaultPipelineOptions and Metrics.setDefaultPipelineOptions. The implementation uses three AtomicReference<> fields:

  • AtomicReference<KV<Long, Integer>> to track PipelineOptions identity via optionsId and revision
  • AtomicReference<Lineage> for SOURCES and SINKS instances

The method follows the exact same concurrent resolution pattern as FileSystems using an infinite loop with compareAndSet to handle race conditions during initialization.

Sample Custom Plugin Implementation - OpenLineage

OpenLineage integration is for demonstration purposes only:

  • OpenLineage is out of scope of this work
  • Cross-worker deduplication is out of scope
package org.apache.beam.sdk.extensions.openlineage;

import org.apache.beam.sdk.lineage.LineageRegistrar;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

// 1. Define PipelineOptions for configuration
public interface OpenLineageOptions extends PipelineOptions {
  @Description("OpenLineage endpoint URL")
  @Default.String("http://localhost:5000")
  String getOpenLineageUrl();
  void setOpenLineageUrl(String url);

  @Description("Enable OpenLineage plugin")
  @Default.Boolean(false)
  Boolean getEnableOpenLineage();
  void setEnableOpenLineage(Boolean enable);
}

// 2. Implement the Lineage plugin
class OpenLineageReporter extends Lineage {
  private final String endpoint;
  private final Lineage.LineageDirection direction;

  OpenLineageReporter(String endpoint, Lineage.LineageDirection direction) {
    this.endpoint = endpoint;
    this.direction = direction;
  }

  @Override
  public void add(Iterable<String> rollupSegments) {
    String fqn = String.join("", rollupSegments);
    // Send to OpenLineage endpoint
    sendToOpenLineage(endpoint, direction, fqn);
  }

  private void sendToOpenLineage(String url, LineageDirection dir, String fqn) {
    // Implementation: POST lineage event to OpenLineage HTTP endpoint
    // Example: { "eventType": "START", "inputs": [...], "outputs": [...] }
  }
}

// 3. Implement LineageRegistrar for ServiceLoader discovery
@AutoService(LineageRegistrar.class)
public class OpenLineageRegistrar implements LineageRegistrar {
  @Override
  public Lineage fromOptions(PipelineOptions options, Lineage.LineageDirection direction) {
    OpenLineageOptions opts = options.as(OpenLineageOptions.class);

    if (opts.getEnableOpenLineage()) {
      return new OpenLineageReporter(opts.getOpenLineageUrl(), direction);
    }

    return null; // Fall back to default metrics-based lineage
  }
}

// 4. Usage in pipeline:
PipelineOptions options = PipelineOptionsFactory.create();
options.as(OpenLineageOptions.class).setEnableOpenLineage(true);
options.as(OpenLineageOptions.class).setOpenLineageUrl("https://my-openlineage.com");
Pipeline p = Pipeline.create(options);

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions github-actions bot added the java label Nov 10, 2025
@shnapz shnapz changed the title Add pluggable LineageReporter interface Make Lineage pluggable Nov 11, 2025
@shnapz shnapz changed the title Make Lineage pluggable Implement pluggable Lineage in Java SDK Nov 11, 2025
? (JmsTextMessage message) -> {
if (message == null) {
return null;
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

irrelevant change to fix flaky tests

assertTrue(
String.format("Too many unacknowledged messages: %d", unackRecords),
unackRecords < OPTIONS.getNumberOfRecords() * 0.003);
unackRecords < OPTIONS.getNumberOfRecords() * 0.005);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

irrelevant change to fix flaky tests

@shnapz shnapz marked this pull request as ready for review December 31, 2025 17:51
@github-actions
Copy link
Contributor

Assigning reviewers:

R: @kennknowles for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@codecov
Copy link

codecov bot commented Dec 31, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 55.15%. Comparing base (7b13d42) to head (032292b).
⚠️ Report is 7 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff            @@
##             master   #36781   +/-   ##
=========================================
  Coverage     55.15%   55.15%           
  Complexity     1676     1676           
=========================================
  Files          1067     1067           
  Lines        167149   167149           
  Branches       1208     1208           
=========================================
  Hits          92189    92189           
  Misses        72779    72779           
  Partials       2181     2181           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant