Skip to content

Commit 5969c65

Browse files
authored
ddl(ticdc): ignore ddl with schemaversion 0 (#11856) (#11923)
close #11839
1 parent 5e61d5a commit 5969c65

File tree

6 files changed

+191
-2
lines changed

6 files changed

+191
-2
lines changed

cdc/entry/schema_storage.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
197197
var snap *schema.Snapshot
198198
if len(s.snaps) > 0 {
199199
lastSnap := s.snaps[len(s.snaps)-1]
200+
// already-executed DDL could filted by finishedTs.
200201
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() {
201202
log.Info("schemaStorage: ignore foregone DDL",
202203
zap.String("namespace", s.id.Namespace),
203204
zap.String("changefeed", s.id.ID),
204205
zap.String("DDL", job.Query),
206+
zap.String("state", job.State.String()),
205207
zap.Int64("jobID", job.ID),
206208
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
207209
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),

cdc/puller/ddl_puller.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
396396
}
397397
}()
398398

399-
if job.BinlogInfo.FinishedTS <= p.getResolvedTs() {
399+
if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
400+
job.BinlogInfo.SchemaVersion == 0 /* means the ddl is ignored in upstream */ {
400401
log.Info("ddl job finishedTs less than puller resolvedTs,"+
401402
"discard the ddl job",
402403
zap.Uint64("jobFinishedTS", job.BinlogInfo.FinishedTS),
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# diff Configuration.
2+
check-thread-count = 4
3+
4+
export-fix-sql = true
5+
6+
check-struct-only = false
7+
8+
[task]
9+
output-dir = "/tmp/tidb_cdc_test/ddl_with_exists/sync_diff/output"
10+
11+
source-instances = ["mysql1"]
12+
13+
target-instance = "tidb0"
14+
15+
target-check-tables = ["ddl_with_exists.*"]
16+
17+
[data-sources]
18+
[data-sources.mysql1]
19+
host = "127.0.0.1"
20+
port = 4000
21+
user = "root"
22+
password = ""
23+
24+
[data-sources.tidb0]
25+
host = "127.0.0.1"
26+
port = 3306
27+
user = "root"
28+
password = ""
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/bin/bash
2+
3+
set -eu
4+
5+
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
6+
source $CUR/../_utils/test_prepare
7+
WORK_DIR=$OUT_DIR/$TEST_NAME
8+
CDC_BINARY=cdc.test
9+
SINK_TYPE=$1
10+
11+
function run() {
12+
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
13+
14+
start_tidb_cluster --workdir $WORK_DIR
15+
16+
cd $WORK_DIR
17+
18+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
19+
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
20+
21+
# this test contains `recover table`, which requires super privilege, so we
22+
# can't use the normal user
23+
TOPIC_NAME="ticdc-ddl-mamager-test-$RANDOM"
24+
case $SINK_TYPE in
25+
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
26+
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
27+
pulsar)
28+
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
29+
;;
30+
*) SINK_URI="mysql://root@127.0.0.1:3306/" ;;
31+
esac
32+
changefeed_id="ddl-with-exists"
33+
run_cdc_cli changefeed create --sink-uri="$SINK_URI" -c=${changefeed_id}
34+
35+
case $SINK_TYPE in
36+
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
37+
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
38+
pulsar) run_pulsar_consumer $WORK_DIR $SINK_URI ;;
39+
esac
40+
41+
run_sql "CREATE DATABASE ddl_with_exists"
42+
43+
cd $CUR
44+
GO111MODULE=on go run test.go
45+
46+
run_sql "CREATE TABLE ddl_with_exists.finish_mark (a int primary key);"
47+
check_table_exists ddl_with_exists.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
48+
# make sure all tables are equal in upstream and downstream
49+
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 180
50+
cleanup_process $CDC_BINARY
51+
}
52+
53+
trap stop_tidb_cluster EXIT
54+
run $*
55+
check_logs $WORK_DIR
56+
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"database/sql"
18+
"fmt"
19+
"log"
20+
"math/rand"
21+
"os"
22+
"sync"
23+
"time"
24+
25+
_ "github.com/go-sql-driver/mysql"
26+
)
27+
28+
func main() {
29+
upHost := GetEnvDefault("UP_TIDB_HOST", "127.0.0.1")
30+
upPort := GetEnvDefault("UP_TIDB_PORT", "4000")
31+
dsn := fmt.Sprintf("root@tcp(%s:%s)/", upHost, upPort)
32+
db, err := sql.Open("mysql", dsn)
33+
if err != nil {
34+
log.Fatal("open db failed:", dsn, ", err: ", err)
35+
}
36+
defer db.Close()
37+
38+
if err = db.Ping(); err != nil {
39+
log.Fatal("ping db failed:", dsn, ", err: ", err)
40+
}
41+
log.Println("connect to tidb success, dsn: ", dsn)
42+
43+
createTable := `create table if not exists ddl_with_exists.t%d (
44+
id int primary key auto_increment,
45+
name varchar(255)
46+
);`
47+
addColumn := "alter table ddl_with_exists.t%d add column if not exists age int;"
48+
dropColumn := "alter table ddl_with_exists.t%d drop column if exists age;"
49+
addIndex := "alter table ddl_with_exists.t%d add index if not exists idx1(id);"
50+
dropIndex := "alter table ddl_with_exists.t%d drop index if exists idx1;"
51+
52+
concurrency := 16
53+
maxTableCnt := 20
54+
db.SetMaxOpenConns(concurrency)
55+
56+
start := time.Now()
57+
for i := 0; i < maxTableCnt; i++ {
58+
_, err := db.Exec(fmt.Sprintf(createTable, i))
59+
if err != nil {
60+
log.Fatal("create table failed:", i, ", err: ", err)
61+
}
62+
}
63+
log.Println("create table cost:", time.Since(start).Seconds(), "s")
64+
65+
var wg sync.WaitGroup
66+
for i := 0; i < concurrency; i++ {
67+
wg.Add(1)
68+
go func() {
69+
defer wg.Done()
70+
log.Println("worker start:", i)
71+
for j := 0; j < 20; j++ {
72+
idx := rand.Intn(maxTableCnt)
73+
ddl := fmt.Sprintf(createTable, idx)
74+
switch rand.Intn(5) {
75+
case 0:
76+
ddl = fmt.Sprintf(addColumn, idx)
77+
case 1:
78+
ddl = fmt.Sprintf(dropColumn, idx)
79+
case 2:
80+
ddl = fmt.Sprintf(addIndex, idx)
81+
case 3:
82+
ddl = fmt.Sprintf(dropIndex, idx)
83+
default:
84+
}
85+
_, err := db.Exec(ddl)
86+
if err != nil {
87+
log.Println(err)
88+
}
89+
}
90+
log.Println("worker exit:", i)
91+
}()
92+
}
93+
wg.Wait()
94+
}
95+
96+
func GetEnvDefault(key, defaultV string) string {
97+
val, ok := os.LookupEnv(key)
98+
if !ok {
99+
return defaultV
100+
}
101+
return val
102+
}

tests/integration_tests/run_group.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ groups=(
3434
["G00"]="$mysql_only $kafka_only $storage_only"
3535
["G01"]="$mysql_only_http $kafka_only_protocol $storage_only_canal_json multi_tables_ddl"
3636
["G02"]="$mysql_only_consistent_replicate $kafka_only_v2 $storage_only_csv"
37-
["G03"]='row_format drop_many_tables processor_stop_delay partition_table'
37+
["G03"]='row_format drop_many_tables processor_stop_delay partition_table ddl_with_exists'
3838
["G04"]='foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop'
3939
["G05"]='charset_gbk ddl_manager multi_source'
4040
["G06"]='sink_retry changefeed_error ddl_sequence resourcecontrol'

0 commit comments

Comments
 (0)