Skip to content

Commit aa889f0

Browse files
committed
[FLINK-34467] add lineage integration for jdbc connector
1 parent 140f179 commit aa889f0

File tree

37 files changed

+1308
-13
lines changed

37 files changed

+1308
-13
lines changed

.java-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
11
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
org.apache.flink.connector.jdbc.postgres.database.dialect.CompatiblePostgresDialect.getRowConverter(org.apache.flink.table.types.logical.RowType): Returned leaf type org.apache.flink.connector.jdbc.postgres.database.dialect.CompatiblePostgresDialectConverter does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2-
org.apache.flink.connector.jdbc.postgres.database.dialect.CompatiblePostgresDialect.getRowConverter(org.apache.flink.table.types.logical.RowType): Returned leaf type org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2+
org.apache.flink.connector.jdbc.postgres.database.dialect.CompatiblePostgresDialect.getRowConverter(org.apache.flink.table.types.logical.RowType): Returned leaf type org.apache.flink.connector.jdbc.postgres.database.dialect.PostgresDialectConverter does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Method <org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource.getLineageVertex()> calls method <org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator.getSqlTemplate()> in (JdbcSource.java:215)

flink-connector-jdbc-core/archunit-violations/6cdea252-f400-4c13-bc99-b325f2ebe333

Lines changed: 266 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
2+
org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
3+
org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
4+
org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

flink-connector-jdbc-core/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ under the License.
5656
<optional>true</optional>
5757
</dependency>
5858

59+
<dependency>
60+
<groupId>io.openlineage</groupId>
61+
<artifactId>openlineage-sql-java</artifactId>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>io.openlineage</groupId>
66+
<artifactId>openlineage-java</artifactId>
67+
</dependency>
68+
5969
<!-- Tests -->
6070

6171
<dependency>

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@
2424
import org.apache.flink.api.common.io.InputFormat;
2525
import org.apache.flink.api.common.io.RichInputFormat;
2626
import org.apache.flink.api.common.io.statistics.BaseStatistics;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2930
import org.apache.flink.configuration.Configuration;
3031
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
3132
import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder;
3233
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3334
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
35+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
36+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3437
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3538
import org.apache.flink.core.io.GenericInputSplit;
3639
import org.apache.flink.core.io.InputSplit;
3740
import org.apache.flink.core.io.InputSplitAssigner;
41+
import org.apache.flink.streaming.api.lineage.LineageDataset;
42+
import org.apache.flink.streaming.api.lineage.LineageVertex;
43+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3844
import org.apache.flink.types.Row;
3945
import org.apache.flink.util.Preconditions;
4046

@@ -53,6 +59,8 @@
5359
import java.sql.Time;
5460
import java.sql.Timestamp;
5561
import java.util.Arrays;
62+
import java.util.Collections;
63+
import java.util.Optional;
5664

5765
/**
5866
* InputFormat to read data from a database and generate Rows. The InputFormat has to be configured
@@ -107,7 +115,7 @@
107115
@Deprecated
108116
@Experimental
109117
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit>
110-
implements ResultTypeQueryable<Row> {
118+
implements LineageVertexProvider, ResultTypeQueryable<Row> {
111119

112120
protected static final long serialVersionUID = 2L;
113121
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
@@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() {
344352
return new JdbcInputFormatBuilder();
345353
}
346354

355+
@Override
356+
public LineageVertex getLineageVertex() {
357+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
358+
new DefaultTypeDatasetFacet(getProducedType());
359+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate);
360+
String namespace = LineageUtils.namespaceOf(connectionProvider);
361+
LineageDataset dataset =
362+
LineageUtils.datasetOf(
363+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
364+
return LineageUtils.sourceLineageVertexOf(
365+
Boundedness.BOUNDED, Collections.singleton(dataset));
366+
}
367+
347368
/** Builder for {@link JdbcInputFormat}. */
348369
public static class JdbcInputFormatBuilder {
349370
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,16 @@
3737
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3838
import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement;
3939
import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
40+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
4041
import org.apache.flink.core.io.SimpleVersionedSerializer;
42+
import org.apache.flink.streaming.api.lineage.LineageDataset;
43+
import org.apache.flink.streaming.api.lineage.LineageVertex;
44+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4145

4246
import java.io.IOException;
4347
import java.util.Collection;
4448
import java.util.Collections;
49+
import java.util.Optional;
4550

4651
/**
4752
* Flink Sink to produce data into a jdbc database.
@@ -50,7 +55,8 @@
5055
*/
5156
@PublicEvolving
5257
public class JdbcSink<IN>
53-
implements Sink<IN>,
58+
implements LineageVertexProvider,
59+
Sink<IN>,
5460
SupportsWriterState<IN, JdbcWriterState>,
5561
SupportsCommitter<JdbcCommitable> {
5662

@@ -120,4 +126,13 @@ public JdbcWriter<IN> restoreWriter(
120126
public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() {
121127
return new JdbcWriterStateSerializer();
122128
}
129+
130+
@Override
131+
public LineageVertex getLineageVertex() {
132+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryStatement.query());
133+
String namespace = LineageUtils.namespaceOf(connectionProvider);
134+
LineageDataset dataset =
135+
LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList());
136+
return LineageUtils.lineageVertexOf(Collections.singleton(dataset));
137+
}
123138
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,35 @@
3535
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator;
3636
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState;
3737
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase;
38+
import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator;
3839
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader;
3940
import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader;
4041
import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
4142
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
4243
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer;
4344
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
45+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
46+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
4447
import org.apache.flink.core.io.SimpleVersionedSerializer;
48+
import org.apache.flink.streaming.api.lineage.LineageDataset;
49+
import org.apache.flink.streaming.api.lineage.LineageVertex;
50+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
4551
import org.apache.flink.util.Preconditions;
4652

4753
import javax.annotation.Nullable;
4854

4955
import java.io.Serializable;
5056
import java.util.ArrayList;
57+
import java.util.Arrays;
58+
import java.util.Collections;
5159
import java.util.Objects;
60+
import java.util.Optional;
5261

5362
/** JDBC source. */
5463
@PublicEvolving
5564
public class JdbcSource<OUT>
56-
implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
65+
implements LineageVertexProvider,
66+
Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>,
5767
ResultTypeQueryable<OUT> {
5868

5969
private final Boundedness boundedness;
@@ -195,4 +205,18 @@ public boolean equals(Object o) {
195205
&& deliveryGuarantee == that.deliveryGuarantee
196206
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
197207
}
208+
209+
@Override
210+
public LineageVertex getLineageVertex() {
211+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
212+
new DefaultTypeDatasetFacet(getTypeInformation());
213+
SqlTemplateSplitEnumerator enumerator =
214+
(SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create();
215+
Optional<String> nameOpt = LineageUtils.tableNameOf(enumerator.getSqlTemplate());
216+
String namespace = LineageUtils.namespaceOf(connectionProvider);
217+
LineageDataset dataset =
218+
LineageUtils.datasetOf(
219+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
220+
return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset));
221+
}
198222
}

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,22 @@
2424
import org.apache.flink.api.common.io.RichInputFormat;
2525
import org.apache.flink.api.common.io.statistics.BaseStatistics;
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.api.connector.source.Boundedness;
2728
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
2829
import org.apache.flink.configuration.Configuration;
2930
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
3031
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
3132
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
3233
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
34+
import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet;
35+
import org.apache.flink.connector.jdbc.lineage.LineageUtils;
3336
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
3437
import org.apache.flink.core.io.GenericInputSplit;
3538
import org.apache.flink.core.io.InputSplit;
3639
import org.apache.flink.core.io.InputSplitAssigner;
40+
import org.apache.flink.streaming.api.lineage.LineageDataset;
41+
import org.apache.flink.streaming.api.lineage.LineageVertex;
42+
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
3743
import org.apache.flink.table.data.RowData;
3844
import org.apache.flink.util.Preconditions;
3945

@@ -51,11 +57,13 @@
5157
import java.sql.Time;
5258
import java.sql.Timestamp;
5359
import java.util.Arrays;
60+
import java.util.Collections;
61+
import java.util.Optional;
5462

5563
/** InputFormat for {@link JdbcDynamicTableSource}. */
5664
@Internal
5765
public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
58-
implements ResultTypeQueryable<RowData> {
66+
implements LineageVertexProvider, ResultTypeQueryable<RowData> {
5967

6068
private static final long serialVersionUID = 2L;
6169
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
@@ -296,6 +304,19 @@ public static Builder builder() {
296304
return new Builder();
297305
}
298306

307+
@Override
308+
public LineageVertex getLineageVertex() {
309+
DefaultTypeDatasetFacet defaultTypeDatasetFacet =
310+
new DefaultTypeDatasetFacet(getProducedType());
311+
Optional<String> nameOpt = LineageUtils.tableNameOf(queryTemplate);
312+
String namespace = LineageUtils.namespaceOf(connectionProvider);
313+
LineageDataset dataset =
314+
LineageUtils.datasetOf(
315+
nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet));
316+
return LineageUtils.sourceLineageVertexOf(
317+
Boundedness.BOUNDED, Collections.singleton(dataset));
318+
}
319+
299320
/** Builder for {@link JdbcRowDataInputFormat}. */
300321
public static class Builder {
301322
private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;

0 commit comments

Comments
 (0)