diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml
index cd09cde5354..e5f330545c5 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/pom.xml
@@ -151,11 +151,30 @@ limitations under the License.
test
+
- org.testcontainers
- junit-jupiter
- ${testcontainers.version}
- test
+ com.esri.geometry
+ esri-geometry-api
+ ${geometry.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+
+ io.debezium
+ debezium-connector-mysql
+ 1.9.8.Final
+ compile
+
+
+ org.apache.flink
+ flink-cdc-base
+ ${project.version}
+ compile
org.apache.flink
@@ -163,6 +182,7 @@ limitations under the License.
${project.version}
test
+
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/Listeners/TiDBAntlrDdlParserListener.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/Listeners/TiDBAntlrDdlParserListener.java
new file mode 100644
index 00000000000..531370e6c42
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/Listeners/TiDBAntlrDdlParserListener.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.tidb.Listeners;
+
+import io.debezium.antlr.AntlrDdlParserListener;
+import io.debezium.antlr.ProxyParseTreeListenerUtil;
+import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener;
+import io.debezium.connector.mysql.antlr.listener.AlterViewParserListener;
+import io.debezium.connector.mysql.antlr.listener.CreateAndAlterDatabaseParserListener;
+import io.debezium.connector.mysql.antlr.listener.CreateTableParserListener;
+import io.debezium.connector.mysql.antlr.listener.CreateUniqueIndexParserListener;
+import io.debezium.connector.mysql.antlr.listener.CreateViewParserListener;
+import io.debezium.connector.mysql.antlr.listener.DropDatabaseParserListener;
+import io.debezium.connector.mysql.antlr.listener.DropTableParserListener;
+import io.debezium.connector.mysql.antlr.listener.DropViewParserListener;
+import io.debezium.connector.mysql.antlr.listener.RenameTableParserListener;
+import io.debezium.connector.mysql.antlr.listener.SetStatementParserListener;
+import io.debezium.connector.mysql.antlr.listener.TruncateTableParserListener;
+import io.debezium.connector.mysql.antlr.listener.UseStatementParserListener;
+import io.debezium.connector.tidb.TiDBAntlrDdlParser;
+import io.debezium.ddl.parser.mysql.generated.MySqlParser;
+import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
+import io.debezium.text.ParsingException;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.tree.ErrorNode;
+import org.antlr.v4.runtime.tree.ParseTreeListener;
+import org.antlr.v4.runtime.tree.TerminalNode;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class TiDBAntlrDdlParserListener extends MySqlParserBaseListener
+ implements AntlrDdlParserListener {
+ private final List listeners = new CopyOnWriteArrayList<>();
+
+ /** Flag for skipping phase. */
+ private boolean skipNodes;
+
+ /**
+ * Count of skipped nodes. Each enter event during skipping phase will increase the counter and
+ * each exit event will decrease it. When counter will be decreased to 0, the skipping phase
+ * will end.
+ */
+ private int skippedNodesCount = 0;
+
+ /** Collection of catched exceptions. */
+ private final Collection errors = new ArrayList<>();
+
+ public TiDBAntlrDdlParserListener(TiDBAntlrDdlParser parser) {
+ // initialize listeners
+ listeners.add(new CreateAndAlterDatabaseParserListener(parser));
+ listeners.add(new DropDatabaseParserListener(parser));
+ listeners.add(new CreateTableParserListener(parser, listeners));
+ listeners.add(new AlterTableParserListener(parser, listeners));
+ listeners.add(new DropTableParserListener(parser));
+ listeners.add(new RenameTableParserListener(parser));
+ listeners.add(new TruncateTableParserListener(parser));
+ listeners.add(new CreateViewParserListener(parser, listeners));
+ listeners.add(new AlterViewParserListener(parser, listeners));
+ listeners.add(new DropViewParserListener(parser));
+ listeners.add(new CreateUniqueIndexParserListener(parser));
+ listeners.add(new SetStatementParserListener(parser));
+ listeners.add(new UseStatementParserListener(parser));
+ }
+
+ /**
+ * Returns all caught errors during tree walk.
+ *
+ * @return list of Parsing exceptions
+ */
+ @Override
+ public Collection getErrors() {
+ return errors;
+ }
+
+ @Override
+ public void enterEveryRule(ParserRuleContext ctx) {
+ if (skipNodes) {
+ skippedNodesCount++;
+ } else {
+ ProxyParseTreeListenerUtil.delegateEnterRule(ctx, listeners, errors);
+ }
+ }
+
+ @Override
+ public void exitEveryRule(ParserRuleContext ctx) {
+ if (skipNodes) {
+ if (skippedNodesCount == 0) {
+ // back in the node where skipping started
+ skipNodes = false;
+ } else {
+ // going up in a tree, means decreasing a number of skipped nodes
+ skippedNodesCount--;
+ }
+ } else {
+ ProxyParseTreeListenerUtil.delegateExitRule(ctx, listeners, errors);
+ }
+ }
+
+ @Override
+ public void visitErrorNode(ErrorNode node) {
+ ProxyParseTreeListenerUtil.visitErrorNode(node, listeners, errors);
+ }
+
+ @Override
+ public void visitTerminal(TerminalNode node) {
+ ProxyParseTreeListenerUtil.visitTerminal(node, listeners, errors);
+ }
+
+ @Override
+ public void enterRoutineBody(MySqlParser.RoutineBodyContext ctx) {
+ // this is a grammar rule for BEGIN ... END part of statements. Skip it.
+ skipNodes = true;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBAntlrDdlParser.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBAntlrDdlParser.java
new file mode 100644
index 00000000000..6352666b8d2
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBAntlrDdlParser.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.tidb;
+
+import org.apache.flink.cdc.connectors.tidb.source.converter.TiDBValueConverters;
+
+import io.debezium.antlr.AntlrDdlParserListener;
+import io.debezium.antlr.DataTypeResolver;
+import io.debezium.connector.mysql.MySqlSystemVariables;
+import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
+import io.debezium.connector.tidb.Listeners.TiDBAntlrDdlParserListener;
+import io.debezium.ddl.parser.mysql.generated.MySqlLexer;
+import io.debezium.ddl.parser.mysql.generated.MySqlParser;
+import io.debezium.relational.SystemVariables;
+import io.debezium.relational.Tables;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.tree.ParseTree;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TiDBAntlrDdlParser extends MySqlAntlrDdlParser {
+ private final ConcurrentMap charsetNameForDatabase = new ConcurrentHashMap<>();
+ private final TiDBValueConverters converters;
+ private final Tables.TableFilter tableFilter;
+
+ public TiDBAntlrDdlParser() {
+ this(null, Tables.TableFilter.includeAll());
+ }
+
+ public TiDBAntlrDdlParser(TiDBValueConverters converters) {
+ this(converters, Tables.TableFilter.includeAll());
+ }
+
+ public TiDBAntlrDdlParser(TiDBValueConverters converters, Tables.TableFilter tableFilter) {
+ this(true, false, false, converters, tableFilter);
+ }
+
+ public TiDBAntlrDdlParser(
+ boolean throwErrorsFromTreeWalk,
+ boolean includeViews,
+ boolean includeComments,
+ TiDBValueConverters converters,
+ Tables.TableFilter tableFilter) {
+ // super(throwErrorsFromTreeWalk, includeViews, includeComments);
+ systemVariables = new MySqlSystemVariables();
+ this.converters = converters;
+ this.tableFilter = tableFilter;
+ }
+
+ @Override
+ protected ParseTree parseTree(MySqlParser parser) {
+ return parser.root();
+ }
+
+ @Override
+ protected AntlrDdlParserListener createParseTreeWalkerListener() {
+ return new TiDBAntlrDdlParserListener(this);
+ }
+
+ @Override
+ protected MySqlLexer createNewLexerInstance(CharStream charStreams) {
+ return new MySqlLexer(charStreams);
+ }
+
+ @Override
+ protected MySqlParser createNewParserInstance(CommonTokenStream commonTokenStream) {
+ return new MySqlParser(commonTokenStream);
+ }
+
+ @Override
+ protected SystemVariables createNewSystemVariablesInstance() {
+ return new MySqlSystemVariables();
+ }
+
+ @Override
+ protected boolean isGrammarInUpperCase() {
+ return true;
+ }
+
+ @Override
+ protected DataTypeResolver initializeDataTypeResolver() {
+ DataTypeResolver.Builder dataTypeResolverBuilder = new DataTypeResolver.Builder();
+
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.StringDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHAR),
+ new DataTypeResolver.DataTypeEntry(
+ Types.VARCHAR, MySqlParser.CHAR, MySqlParser.VARYING),
+ new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.VARCHAR),
+ new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TINYTEXT),
+ new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.TEXT),
+ new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.MEDIUMTEXT),
+ new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONGTEXT),
+ new DataTypeResolver.DataTypeEntry(Types.NCHAR, MySqlParser.NCHAR),
+ new DataTypeResolver.DataTypeEntry(
+ Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARYING),
+ new DataTypeResolver.DataTypeEntry(Types.NVARCHAR, MySqlParser.NVARCHAR),
+ new DataTypeResolver.DataTypeEntry(
+ Types.CHAR, MySqlParser.CHAR, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.VARCHAR, MySqlParser.VARCHAR, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.VARCHAR, MySqlParser.TINYTEXT, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.VARCHAR, MySqlParser.TEXT, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.VARCHAR, MySqlParser.MEDIUMTEXT, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.VARCHAR, MySqlParser.LONGTEXT, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.NCHAR, MySqlParser.NCHAR, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.NVARCHAR, MySqlParser.NVARCHAR, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.CHARACTER),
+ new DataTypeResolver.DataTypeEntry(
+ Types.VARCHAR, MySqlParser.CHARACTER, MySqlParser.VARYING)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.NationalStringDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(
+ Types.NVARCHAR, MySqlParser.NATIONAL, MySqlParser.VARCHAR)
+ .setSuffixTokens(MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.NCHAR, MySqlParser.NATIONAL, MySqlParser.CHARACTER)
+ .setSuffixTokens(MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(
+ Types.NVARCHAR, MySqlParser.NCHAR, MySqlParser.VARCHAR)
+ .setSuffixTokens(MySqlParser.BINARY)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.NationalVaryingStringDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(
+ Types.NVARCHAR,
+ MySqlParser.NATIONAL,
+ MySqlParser.CHAR,
+ MySqlParser.VARYING),
+ new DataTypeResolver.DataTypeEntry(
+ Types.NVARCHAR,
+ MySqlParser.NATIONAL,
+ MySqlParser.CHARACTER,
+ MySqlParser.VARYING)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.DimensionDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.TINYINT)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT1)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.SMALLINT)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.SMALLINT, MySqlParser.INT2)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MEDIUMINT)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT3)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.MIDDLEINT)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INTEGER)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.INT4)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.BIGINT)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.INT8)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.REAL, MySqlParser.REAL)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.DOUBLE)
+ .setSuffixTokens(
+ MySqlParser.PRECISION,
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.DOUBLE, MySqlParser.FLOAT8)
+ .setSuffixTokens(
+ MySqlParser.PRECISION,
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.FLOAT, MySqlParser.FLOAT4)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL),
+ new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DECIMAL)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL)
+ .setDefaultLengthScaleDimension(10, 0),
+ new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.DEC)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL)
+ .setDefaultLengthScaleDimension(10, 0),
+ new DataTypeResolver.DataTypeEntry(Types.DECIMAL, MySqlParser.FIXED)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL)
+ .setDefaultLengthScaleDimension(10, 0),
+ new DataTypeResolver.DataTypeEntry(Types.NUMERIC, MySqlParser.NUMERIC)
+ .setSuffixTokens(
+ MySqlParser.SIGNED,
+ MySqlParser.UNSIGNED,
+ MySqlParser.ZEROFILL)
+ .setDefaultLengthScaleDimension(10, 0),
+ new DataTypeResolver.DataTypeEntry(Types.BIT, MySqlParser.BIT),
+ new DataTypeResolver.DataTypeEntry(Types.TIME, MySqlParser.TIME),
+ new DataTypeResolver.DataTypeEntry(
+ Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP),
+ new DataTypeResolver.DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME),
+ new DataTypeResolver.DataTypeEntry(Types.BINARY, MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(Types.VARBINARY, MySqlParser.VARBINARY),
+ new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.BLOB),
+ new DataTypeResolver.DataTypeEntry(Types.INTEGER, MySqlParser.YEAR)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.SimpleDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(Types.DATE, MySqlParser.DATE),
+ new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.TINYBLOB),
+ new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.MEDIUMBLOB),
+ new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONGBLOB),
+ new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOL),
+ new DataTypeResolver.DataTypeEntry(Types.BOOLEAN, MySqlParser.BOOLEAN),
+ new DataTypeResolver.DataTypeEntry(Types.BIGINT, MySqlParser.SERIAL)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.CollectionDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.ENUM)
+ .setSuffixTokens(MySqlParser.BINARY),
+ new DataTypeResolver.DataTypeEntry(Types.CHAR, MySqlParser.SET)
+ .setSuffixTokens(MySqlParser.BINARY)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.SpatialDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(
+ Types.OTHER, MySqlParser.GEOMETRYCOLLECTION),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMCOLLECTION),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.LINESTRING),
+ new DataTypeResolver.DataTypeEntry(
+ Types.OTHER, MySqlParser.MULTILINESTRING),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOINT),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.MULTIPOLYGON),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POINT),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.POLYGON),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.JSON),
+ new DataTypeResolver.DataTypeEntry(Types.OTHER, MySqlParser.GEOMETRY)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.LongVarbinaryDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(Types.BLOB, MySqlParser.LONG)
+ .setSuffixTokens(MySqlParser.VARBINARY)));
+ dataTypeResolverBuilder.registerDataTypes(
+ MySqlParser.LongVarcharDataTypeContext.class.getCanonicalName(),
+ Arrays.asList(
+ new DataTypeResolver.DataTypeEntry(Types.VARCHAR, MySqlParser.LONG)
+ .setSuffixTokens(MySqlParser.VARCHAR)));
+
+ return dataTypeResolverBuilder.build();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBEventMetadataProvider.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBEventMetadataProvider.java
new file mode 100644
index 00000000000..e1ca467d9fb
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBEventMetadataProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.tidb;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.mysql.MySqlOffsetContext;
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.source.spi.EventMetadataProvider;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.schema.DataCollectionId;
+import io.debezium.util.Collect;
+import org.apache.kafka.connect.data.Struct;
+
+import java.time.Instant;
+import java.util.Map;
+
+import static org.apache.flink.cdc.connectors.tidb.source.offset.TiDBSourceInfo.COMMIT_VERSION_KEY;
+
+public class TiDBEventMetadataProvider implements EventMetadataProvider {
+ @Override
+ public Instant getEventTimestamp(
+ DataCollectionId source, OffsetContext offset, Object key, Struct value) {
+ if (value == null) {
+ return null;
+ }
+ final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
+ if (sourceInfo == null) {
+ return null;
+ }
+ final Long timestamp = sourceInfo.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
+ return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
+ }
+
+ @Override
+ public Map getEventSourcePosition(
+ DataCollectionId source, OffsetContext offset, Object key, Struct value) {
+ if (value == null) {
+ return null;
+ }
+ final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
+ if (source == null) {
+ return null;
+ }
+ return Collect.hashMapOf(COMMIT_VERSION_KEY, sourceInfo.getString(COMMIT_VERSION_KEY));
+ }
+
+ @Override
+ public String getTransactionId(
+ DataCollectionId source, OffsetContext offset, Object key, Struct value) {
+ return ((MySqlOffsetContext) offset).getTransactionId();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBPartition.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBPartition.java
new file mode 100644
index 00000000000..f14e595ac65
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBPartition.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.tidb;
+
+import io.debezium.pipeline.spi.Partition;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+public class TiDBPartition implements Partition {
+ private final String serverName;
+
+ public TiDBPartition(String serverName) {
+ this.serverName = serverName;
+ }
+
+ @Override
+ public Map getSourcePartition() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map getLoggingContext() {
+ return Partition.super.getLoggingContext();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final io.debezium.connector.tidb.TiDBPartition other =
+ (io.debezium.connector.tidb.TiDBPartition) obj;
+ return Objects.equals(serverName, other.serverName);
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBTaskContext.java
new file mode 100644
index 00000000000..70a61a0eebe
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TiDBTaskContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.tidb;
+
+import org.apache.flink.cdc.connectors.tidb.source.config.TiDBConnectorConfig;
+import org.apache.flink.cdc.connectors.tidb.source.schema.TiDBDatabaseSchema;
+
+import io.debezium.connector.common.CdcSourceTaskContext;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+
+public class TiDBTaskContext extends CdcSourceTaskContext {
+ private final TiDBDatabaseSchema schema;
+ private final TopicSelector topicSelector;
+
+ public TiDBTaskContext(TiDBConnectorConfig config, TiDBDatabaseSchema schema) {
+ super(config.getContextName(), config.getLogicalName(), schema::tableIds);
+ this.schema = schema;
+ topicSelector = TidbTopicSelector.defaultSelector(config);
+ }
+
+ public TiDBDatabaseSchema getSchema() {
+ return schema;
+ }
+
+ public TopicSelector getTopicSelector() {
+ return topicSelector;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TidbTopicSelector.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TidbTopicSelector.java
new file mode 100644
index 00000000000..5663029c5de
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/io/debezium/connector/tidb/TidbTopicSelector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.tidb;
+
+import org.apache.flink.cdc.connectors.tidb.source.config.TiDBConnectorConfig;
+
+import io.debezium.annotation.ThreadSafe;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+
+@ThreadSafe
+public class TidbTopicSelector {
+
+ /**
+ * Get the default topic selector logic, which uses a '.' delimiter character when needed.
+ *
+ * @param prefix the name of the prefix to be used for all topics; may not be null and must not
+ * terminate in the {@code delimiter}
+ * @param heartbeatPrefix the name of the prefix to be used for all heartbeat topics; may not be
+ * null and must not terminate in the {@code delimiter}
+ * @return the topic selector; never null
+ */
+ @Deprecated
+ public static TopicSelector defaultSelector(String prefix, String heartbeatPrefix) {
+ return TopicSelector.defaultSelector(
+ prefix,
+ heartbeatPrefix,
+ ".",
+ (t, pref, delimiter) -> String.join(delimiter, pref, t.catalog(), t.table()));
+ }
+
+ public static TopicSelector defaultSelector(TiDBConnectorConfig connectorConfig) {
+ return TopicSelector.defaultSelector(
+ connectorConfig,
+ (tableId, prefix, delimiter) ->
+ String.join(delimiter, prefix, tableId.catalog(), tableId.table()));
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java
deleted file mode 100644
index fc468e68512..00000000000
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TDBSourceOptions.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.tidb;
-
-import org.apache.flink.cdc.connectors.tidb.table.utils.UriHostMapping;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.Configuration;
-
-import org.tikv.common.ConfigUtils;
-import org.tikv.common.TiConfiguration;
-
-import java.util.Map;
-import java.util.Optional;
-
-/** Configurations for {@link TiDBSource}. */
-public class TDBSourceOptions {
-
- private TDBSourceOptions() {}
-
- public static final ConfigOption DATABASE_NAME =
- ConfigOptions.key("database-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Database name of the TiDB server to monitor.");
-
- public static final ConfigOption TABLE_NAME =
- ConfigOptions.key("table-name")
- .stringType()
- .noDefaultValue()
- .withDescription("Table name of the TiDB database to monitor.");
-
- public static final ConfigOption SCAN_STARTUP_MODE =
- ConfigOptions.key("scan.startup.mode")
- .stringType()
- .defaultValue("initial")
- .withDescription(
- "Optional startup mode for TiDB CDC consumer, valid enumerations are "
- + "\"initial\", \"latest-offset\"");
-
- public static final ConfigOption PD_ADDRESSES =
- ConfigOptions.key("pd-addresses")
- .stringType()
- .noDefaultValue()
- .withDescription("TiKV cluster's PD address");
-
- public static final ConfigOption HOST_MAPPING =
- ConfigOptions.key("host-mapping")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.");
- public static final ConfigOption TIKV_GRPC_TIMEOUT =
- ConfigOptions.key(ConfigUtils.TIKV_GRPC_TIMEOUT)
- .longType()
- .noDefaultValue()
- .withDescription("TiKV GRPC timeout in ms");
-
- public static final ConfigOption TIKV_GRPC_SCAN_TIMEOUT =
- ConfigOptions.key(ConfigUtils.TIKV_GRPC_SCAN_TIMEOUT)
- .longType()
- .noDefaultValue()
- .withDescription("TiKV GRPC scan timeout in ms");
-
- public static final ConfigOption TIKV_BATCH_GET_CONCURRENCY =
- ConfigOptions.key(ConfigUtils.TIKV_BATCH_GET_CONCURRENCY)
- .intType()
- .noDefaultValue()
- .withDescription("TiKV GRPC batch get concurrency");
-
- public static final ConfigOption TIKV_BATCH_SCAN_CONCURRENCY =
- ConfigOptions.key(ConfigUtils.TIKV_BATCH_SCAN_CONCURRENCY)
- .intType()
- .noDefaultValue()
- .withDescription("TiKV GRPC batch scan concurrency");
-
- public static TiConfiguration getTiConfiguration(
- final String pdAddrsStr, final String hostMapping, final Map options) {
- final Configuration configuration = Configuration.fromMap(options);
-
- final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
- Optional.of(new UriHostMapping(hostMapping)).ifPresent(tiConf::setHostMapping);
- configuration.getOptional(TIKV_GRPC_TIMEOUT).ifPresent(tiConf::setTimeout);
- configuration.getOptional(TIKV_GRPC_SCAN_TIMEOUT).ifPresent(tiConf::setScanTimeout);
- configuration
- .getOptional(TIKV_BATCH_GET_CONCURRENCY)
- .ifPresent(tiConf::setBatchGetConcurrency);
-
- configuration
- .getOptional(TIKV_BATCH_SCAN_CONCURRENCY)
- .ifPresent(tiConf::setBatchScanConcurrency);
- return tiConf;
- }
-}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiDBSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiDBSource.java
deleted file mode 100644
index fa74f69ba88..00000000000
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiDBSource.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.tidb;
-
-import org.apache.flink.cdc.connectors.tidb.table.StartupOptions;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
-import org.tikv.common.TiConfiguration;
-
-/** A builder to build a SourceFunction which can read snapshot and continue to read CDC events. */
-public class TiDBSource {
-
- public static Builder builder() {
- return new Builder<>();
- }
-
- /** Builder class of {@link TiDBSource}. */
- public static class Builder {
- private String database;
- private String tableName;
- private StartupOptions startupOptions = StartupOptions.initial();
- private TiConfiguration tiConf;
-
- private TiKVSnapshotEventDeserializationSchema snapshotEventDeserializationSchema;
- private TiKVChangeEventDeserializationSchema changeEventDeserializationSchema;
-
- /** Database name to be monitored. */
- public Builder database(String database) {
- this.database = database;
- return this;
- }
-
- /** TableName name to be monitored. */
- public Builder tableName(String tableName) {
- this.tableName = tableName;
- return this;
- }
-
- /** The deserializer used to convert from consumed snapshot event from TiKV. */
- public Builder snapshotEventDeserializer(
- TiKVSnapshotEventDeserializationSchema snapshotEventDeserializationSchema) {
- this.snapshotEventDeserializationSchema = snapshotEventDeserializationSchema;
- return this;
- }
-
- /** The deserializer used to convert from consumed change event from TiKV. */
- public Builder changeEventDeserializer(
- TiKVChangeEventDeserializationSchema changeEventDeserializationSchema) {
- this.changeEventDeserializationSchema = changeEventDeserializationSchema;
- return this;
- }
-
- /** Specifies the startup options. */
- public Builder startupOptions(StartupOptions startupOptions) {
- this.startupOptions = startupOptions;
- return this;
- }
-
- /** TIDB config. */
- public Builder tiConf(TiConfiguration tiConf) {
- this.tiConf = tiConf;
- return this;
- }
-
- public RichParallelSourceFunction build() {
-
- return new TiKVRichParallelSourceFunction<>(
- snapshotEventDeserializationSchema,
- changeEventDeserializationSchema,
- tiConf,
- startupOptions.startupMode,
- database,
- tableName);
- }
- }
-}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVChangeEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVChangeEventDeserializationSchema.java
deleted file mode 100644
index bf652624fdf..00000000000
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVChangeEventDeserializationSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.tidb;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.cdc.common.annotation.PublicEvolving;
-import org.apache.flink.util.Collector;
-
-import org.tikv.kvproto.Cdcpb.Event.Row;
-
-import java.io.Serializable;
-
-/**
- * The deserialization schema describes how to turn the TiKV Change Event into data types
- * (Java/Scala objects) that are processed by Flink.
- *
- * @param The type created by the deserialization schema.
- */
-@PublicEvolving
-public interface TiKVChangeEventDeserializationSchema
- extends Serializable, ResultTypeQueryable {
-
- /** Deserialize the TiDB record. */
- void deserialize(Row record, Collector out) throws Exception;
-}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java
deleted file mode 100644
index 9570f40ed23..00000000000
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.tidb;
-
-import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.cdc.connectors.tidb.metrics.TiDBSourceMetrics;
-import org.apache.flink.cdc.connectors.tidb.table.StartupMode;
-import org.apache.flink.cdc.connectors.tidb.table.utils.TableKeyRangeUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.tikv.cdc.CDCClient;
-import org.tikv.common.TiConfiguration;
-import org.tikv.common.TiSession;
-import org.tikv.common.key.RowKey;
-import org.tikv.common.meta.TiTableInfo;
-import org.tikv.common.meta.TiTimestamp;
-import org.tikv.kvproto.Cdcpb;
-import org.tikv.kvproto.Coprocessor;
-import org.tikv.kvproto.Kvrpcpb;
-import org.tikv.shade.com.google.protobuf.ByteString;
-import org.tikv.txn.KVClient;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * The source implementation for TiKV that read snapshot events first and then read the change
- * event.
- */
-public class TiKVRichParallelSourceFunction extends RichParallelSourceFunction
- implements CheckpointListener, CheckpointedFunction, ResultTypeQueryable {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(TiKVRichParallelSourceFunction.class);
- private static final long SNAPSHOT_VERSION_EPOCH = -1L;
- private static final long STREAMING_VERSION_START_EPOCH = 0L;
-
- private final TiKVSnapshotEventDeserializationSchema snapshotEventDeserializationSchema;
- private final TiKVChangeEventDeserializationSchema changeEventDeserializationSchema;
- private final TiConfiguration tiConf;
- private final StartupMode startupMode;
- private final String database;
- private final String tableName;
-
- /** Task local variables. */
- private transient TiSession session = null;
-
- private transient Coprocessor.KeyRange keyRange = null;
- private transient CDCClient cdcClient = null;
- private transient SourceContext sourceContext = null;
- private transient volatile long resolvedTs = -1L;
- private transient TreeMap prewrites = null;
- private transient TreeMap commits = null;
- private transient BlockingQueue committedEvents = null;
- private transient OutputCollector outputCollector;
-
- private transient boolean running = true;
- private transient ExecutorService executorService;
- private transient TiDBSourceMetrics sourceMetrics;
-
- /** offset state. */
- private transient ListState offsetState;
-
- private static final long CLOSE_TIMEOUT = 30L;
-
- public TiKVRichParallelSourceFunction(
- TiKVSnapshotEventDeserializationSchema snapshotEventDeserializationSchema,
- TiKVChangeEventDeserializationSchema changeEventDeserializationSchema,
- TiConfiguration tiConf,
- StartupMode startupMode,
- String database,
- String tableName) {
- this.snapshotEventDeserializationSchema = snapshotEventDeserializationSchema;
- this.changeEventDeserializationSchema = changeEventDeserializationSchema;
- this.tiConf = tiConf;
- this.startupMode = startupMode;
- this.database = database;
- this.tableName = tableName;
- }
-
- @Override
- public void open(final Configuration config) throws Exception {
- super.open(config);
- session = TiSession.create(tiConf);
- TiTableInfo tableInfo = session.getCatalog().getTable(database, tableName);
- if (tableInfo == null) {
- throw new RuntimeException(
- String.format("Table %s.%s does not exist.", database, tableName));
- }
- long tableId = tableInfo.getId();
- keyRange =
- TableKeyRangeUtils.getTableKeyRange(
- tableId,
- getRuntimeContext().getNumberOfParallelSubtasks(),
- getRuntimeContext().getIndexOfThisSubtask());
- cdcClient = new CDCClient(session, keyRange);
- prewrites = new TreeMap<>();
- commits = new TreeMap<>();
- // cdc event will lose if pull cdc event block when region split
- // use queue to separate read and write to ensure pull event unblock.
- // since sink jdbc is slow, 5000W queue size may be safe size.
- committedEvents = new LinkedBlockingQueue<>();
- outputCollector = new OutputCollector<>();
- resolvedTs =
- startupMode == StartupMode.INITIAL
- ? SNAPSHOT_VERSION_EPOCH
- : STREAMING_VERSION_START_EPOCH;
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setNameFormat(
- "tidb-source-function-"
- + getRuntimeContext().getIndexOfThisSubtask())
- .build();
- executorService = Executors.newSingleThreadExecutor(threadFactory);
- final MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
- sourceMetrics = new TiDBSourceMetrics(metricGroup);
- sourceMetrics.registerMetrics();
- }
-
- @Override
- public void run(final SourceContext ctx) throws Exception {
- sourceContext = ctx;
- outputCollector.context = sourceContext;
-
- if (startupMode == StartupMode.INITIAL) {
- synchronized (sourceContext.getCheckpointLock()) {
- readSnapshotEvents();
- }
- } else {
- LOG.info("Skip snapshot read");
- resolvedTs = session.getTimestamp().getVersion();
- }
-
- LOG.info("start read change events");
- cdcClient.start(resolvedTs);
- running = true;
- readChangeEvents();
- }
-
- private void handleRow(final Cdcpb.Event.Row row) {
- if (!TableKeyRangeUtils.isRecordKey(row.getKey().toByteArray())) {
- // Don't handle index key for now
- return;
- }
- LOG.debug("binlog record, type: {}, data: {}", row.getType(), row);
- switch (row.getType()) {
- case COMMITTED:
- prewrites.put(RowKeyWithTs.ofStart(row), row);
- commits.put(RowKeyWithTs.ofCommit(row), row);
- break;
- case COMMIT:
- commits.put(RowKeyWithTs.ofCommit(row), row);
- break;
- case PREWRITE:
- prewrites.put(RowKeyWithTs.ofStart(row), row);
- break;
- case ROLLBACK:
- prewrites.remove(RowKeyWithTs.ofStart(row));
- break;
- default:
- LOG.warn("Unsupported row type:" + row.getType());
- }
- }
-
- protected void readSnapshotEvents() throws Exception {
- LOG.info("read snapshot events");
- try (KVClient scanClient = session.createKVClient()) {
- long startTs = session.getTimestamp().getVersion();
- ByteString start = keyRange.getStart();
- while (true) {
- final List segment =
- scanClient.scan(start, keyRange.getEnd(), startTs);
-
- if (segment.isEmpty()) {
- resolvedTs = startTs;
- break;
- }
-
- for (final Kvrpcpb.KvPair pair : segment) {
- if (TableKeyRangeUtils.isRecordKey(pair.getKey().toByteArray())) {
- snapshotEventDeserializationSchema.deserialize(pair, outputCollector);
- reportMetrics(0L, startTs);
- }
- }
-
- start =
- RowKey.toRawKey(segment.get(segment.size() - 1).getKey())
- .next()
- .toByteString();
- }
- }
- }
-
- protected void readChangeEvents() throws Exception {
- LOG.info("read change event from resolvedTs:{}", resolvedTs);
- // child thread to sink committed rows.
- executorService.execute(
- () -> {
- while (running) {
- try {
- Cdcpb.Event.Row committedRow = committedEvents.take();
- changeEventDeserializationSchema.deserialize(
- committedRow, outputCollector);
- // use startTs of row as messageTs, use commitTs of row as fetchTs
- reportMetrics(committedRow.getStartTs(), committedRow.getCommitTs());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- while (resolvedTs >= STREAMING_VERSION_START_EPOCH) {
- for (int i = 0; i < 1000; i++) {
- final Cdcpb.Event.Row row = cdcClient.get();
- if (row == null) {
- break;
- }
- handleRow(row);
- }
- resolvedTs = cdcClient.getMaxResolvedTs();
- if (commits.size() > 0) {
- flushRows(resolvedTs);
- }
- }
- }
-
- protected void flushRows(final long timestamp) throws Exception {
- Preconditions.checkState(sourceContext != null, "sourceContext shouldn't be null");
- synchronized (sourceContext) {
- while (!commits.isEmpty() && commits.firstKey().timestamp <= timestamp) {
- final Cdcpb.Event.Row commitRow = commits.pollFirstEntry().getValue();
- final Cdcpb.Event.Row prewriteRow =
- prewrites.remove(RowKeyWithTs.ofStart(commitRow));
- // if pull cdc event block when region split, cdc event will lose.
- committedEvents.offer(prewriteRow);
- }
- }
- }
-
- @Override
- public void cancel() {
- try {
- running = false;
- if (cdcClient != null) {
- cdcClient.close();
- }
- if (executorService != null) {
- executorService.shutdown();
- if (!executorService.awaitTermination(CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
- LOG.warn(
- "Failed to close the tidb source function in {} seconds.",
- CLOSE_TIMEOUT);
- }
- }
- } catch (final Exception e) {
- LOG.error("Unable to close cdcClient", e);
- }
- }
-
- @Override
- public void snapshotState(final FunctionSnapshotContext context) throws Exception {
- LOG.info(
- "snapshotState checkpoint: {} at resolvedTs: {}",
- context.getCheckpointId(),
- resolvedTs);
- flushRows(resolvedTs);
- offsetState.clear();
- offsetState.add(resolvedTs);
- }
-
- @Override
- public void initializeState(final FunctionInitializationContext context) throws Exception {
- LOG.info("initialize checkpoint");
- offsetState =
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
- "resolvedTsState", LongSerializer.INSTANCE));
- if (context.isRestored()) {
- for (final Long offset : offsetState.get()) {
- resolvedTs = offset;
- LOG.info("Restore State from resolvedTs: {}", resolvedTs);
- return;
- }
- } else {
- resolvedTs = 0;
- LOG.info("Initialize State from resolvedTs: {}", resolvedTs);
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- // do nothing
- }
-
- @Override
- public TypeInformation getProducedType() {
- return snapshotEventDeserializationSchema.getProducedType();
- }
-
- // ---------------------------------------
- // static Utils classes
- // ---------------------------------------
- private static class RowKeyWithTs implements Comparable {
- private final long timestamp;
- private final RowKey rowKey;
-
- private RowKeyWithTs(final long timestamp, final RowKey rowKey) {
- this.timestamp = timestamp;
- this.rowKey = rowKey;
- }
-
- private RowKeyWithTs(final long timestamp, final byte[] key) {
- this(timestamp, RowKey.decode(key));
- }
-
- @Override
- public int compareTo(final RowKeyWithTs that) {
- int res = Long.compare(this.timestamp, that.timestamp);
- if (res == 0) {
- res = Long.compare(this.rowKey.getTableId(), that.rowKey.getTableId());
- }
- if (res == 0) {
- res = Long.compare(this.rowKey.getHandle(), that.rowKey.getHandle());
- }
- return res;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(this.timestamp, this.rowKey.getTableId(), this.rowKey.getHandle());
- }
-
- @Override
- public boolean equals(final Object thatObj) {
- if (thatObj instanceof RowKeyWithTs) {
- final RowKeyWithTs that = (RowKeyWithTs) thatObj;
- return this.timestamp == that.timestamp && this.rowKey.equals(that.rowKey);
- }
- return false;
- }
-
- static RowKeyWithTs ofStart(final Cdcpb.Event.Row row) {
- return new RowKeyWithTs(row.getStartTs(), row.getKey().toByteArray());
- }
-
- static RowKeyWithTs ofCommit(final Cdcpb.Event.Row row) {
- return new RowKeyWithTs(row.getCommitTs(), row.getKey().toByteArray());
- }
- }
-
- private static class OutputCollector implements Collector {
-
- private SourceContext context;
-
- @Override
- public void collect(T record) {
- context.collect(record);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
-
- private void reportMetrics(long messageTs, long fetchTs) {
- long now = System.currentTimeMillis();
- // record the latest process time
- sourceMetrics.recordProcessTime(now);
- long messageTimestamp = TiTimestamp.extractPhysical(messageTs);
- long fetchTimestamp = TiTimestamp.extractPhysical(fetchTs);
- if (messageTimestamp > 0L) {
- // report fetch delay
- if (fetchTimestamp >= messageTimestamp) {
- sourceMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
- }
- // report emit delay
- sourceMetrics.recordEmitDelay(now - messageTimestamp);
- }
- }
-}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVSnapshotEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVSnapshotEventDeserializationSchema.java
deleted file mode 100644
index a0a43658181..00000000000
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVSnapshotEventDeserializationSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.connectors.tidb;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.cdc.common.annotation.PublicEvolving;
-import org.apache.flink.util.Collector;
-
-import org.tikv.kvproto.Kvrpcpb.KvPair;
-
-import java.io.Serializable;
-
-/**
- * The deserialization schema describes how to turn the TiKV snapshot event into data types
- * (Java/Scala objects) that are processed by Flink.
- *
- * @param The type created by the deserialization schema.
- */
-@PublicEvolving
-public interface TiKVSnapshotEventDeserializationSchema
- extends Serializable, ResultTypeQueryable {
-
- /** Deserialize the TiDB record. */
- void deserialize(KvPair record, Collector out) throws Exception;
-}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java
index 1f32c0f3411..2fe98827b5f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.connectors.tidb.metrics;
-import org.apache.flink.cdc.connectors.tidb.TiKVRichParallelSourceFunction;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java
new file mode 100644
index 00000000000..d1598fcc0cb
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBDialect.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
+import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
+import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
+import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
+import org.apache.flink.cdc.connectors.base.source.assigner.state.ChunkSplitterState;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
+import org.apache.flink.cdc.connectors.tidb.source.config.TiDBConnectorConfig;
+import org.apache.flink.cdc.connectors.tidb.source.config.TiDBSourceConfig;
+import org.apache.flink.cdc.connectors.tidb.source.connection.TiDBConnection;
+import org.apache.flink.cdc.connectors.tidb.source.connection.TiDBConnectionPoolFactory;
+import org.apache.flink.cdc.connectors.tidb.source.fetch.TiDBScanFetchTask;
+import org.apache.flink.cdc.connectors.tidb.source.fetch.TiDBSourceFetchTaskContext;
+import org.apache.flink.cdc.connectors.tidb.source.fetch.TiDBStreamFetchTask;
+import org.apache.flink.cdc.connectors.tidb.source.schema.TiDBSchema;
+import org.apache.flink.cdc.connectors.tidb.source.splitter.TiDBChunkSplitter;
+import org.apache.flink.cdc.connectors.tidb.utils.TableDiscoveryUtils;
+import org.apache.flink.cdc.connectors.tidb.utils.TiDBConnectionUtils;
+import org.apache.flink.cdc.connectors.tidb.utils.TiDBUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** TiDB data source dialect. */
+public class TiDBDialect implements JdbcDataSourceDialect {
+ private static final Logger LOG = LoggerFactory.getLogger(TiDBDialect.class);
+
+ private static final String QUOTED_CHARACTER = "`";
+ private static final long serialVersionUID = 1L;
+
+ private final TiDBSourceConfig sourceConfig;
+ private transient TiDBSchema tiDBSchema;
+ @Nullable private TiDBStreamFetchTask streamFetchTask;
+
+ public TiDBDialect(TiDBSourceConfig sourceConfig) {
+ this.sourceConfig = sourceConfig;
+ }
+
+ @Override
+ public String getName() {
+ return "TiDB";
+ }
+
+ @Override
+ public Offset displayCurrentOffset(JdbcSourceConfig sourceConfig) {
+ try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
+ return TiDBUtils.currentBinlogOffset(jdbcConnection);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Read the binlog offset error", e);
+ }
+ // return null;
+ }
+
+ @Override
+ public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
+ try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
+ return TiDBConnectionUtils.isTableIdCaseInsensitive(jdbcConnection);
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException("Error reading TiDB variables: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
+ return new TiDBChunkSplitter(
+ sourceConfig, this, ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
+ }
+
+ @Override
+ public ChunkSplitter createChunkSplitter(
+ JdbcSourceConfig sourceConfig, ChunkSplitterState chunkSplitterState) {
+ return new TiDBChunkSplitter(this.sourceConfig, this, chunkSplitterState);
+ }
+
+ @Override
+ public FetchTask.Context createFetchTaskContext(JdbcSourceConfig sourceConfig) {
+ return new TiDBSourceFetchTaskContext(sourceConfig, this, openJdbcConnection());
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception {
+ if (streamFetchTask != null) {
+ streamFetchTask.commitCurrentOffset(offset);
+ }
+ }
+
+ @Override
+ public boolean isIncludeDataCollection(JdbcSourceConfig sourceConfig, TableId tableId) {
+ // temp
+ return true;
+ }
+
+ @Override
+ public List discoverDataCollections(JdbcSourceConfig sourceConfig) {
+ try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+ List tableIds =
+ TableDiscoveryUtils.listTables(
+ sourceConfig.getDatabaseList().get(0),
+ jdbc,
+ sourceConfig.getTableFilters());
+ if (tableIds.isEmpty()) {
+ throw new FlinkRuntimeException(
+ "No tables discovered for the given tables:" + sourceConfig.getTableList());
+ }
+ return tableIds;
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException("Error to discover tables:" + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Map discoverDataCollectionSchemas(
+ JdbcSourceConfig sourceConfig) {
+ final List capturedTableIds = discoverDataCollections(sourceConfig);
+
+ try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+ // fetch table schemas
+ Map tableSchemas = new HashMap<>();
+ for (TableId tableId : capturedTableIds) {
+ TableChanges.TableChange tableSchema = queryTableSchema(jdbc, tableId);
+ tableSchemas.put(tableId, tableSchema);
+ }
+ return tableSchemas;
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ "Error to discover table schemas: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
+ TiDBSourceConfig tiDBSourceConfig = (TiDBSourceConfig) sourceConfig;
+ TiDBConnectorConfig dbzConfig = tiDBSourceConfig.getDbzConnectorConfig();
+
+ JdbcConnection jdbc =
+ new TiDBConnection(
+ dbzConfig.getJdbcConfig(),
+ new JdbcConnectionFactory(sourceConfig, getPooledDataSourceFactory()),
+ QUOTED_CHARACTER,
+ QUOTED_CHARACTER);
+ try {
+ jdbc.connect();
+ } catch (Exception e) {
+ LOG.error("Failed to open TiDB connection", e);
+ throw new FlinkRuntimeException(e);
+ }
+ return jdbc;
+ }
+
+ public TiDBConnection openJdbcConnection() {
+ return (TiDBConnection) openJdbcConnection(sourceConfig);
+ }
+
+ @Override
+ public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
+ return new TiDBConnectionPoolFactory();
+ }
+
+ @Override
+ public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
+ if (tiDBSchema == null) {
+ tiDBSchema =
+ new TiDBSchema(sourceConfig, isDataCollectionIdCaseSensitive(sourceConfig));
+ }
+ return tiDBSchema.getTableSchema(jdbc, tableId);
+ }
+
+ @Override
+ public FetchTask createFetchTask(SourceSplitBase sourceSplitBase) {
+ if (sourceSplitBase.isSnapshotSplit()) {
+ return new TiDBScanFetchTask(sourceSplitBase.asSnapshotSplit());
+ } else {
+ this.streamFetchTask = new TiDBStreamFetchTask(sourceSplitBase.asStreamSplit());
+ return this.streamFetchTask;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ JdbcDataSourceDialect.super.close();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java
new file mode 100644
index 00000000000..93f4ff34b60
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/TiDBSourceBuilder.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.tidb.source.config.TiDBSourceConfigFactory;
+import org.apache.flink.cdc.connectors.tidb.source.offset.EventOffsetFactory;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.tikv.common.TiConfiguration;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
+
+/** Builder for {@link JdbcIncrementalSource}. */
+public class TiDBSourceBuilder {
+ private final TiDBSourceConfigFactory configFactory = new TiDBSourceConfigFactory();
+ private EventOffsetFactory offsetFactory;
+ private DebeziumDeserializationSchema deserializer;
+ private TiDBDialect dialect;
+
+ private TiDBSourceBuilder() {}
+
+ public TiDBSourceBuilder startupOptions(StartupOptions startupOptions) {
+ this.configFactory.startupOptions(startupOptions);
+ return this;
+ }
+
+ public TiDBSourceBuilder hostname(String hostname) {
+ this.configFactory.hostname(hostname);
+ return this;
+ }
+
+ public TiDBSourceBuilder port(int port) {
+ this.configFactory.port(port);
+ return this;
+ }
+
+ public TiDBSourceBuilder driverClassName(String driverClassName) {
+ this.configFactory.driverClassName(driverClassName);
+ return this;
+ }
+
+ public TiDBSourceBuilder databaseList(String... databaseList) {
+ this.configFactory.databaseList(databaseList);
+ return this;
+ }
+
+ public TiDBSourceBuilder tableList(String... tableList) {
+ this.configFactory.tableList(tableList);
+ return this;
+ }
+
+ public TiDBSourceBuilder username(String username) {
+ this.configFactory.username(username);
+ return this;
+ }
+
+ public TiDBSourceBuilder password(String password) {
+ this.configFactory.password(password);
+ return this;
+ }
+
+ public TiDBSourceBuilder jdbcProperties(Properties properties) {
+ this.configFactory.jdbcProperties(properties);
+ return this;
+ }
+
+ public TiDBSourceBuilder tikvProperties(Properties properties) {
+ this.configFactory.tikvProperties(properties);
+ return this;
+ }
+
+ public TiDBSourceBuilder serverTimeZone(String timeZone) {
+ this.configFactory.serverTimeZone(timeZone);
+ return this;
+ }
+
+ public TiDBSourceBuilder connectTimeout(Duration connectTimeout) {
+ this.configFactory.connectTimeout(connectTimeout);
+ return this;
+ }
+
+ public TiDBSourceBuilder connectionPoolSize(int connectionPoolSize) {
+ this.configFactory.connectionPoolSize(connectionPoolSize);
+ return this;
+ }
+
+ public TiDBSourceBuilder connectMaxRetries(int connectMaxRetries) {
+ this.configFactory.connectMaxRetries(connectMaxRetries);
+ return this;
+ }
+
+ public TiDBSourceBuilder chunkKeyColumn(String chunkKeyColumn) {
+ this.configFactory.chunkKeyColumn(chunkKeyColumn);
+ return this;
+ }
+
+ public TiDBSourceBuilder chunkKeyColumns(Map chunkKeyColumns) {
+ this.configFactory.chunkKeyColumns(chunkKeyColumns);
+ return this;
+ }
+
+ public TiDBSourceBuilder pdAddresses(String pdAddresses) {
+ this.configFactory.pdAddresses(pdAddresses);
+ return this;
+ }
+
+ public TiDBSourceBuilder hostMapping(String hostMapping) {
+ this.configFactory.hostMapping(hostMapping);
+ return this;
+ }
+
+ /**
+ * The split size (number of rows) of table snapshot, captured tables are split into multiple
+ * splits when read the snapshot of table.
+ */
+ public TiDBSourceBuilder splitSize(int splitSize) {
+ this.configFactory.splitSize(splitSize);
+ return this;
+ }
+
+ /** The maximum fetch size for per poll when read table snapshot. */
+ public TiDBSourceBuilder fetchSize(int fetchSize) {
+ this.configFactory.fetchSize(fetchSize);
+ return this;
+ }
+
+ public TiDBSourceBuilder splitMetaGroupSize(int splitMetaGroupSize) {
+ this.configFactory.splitMetaGroupSize(splitMetaGroupSize);
+ return this;
+ }
+
+ public TiDBSourceBuilder distributionFactorUpper(double distributionFactorUpper) {
+ this.configFactory.distributionFactorUpper(distributionFactorUpper);
+ return this;
+ }
+
+ /**
+ * The lower bound of split key evenly distribution factor, the factor is used to determine
+ * whether the table is evenly distribution or not.
+ */
+ public TiDBSourceBuilder distributionFactorLower(double distributionFactorLower) {
+ this.configFactory.distributionFactorLower(distributionFactorLower);
+ return this;
+ }
+
+ public TiDBSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
+ this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
+ return this;
+ }
+
+ public TiDBSourceBuilder deserializer(DebeziumDeserializationSchema deserializer) {
+ this.deserializer = deserializer;
+ return this;
+ }
+
+ public TiDBSourceBuilder tiConfiguration(TiConfiguration tiConfiguration) {
+ this.configFactory.tiConfiguration(tiConfiguration);
+ return this;
+ }
+
+ public TiDBIncrementalSource build() {
+ this.offsetFactory = new EventOffsetFactory();
+ this.dialect = new TiDBDialect(configFactory.create(0));
+ return new TiDBIncrementalSource<>(
+ configFactory, checkNotNull(deserializer), offsetFactory, dialect);
+ }
+
+ /** TiDB incremental source. */
+ public static class TiDBIncrementalSource extends JdbcIncrementalSource {
+ public TiDBIncrementalSource(
+ JdbcSourceConfigFactory configFactory,
+ DebeziumDeserializationSchema deserializationSchema,
+ EventOffsetFactory offsetFactory,
+ TiDBDialect dataSourceDialect) {
+ super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect);
+ }
+
+ public static TiDBSourceBuilder builder() {
+ return new TiDBSourceBuilder<>();
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java
new file mode 100644
index 00000000000..3bd6e481a68
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBConnectorConfig.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source.config;
+
+import org.apache.flink.cdc.connectors.tidb.source.offset.TiDBSourceInfoStructMaker;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.config.Configuration;
+import io.debezium.config.EnumeratedValue;
+import io.debezium.config.Field;
+import io.debezium.connector.SourceInfoStructMaker;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.jdbc.JdbcValueConverters;
+import io.debezium.relational.ColumnFilterMode;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import org.apache.kafka.common.config.ConfigDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** TiDB connector configuration. */
+public class TiDBConnectorConfig extends RelationalDatabaseConnectorConfig {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TiDBConnectorConfig.class);
+
+ protected static final String LOGICAL_NAME = "tidb_cdc_connector";
+ protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = Integer.MIN_VALUE;
+ private final boolean readOnlyConnection = true;
+ protected static final List BUILT_IN_DB_NAMES =
+ Collections.unmodifiableList(
+ Arrays.asList("information_schema", "mysql", "tidb", "LBACSYS", "ORAAUDITOR"));
+ private final TiDBSourceConfig sourceConfig;
+
+ public static final Field READ_ONLY_CONNECTION =
+ Field.create("read.only")
+ .withDisplayName("Read only connection")
+ .withType(ConfigDef.Type.BOOLEAN)
+ .withDefault(false)
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription(
+ "Switched connector to use alternative methods to deliver signals to Debezium instead of writing to signaling table");
+
+ public static final Field BIGINT_UNSIGNED_HANDLING_MODE =
+ Field.create("bigint.unsigned.handling.mode")
+ .withDisplayName("BIGINT UNSIGNED Handling")
+ .withEnum(BigIntUnsignedHandlingMode.class, BigIntUnsignedHandlingMode.LONG)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 27))
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "Specify how BIGINT UNSIGNED columns should be represented in change events, including:"
+ + "'precise' uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; "
+ + "'long' (the default) represents values using Java's 'long', which may not offer the precision but will be far easier to use in consumers.");
+
+ public static final Field ENABLE_TIME_ADJUSTER =
+ Field.create("enable.time.adjuster")
+ .withDisplayName("Enable Time Adjuster")
+ .withType(ConfigDef.Type.BOOLEAN)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 22))
+ .withDefault(true)
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.LOW)
+ .withDescription(
+ "MySQL allows user to insert year value as either 2-digit or 4-digit. In case of two digit the value is automatically mapped into 1970 - 2069."
+ + "false - delegates the implicit conversion to the database"
+ + "true - (the default) Debezium makes the conversion");
+
+ /** The set of predefined options for the handling mode configuration property. */
+ public enum BigIntUnsignedHandlingMode implements EnumeratedValue {
+ /**
+ * Represent {@code BIGINT UNSIGNED} values as precise {@link BigDecimal} values, which are
+ * represented in change events in a binary form. This is precise but difficult to use.
+ */
+ PRECISE("precise"),
+
+ /**
+ * Represent {@code BIGINT UNSIGNED} values as precise {@code long} values. This may be less
+ * precise but is far easier to use.
+ */
+ LONG("long");
+
+ private final String value;
+
+ private BigIntUnsignedHandlingMode(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ public JdbcValueConverters.BigIntUnsignedMode asBigIntUnsignedMode() {
+ switch (this) {
+ case LONG:
+ return JdbcValueConverters.BigIntUnsignedMode.LONG;
+ case PRECISE:
+ default:
+ return JdbcValueConverters.BigIntUnsignedMode.PRECISE;
+ }
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @return the matching option, or null if no match is found
+ */
+ public static BigIntUnsignedHandlingMode parse(String value) {
+ if (value == null) {
+ return null;
+ }
+ value = value.trim();
+ for (BigIntUnsignedHandlingMode option : BigIntUnsignedHandlingMode.values()) {
+ if (option.getValue().equalsIgnoreCase(value)) {
+ return option;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @param defaultValue the default value; may be null
+ * @return the matching option, or null if no match is found and the non-null default is
+ * invalid
+ */
+ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue) {
+ BigIntUnsignedHandlingMode mode = parse(value);
+ if (mode == null && defaultValue != null) {
+ mode = parse(defaultValue);
+ }
+ return mode;
+ }
+ }
+
+ @Override
+ public String getContextName() {
+ return "TiDB";
+ }
+
+ @Override
+ public String getConnectorName() {
+ return "TiDB";
+ }
+
+ public String databaseName() {
+ return getConfig().getString(DATABASE_NAME);
+ }
+
+ public TiDBConnectorConfig(TiDBSourceConfig sourceConfig) {
+ super(
+ Configuration.from(sourceConfig.getDbzProperties()),
+ LOGICAL_NAME,
+ Tables.TableFilter.fromPredicate(
+ tableId ->
+ "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode())
+ ? !BUILT_IN_DB_NAMES.contains(tableId.catalog())
+ : !BUILT_IN_DB_NAMES.contains(tableId.schema())),
+ TableId::identifier,
+ DEFAULT_SNAPSHOT_FETCH_SIZE,
+ "mysql".equalsIgnoreCase(sourceConfig.getCompatibleMode())
+ ? ColumnFilterMode.CATALOG
+ : ColumnFilterMode.SCHEMA);
+ this.sourceConfig = sourceConfig;
+ }
+
+ public TiDBSourceConfig getSourceConfig() {
+ return sourceConfig;
+ }
+
+ @Override
+ protected SourceInfoStructMaker> getSourceInfoStructMaker(Version version) {
+ return new TiDBSourceInfoStructMaker();
+ }
+
+ public static final Field SERVER_NAME =
+ RelationalDatabaseConnectorConfig.SERVER_NAME.withValidation(
+ CommonConnectorConfig::validateServerNameIsDifferentFromHistoryTopicName);
+
+ public boolean isReadOnlyConnection() {
+ return readOnlyConnection;
+ }
+
+ /** Whether to use SSL/TLS to connect to the database. */
+ public enum SecureConnectionMode implements EnumeratedValue {
+ /** Establish an unencrypted connection. */
+ DISABLED("disabled"),
+
+ /**
+ * Establish a secure (encrypted) connection if the server supports secure connections. Fall
+ * back to an unencrypted connection otherwise.
+ */
+ PREFERRED("preferred"),
+ /**
+ * Establish a secure connection if the server supports secure connections. The connection
+ * attempt fails if a secure connection cannot be established.
+ */
+ REQUIRED("required"),
+ /**
+ * Like REQUIRED, but additionally verify the server TLS certificate against the configured
+ * Certificate Authority (CA) certificates. The connection attempt fails if no valid
+ * matching CA certificates are found.
+ */
+ VERIFY_CA("verify_ca"),
+ /**
+ * Like VERIFY_CA, but additionally verify that the server certificate matches the host to
+ * which the connection is attempted.
+ */
+ VERIFY_IDENTITY("verify_identity");
+
+ private final String value;
+
+ private SecureConnectionMode(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @return the matching option, or null if no match is found
+ */
+ public static SecureConnectionMode parse(String value) {
+ if (value == null) {
+ return null;
+ }
+ value = value.trim();
+ for (SecureConnectionMode option : SecureConnectionMode.values()) {
+ if (option.getValue().equalsIgnoreCase(value)) {
+ return option;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @param defaultValue the default value; may be null
+ * @return the matching option, or null if no match is found and the non-null default is
+ * invalid
+ */
+ public static SecureConnectionMode parse(String value, String defaultValue) {
+ SecureConnectionMode mode = parse(value);
+ if (mode == null && defaultValue != null) {
+ mode = parse(defaultValue);
+ }
+ return mode;
+ }
+ }
+
+ public static final Field SSL_MODE =
+ Field.create("database.ssl.mode")
+ .withDisplayName("SSL mode")
+ .withEnum(
+ MySqlConnectorConfig.SecureConnectionMode.class,
+ MySqlConnectorConfig.SecureConnectionMode.DISABLED)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 0))
+ .withWidth(ConfigDef.Width.MEDIUM)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "Whether to use an encrypted connection to MySQL. Options include"
+ + "'disabled' (the default) to use an unencrypted connection; "
+ + "'preferred' to establish a secure (encrypted) connection if the server supports secure connections, "
+ + "but fall back to an unencrypted connection otherwise; "
+ + "'required' to use a secure (encrypted) connection, and fail if one cannot be established; "
+ + "'verify_ca' like 'required' but additionally verify the server TLS certificate against the configured Certificate Authority "
+ + "(CA) certificates, or fail if no valid matching CA certificates are found; or"
+ + "'verify_identity' like 'verify_ca' but additionally verify that the server certificate matches the host to which the connection is attempted.");
+
+ public static final Field SSL_KEYSTORE =
+ Field.create("database.ssl.keystore")
+ .withDisplayName("SSL Keystore")
+ .withType(ConfigDef.Type.STRING)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 1))
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "The location of the key store file. "
+ + "This is optional and can be used for two-way authentication between the client and the MySQL Server.");
+
+ public static final Field SSL_KEYSTORE_PASSWORD =
+ Field.create("database.ssl.keystore.password")
+ .withDisplayName("SSL Keystore Password")
+ .withType(ConfigDef.Type.PASSWORD)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 2))
+ .withWidth(ConfigDef.Width.MEDIUM)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "The password for the key store file. "
+ + "This is optional and only needed if 'database.ssl.keystore' is configured.");
+
+ public static final Field SSL_TRUSTSTORE =
+ Field.create("database.ssl.truststore")
+ .withDisplayName("SSL Truststore")
+ .withType(ConfigDef.Type.STRING)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 3))
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "The location of the trust store file for the server certificate verification.");
+
+ public static final Field SSL_TRUSTSTORE_PASSWORD =
+ Field.create("database.ssl.truststore.password")
+ .withDisplayName("SSL Truststore Password")
+ .withType(ConfigDef.Type.PASSWORD)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 4))
+ .withWidth(ConfigDef.Width.MEDIUM)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "The password for the trust store file. "
+ + "Used to check the integrity of the truststore, and unlock the truststore.");
+
+ public static final Field CONNECTION_TIMEOUT_MS =
+ Field.create("connect.timeout.ms")
+ .withDisplayName("Connection Timeout (ms)")
+ .withType(ConfigDef.Type.INT)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 1))
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "Maximum time to wait after trying to connect to the database before timing out, given in milliseconds. Defaults to 30 seconds (30,000 ms).")
+ .withDefault(30 * 1000)
+ .withValidation(Field::isPositiveInteger);
+
+ public static final Field EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE =
+ Field.create("event.deserialization.failure.handling.mode")
+ .withDisplayName("Event deserialization failure handling")
+ .withEnum(
+ EventProcessingFailureHandlingMode.class,
+ EventProcessingFailureHandlingMode.FAIL)
+ .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 21))
+ .withValidation(
+ TiDBConnectorConfig
+ ::validateEventDeserializationFailureHandlingModeNotSet)
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "Specify how failures during deserialization of binlog events (i.e. when encountering a corrupted event) should be handled, including:"
+ + "'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; "
+ + "'warn' the problematic event and its binlog position will be logged and the event will be skipped;"
+ + "'ignore' the problematic event will be skipped.");
+
+ public static final Field INCONSISTENT_SCHEMA_HANDLING_MODE =
+ Field.create("inconsistent.schema.handling.mode")
+ .withDisplayName("Inconsistent schema failure handling")
+ .withEnum(
+ EventProcessingFailureHandlingMode.class,
+ EventProcessingFailureHandlingMode.FAIL)
+ .withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 2))
+ .withValidation(
+ TiDBConnectorConfig::validateInconsistentSchemaHandlingModeNotIgnore)
+ .withWidth(ConfigDef.Width.SHORT)
+ .withImportance(ConfigDef.Importance.MEDIUM)
+ .withDescription(
+ "Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:"
+ + "'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; "
+ + "'warn' the problematic event and its binlog position will be logged and the event will be skipped;"
+ + "'skip' the problematic event will be skipped.");
+
+ private static int validateEventDeserializationFailureHandlingModeNotSet(
+ Configuration config, Field field, Field.ValidationOutput problems) {
+ final String modeName =
+ config.asMap().get(EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE.name());
+ if (modeName != null) {
+ LOGGER.warn(
+ "Configuration option '{}' is renamed to '{}'",
+ EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE.name(),
+ EVENT_PROCESSING_FAILURE_HANDLING_MODE.name());
+ if (EventProcessingFailureHandlingMode.OBSOLETE_NAME_FOR_SKIP_FAILURE_HANDLING.equals(
+ modeName)) {
+ LOGGER.warn(
+ "Value '{}' of configuration option '{}' is deprecated and should be replaced with '{}'",
+ EventProcessingFailureHandlingMode.OBSOLETE_NAME_FOR_SKIP_FAILURE_HANDLING,
+ EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE.name(),
+ EventProcessingFailureHandlingMode.SKIP.getValue());
+ }
+ }
+ return 0;
+ }
+
+ private static int validateInconsistentSchemaHandlingModeNotIgnore(
+ Configuration config, Field field, Field.ValidationOutput problems) {
+ final String modeName = config.getString(INCONSISTENT_SCHEMA_HANDLING_MODE);
+ if (EventProcessingFailureHandlingMode.OBSOLETE_NAME_FOR_SKIP_FAILURE_HANDLING.equals(
+ modeName)) {
+ LOGGER.warn(
+ "Value '{}' of configuration option '{}' is deprecated and should be replaced with '{}'",
+ EventProcessingFailureHandlingMode.OBSOLETE_NAME_FOR_SKIP_FAILURE_HANDLING,
+ INCONSISTENT_SCHEMA_HANDLING_MODE.name(),
+ EventProcessingFailureHandlingMode.SKIP.getValue());
+ }
+ return 0;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java
new file mode 100644
index 00000000000..7268a1d9f19
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfig.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source.config;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import io.debezium.config.Configuration;
+import org.tikv.common.TiConfiguration;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/** The configuration for TiDB source. */
+public class TiDBSourceConfig extends JdbcSourceConfig {
+ private static final long serialVersionUID = 1L;
+ private final String compatibleMode;
+ private final String pdAddresses;
+
+ private final String hostMapping;
+ private TiConfiguration tiConfiguration;
+ private final Properties jdbcProperties;
+ private Map chunkKeyColumns;
+
+ public TiDBSourceConfig(
+ String compatibleMode,
+ StartupOptions startupOptions,
+ List databaseList,
+ List tableList,
+ String pdAddresses,
+ String hostMapping,
+ int splitSize,
+ int splitMetaGroupSize,
+ TiConfiguration tiConfiguration,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ boolean includeSchemaChanges,
+ boolean closeIdleReaders,
+ Properties jdbcProperties,
+ Configuration dbzConfiguration,
+ String driverClassName,
+ String hostname,
+ int port,
+ String username,
+ String password,
+ int fetchSize,
+ String serverTimeZone,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ String chunkKeyColumn,
+ Map chunkKeyColumns,
+ boolean skipSnapshotBackfill,
+ boolean isScanNewlyAddedTableEnabled,
+ boolean assignUnboundedChunkFirst) {
+ super(
+ startupOptions,
+ databaseList,
+ null,
+ tableList,
+ splitSize,
+ splitMetaGroupSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ includeSchemaChanges,
+ closeIdleReaders,
+ jdbcProperties,
+ dbzConfiguration,
+ driverClassName,
+ hostname,
+ port,
+ username,
+ password,
+ fetchSize,
+ serverTimeZone,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ chunkKeyColumn,
+ skipSnapshotBackfill,
+ isScanNewlyAddedTableEnabled,
+ assignUnboundedChunkFirst);
+ this.compatibleMode = compatibleMode;
+ this.pdAddresses = pdAddresses;
+ this.hostMapping = hostMapping;
+ this.jdbcProperties = jdbcProperties;
+ this.tiConfiguration = tiConfiguration;
+ this.chunkKeyColumns = chunkKeyColumns;
+ }
+
+ public String getCompatibleMode() {
+ return compatibleMode;
+ }
+
+ public String getPdAddresses() {
+ return pdAddresses;
+ }
+
+ public String getHostMapping() {
+ return hostMapping;
+ }
+
+ public Properties getJdbcProperties() {
+ return this.jdbcProperties;
+ }
+
+ public TiConfiguration getTiConfiguration() {
+ return this.tiConfiguration;
+ }
+
+ public Map getChunkKeyColumns() {
+ return this.chunkKeyColumns;
+ }
+
+ @Override
+ public TiDBConnectorConfig getDbzConnectorConfig() {
+ return new TiDBConnectorConfig(this);
+ }
+
+ public StartupOptions getStartupOptions() {
+ return startupOptions;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java
new file mode 100644
index 00000000000..2e6afbf9496
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceConfigFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source.config;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfigFactory;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import io.debezium.config.Configuration;
+import org.tikv.common.TiConfiguration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
+import static org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
+
+/** A factory to initialize {@link TiDBSourceConfig}. */
+@SuppressWarnings("UnusedReturnValue")
+public class TiDBSourceConfigFactory extends JdbcSourceConfigFactory {
+ private static final long serialVersionUID = 1L;
+ private String compatibleMode;
+ private String driverClassName = "com.mysql.cj.jdbc.Driver";
+ private String pdAddresses;
+
+ private String hostMapping;
+ private TiConfiguration tiConfiguration;
+ private Properties tikvProperties;
+ private Properties jdbcProperties;
+ private Map chunkKeyColumns = new HashMap<>();
+
+ public TiDBSourceConfigFactory compatibleMode(String compatibleMode) {
+ this.compatibleMode = compatibleMode;
+ return this;
+ }
+
+ public TiDBSourceConfigFactory chunkKeyColumn(ObjectPath objectPath, String chunkKeyColumn) {
+ this.chunkKeyColumns.put(objectPath, chunkKeyColumn);
+ return this;
+ }
+
+ public TiDBSourceConfigFactory chunkKeyColumns(Map chunkKeyColumns) {
+ this.chunkKeyColumns.putAll(chunkKeyColumns);
+ return this;
+ }
+
+ public TiDBSourceConfigFactory driverClassName(String driverClassName) {
+ this.driverClassName = driverClassName;
+ return this;
+ }
+
+ public TiDBSourceConfigFactory pdAddresses(String pdAddresses) {
+ this.pdAddresses = pdAddresses;
+ return this;
+ }
+
+ public TiDBSourceConfigFactory hostMapping(String hostMapping) {
+ this.hostMapping = hostMapping;
+ return this;
+ }
+
+ public TiDBSourceConfigFactory tikvProperties(Properties tikvProperties) {
+ this.tikvProperties = tikvProperties;
+ return this;
+ }
+
+ public TiDBSourceConfigFactory jdbcProperties(Properties jdbcProperties) {
+ this.jdbcProperties = jdbcProperties;
+ return this;
+ }
+
+ public TiDBSourceConfigFactory tiConfiguration(TiConfiguration tiConfiguration) {
+ this.tiConfiguration = tiConfiguration;
+ return this;
+ }
+
+ @Override
+ public TiDBSourceConfig create(int subtask) {
+ checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
+ Properties props = new Properties();
+ props.setProperty("database.server.name", "tidb_cdc");
+ props.setProperty("database.hostname", checkNotNull(hostname));
+ props.setProperty("database.port", String.valueOf(port));
+ props.setProperty("database.user", checkNotNull(username));
+ props.setProperty("database.password", checkNotNull(password));
+ props.setProperty("database.dbname", checkNotNull(databaseList.get(0)));
+ props.setProperty("database.connect.timeout.ms", String.valueOf(connectTimeout.toMillis()));
+
+ // table filter
+ // props.put("database.include.list", String.join(",", databaseList));
+ if (tableList != null) {
+ props.put("table.include.list", String.join(",", tableList));
+ }
+ // value converter
+ props.put("decimal.handling.mode", "precise");
+ props.put("time.precision.mode", "adaptive_time_microseconds");
+ props.put("binary.handling.mode", "bytes");
+
+ if (jdbcProperties != null) {
+ props.putAll(jdbcProperties);
+ }
+
+ if (tikvProperties != null) {
+ props.putAll(tikvProperties);
+ }
+
+ Configuration dbzConfiguration = Configuration.from(props);
+ return new TiDBSourceConfig(
+ compatibleMode,
+ startupOptions,
+ databaseList,
+ tableList,
+ pdAddresses,
+ hostMapping,
+ splitSize,
+ splitMetaGroupSize,
+ tiConfiguration,
+ distributionFactorUpper,
+ distributionFactorLower,
+ includeSchemaChanges,
+ closeIdleReaders,
+ props,
+ dbzConfiguration,
+ driverClassName,
+ hostname,
+ port,
+ username,
+ password,
+ fetchSize,
+ serverTimeZone,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ chunkKeyColumn,
+ chunkKeyColumns,
+ skipSnapshotBackfill,
+ scanNewlyAddedTableEnabled,
+ assignUnboundedChunkFirst);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java
new file mode 100644
index 00000000000..fb8a026e1b6
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/config/TiDBSourceOptions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source.config;
+
+import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
+import org.apache.flink.cdc.connectors.tidb.utils.UriHostMapping;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.tikv.common.TiConfiguration;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+
+/** Options for {@link org.apache.flink.cdc.connectors.tidb.table.TiDBTableSource}. */
+public class TiDBSourceOptions extends JdbcSourceOptions {
+
+ public static final ConfigOption TIDB_PORT =
+ ConfigOptions.key("port")
+ .intType()
+ .defaultValue(4000)
+ .withDescription("Integer port number of the TiDB database server.");
+
+ public static final ConfigOption PD_ADDRESSES =
+ ConfigOptions.key("pd-addresses")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("TiDB pd-server addresses");
+
+ public static final ConfigOption HEARTBEAT_INTERVAL =
+ ConfigOptions.key("heartbeat.interval.ms")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "Optional interval of sending heartbeat event for tracing the latest available replication slot offsets");
+
+ public static final ConfigOption TABLE_LIST =
+ ConfigOptions.key("table-list")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "List of full names of tables, separated by commas, e.g. \"db1.table1, db2.table2\".");
+
+ public static final ConfigOption HOST_MAPPING =
+ ConfigOptions.key("host-mapping")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "TiKV cluster's host-mapping used to configure public IP and intranet IP mapping. When the TiKV cluster is running on the intranet, you can map a set of intranet IPs to public IPs for an outside Flink cluster to access. The format is {Intranet IP1}:{Public IP1};{Intranet IP2}:{Public IP2}, e.g. 192.168.0.2:8.8.8.8;192.168.0.3:9.9.9.9.");
+
+ public static final ConfigOption JDBC_DRIVER =
+ ConfigOptions.key("jdbc.driver")
+ .stringType()
+ .defaultValue("com.mysql.cj.jdbc.Driver")
+ .withDescription(
+ "JDBC driver class name, use 'com.mysql.cj.jdbc.Driver' by default.");
+
+ public static TiConfiguration getTiConfiguration(
+ final String pdAddrsStr, final String hostMapping, final Map options) {
+ final Configuration configuration = Configuration.fromMap(options);
+
+ final TiConfiguration tiConf = TiConfiguration.createDefault(pdAddrsStr);
+ Optional.of(new UriHostMapping(hostMapping)).ifPresent(tiConf::setHostMapping);
+ // todo add more config to tidb
+ return tiConf;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnection.java
new file mode 100644
index 00000000000..48e4cd0d29c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnection.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source.connection;
+
+import org.apache.flink.cdc.connectors.tidb.source.config.TiDBConnectorConfig;
+import org.apache.flink.cdc.connectors.tidb.source.offset.EventOffsetContext;
+import org.apache.flink.cdc.connectors.tidb.source.schema.TiDBDatabaseSchema;
+import org.apache.flink.cdc.connectors.tidb.utils.TiDBUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.tidb.TiDBPartition;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.schema.SchemaChangeEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+/** TiDB JDBC connection. */
+public class TiDBConnection extends JdbcConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(TiDBConnection.class);
+
+ private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties();
+ private static final String MYSQL_URL_PATTERN =
+ "jdbc:mysql://${hostname}:${port}/?connectTimeout=${connectTimeout}";
+ private static final String SHOW_CREATE_TABLE = "SHOW CREATE TABLE ";
+ private static final int TYPE_BINARY_FLOAT = 100;
+ private static final int TYPE_BINARY_DOUBLE = 101;
+ private static final int TYPE_TIMESTAMP_WITH_TIME_ZONE = -101;
+ private static final int TYPE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = -102;
+ private static final int TYPE_INTERVAL_YEAR_TO_MONTH = -103;
+ private static final int TYPE_INTERVAL_DAY_TO_SECOND = -104;
+ private static final char quote = '`';
+ private static final String QUOTED_CHARACTER = "`";
+
+ public TiDBConnection(
+ String hostname,
+ Integer port,
+ String user,
+ String password,
+ Duration timeout,
+ String jdbcDriver,
+ Properties jdbcProperties,
+ ClassLoader classLoader) {
+ super(
+ config(hostname, port, user, password, timeout),
+ JdbcConnection.patternBasedFactory(
+ formatJdbcUrl(jdbcDriver, jdbcProperties), jdbcDriver, classLoader),
+ quote + "",
+ quote + "");
+ }
+
+ public TiDBConnection(
+ JdbcConfiguration config,
+ ConnectionFactory connectionFactory,
+ String openingQuoteCharacter,
+ String closingQuoteCharacter) {
+ super(config, connectionFactory, openingQuoteCharacter, closingQuoteCharacter);
+ }
+
+ public TiDBConnection(
+ JdbcConfiguration config,
+ ConnectionFactory connectionFactory,
+ Supplier classLoaderSupplier,
+ String openingQuoteCharacter,
+ String closingQuoteCharacter) {
+ super(
+ config,
+ connectionFactory,
+ classLoaderSupplier,
+ openingQuoteCharacter,
+ closingQuoteCharacter);
+ }
+
+ protected TiDBConnection(
+ JdbcConfiguration config,
+ ConnectionFactory connectionFactory,
+ Operations initialOperations,
+ Supplier classLoaderSupplier,
+ String openingQuotingChar,
+ String closingQuotingChar) {
+ super(
+ config,
+ connectionFactory,
+ initialOperations,
+ classLoaderSupplier,
+ openingQuotingChar,
+ closingQuotingChar);
+ }
+
+ private static JdbcConfiguration config(
+ String hostname, Integer port, String user, String password, Duration timeout) {
+ return JdbcConfiguration.create()
+ .with("hostname", hostname)
+ .with("port", port)
+ .with("user", user)
+ .with("password", password)
+ .with("connectTimeout", timeout == null ? 30000 : timeout.toMillis())
+ .build();
+ }
+
+ private static String formatJdbcUrl(String jdbcDriver, Properties jdbcProperties) {
+ Properties combinedProperties = new Properties();
+ combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
+ if (jdbcProperties != null) {
+ combinedProperties.putAll(jdbcProperties);
+ }
+ StringBuilder jdbcUrlStringBuilder = new StringBuilder(MYSQL_URL_PATTERN);
+ combinedProperties.forEach(
+ (key, value) -> {
+ jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
+ });
+ return jdbcUrlStringBuilder.toString();
+ }
+
+ private static Properties initializeDefaultJdbcProperties() {
+ Properties defaultJdbcProperties = new Properties();
+ defaultJdbcProperties.setProperty("useInformationSchema", "true");
+ defaultJdbcProperties.setProperty("nullCatalogMeansCurrent", "false");
+ defaultJdbcProperties.setProperty("useUnicode", "true");
+ defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "convertToNull");
+ defaultJdbcProperties.setProperty("characterEncoding", "UTF-8");
+ defaultJdbcProperties.setProperty("characterSetResults", "UTF-8");
+ return defaultJdbcProperties;
+ }
+
+ public long getCurrentTimestampS() throws SQLException {
+ try {
+ long globalTimestamp = getGlobalTimestamp();
+ LOG.info("Global timestamp: {}", globalTimestamp);
+ return Long.parseLong(String.valueOf(globalTimestamp).substring(0, 10));
+ } catch (Exception e) {
+ LOG.warn("Failed to get global timestamp, use local timestamp instead");
+ }
+ return getCurrentTimestamp()
+ .orElseThrow(IllegalStateException::new)
+ .toInstant()
+ .getEpochSecond();
+ }
+
+ private long getGlobalTimestamp() throws SQLException {
+ return querySingleValue(
+ connection(), "SELECT CURRENT_TIMESTAMP FROM DUAL", ps -> {}, rs -> rs.getLong(1));
+ }
+
+ @Override
+ public Optional getCurrentTimestamp() throws SQLException {
+ return queryAndMap(
+ "SELECT LOCALTIMESTAMP FROM DUAL",
+ rs -> rs.next() ? Optional.of(rs.getTimestamp(1)) : Optional.empty());
+ }
+
+ @Override
+ protected String[] supportedTableTypes() {
+ return new String[] {"TABLE"};
+ }
+
+ @Override
+ public String quotedTableIdString(TableId tableId) {
+ return tableId.toQuotedString(quote);
+ }
+
+ public void readSchemaForCapturedTables(
+ Tables tables,
+ String databaseCatalog,
+ String schemaNamePattern,
+ Tables.ColumnNameFilter columnFilter,
+ boolean removeTablesNotFoundInJdbc,
+ Set capturedTables)
+ throws SQLException {
+
+ Set tableIdsBefore = new HashSet<>(tables.tableIds());
+
+ DatabaseMetaData metadata = connection().getMetaData();
+ Map> columnsByTable = new HashMap<>();
+
+ for (TableId tableId : capturedTables) {
+ try (ResultSet columnMetadata =
+ metadata.getColumns(
+ databaseCatalog, schemaNamePattern, tableId.table(), null)) {
+ while (columnMetadata.next()) {
+ // add all whitelisted columns
+ readTableColumn(columnMetadata, tableId, columnFilter)
+ .ifPresent(
+ column -> {
+ columnsByTable
+ .computeIfAbsent(tableId, t -> new ArrayList<>())
+ .add(column.create());
+ });
+ }
+ }
+ }
+
+ // Read the metadata for the primary keys ...
+ for (Map.Entry> tableEntry : columnsByTable.entrySet()) {
+ // First get the primary key information, which must be done for *each* table ...
+ List pkColumnNames = readPrimaryKeyNames(metadata, tableEntry.getKey());
+
+ // Then define the table ...
+ List columns = tableEntry.getValue();
+ Collections.sort(columns);
+ tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, null);
+ }
+
+ if (removeTablesNotFoundInJdbc) {
+ // Remove any definitions for tables that were not found in the database metadata ...
+ tableIdsBefore.removeAll(columnsByTable.keySet());
+ tableIdsBefore.forEach(tables::removeTable);
+ }
+ }
+
+ @Override
+ protected int resolveNativeType(String typeName) {
+ String upperCaseTypeName = typeName.toUpperCase();
+ if (upperCaseTypeName.startsWith("JSON")) {
+ return Types.VARCHAR;
+ }
+ if (upperCaseTypeName.startsWith("NCHAR")) {
+ return Types.NCHAR;
+ }
+ if (upperCaseTypeName.startsWith("NVARCHAR2")) {
+ return Types.NVARCHAR;
+ }
+ if (upperCaseTypeName.startsWith("TIMESTAMP")) {
+ if (upperCaseTypeName.contains("WITH TIME ZONE")) {
+ return TYPE_TIMESTAMP_WITH_TIME_ZONE;
+ }
+ if (upperCaseTypeName.contains("WITH LOCAL TIME ZONE")) {
+ return TYPE_TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+ }
+ return Types.TIMESTAMP;
+ }
+ if (upperCaseTypeName.startsWith("INTERVAL")) {
+ if (upperCaseTypeName.contains("TO MONTH")) {
+ return TYPE_INTERVAL_YEAR_TO_MONTH;
+ }
+ if (upperCaseTypeName.contains("TO SECOND")) {
+ return TYPE_INTERVAL_DAY_TO_SECOND;
+ }
+ }
+ return Column.UNSET_INT_VALUE;
+ }
+
+ public String readSystemVariable(String variable) throws SQLException {
+ return querySingleValue(
+ connection(),
+ "SHOW VARIABLES LIKE ?",
+ ps -> ps.setString(1, variable),
+ rs -> rs.getString("VALUE"));
+ }
+
+ @Override
+ protected int resolveJdbcType(int metadataJdbcType, int nativeType) {
+ switch (metadataJdbcType) {
+ case TYPE_BINARY_FLOAT:
+ return Types.REAL;
+ case TYPE_BINARY_DOUBLE:
+ return Types.DOUBLE;
+ case TYPE_TIMESTAMP_WITH_TIME_ZONE:
+ case TYPE_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case TYPE_INTERVAL_YEAR_TO_MONTH:
+ case TYPE_INTERVAL_DAY_TO_SECOND:
+ return Types.OTHER;
+ default:
+ return nativeType == Column.UNSET_INT_VALUE ? metadataJdbcType : nativeType;
+ }
+ }
+
+ public List getTables(String dbPattern, String tbPattern) throws SQLException {
+ return listTables(
+ db -> Pattern.matches(dbPattern, db),
+ tableId -> Pattern.matches(tbPattern, tableId.table()));
+ }
+
+ private List listTables(
+ Predicate databaseFilter, Tables.TableFilter tableFilter) throws SQLException {
+ List tableIds = new ArrayList<>();
+ DatabaseMetaData metaData = connection().getMetaData();
+ ResultSet rs = metaData.getCatalogs();
+ List dbList = new ArrayList<>();
+ while (rs.next()) {
+ String db = rs.getString("TABLE_CAT");
+ if (databaseFilter.test(db)) {
+ dbList.add(db);
+ }
+ }
+ for (String db : dbList) {
+
+ rs = metaData.getTables(db, null, null, supportedTableTypes());
+ while (rs.next()) {
+ TableId tableId = new TableId(db, null, rs.getString("TABLE_NAME"));
+ if (tableFilter.isIncluded(tableId)) {
+ tableIds.add(tableId);
+ }
+ }
+ }
+ return tableIds;
+ }
+
+ // 新的readSchema
+ public void readTiDBSchema(
+ TiDBConnectorConfig config,
+ TiDBDatabaseSchema databaseSchema,
+ Tables tables,
+ String databaseCatalog,
+ String schemaNamePattern,
+ Tables.TableFilter tableFilter,
+ Tables.ColumnNameFilter columnFilter,
+ boolean removeTablesNotFoundInJdbc)
+ throws SQLException {
+ // Before we make any changes, get the copy of the set of table IDs ...
+ Set tableIdsBefore = new HashSet<>(tables.tableIds());
+
+ // Read the metadata for the table columns ...
+ DatabaseMetaData metadata = connection().getMetaData();
+
+ // Find regular and materialized views as they cannot be snapshotted
+ final Set viewIds = new HashSet<>();
+ final Set tableIds = new HashSet<>();
+
+ int totalTables = 0;
+ try (final ResultSet rs =
+ metadata.getTables(
+ databaseCatalog, schemaNamePattern, null, supportedTableTypes())) {
+ while (rs.next()) {
+ final String catalogName = resolveCatalogName(rs.getString(1));
+ final String schemaName = rs.getString(2);
+ final String tableName = rs.getString(3);
+ final String tableType = rs.getString(4);
+ if (isTableType(tableType)) {
+ totalTables++;
+ TableId tableId = new TableId(catalogName, schemaName, tableName);
+ if (tableFilter == null || tableFilter.isIncluded(tableId)) {
+ tableIds.add(tableId);
+ }
+ } else {
+ TableId tableId = new TableId(catalogName, schemaName, tableName);
+ viewIds.add(tableId);
+ }
+ }
+ }
+
+ Map> columnsByTable = new HashMap<>();
+ if (totalTables == tableIds.size()) {
+ columnsByTable =
+ getColumnsDetailsWithTableChange(
+ config,
+ databaseSchema,
+ databaseCatalog,
+ schemaNamePattern,
+ null,
+ tableFilter,
+ columnFilter,
+ metadata,
+ viewIds);
+ // LOGGER.info("connection readSchema:", columnsByTable);
+ } else {
+ for (TableId includeTable : tableIds) {
+ Map> cols =
+ getColumnsDetailsWithTableChange(
+ config,
+ databaseSchema,
+ databaseCatalog,
+ schemaNamePattern,
+ null,
+ tableFilter,
+ columnFilter,
+ metadata,
+ viewIds);
+ columnsByTable.putAll(cols);
+ }
+ }
+
+ // Read the metadata for the primary keys ...
+ for (Map.Entry> tableEntry : columnsByTable.entrySet()) {
+ // First get the primary key information, which must be done for *each* table ...
+ List pkColumnNames =
+ readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey());
+
+ // Then define the table ...
+ List columns = tableEntry.getValue();
+ Collections.sort(columns);
+ String defaultCharsetName = null; // JDBC does not expose character sets
+ tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName);
+ }
+
+ if (removeTablesNotFoundInJdbc) {
+ // Remove any definitions for tables that were not found in the database metadata ...
+ tableIdsBefore.removeAll(columnsByTable.keySet());
+ tableIdsBefore.forEach(tables::removeTable);
+ }
+ }
+
+ protected Map> getColumnsDetailsWithTableChange(
+ TiDBConnectorConfig config,
+ TiDBDatabaseSchema databaseSchema,
+ String databaseCatalog,
+ String schemaNamePattern,
+ String tableName,
+ Tables.TableFilter tableFilter,
+ Tables.ColumnNameFilter columnFilter,
+ DatabaseMetaData metadata,
+ final Set viewIds)
+ throws SQLException {
+ Map> columnsByTable = new HashMap<>();
+ try (ResultSet columnMetadata =
+ metadata.getColumns(databaseCatalog, schemaNamePattern, tableName, null)) {
+ while (columnMetadata.next()) {
+ String catalogName = resolveCatalogName(columnMetadata.getString(1));
+ String schemaName = columnMetadata.getString(2);
+ String metaTableName = columnMetadata.getString(3);
+ TableId tableId = new TableId(catalogName, schemaName, metaTableName);
+
+ // exclude views and non-captured tables
+ if (viewIds.contains(tableId)
+ || (tableFilter != null && !tableFilter.isIncluded(tableId))) {
+ continue;
+ }
+ TableChanges.TableChange tableChange =
+ readTableSchema(config, databaseSchema, tableId);
+ if (tableChange != null) {
+ ArrayList columns = new ArrayList<>(tableChange.getTable().columns());
+ columnsByTable.put(tableId, columns);
+ }
+ }
+ }
+ return columnsByTable;
+ }
+
+ private TableChanges.TableChange readTableSchema(
+ TiDBConnectorConfig connectorConfig,
+ TiDBDatabaseSchema databaseSchema,
+ TableId tableId) {
+ final Map tableChangeMap = new HashMap<>();
+ String showCreateTable = SHOW_CREATE_TABLE + TiDBUtils.quote(tableId);
+ final TiDBPartition partition = new TiDBPartition(connectorConfig.getLogicalName());
+ buildSchemaByShowCreateTable(
+ connectorConfig, databaseSchema, partition, this, tableId, tableChangeMap);
+ return tableChangeMap.get(tableId);
+ }
+
+ private void buildSchemaByShowCreateTable(
+ TiDBConnectorConfig config,
+ TiDBDatabaseSchema databaseSchema,
+ TiDBPartition partition,
+ JdbcConnection jdbc,
+ TableId tableId,
+ Map tableChangeMap) {
+ final String sql = SHOW_CREATE_TABLE + TiDBUtils.quote(tableId);
+ try {
+ jdbc.query(
+ sql,
+ rs -> {
+ if (rs.next()) {
+ final String ddl = rs.getString(2);
+ parseSchemaByDdl(
+ config,
+ databaseSchema,
+ partition,
+ ddl,
+ tableId,
+ tableChangeMap);
+ }
+ });
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(
+ String.format("Failed to read schema for table %s by running %s", tableId, sql),
+ e);
+ }
+ }
+
+ private void parseSchemaByDdl(
+ TiDBConnectorConfig config,
+ TiDBDatabaseSchema databaseSchema,
+ TiDBPartition partition,
+ String ddl,
+ TableId tableId,
+ Map tableChangeMap) {
+ final EventOffsetContext offsetContext = EventOffsetContext.initial(config);
+ List schemaChangeEvents =
+ databaseSchema.parseSnapshotDdl(
+ partition, ddl, tableId.catalog(), offsetContext, Instant.now());
+ for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
+ for (TableChanges.TableChange tableChange : schemaChangeEvent.getTableChanges()) {
+ tableChangeMap.put(tableId, tableChange);
+ }
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnectionPoolFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnectionPoolFactory.java
new file mode 100644
index 00000000000..5253ca99650
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/connection/TiDBConnectionPoolFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source.connection;
+
+import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
+
+/** The factory to create {@link TiDBConnectionPool}. */
+public class TiDBConnectionPoolFactory extends JdbcConnectionPoolFactory {
+ private static final String MYSQL_URL_PATTERN =
+ "jdbc:mysql://%s:%s/?useUnicode=true&useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&characterSetResults=UTF-8";
+
+ @Override
+ public String getJdbcUrl(JdbcSourceConfig sourceConfig) {
+ String hostName = sourceConfig.getHostname();
+ int port = sourceConfig.getPort();
+ return String.format(MYSQL_URL_PATTERN, hostName, port);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/converter/TiDBDefaultValueConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/converter/TiDBDefaultValueConverter.java
new file mode 100644
index 00000000000..f94f7db63a1
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/source/converter/TiDBDefaultValueConverter.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.source.converter;
+
+import io.debezium.annotation.Immutable;
+import io.debezium.connector.mysql.MySqlDefaultValueConverter;
+import io.debezium.connector.mysql.MySqlValueConverters;
+import io.debezium.relational.Column;
+import io.debezium.relational.DefaultValueConverter;
+import io.debezium.relational.ValueConverter;
+import io.debezium.util.Collect;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** TiDBDefaultValueConverter. */
+public class TiDBDefaultValueConverter implements DefaultValueConverter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDefaultValueConverter.class);
+
+ private static final Pattern EPOCH_EQUIVALENT_TIMESTAMP =
+ Pattern.compile(
+ "(\\d{4}-\\d{2}-00|\\d{4}-00-\\d{2}|0000-\\d{2}-\\d{2}) (00:00:00(\\.\\d{1,6})?)");
+
+ private static final Pattern EPOCH_EQUIVALENT_DATE =
+ Pattern.compile("\\d{4}-\\d{2}-00|\\d{4}-00-\\d{2}|0000-\\d{2}-\\d{2}");
+
+ private static final String EPOCH_TIMESTAMP = "1970-01-01 00:00:00";
+
+ private static final String EPOCH_DATE = "1970-01-01";
+
+ private static final Pattern TIMESTAMP_PATTERN =
+ Pattern.compile("([0-9]*-[0-9]*-[0-9]*) ([0-9]*:[0-9]*:[0-9]*(\\.([0-9]*))?)");
+
+ private static final Pattern CHARSET_INTRODUCER_PATTERN =
+ Pattern.compile("^_[A-Za-z0-9]+'(.*)'$");
+
+ @Immutable
+ private static final Set TRIM_DATA_TYPES =
+ Collect.unmodifiableSet(
+ Types.TINYINT,
+ Types.INTEGER,
+ Types.DATE,
+ Types.TIMESTAMP,
+ Types.TIMESTAMP_WITH_TIMEZONE,
+ Types.TIME,
+ Types.BOOLEAN,
+ Types.BIT,
+ Types.NUMERIC,
+ Types.DECIMAL,
+ Types.FLOAT,
+ Types.DOUBLE,
+ Types.REAL);
+
+ @Immutable
+ private static final Set NUMBER_DATA_TYPES =
+ Collect.unmodifiableSet(
+ Types.BIT,
+ Types.TINYINT,
+ Types.SMALLINT,
+ Types.INTEGER,
+ Types.BIGINT,
+ Types.FLOAT,
+ Types.REAL,
+ Types.DOUBLE,
+ Types.NUMERIC,
+ Types.DECIMAL);
+
+ private static final DateTimeFormatter ISO_LOCAL_DATE_WITH_OPTIONAL_TIME =
+ new DateTimeFormatterBuilder()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE)
+ .optionalStart()
+ .appendLiteral(" ")
+ .append(DateTimeFormatter.ISO_LOCAL_TIME)
+ .optionalEnd()
+ .toFormatter();
+
+ private final TiDBValueConverters converters;
+
+ public TiDBDefaultValueConverter(TiDBValueConverters converters) {
+ this.converters = converters;
+ }
+
+ @Override
+ public Optional