diff --git a/.gitignore b/.gitignore index 5f0068cda..fd822cb73 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ scalastyle-output.xml .metadata .settings .project +.java-version .version.properties filter.properties logs.zip diff --git a/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0 b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0 new file mode 100644 index 000000000..3f29182c3 --- /dev/null +++ b/flink-connector-jdbc-core/archunit-violations/4bca2274-65a9-4a61-81ef-767d58233ea0 @@ -0,0 +1 @@ +Method calls method in (JdbcSource.java:215) diff --git a/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a new file mode 100644 index 000000000..c248851cb --- /dev/null +++ b/flink-connector-jdbc-core/archunit-violations/d45c3af5-52c6-45fd-9926-75e75e77473a @@ -0,0 +1,4 @@ +org.apache.flink.connector.jdbc.lineage.DefaultJdbcExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated +org.apache.flink.connector.jdbc.lineage.JdbcLocation$Builder.build(): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated +org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated +org.apache.flink.connector.jdbc.lineage.OverrideJdbcLocationExtractor.extract(java.lang.String, java.util.Properties): Returned leaf type org.apache.flink.connector.jdbc.lineage.JdbcLocation does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated diff --git a/flink-connector-jdbc-core/pom.xml b/flink-connector-jdbc-core/pom.xml index 43d844496..c31423909 100644 --- a/flink-connector-jdbc-core/pom.xml +++ b/flink-connector-jdbc-core/pom.xml @@ -56,6 +56,16 @@ under the License. true + + io.openlineage + openlineage-sql-java + + + + io.openlineage + openlineage-java + + diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java index dacf3c6c6..fb8b11fa2 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; @@ -31,10 +32,15 @@ import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -53,6 +59,8 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; /** * InputFormat to read data from a database and generate Rows. The InputFormat has to be configured @@ -107,7 +115,7 @@ @Deprecated @Experimental public class JdbcInputFormat extends RichInputFormat - implements ResultTypeQueryable { + implements LineageVertexProvider, ResultTypeQueryable { protected static final long serialVersionUID = 2L; protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class); @@ -344,6 +352,19 @@ public static JdbcInputFormatBuilder buildJdbcInputFormat() { return new JdbcInputFormatBuilder(); } + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet(getProducedType()); + Optional nameOpt = LineageUtils.tableNameOf(queryTemplate, true); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf( + Boundedness.BOUNDED, Collections.singleton(dataset)); + } + /** Builder for {@link JdbcInputFormat}. */ public static class JdbcInputFormatBuilder { private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java index 5753eb9d4..5ccb6e49c 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java @@ -37,11 +37,16 @@ import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Optional; /** * Flink Sink to produce data into a jdbc database. @@ -50,7 +55,8 @@ */ @PublicEvolving public class JdbcSink - implements Sink, + implements LineageVertexProvider, + Sink, SupportsWriterState, SupportsCommitter { @@ -120,4 +126,13 @@ public JdbcWriter restoreWriter( public SimpleVersionedSerializer getWriterStateSerializer() { return new JdbcWriterStateSerializer(); } + + @Override + public LineageVertex getLineageVertex() { + Optional nameOpt = LineageUtils.tableNameOf(queryStatement.query(), false); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList()); + return LineageUtils.lineageVertexOf(Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java index d2c055e42..bdc505c12 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java @@ -35,25 +35,35 @@ import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState; import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader; import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Objects; +import java.util.Optional; /** JDBC source. */ @PublicEvolving public class JdbcSource - implements Source, + implements LineageVertexProvider, + Source, ResultTypeQueryable { private final Boundedness boundedness; @@ -195,4 +205,18 @@ public boolean equals(Object o) { && deliveryGuarantee == that.deliveryGuarantee && Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings); } + + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet(getTypeInformation()); + SqlTemplateSplitEnumerator enumerator = + (SqlTemplateSplitEnumerator) sqlSplitEnumeratorProvider.create(); + Optional nameOpt = LineageUtils.tableNameOf(enumerator.getSqlTemplate(), true); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf(boundedness, Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java index b06f555da..de02666f9 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataInputFormat.java @@ -24,16 +24,22 @@ import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -51,11 +57,13 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; /** InputFormat for {@link JdbcDynamicTableSource}. */ @Internal public class JdbcRowDataInputFormat extends RichInputFormat - implements ResultTypeQueryable { + implements LineageVertexProvider, ResultTypeQueryable { private static final long serialVersionUID = 2L; private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class); @@ -296,6 +304,19 @@ public static Builder builder() { return new Builder(); } + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet(getProducedType()); + Optional nameOpt = LineageUtils.tableNameOf(queryTemplate, true); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf( + Boundedness.BOUNDED, Collections.singleton(dataset)); + } + /** Builder for {@link JdbcRowDataInputFormat}. */ public static class Builder { private JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java index dd39fa27e..1466df45c 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java @@ -20,18 +20,26 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.LookupFunction; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter; +import org.apache.flink.table.types.utils.TypeConversions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +54,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -53,7 +62,7 @@ /** A lookup function for {@link JdbcDynamicTableSource}. */ @Internal -public class JdbcRowDataLookupFunction extends LookupFunction { +public class JdbcRowDataLookupFunction extends LookupFunction implements LineageVertexProvider { private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class); private static final long serialVersionUID = 2L; @@ -67,6 +76,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private final List resolvedPredicates; private final Serializable[] pushdownParams; + private final RowType producedType; private transient FieldNamedPreparedStatement statement; @@ -106,12 +116,12 @@ public JdbcRowDataLookupFunction( .getSelectFromStatement(options.getTableName(), fieldNames, keyNames); JdbcDialect jdbcDialect = options.getDialect(); this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType); - this.lookupKeyRowConverter = - jdbcDialect.getRowConverter( - RowType.of( - Arrays.stream(keyTypes) - .map(DataType::getLogicalType) - .toArray(LogicalType[]::new))); + this.producedType = + RowType.of( + Arrays.stream(keyTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new)); + this.lookupKeyRowConverter = jdbcDialect.getRowConverter(producedType); this.resolvedPredicates = resolvedPredicates; this.pushdownParams = pushdownParams; } @@ -224,4 +234,19 @@ public void close() throws IOException { public Connection getDbConnection() { return connectionProvider.getConnection(); } + + @Override + public LineageVertex getLineageVertex() { + DefaultTypeDatasetFacet defaultTypeDatasetFacet = + new DefaultTypeDatasetFacet( + LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo( + TypeConversions.fromLogicalToDataType(producedType))); + Optional nameOpt = LineageUtils.tableNameOf(query, true); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf( + nameOpt.orElse(""), namespace, Arrays.asList(defaultTypeDatasetFacet)); + return LineageUtils.sourceLineageVertexOf( + Boundedness.BOUNDED, Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java index 4c48f799e..3c0c00e40 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java @@ -149,4 +149,8 @@ public Connection reestablishConnection() throws SQLException, ClassNotFoundExce closeConnection(); return getOrEstablishConnection(); } + + public String getDbURL() { + return this.jdbcOptions.getDbURL(); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java index c031419db..d01afee8b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -36,7 +38,7 @@ /** A generic SinkFunction for JDBC. */ @Internal public class GenericJdbcSinkFunction extends RichSinkFunction - implements CheckpointedFunction, InputTypeConfigurable { + implements LineageVertexProvider, CheckpointedFunction, InputTypeConfigurable { private final JdbcOutputFormat outputFormat; private JdbcOutputSerializer serializer; @@ -78,4 +80,9 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi ((TypeInformation) type) .createSerializer(executionConfig.getSerializerConfig())); } + + @Override + public LineageVertex getLineageVertex() { + return outputFormat.getLineageVertex(); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java index 7668236ef..e02657bf2 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java @@ -23,6 +23,10 @@ import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; +import org.apache.flink.connector.jdbc.lineage.LineageUtils; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.function.SerializableSupplier; @@ -36,6 +40,8 @@ import java.io.Serializable; import java.sql.Connection; import java.sql.SQLException; +import java.util.Collections; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -47,7 +53,7 @@ /** A JDBC outputFormat that supports batching records before writing records to database. */ @Internal public class JdbcOutputFormat> - implements Flushable, AutoCloseable, Serializable { + implements LineageVertexProvider, Flushable, AutoCloseable, Serializable { protected final JdbcConnectionProvider connectionProvider; @@ -247,4 +253,16 @@ public JdbcExecutionOptions getExecutionOptions() { public Connection getConnection() { return connectionProvider.getConnection(); } + + @Override + public LineageVertex getLineageVertex() { + Optional nameOpt = + jdbcStatementExecutor == null + ? Optional.empty() + : LineageUtils.tableNameOf(jdbcStatementExecutor.insertSql(), false); + String namespace = LineageUtils.namespaceOf(connectionProvider); + LineageDataset dataset = + LineageUtils.datasetOf(nameOpt.orElse(""), namespace, Collections.emptyList()); + return LineageUtils.lineageVertexOf(Collections.singleton(dataset)); + } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java index 838cbc529..bcf519616 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/InsertOrUpdateJdbcExecutor.java @@ -108,6 +108,11 @@ public void executeBatch() throws SQLException { } } + @Override + public String insertSql() { + return insertSQL; + } + private void processOneRowInBatch(K pk, V row) throws SQLException { if (exist(pk)) { updateSetter.accept(updateStatement, row); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java index 3e52e87f8..577f04223 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor.java @@ -40,6 +40,9 @@ public interface JdbcBatchStatementExecutor { /** Close JDBC related statements. */ void closeStatements() throws SQLException; + /** return the insert sql of the executor. */ + String insertSql(); + static JdbcBatchStatementExecutor keyed( String sql, Function keyExtractor, JdbcStatementBuilder statementBuilder) { return new KeyedBatchStatementExecutor<>(sql, keyExtractor, statementBuilder); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java index c16f275f6..e14eedd09 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/KeyedBatchStatementExecutor.java @@ -79,6 +79,11 @@ public void executeBatch() throws SQLException { } } + @Override + public String insertSql() { + return sql; + } + @Override public void closeStatements() throws SQLException { if (st != null) { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java index 253fdb42b..76102a81e 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/SimpleBatchStatementExecutor.java @@ -71,6 +71,11 @@ public void executeBatch() throws SQLException { } } + @Override + public String insertSql() { + return sql; + } + @Override public void closeStatements() throws SQLException { if (st != null) { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java index fd062ccc2..ac013d85b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java @@ -84,6 +84,11 @@ private boolean changeFlag(RowKind rowKind) { } } + @Override + public String insertSql() { + return upsertExecutor.insertSql(); + } + @Override public void executeBatch() throws SQLException { if (!reduceBuffer.isEmpty()) { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java index 7e5e50ec0..17e2f5df1 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferedStatementExecutor.java @@ -61,6 +61,11 @@ public void executeBatch() throws SQLException { } } + @Override + public String insertSql() { + return statementExecutor.insertSql(); + } + @Override public void closeStatements() throws SQLException { statementExecutor.closeStatements(); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java index 61ef4cc45..7df38535c 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableInsertOrUpdateStatementExecutor.java @@ -82,6 +82,11 @@ public void addToBatch(RowData record) throws SQLException { processOneRowInBatch(keyExtractor.apply(record), record); } + @Override + public String insertSql() { + return insertStatement.getQuery(); + } + private void processOneRowInBatch(RowData pk, RowData row) throws SQLException { if (exist(pk)) { updateSetter.toExternal(row, updateStatement); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java index a3fc7b7e2..88a1ad503 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableSimpleStatementExecutor.java @@ -60,6 +60,11 @@ public void addToBatch(RowData record) throws SQLException { st.addBatch(); } + @Override + public String insertSql() { + return st.getQuery(); + } + @Override public void executeBatch() throws SQLException { st.executeBatch(); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java new file mode 100644 index 000000000..989340fdd --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/DefaultTypeDatasetFacet.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.util.Objects; + +/** Default implementation of {@link TypeDatasetFacet}. */ +@PublicEvolving +public class DefaultTypeDatasetFacet implements TypeDatasetFacet { + + public static final String TYPE_FACET_NAME = "type"; + + private final TypeInformation typeInformation; + + public DefaultTypeDatasetFacet(TypeInformation typeInformation) { + this.typeInformation = typeInformation; + } + + public TypeInformation getTypeInformation() { + return typeInformation; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultTypeDatasetFacet that = (DefaultTypeDatasetFacet) o; + return Objects.equals(typeInformation, that.typeInformation); + } + + @Override + public int hashCode() { + return Objects.hash(typeInformation); + } + + @Override + public String name() { + return TYPE_FACET_NAME; + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java new file mode 100644 index 000000000..cf224da81 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocation.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.Locale; +import java.util.Optional; + +/** JDBC connection URL location. */ +@Internal +public class JdbcLocation { + private final String scheme; + private final Optional authority; + private final Optional instance; + private final Optional database; + + private JdbcLocation( + String scheme, + Optional authority, + Optional instance, + Optional database) { + this.scheme = scheme; + this.authority = authority; + this.instance = instance; + this.database = database; + } + + public String toNamespace() { + String result = scheme.toLowerCase(Locale.ROOT) + ":"; + if (authority.isPresent()) { + result = String.format("%s//%s", result, authority.get().toLowerCase(Locale.ROOT)); + } + if (instance.isPresent()) { + result = String.format("%s/%s", result, StringUtils.stripStart(instance.get(), "/")); + } + return result; + } + + public String toName(List parts) { + if (database.isPresent()) { + parts.add(0, database.get()); + } + return String.join(".", parts); + } + + public String getScheme() { + return this.scheme; + } + + public Optional getAuthority() { + return this.authority; + } + + public Optional getInstance() { + return this.instance; + } + + public Optional getDatabase() { + return this.database; + } + + public static JdbcLocation.Builder builder() { + return new JdbcLocation.Builder(); + } + + /** Builder for {@link JdbcLocation}. */ + @PublicEvolving + public static final class Builder { + private String scheme = ""; + private Optional authority = Optional.empty(); + private Optional instance = Optional.empty(); + private Optional database = Optional.empty(); + + public Builder withScheme(String scheme) { + this.scheme = scheme; + return this; + } + + public Builder withAuthority(Optional authority) { + this.authority = authority; + return this; + } + + public Builder withInstance(Optional instance) { + this.instance = instance; + return this; + } + + public Builder withDatabase(Optional database) { + this.database = database; + return this; + } + + public JdbcLocation build() { + return new JdbcLocation(scheme, authority, instance, database); + } + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java new file mode 100644 index 000000000..ec5d9e8ef --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcLocationExtractorFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; + +/** + * A factory to create a specific {@link JdbcExtractor}. This factory is used with Java's Service + * Provider Interfaces (SPI) for discovering. + * + *

Classes that implement this interface can be added to the + * "META_INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory" file of + * a JAR file in the current classpath to be found. + * + * @see JdbcExtractor + */ +@PublicEvolving +public interface JdbcLocationExtractorFactory { + + JdbcExtractor createExtractor(); +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java new file mode 100644 index 000000000..71775d8ea --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/JdbcUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.ServiceLoader; + +/** Utils for JDBC url preprocess and namespace extraction. */ +public class JdbcUtils { + private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + private static final String SLASH_DELIMITER_USER_PASSWORD_REGEX = + "[A-Za-z0-9_%]+//?[A-Za-z0-9_%]*@"; + private static final String COLON_DELIMITER_USER_PASSWORD_REGEX = + "([/|,])[A-Za-z0-9_%]+:?[A-Za-z0-9_%]*@"; + private static final String PARAMS_USER_PASSWORD_REGEX = + "(?i)[,;&:]?(?:user|username|password)=[^,;&:()]+[,;&:]?"; + private static final String DUPLICATED_DELIMITERS = "(\\(\\)){2,}|[,;&:]{2,}"; + private static final String QUERY_PARAMS_REGEX = "\\?.*$"; + + private static final List extractors = new ArrayList<>(); + + static { + for (JdbcLocationExtractorFactory factory : + ServiceLoader.load(JdbcLocationExtractorFactory.class)) { + extractors.add(factory.createExtractor()); + } + } + + /** + * Get JDBC namespace from JdbcUrl. + * + * @param jdbcUrl JDBC URL + * @param properties connection properties + * @return namespace String + */ + public static String getJdbcNamespace(String jdbcUrl, Properties properties) { + String uri = jdbcUrl.replaceAll("^(?i)jdbc:", ""); + try { + JdbcExtractor extractor = getExtractor(uri); + return extractor.extract(uri, properties).toNamespace(); + } catch (URISyntaxException e) { + LOG.debug("Failed to parse jdbc url", e); + return dropSensitiveData(uri); + } + } + + /** + * Get the corresponding JdbcExtractor of the jdbc Url. + * + * @param jdbcUrl JDBC URL + * @return JdbcExtractor + * @throws URISyntaxException + */ + public static JdbcExtractor getExtractor(String jdbcUrl) throws URISyntaxException { + for (JdbcExtractor extractor : extractors) { + if (extractor.isDefinedAt(jdbcUrl)) { + return extractor; + } + } + + throw new URISyntaxException(jdbcUrl, "Unsupported JDBC URL"); + } + + /** + * JdbcUrl can contain username and password this method clean-up credentials from jdbcUrl. Also + * drop query params as they include a lot of useless options, like timeout + * + * @param jdbcUrl url to database + * @return String + */ + private static String dropSensitiveData(String jdbcUrl) { + return jdbcUrl.replaceAll(SLASH_DELIMITER_USER_PASSWORD_REGEX, "@") + .replaceAll(COLON_DELIMITER_USER_PASSWORD_REGEX, "$1") + .replaceAll(PARAMS_USER_PASSWORD_REGEX, "") + .replaceAll(DUPLICATED_DELIMITERS, "") + .replaceAll(QUERY_PARAMS_REGEX, ""); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java new file mode 100644 index 000000000..0a09671c4 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/LineageUtils.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; +import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; + +import io.openlineage.sql.DbTableMeta; +import io.openlineage.sql.OpenLineageSql; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Utils for Lineage metadata extraction. */ +@PublicEvolving +public class LineageUtils { + + public static Optional nameOf( + JdbcQueryStatement jdbcQueryStatement, boolean isSource) { + if (!(jdbcQueryStatement instanceof SimpleJdbcQueryStatement)) { + return Optional.empty(); + } + + SimpleJdbcQueryStatement simpleJdbcQueryStatement = + (SimpleJdbcQueryStatement) jdbcQueryStatement; + return tableNameOf(simpleJdbcQueryStatement.query(), isSource); + } + + public static Optional tableNameOf(String query, boolean isSource) { + return OpenLineageSql.parse(Arrays.asList(query)) + .map( + sqlMeta -> + isSource + ? getFirstQualifiedName(sqlMeta.inTables()) + : getFirstQualifiedName(sqlMeta.outTables())); + } + + public static String namespaceOf(JdbcConnectionProvider jdbcConnectionProvider) { + if (!(jdbcConnectionProvider instanceof SimpleJdbcConnectionProvider)) { + return ""; + } + + SimpleJdbcConnectionProvider simpleJdbcConnectionProvider = + (SimpleJdbcConnectionProvider) jdbcConnectionProvider; + + return JdbcUtils.getJdbcNamespace( + simpleJdbcConnectionProvider.getDbURL(), + simpleJdbcConnectionProvider.getProperties()); + } + + public static LineageDataset datasetOf( + String name, String namespace, List facets) { + + return new LineageDataset() { + @Override + public String name() { + return name; + } + + @Override + public String namespace() { + return namespace; + } + + @Override + public Map facets() { + Map facetMap = new HashMap<>(); + facetMap.putAll( + facets.stream() + .collect( + Collectors.toMap(LineageDatasetFacet::name, item -> item))); + return facetMap; + } + }; + } + + public static LineageVertex lineageVertexOf(Collection datasets) { + return new LineageVertex() { + @Override + public List datasets() { + return new ArrayList<>(datasets); + } + }; + } + + public static SourceLineageVertex sourceLineageVertexOf( + Boundedness boundedness, Collection datasets) { + return new SourceLineageVertex() { + @Override + public Boundedness boundedness() { + return boundedness; + } + + @Override + public List datasets() { + return new ArrayList<>(datasets); + } + }; + } + + private static String getFirstQualifiedName(List tableMetas) { + return tableMetas.isEmpty() ? "" : tableMetas.get(0).qualifiedName(); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java new file mode 100644 index 000000000..4f343f394 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/lineage/TypeDatasetFacet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; + +/** Facet definition to contain type information of source and sink. */ +@PublicEvolving +public interface TypeDatasetFacet extends LineageDatasetFacet { + + TypeInformation getTypeInformation(); +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java index 85814ece9..d82297839 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java @@ -83,6 +83,13 @@ static FieldNamedPreparedStatement prepareStatement( connection, sql, fieldNames, additionalPredicates, numberOfDynamicParams); } + /** + * Returns the final prepared query. + * + * @return prepared query + */ + String getQuery(); + /** * Clears the current parameter values immediately. * diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java index fc05b90bf..ea2bac1ef 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java @@ -39,10 +39,18 @@ public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatem private final PreparedStatement statement; private final int[][] indexMapping; + private final String query; - private FieldNamedPreparedStatementImpl(PreparedStatement statement, int[][] indexMapping) { + private FieldNamedPreparedStatementImpl( + PreparedStatement statement, int[][] indexMapping, String query) { this.statement = statement; this.indexMapping = indexMapping; + this.query = query; + } + + @Override + public String getQuery() { + return query; } @Override @@ -221,7 +229,7 @@ public static FieldNamedPreparedStatement prepareStatement( } return new FieldNamedPreparedStatementImpl( - connection.prepareStatement(parsedSQL), indexMapping); + connection.prepareStatement(parsedSQL), indexMapping, parsedSQL); } /** diff --git a/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE b/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000..0be718a06 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/resources/META-INF/NOTICE @@ -0,0 +1,11 @@ +flink-connector-jdbc-core + +Copyright 2014-2024 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) +- io.openlineage:openlineage-sql-java:1.32.0 +- io.openlineage:openlineage-java:1.32.0 + diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java index f72434db7..61b897e5e 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java @@ -18,10 +18,14 @@ package org.apache.flink.connector.jdbc; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.jdbc.lineage.DefaultTypeDatasetFacet; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.api.lineage.LineageDataset; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.types.Row; import org.junit.jupiter.api.AfterEach; @@ -380,6 +384,34 @@ void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException jdbcInputFormat.closeInputFormat(); } + @Test + void testGetLineageVertex() { + jdbcInputFormat = + JdbcInputFormat.buildJdbcInputFormat() + .setDrivername(getMetadata().getDriverClass()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(ROW_TYPE_INFO) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + SourceLineageVertex lineageVertex = + (SourceLineageVertex) jdbcInputFormat.getLineageVertex(); + + assertThat(lineageVertex.datasets().size()).isEqualTo(1); + assertThat(lineageVertex.boundedness()).isEqualTo(Boundedness.BOUNDED); + LineageDataset lineageDataset = lineageVertex.datasets().get(0); + assertThat(lineageDataset.name()).isEqualTo("books"); + assertThat(lineageDataset.namespace()).isEqualTo("derby:memory:test"); + assertThat(lineageDataset.facets().size()).isEqualTo(1); + assertThat(lineageDataset.facets().size()).isEqualTo(1); + assertThat( + ((DefaultTypeDatasetFacet) lineageDataset.facets().values().toArray()[0]) + .getTypeInformation() + .getArity()) + .isEqualTo(5); + } + private void verifySplit(InputSplit split, int expectedIDSum) throws IOException { int sum = 0; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java index 4d8a20b6c..305837e7b 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; +import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.types.Row; import org.junit.jupiter.api.AfterEach; @@ -282,6 +283,24 @@ void testJdbcOutputFormat() throws IOException, SQLException { } } + @Test + void testGetLineageVertex() throws Exception { + jdbcOutputFormat = + JdbcRowOutputFormat.buildJdbcOutputFormat() + .setDrivername(getMetadata().getDriverClass()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE)) + .finish(); + JdbcOutputSerializer serializer = + JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true)); + jdbcOutputFormat.open(serializer); + + LineageVertex lineageVertex = jdbcOutputFormat.getLineageVertex(); + assertThat(lineageVertex.datasets().size()).isEqualTo(1); + assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("newbooks"); + assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test"); + } + @Test void testFlush() throws SQLException, IOException { jdbcOutputFormat = diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java index 48fe94f16..d8ab0d717 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java @@ -21,6 +21,7 @@ import org.apache.flink.connector.jdbc.testutils.TableManaged; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.util.RestartStrategyUtils; import org.junit.jupiter.api.Test; @@ -108,6 +109,19 @@ public void testInsertWithObjectReuse() throws Exception { assertResult(BOOKS); } + @Test + public void testGetLineageVertex() { + JdbcSink sink = + finishSink( + new JdbcSinkBuilder() + .withQueryStatement( + TEST_TABLE.getInsertIntoQuery(), + TEST_TABLE.getStatementBuilder())); + + LineageVertex lineageVertex = sink.getLineageVertex(); + assertThat(lineageVertex.datasets().size()).isEqualTo(1); + } + private void assertResult(List expected) throws SQLException { assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(expected); } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java index a22e6859c..0f20a9c82 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java @@ -20,11 +20,13 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.jdbc.JdbcDataTestBase; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.apache.flink.streaming.api.lineage.SourceLineageVertex; import org.apache.flink.streaming.util.RestartStrategyUtils; import org.junit.jupiter.api.BeforeEach; @@ -135,6 +137,27 @@ void testReadWithParallelismWithParamsProvider() throws Exception { assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA); } + @Test + void testGetLineageVertex() { + JdbcSource jdbcSource = + JdbcSource.builder() + .setTypeInformation(TypeInformation.of(TestEntry.class)) + .setSql(sql + " where id >= ? and id <= ?") + .setJdbcParameterValuesProvider( + new JdbcGenericParameterValuesProvider( + new Serializable[][] {{1001, 1005}, {1006, 1010}})) + .setDBUrl(getMetadata().getJdbcUrl()) + .setDriverName(getMetadata().getDriverClass()) + .setResultExtractor(extractor) + .build(); + + SourceLineageVertex lineageVertex = (SourceLineageVertex) jdbcSource.getLineageVertex(); + assertThat(lineageVertex.boundedness()).isEqualTo(Boundedness.BOUNDED); + assertThat(lineageVertex.datasets().size()).isEqualTo(1); + assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("books"); + assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test"); + } + /** A sink function to collect the records. */ static class TestingSinkFunction implements SinkFunction { diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java index 32618d920..af549ec6d 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcOutputFormatTest.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; @@ -371,6 +372,39 @@ void testJdbcOutputFormat() throws IOException, SQLException { } } + @Test + void testGetLineageVertex() throws Exception { + InternalJdbcConnectionOptions jdbcOptions = + InternalJdbcConnectionOptions.builder() + .setDriverName(getMetadata().getDriverClass()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setTableName(OUTPUT_TABLE) + .build(); + JdbcDmlOptions dmlOptions = + JdbcDmlOptions.builder() + .withTableName(jdbcOptions.getTableName()) + .withDialect(jdbcOptions.getDialect()) + .withFieldNames(fieldNames) + .build(); + + outputFormat = + new JdbcOutputFormatBuilder() + .setJdbcOptions(jdbcOptions) + .setFieldDataTypes(fieldDataTypes) + .setJdbcDmlOptions(dmlOptions) + .setJdbcExecutionOptions(JdbcExecutionOptions.builder().build()) + .build(); + + JdbcOutputSerializer serializer = + JdbcOutputSerializer.of(getSerializer(TypeInformation.of(RowData.class), true)); + outputFormat.open(serializer); + + LineageVertex lineageVertex = outputFormat.getLineageVertex(); + assertThat(lineageVertex.datasets().size()).isEqualTo(1); + assertThat(lineageVertex.datasets().get(0).name()).isEqualTo("newbooks"); + assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test"); + } + @Test void testFlush() throws SQLException, IOException { InternalJdbcConnectionOptions jdbcOptions = diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java index ba986cf52..171536d02 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; +import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.types.Row; import org.junit.jupiter.api.AfterEach; @@ -121,6 +122,11 @@ public void executeBatch() throws SQLException { } } + @Override + public String insertSql() { + return ""; + } + @Override public void prepareStatements(Connection connection) {} @@ -139,6 +145,11 @@ public void prepareStatements(Connection connection) { } } + @Override + public String insertSql() { + return ""; + } + @Override public void addToBatch(Row record) {} @@ -213,6 +224,36 @@ void testJdbcOutputFormat() throws Exception { check(expected); } + @Test + void testGetLineageVertex() throws Exception { + InternalJdbcConnectionOptions options = + InternalJdbcConnectionOptions.builder() + .setDBUrl(getMetadata().getJdbcUrl()) + .setTableName(OUTPUT_TABLE) + .build(); + JdbcDmlOptions dmlOptions = + JdbcDmlOptions.builder() + .withTableName(options.getTableName()) + .withDialect(options.getDialect()) + .withFieldNames(fieldNames) + .withKeyFields(keyFields) + .build(); + format = + new TableJdbcUpsertOutputFormat( + new SimpleJdbcConnectionProvider(options), + dmlOptions, + JdbcExecutionOptions.defaults()); + + JdbcOutputSerializer serializer = + JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true)); + format.open(serializer); + + LineageVertex lineageVertex = format.getLineageVertex(); + assertThat(lineageVertex.datasets().size()).isEqualTo(1); + assertThat(lineageVertex.datasets().get(0).name()).isEqualTo(OUTPUT_TABLE); + assertThat(lineageVertex.datasets().get(0).namespace()).isEqualTo("derby:memory:test"); + } + private void check(Row[] rows) throws SQLException { check(rows, getMetadata().getJdbcUrl(), OUTPUT_TABLE, fieldNames); } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java new file mode 100644 index 000000000..ad90e123a --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/JdbcUtilsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import org.junit.jupiter.api.Test; + +import java.net.URISyntaxException; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link JdbcUtils}. */ +class JdbcUtilsTest { + private static final String TEST_JDBC_URL = "jdbc:test://localhost/testdb"; + private static final String UNSUPPORTED_JDBC_URL = "jdbc:unsupported://localhost:8990/testdb"; + + @Test + void testGetTestJdbcExtractor() throws Exception { + JdbcExtractor jdbcExtractor = JdbcUtils.getExtractor("test://localhost/testdb"); + assertThat(jdbcExtractor).isInstanceOf(TestJdbcExtractor.class); + } + + @Test + void testGetUnsupportedJdbcExtractor() { + assertThatThrownBy( + () -> { + JdbcUtils.getExtractor("unsupported://localhost:8990/testdb"); + }) + .isInstanceOf(URISyntaxException.class) + .hasMessage("Unsupported JDBC URL: unsupported://localhost:8990/testdb"); + } + + @Test + void testGetValidJdbcNamespace() { + String test = JdbcUtils.getJdbcNamespace(TEST_JDBC_URL, new Properties()); + assertThat(test).isEqualTo("test://localhost:10051"); + } + + @Test + void testUnsupportedJdbcNamespace() { + String test = JdbcUtils.getJdbcNamespace(UNSUPPORTED_JDBC_URL, new Properties()); + assertThat(test).isEqualTo("unsupported://localhost:8990/testdb"); + } +} diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java new file mode 100644 index 000000000..4597a7e4e --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/LineageUtilsTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LineageUtils}. */ +class LineageUtilsTest { + + @Test + void testGetNameFromJdbcQueryStatement() { + SimpleJdbcQueryStatement simpleStatement = + new SimpleJdbcQueryStatement("select * from test_table", null); + + Optional tableNameOpt = LineageUtils.nameOf(simpleStatement, true); + assertThat(tableNameOpt).isNotEmpty(); + assertThat(tableNameOpt.get()).isEqualTo("test_table"); + } + + @Test + void testGetNamespaceFromJdbcConnectionProvider() { + JdbcConnectionOptions options = + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl("jdbc:test://localhost/test_db") + .build(); + SimpleJdbcConnectionProvider provider = new SimpleJdbcConnectionProvider(options); + String namespace = LineageUtils.namespaceOf(provider); + assertThat(namespace).isEqualTo("test://localhost:10051"); + } +} diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java new file mode 100644 index 000000000..84021db66 --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcExtractor.java @@ -0,0 +1,25 @@ +package org.apache.flink.connector.jdbc.lineage; + +import io.openlineage.client.utils.jdbc.GenericJdbcExtractor; +import io.openlineage.client.utils.jdbc.JdbcLocation; +import io.openlineage.client.utils.jdbc.OverridingJdbcExtractor; + +import java.net.URISyntaxException; +import java.util.Properties; + +/** Implementation of {@link io.openlineage.client.utils.jdbc.JdbcExtractor} for test purpose. */ +public class TestJdbcExtractor extends GenericJdbcExtractor { + private OverridingJdbcExtractor delegate; + + public TestJdbcExtractor() { + this.delegate = new OverridingJdbcExtractor("test", "10051"); + } + + public boolean isDefinedAt(String jdbcUri) { + return this.delegate.isDefinedAt(jdbcUri); + } + + public JdbcLocation extract(String rawUri, Properties properties) throws URISyntaxException { + return this.delegate.extract(rawUri, properties); + } +} diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java new file mode 100644 index 000000000..70c2a6fed --- /dev/null +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/lineage/TestJdbcLocationExtractorFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.lineage; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; + +/** JdbcLocationExtractorFactory for {@link TestJdbcExtractor}. */ +public class TestJdbcLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new TestJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..00d341737 --- /dev/null +++ b/flink-connector-jdbc-core/src/test/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.lineage.TestJdbcLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java new file mode 100644 index 000000000..ad17dfd3e --- /dev/null +++ b/flink-connector-jdbc-cratedb/src/main/java/org/apache/flink/connector/jdbc/cratedb/database/lineage/CrateLocationExtractorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.cratedb.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.CrateJdbcExtractor; +import io.openlineage.client.utils.jdbc.JdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for CrateDB. */ +@Internal +public class CrateLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new CrateJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..bcd4dfebd --- /dev/null +++ b/flink-connector-jdbc-cratedb/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.cratedb.database.lineage.CrateLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java new file mode 100644 index 000000000..d4a0de2da --- /dev/null +++ b/flink-connector-jdbc-db2/src/main/java/org/apache/flink/connector/jdbc/db2/database/lineage/Db2LocationExtractorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.db2.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.Db2JdbcExtractor; +import io.openlineage.client.utils.jdbc.JdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for DB2. */ +@Internal +public class Db2LocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new Db2JdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..9f066f72e --- /dev/null +++ b/flink-connector-jdbc-db2/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.db2.database.lineage.Db2LocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java new file mode 100644 index 000000000..d1c5e00e7 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/main/java/org/apache/flink/connector/jdbc/mysql/database/lineage/MySqlLocationExtractorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.mysql.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import io.openlineage.client.utils.jdbc.MySqlJdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Mysql. */ +@Internal +public class MySqlLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new MySqlJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..b681048a5 --- /dev/null +++ b/flink-connector-jdbc-mysql/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.mysql.database.lineage.MySqlLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java new file mode 100644 index 000000000..f60e504a8 --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/main/java/org/apache/flink/connector/jdbc/oceanbase/database/lineage/OceanBaseLocationExtractorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oceanbase.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import io.openlineage.client.utils.jdbc.OceanBaseJdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for OceanBase. */ +@Internal +public class OceanBaseLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new OceanBaseJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..678ddb655 --- /dev/null +++ b/flink-connector-jdbc-oceanbase/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.oceanbase.database.lineage.OceanBaseLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java new file mode 100644 index 000000000..7ace87a4c --- /dev/null +++ b/flink-connector-jdbc-oracle/src/main/java/org/apache/flink/connector/jdbc/oracle/database/lineage/OracleLocationExtractorFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.oracle.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import io.openlineage.client.utils.jdbc.OracleJdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Oracle. */ +@Internal +public class OracleLocationExtractorFactory implements JdbcLocationExtractorFactory { + @Override + public JdbcExtractor createExtractor() { + return new OracleJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..7428410d8 --- /dev/null +++ b/flink-connector-jdbc-oracle/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.oracle.database.lineage.OracleLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java new file mode 100644 index 000000000..4ecb74be8 --- /dev/null +++ b/flink-connector-jdbc-postgres/src/main/java/org/apache/flink/connector/jdbc/postgres/database/lineage/PostgresLocationExtractorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.postgres.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import io.openlineage.client.utils.jdbc.PostgresJdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Postgres. */ +@Internal +public class PostgresLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new PostgresJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..0ea5acd7e --- /dev/null +++ b/flink-connector-jdbc-postgres/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.postgres.database.lineage.PostgresLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java new file mode 100644 index 000000000..94fff01a8 --- /dev/null +++ b/flink-connector-jdbc-sqlserver/src/main/java/org/apache/flink/connector/jdbc/sqlserver/database/lineage/SqlServerLocationExtractorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.sqlserver.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import io.openlineage.client.utils.jdbc.SqlServerJdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for SqlServer. */ +@Internal +public class SqlServerLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new SqlServerJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..2598cebb7 --- /dev/null +++ b/flink-connector-jdbc-sqlserver/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.sqlserver.database.lineage.SqlServerLocationExtractorFactory \ No newline at end of file diff --git a/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java new file mode 100644 index 000000000..f239f9788 --- /dev/null +++ b/flink-connector-jdbc-trino/src/main/java/org/apache/flink/connector/jdbc/trino/database/lineage/TrinoLocationExtractorFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.trino.database.lineage; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory; + +import io.openlineage.client.utils.jdbc.JdbcExtractor; +import io.openlineage.client.utils.jdbc.TrinoJdbcExtractor; + +/** Implementation of {@link JdbcLocationExtractorFactory} for Trino. */ +@Internal +public class TrinoLocationExtractorFactory implements JdbcLocationExtractorFactory { + + @Override + public JdbcExtractor createExtractor() { + return new TrinoJdbcExtractor(); + } +} diff --git a/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory new file mode 100644 index 000000000..8be9bbb75 --- /dev/null +++ b/flink-connector-jdbc-trino/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.lineage.JdbcLocationExtractorFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.trino.database.lineage.TrinoLocationExtractorFactory \ No newline at end of file diff --git a/pom.xml b/pom.xml index fe2fc778b..a21db8ef3 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ under the License. 3.24.2 1.20.1 3.12.4 + 1.32.0 3.0.0-1.16 @@ -253,6 +254,18 @@ under the License. ${log4j.version} + + io.openlineage + openlineage-sql-java + ${openlineage.version} + + + + io.openlineage + openlineage-java + ${openlineage.version} + + com.fasterxml.jackson