2828import java .util .concurrent .atomic .AtomicReference ;
2929import java .util .regex .Pattern ;
3030import org .apache .beam .sdk .annotations .Internal ;
31- import org .apache .beam .sdk .lineage .LineageReporter ;
32- import org .apache .beam .sdk .lineage .LineageReporterRegistrar ;
31+ import org .apache .beam .sdk .lineage .LineageRegistrar ;
3332import org .apache .beam .sdk .metrics .Metrics .MetricsFlag ;
3433import org .apache .beam .sdk .options .PipelineOptions ;
3534import org .apache .beam .sdk .options .PipelineOptionsFactory ;
3635import org .apache .beam .sdk .util .common .ReflectHelpers ;
3736import org .apache .beam .sdk .values .KV ;
3837import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .annotations .VisibleForTesting ;
3938import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Splitter ;
40- import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableList ;
4139import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Lists ;
4240import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Sets ;
4341import org .checkerframework .checker .nullness .qual .Nullable ;
4745/**
4846 * Standard collection of metrics used to record source and sinks information for lineage tracking.
4947 */
50- public class Lineage {
51- // Namespace for lineage metrics; used to filter queries in Lineage.query() and in MetricsLineageReporter
48+ public abstract class Lineage {
5249 public static final String LINEAGE_NAMESPACE = "lineage" ;
5350 private static final Logger LOG = LoggerFactory .getLogger (Lineage .class );
54- private static final AtomicReference <LineageReporter > SOURCES = new AtomicReference <>();
55- private static final AtomicReference <LineageReporter > SINKS = new AtomicReference <>();
51+ private static final AtomicReference <Lineage > SOURCES = new AtomicReference <>();
52+ private static final AtomicReference <Lineage > SINKS = new AtomicReference <>();
5653
5754 private static final AtomicReference <KV <Long , Integer >> LINEAGE_REVISION =
5855 new AtomicReference <>();
5956
6057 // Reserved characters are backtick, colon, whitespace (space, \t, \n) and dot.
6158 private static final Pattern RESERVED_CHARS = Pattern .compile ("[:\\ s.`]" );
6259
63- private final Metric metric ;
64-
65- private Lineage (Type type ) {
66- if (MetricsFlag .lineageRollupEnabled ()) {
67- this .metric =
68- Metrics .boundedTrie (
69- LINEAGE_NAMESPACE ,
70- type == Type .SOURCE ? Type .SOURCEV2 .toString () : Type .SINKV2 .toString ());
71- } else {
72- this .metric = Metrics .stringSet (LINEAGE_NAMESPACE , type .toString ());
73- }
74- }
60+ protected Lineage () {}
7561
7662 @ Internal
7763 public static void initialize (PipelineOptions options ) {
@@ -82,70 +68,70 @@ public static void initialize(PipelineOptions options) {
8268 while (true ) {
8369 KV <Long , Integer > currentRevision = LINEAGE_REVISION .get ();
8470
85- // Skip re-initialization if same options and revision hasn't changed
8671 if (currentRevision != null
8772 && currentRevision .getKey ().equals (optionsId )
8873 && currentRevision .getValue () >= nextRevision ) {
89- LOG .debug ("Lineage already initialized with options ID {} revision {}, skipping" ,
90- optionsId , currentRevision .getValue ());
74+ LOG .debug (
75+ "Lineage already initialized with options ID {} revision {}, skipping" ,
76+ optionsId ,
77+ currentRevision .getValue ());
9178 return ;
9279 }
9380
9481 if (LINEAGE_REVISION .compareAndSet (currentRevision , KV .of (optionsId , nextRevision ))) {
95- LineageReporter sources = createReporter (options , Type .SOURCE );
96- LineageReporter sinks = createReporter (options , Type .SINK );
82+ Lineage sources = createLineage (options , Type .SOURCE );
83+ Lineage sinks = createLineage (options , Type .SINK );
9784
9885 SOURCES .set (sources );
9986 SINKS .set (sinks );
10087
10188 if (currentRevision == null ) {
10289 LOG .info ("Lineage initialized with options ID {} revision {}" , optionsId , nextRevision );
10390 } else {
104- LOG .info ("Lineage re-initialized from options ID {} to {} (revision {} -> {})" ,
105- currentRevision .getKey (), optionsId ,
106- currentRevision .getValue (), nextRevision );
91+ LOG .info (
92+ "Lineage re-initialized from options ID {} to {} (revision {} -> {})" ,
93+ currentRevision .getKey (),
94+ optionsId ,
95+ currentRevision .getValue (),
96+ nextRevision );
10797 }
10898 return ;
10999 }
110100 }
111101 }
112102
113- /// //// NEW METHOD
114- private static LineageReporter createReporter (PipelineOptions options , Type type ) {
115- Set <LineageReporterRegistrar > registrars = Sets .newTreeSet (
116- ReflectHelpers .ObjectsClassComparator .INSTANCE );
117- registrars .addAll (Lists .newArrayList (
118- ServiceLoader .load (LineageReporterRegistrar .class ,
119- ReflectHelpers .findClassLoader ())));
103+ private static Lineage createLineage (PipelineOptions options , Type type ) {
104+ Set <LineageRegistrar > registrars =
105+ Sets .newTreeSet (ReflectHelpers .ObjectsClassComparator .INSTANCE );
106+ registrars .addAll (
107+ Lists .newArrayList (
108+ ServiceLoader .load (LineageRegistrar .class , ReflectHelpers .findClassLoader ())));
120109
121- for (LineageReporterRegistrar registrar : registrars ) {
122- LineageReporter reporter = registrar .fromOptions (options , type );
110+ for (LineageRegistrar registrar : registrars ) {
111+ Lineage reporter = registrar .fromOptions (options , type );
123112 if (reporter != null ) {
124- LOG .info ("Using {} for lineage type {}" ,
125- reporter .getClass ().getName (), type );
113+ LOG .info ("Using {} for lineage type {}" , reporter .getClass ().getName (), type );
126114 return reporter ;
127115 }
128116 }
129117
130118 LOG .debug ("Using default Metrics-based lineage for type {}" , type );
131- return new MetricsLineageReporter (type );
119+ return new MetricsLineage (type );
132120 }
133121
134- /**
135- * Get {@link LineageReporter} representing sources and optionally side inputs.
136- */
137- public static LineageReporter getSources () {
138- LineageReporter sources = SOURCES .get ();
122+ /** Get {@link Lineage} representing sources and optionally side inputs. */
123+ public static Lineage getSources () {
124+ Lineage sources = SOURCES .get ();
139125 if (sources == null ) {
140126 initialize (PipelineOptionsFactory .create ());
141127 sources = SOURCES .get ();
142128 }
143129 return sources ;
144130 }
145131
146- /** {@link LineageReporter } representing sinks. */
147- public static LineageReporter getSinks () {
148- LineageReporter sinks = SINKS .get ();
132+ /** {@link Lineage } representing sinks. */
133+ public static Lineage getSinks () {
134+ Lineage sinks = SINKS .get ();
149135 if (sinks == null ) {
150136 initialize (PipelineOptionsFactory .create ());
151137 sinks = SINKS .get ();
@@ -228,14 +214,7 @@ public void add(String system, Iterable<String> segments) {
228214 * which is already escaped.
229215 * <p>In particular, this means they will often have trailing delimiters.
230216 */
231- public void add (Iterable <String > rollupSegments ) {
232- ImmutableList <String > segments = ImmutableList .copyOf (rollupSegments );
233- if (MetricsFlag .lineageRollupEnabled ()) {
234- ((BoundedTrie ) this .metric ).add (segments );
235- } else {
236- ((StringSet ) this .metric ).add (String .join ("" , segments ));
237- }
238- }
217+ public abstract void add (Iterable <String > rollupSegments );
239218
240219 /**
241220 * Query {@link BoundedTrie} metrics from {@link MetricResults}.
@@ -245,9 +224,8 @@ public void add(Iterable<String> rollupSegments) {
245224 * @param truncatedMarker the marker to use to represent truncated FQNs.
246225 * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing
247226 * truncatedMarker.
248- *
249- * <p>NOTE: When using a custom LineageReporter plugin, this method
250- * will return empty results since lineage is not stored in Metrics.
227+ * <p>NOTE: When using a custom LineageReporter plugin, this method will return empty results
228+ * since lineage is not stored in Metrics.
251229 */
252230 public static Set <String > query (MetricResults results , Type type , String truncatedMarker ) {
253231 MetricQueryResults lineageQueryResults = getLineageQueryResults (results , type );
@@ -276,9 +254,8 @@ public static Set<String> query(MetricResults results, Type type, String truncat
276254 * @param results FQNs from the result
277255 * @param type sources or sinks
278256 * @return A flat representation of all FQNs. If the FQN was truncated then it has a trailing '*'.
279- *
280- * <p>NOTE: When using a custom LineageReporter plugin, this method
281- * will return empty results since lineage is not stored in Metrics.
257+ * <p>NOTE: When using a custom LineageReporter plugin, this method will return empty results
258+ * since lineage is not stored in Metrics.
282259 */
283260 public static Set <String > query (MetricResults results , Type type ) {
284261 if (MetricsFlag .lineageRollupEnabled ()) {
0 commit comments