diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/RowDataResultExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/RowDataResultExtractor.java new file mode 100644 index 000000000..acb18f633 --- /dev/null +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/RowDataResultExtractor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.core.table; + +import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import java.sql.ResultSet; +import java.sql.SQLException; + +/** The result extractor for {@link RowData}. */ +public class RowDataResultExtractor implements ResultExtractor { + + private final JdbcDialectConverter jdbcDialectConverter; + + public RowDataResultExtractor(JdbcDialectConverter jdbcDialectConverter) { + this.jdbcDialectConverter = Preconditions.checkNotNull(jdbcDialectConverter); + } + + @Override + public RowData extract(ResultSet resultSet) throws SQLException { + return jdbcDialectConverter.toInternal(resultSet); + } +} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java index d26898a53..d732230a2 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java @@ -19,18 +19,27 @@ package org.apache.flink.connector.jdbc.core.table.source; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder; import org.apache.flink.connector.jdbc.core.table.FilterHandlingPolicy; +import org.apache.flink.connector.jdbc.core.table.RowDataResultExtractor; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions; import org.apache.flink.connector.jdbc.split.CompositeJdbcParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; @@ -39,6 +48,7 @@ import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.types.DataType; @@ -70,6 +80,8 @@ public class JdbcDynamicTableSource SupportsFilterPushDown { private static final Logger LOG = LoggerFactory.getLogger(JdbcDynamicTableSource.class); + private static final String JDBC_TRANSFORMATION = "jdbc"; + private final InternalJdbcConnectionOptions options; private final JdbcReadOptions readOptions; private final int lookupMaxRetryTimes; @@ -126,17 +138,17 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { } @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - final JdbcRowDataInputFormat.Builder builder = - JdbcRowDataInputFormat.builder() - .setDrivername(options.getDriverName()) + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + final JdbcSourceBuilder builder = + JdbcSource.builder() + .setDriverName(options.getDriverName()) .setDBUrl(options.getDbURL()) .setUsername(options.getUsername().orElse(null)) .setPassword(options.getPassword().orElse(null)) .setAutoCommit(readOptions.getAutoCommit()); if (readOptions.getFetchSize() != 0) { - builder.setFetchSize(readOptions.getFetchSize()); + builder.setResultSetFetchSize(readOptions.getFetchSize()); } final JdbcDialect dialect = options.getDialect(); String query = @@ -158,19 +170,19 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon .ofBatchNum(numPartitions), new JdbcGenericParameterValuesProvider(allPushdownParams)); - builder.setParametersProvider(allParams); + builder.setJdbcParameterValuesProvider(allParams); predicates.add( dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) + " BETWEEN ? AND ?"); } else { - builder.setParametersProvider( + builder.setJdbcParameterValuesProvider( new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1))); } predicates.addAll(this.resolvedPredicates); - if (predicates.size() > 0) { + if (!predicates.isEmpty()) { String joinedConditions = predicates.stream() .map(pred -> String.format("(%s)", pred)) @@ -184,13 +196,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon LOG.debug("Query generated for JDBC scan: " + query); - builder.setQuery(query); + builder.setSql(query); final RowType rowType = (RowType) physicalRowDataType.getLogicalType(); - builder.setRowConverter(dialect.getRowConverter(rowType)); - builder.setRowDataTypeInfo( - runtimeProviderContext.createTypeInformation(physicalRowDataType)); - - return InputFormatProvider.of(builder.build()); + builder.setResultExtractor(new RowDataResultExtractor(dialect.getRowConverter(rowType))); + builder.setTypeInformation(scanContext.createTypeInformation(physicalRowDataType)); + options.getProperties() + .forEach( + (key, value) -> + builder.setConnectionProperty(key.toString(), value.toString())); + return new JdbcDataStreamScanProvider(builder.build()); } @Override @@ -314,4 +328,30 @@ private Serializable[][] replicatePushdownParamsForN(int n) { } return allPushdownParams; } + + private static class JdbcDataStreamScanProvider implements DataStreamScanProvider { + + private final JdbcSource source; + + public JdbcDataStreamScanProvider(JdbcSource source) { + this.source = Preconditions.checkNotNull(source); + } + + @Override + public DataStream produceDataStream( + ProviderContext providerContext, StreamExecutionEnvironment execEnv) { + DataStreamSource sourceStream = + execEnv.fromSource( + source, + WatermarkStrategy.noWatermarks(), + JdbcDynamicTableSource.class.getSimpleName()); + providerContext.generateUid(JDBC_TRANSFORMATION).ifPresent(sourceStream::uid); + return sourceStream; + } + + @Override + public boolean isBounded() { + return source.getBoundedness() == Boundedness.BOUNDED; + } + } }