Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,38 @@ limitations under the License.
<scope>test</scope>
</dependency>

<!-- geometry dependencies -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>${geometry.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.9.8.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-util</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.debezium.connector.tidb.Listeners;

import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.ProxyParseTreeListenerUtil;
import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener;
import io.debezium.connector.mysql.antlr.listener.AlterViewParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateAndAlterDatabaseParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateTableParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateUniqueIndexParserListener;
import io.debezium.connector.mysql.antlr.listener.CreateViewParserListener;
import io.debezium.connector.mysql.antlr.listener.DropDatabaseParserListener;
import io.debezium.connector.mysql.antlr.listener.DropTableParserListener;
import io.debezium.connector.mysql.antlr.listener.DropViewParserListener;
import io.debezium.connector.mysql.antlr.listener.RenameTableParserListener;
import io.debezium.connector.mysql.antlr.listener.SetStatementParserListener;
import io.debezium.connector.mysql.antlr.listener.TruncateTableParserListener;
import io.debezium.connector.mysql.antlr.listener.UseStatementParserListener;
import io.debezium.connector.tidb.TiDBAntlrDdlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.text.ParsingException;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.tree.ErrorNode;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.antlr.v4.runtime.tree.TerminalNode;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class TiDBAntlrDdlParserListener extends MySqlParserBaseListener
implements AntlrDdlParserListener {
private final List<ParseTreeListener> listeners = new CopyOnWriteArrayList<>();

/** Flag for skipping phase. */
private boolean skipNodes;

/**
* Count of skipped nodes. Each enter event during skipping phase will increase the counter and
* each exit event will decrease it. When counter will be decreased to 0, the skipping phase
* will end.
*/
private int skippedNodesCount = 0;

/** Collection of catched exceptions. */
private final Collection<ParsingException> errors = new ArrayList<>();

public TiDBAntlrDdlParserListener(TiDBAntlrDdlParser parser) {
// initialize listeners
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
listeners.add(new DropDatabaseParserListener(parser));
listeners.add(new CreateTableParserListener(parser, listeners));
listeners.add(new AlterTableParserListener(parser, listeners));
listeners.add(new DropTableParserListener(parser));
listeners.add(new RenameTableParserListener(parser));
listeners.add(new TruncateTableParserListener(parser));
listeners.add(new CreateViewParserListener(parser, listeners));
listeners.add(new AlterViewParserListener(parser, listeners));
listeners.add(new DropViewParserListener(parser));
listeners.add(new CreateUniqueIndexParserListener(parser));
listeners.add(new SetStatementParserListener(parser));
listeners.add(new UseStatementParserListener(parser));
}

/**
* Returns all caught errors during tree walk.
*
* @return list of Parsing exceptions
*/
@Override
public Collection<ParsingException> getErrors() {
return errors;
}

@Override
public void enterEveryRule(ParserRuleContext ctx) {
if (skipNodes) {
skippedNodesCount++;
} else {
ProxyParseTreeListenerUtil.delegateEnterRule(ctx, listeners, errors);
}
}

@Override
public void exitEveryRule(ParserRuleContext ctx) {
if (skipNodes) {
if (skippedNodesCount == 0) {
// back in the node where skipping started
skipNodes = false;
} else {
// going up in a tree, means decreasing a number of skipped nodes
skippedNodesCount--;
}
} else {
ProxyParseTreeListenerUtil.delegateExitRule(ctx, listeners, errors);
}
}

@Override
public void visitErrorNode(ErrorNode node) {
ProxyParseTreeListenerUtil.visitErrorNode(node, listeners, errors);
}

@Override
public void visitTerminal(TerminalNode node) {
ProxyParseTreeListenerUtil.visitTerminal(node, listeners, errors);
}

@Override
public void enterRoutineBody(MySqlParser.RoutineBodyContext ctx) {
// this is a grammar rule for BEGIN ... END part of statements. Skip it.
skipNodes = true;
}
}
Loading
Loading