Skip to content

Commit 618d1ab

Browse files
wlwilliamxlidezhu
authored andcommitted
fix(ddl): wait running add index before CREATE TABLE LIKE (#3863)
* fix(ddl): fix create table like ddl don't wait add index * refactor ut * modify error handle * refactor * set BlockedTables for create table like ddl * make check * fix extract refer table info * fix test query * remove the useless func needWaitAsyncExecDone * modify search name with case sensitive
1 parent 24c2977 commit 618d1ab

File tree

8 files changed

+421
-46
lines changed

8 files changed

+421
-46
lines changed

logservice/schemastore/persist_storage_ddl_handlers.go

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,26 @@ func getSchemaID(tableMap map[int64]*BasicTableInfo, tableID int64) int64 {
517517
return tableInfo.SchemaID
518518
}
519519

520+
// schemaName should be "Name.O"
521+
func findSchemaIDByName(databaseMap map[int64]*BasicDatabaseInfo, schemaName string) (int64, bool) {
522+
for id, info := range databaseMap {
523+
if info.Name == schemaName {
524+
return id, true
525+
}
526+
}
527+
return 0, false
528+
}
529+
530+
// tableName should be "Name.O"
531+
func findTableIDByName(tableMap map[int64]*BasicTableInfo, schemaID int64, tableName string) (int64, bool) {
532+
for id, info := range tableMap {
533+
if info.SchemaID == schemaID && info.Name == tableName {
534+
return id, true
535+
}
536+
}
537+
return 0, false
538+
}
539+
520540
// =======
521541
// buildPersistedDDLEventFunc start
522542
// =======
@@ -588,9 +608,55 @@ func buildPersistedDDLEventForCreateTable(args buildPersistedDDLEventFuncArgs) P
588608
event := buildPersistedDDLEventCommon(args)
589609
event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID)
590610
event.TableName = event.TableInfo.Name.O
611+
setReferTableForCreateTableLike(&event, args)
591612
return event
592613
}
593614

615+
func setReferTableForCreateTableLike(event *PersistedDDLEvent, args buildPersistedDDLEventFuncArgs) {
616+
if event.Query == "" {
617+
return
618+
}
619+
stmt, err := parser.New().ParseOneStmt(event.Query, "", "")
620+
if err != nil {
621+
log.Error("parse create table ddl failed",
622+
zap.String("query", event.Query),
623+
zap.Error(err))
624+
return
625+
}
626+
createStmt, ok := stmt.(*ast.CreateTableStmt)
627+
if !ok || createStmt.ReferTable == nil {
628+
return
629+
}
630+
refTable := createStmt.ReferTable.Name.O
631+
refSchema := createStmt.ReferTable.Schema.O
632+
if refSchema == "" {
633+
refSchema = event.SchemaName
634+
}
635+
refSchemaID, ok := findSchemaIDByName(args.databaseMap, refSchema)
636+
if !ok {
637+
log.Warn("refer schema not found for create table like",
638+
zap.String("schema", refSchema),
639+
zap.String("table", refTable),
640+
zap.String("query", event.Query))
641+
return
642+
}
643+
refTableID, ok := findTableIDByName(args.tableMap, refSchemaID, refTable)
644+
if !ok {
645+
log.Warn("refer table not found for create table like",
646+
zap.String("schema", refSchema),
647+
zap.String("table", refTable),
648+
zap.String("query", event.Query))
649+
return
650+
}
651+
event.ExtraTableID = refTableID
652+
if partitions, ok := args.partitionMap[refTableID]; ok {
653+
event.ReferTablePartitionIDs = event.ReferTablePartitionIDs[:0]
654+
for id := range partitions {
655+
event.ReferTablePartitionIDs = append(event.ReferTablePartitionIDs, id)
656+
}
657+
}
658+
}
659+
594660
func buildPersistedDDLEventForDropTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
595661
event := buildPersistedDDLEventCommon(args)
596662
event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID)
@@ -872,8 +938,8 @@ func updateDDLHistoryForSchemaDDL(args updateDDLHistoryFuncArgs) []uint64 {
872938

873939
func updateDDLHistoryForAddDropTable(args updateDDLHistoryFuncArgs) []uint64 {
874940
args.appendTableTriggerDDLHistory(args.ddlEvent.FinishedTs)
875-
// Note: for create table, this ddl event will not be sent to table dispatchers.
876-
// add it to ddl history is just for building table info store.
941+
// Note: for create table, this ddl event will not be sent to table dispatchers by default.
942+
// adding it to ddl history is just for building table info store.
877943
if isPartitionTable(args.ddlEvent.TableInfo) {
878944
// for partition table, we only care the ddl history of physical table ids.
879945
for _, partitionID := range getAllPartitionIDs(args.ddlEvent.TableInfo) {
@@ -882,6 +948,13 @@ func updateDDLHistoryForAddDropTable(args updateDDLHistoryFuncArgs) []uint64 {
882948
} else {
883949
args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.TableID)
884950
}
951+
if args.ddlEvent.Type == byte(model.ActionCreateTable) && args.ddlEvent.ExtraTableID != 0 {
952+
if len(args.ddlEvent.ReferTablePartitionIDs) > 0 {
953+
args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.ReferTablePartitionIDs...)
954+
} else {
955+
args.appendTablesDDLHistory(args.ddlEvent.FinishedTs, args.ddlEvent.ExtraTableID)
956+
}
957+
}
885958
return args.tableTriggerDDLHistory
886959
}
887960

@@ -1383,6 +1456,21 @@ func extractTableInfoFuncForSingleTableDDL(event *PersistedDDLEvent, tableID int
13831456
return common.WrapTableInfo(event.SchemaName, event.TableInfo), false
13841457
}
13851458
}
1459+
1460+
// The DDL "CREATE TABLE ... LIKE ..." may be added to the ddl history of the referenced table
1461+
// (or its physical partition IDs) to ensure those tables are blocked while this DDL is executing.
1462+
// It does not change the schema of the referenced table itself, so we should ignore it when
1463+
// building the table info store for the referenced table.
1464+
if event.Type == byte(model.ActionCreateTable) && event.ExtraTableID != 0 {
1465+
if tableID == event.ExtraTableID {
1466+
return nil, false
1467+
}
1468+
for _, id := range event.ReferTablePartitionIDs {
1469+
if tableID == id {
1470+
return nil, false
1471+
}
1472+
}
1473+
}
13861474
log.Panic("should not reach here",
13871475
zap.Any("type", event.Type),
13881476
zap.String("query", event.Query),
@@ -1783,6 +1871,13 @@ func buildDDLEventForNewTableDDL(rawEvent *PersistedDDLEvent, tableFilter filter
17831871
InfluenceType: commonEvent.InfluenceTypeNormal,
17841872
TableIDs: []int64{common.DDLSpanTableID},
17851873
}
1874+
if rawEvent.ExtraTableID != 0 {
1875+
if len(rawEvent.ReferTablePartitionIDs) > 0 {
1876+
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.ReferTablePartitionIDs...)
1877+
} else {
1878+
ddlEvent.BlockedTables.TableIDs = append(ddlEvent.BlockedTables.TableIDs, rawEvent.ExtraTableID)
1879+
}
1880+
}
17861881
if isPartitionTable(rawEvent.TableInfo) {
17871882
physicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
17881883
ddlEvent.NeedAddedTables = make([]commonEvent.Table, 0, len(physicalIDs))
@@ -1811,6 +1906,28 @@ func buildDDLEventForNewTableDDL(rawEvent *PersistedDDLEvent, tableFilter filter
18111906
},
18121907
},
18131908
}
1909+
if rawEvent.Query != "" {
1910+
stmt, err := parser.New().ParseOneStmt(rawEvent.Query, "", "")
1911+
if err != nil {
1912+
log.Error("parse create table ddl failed",
1913+
zap.String("query", rawEvent.Query),
1914+
zap.Error(err))
1915+
return ddlEvent, false, err
1916+
}
1917+
if createStmt, ok := stmt.(*ast.CreateTableStmt); ok && createStmt.ReferTable != nil {
1918+
refTable := createStmt.ReferTable.Name.O
1919+
refSchema := createStmt.ReferTable.Schema.O
1920+
if refSchema == "" {
1921+
refSchema = rawEvent.SchemaName
1922+
}
1923+
ddlEvent.BlockedTableNames = []commonEvent.SchemaTableName{
1924+
{
1925+
SchemaName: refSchema,
1926+
TableName: refTable,
1927+
},
1928+
}
1929+
}
1930+
}
18141931
return ddlEvent, true, err
18151932
}
18161933

logservice/schemastore/persist_storage_test.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3129,3 +3129,171 @@ func TestRenameTable(t *testing.T) {
31293129
})
31303130
assert.Equal(t, "RENAME TABLE `test`.`t1` TO `test`.`t2`", ddl.Query)
31313131
}
3132+
3133+
func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTableNames(t *testing.T) {
3134+
cases := []struct {
3135+
name string
3136+
query string
3137+
expected []commonEvent.SchemaTableName
3138+
}{
3139+
{
3140+
name: "default schema",
3141+
query: "CREATE TABLE `b` LIKE `a`",
3142+
expected: []commonEvent.SchemaTableName{
3143+
{SchemaName: "test", TableName: "a"},
3144+
},
3145+
},
3146+
{
3147+
name: "explicit schema",
3148+
query: "CREATE TABLE `b` LIKE `other`.`a`",
3149+
expected: []commonEvent.SchemaTableName{
3150+
{SchemaName: "other", TableName: "a"},
3151+
},
3152+
},
3153+
}
3154+
3155+
for _, tc := range cases {
3156+
rawEvent := &PersistedDDLEvent{
3157+
Type: byte(model.ActionCreateTable),
3158+
SchemaID: 1,
3159+
TableID: 2,
3160+
SchemaName: "test",
3161+
TableName: "b",
3162+
Query: tc.query,
3163+
TableInfo: &model.TableInfo{},
3164+
}
3165+
3166+
ddlEvent, ok, err := buildDDLEventForNewTableDDL(rawEvent, nil, 0)
3167+
require.NoError(t, err)
3168+
require.True(t, ok)
3169+
require.Equal(t, tc.expected, ddlEvent.BlockedTableNames)
3170+
}
3171+
}
3172+
3173+
func TestBuildPersistedDDLEventForCreateTableLikeSetsReferTableID(t *testing.T) {
3174+
cases := []struct {
3175+
name string
3176+
query string
3177+
partitionIDs []int64
3178+
expectedReferID int64
3179+
}{
3180+
{
3181+
name: "non partition refer table",
3182+
query: "CREATE TABLE `b` LIKE `a`",
3183+
partitionIDs: nil,
3184+
expectedReferID: 101,
3185+
},
3186+
{
3187+
name: "partition refer table",
3188+
query: "CREATE TABLE `b` LIKE `a`",
3189+
partitionIDs: []int64{111, 112},
3190+
expectedReferID: 101,
3191+
},
3192+
}
3193+
3194+
for _, tc := range cases {
3195+
job := buildCreateTableJobForTest(100, 200, "b", 1010)
3196+
job.Query = tc.query
3197+
partitionMap := map[int64]BasicPartitionInfo{}
3198+
if len(tc.partitionIDs) > 0 {
3199+
partitionInfo := make(BasicPartitionInfo)
3200+
for _, id := range tc.partitionIDs {
3201+
partitionInfo[id] = nil
3202+
}
3203+
partitionMap[tc.expectedReferID] = partitionInfo
3204+
}
3205+
ddl := buildPersistedDDLEventForCreateTable(buildPersistedDDLEventFuncArgs{
3206+
job: job,
3207+
databaseMap: map[int64]*BasicDatabaseInfo{
3208+
100: {Name: "test", Tables: map[int64]bool{101: true, 200: true}},
3209+
},
3210+
tableMap: map[int64]*BasicTableInfo{
3211+
101: {SchemaID: 100, Name: "a"},
3212+
200: {SchemaID: 100, Name: "b"},
3213+
},
3214+
partitionMap: partitionMap,
3215+
})
3216+
require.Equal(t, tc.expectedReferID, ddl.ExtraTableID, tc.name)
3217+
if len(tc.partitionIDs) > 0 {
3218+
require.ElementsMatch(t, tc.partitionIDs, ddl.ReferTablePartitionIDs, tc.name)
3219+
} else {
3220+
require.Empty(t, ddl.ReferTablePartitionIDs, tc.name)
3221+
}
3222+
}
3223+
}
3224+
3225+
func TestBuildDDLEventForNewTableDDL_CreateTableLikeBlockedTables(t *testing.T) {
3226+
rawEvent := &PersistedDDLEvent{
3227+
Type: byte(model.ActionCreateTable),
3228+
SchemaID: 1,
3229+
TableID: 2,
3230+
SchemaName: "test",
3231+
TableName: "b",
3232+
Query: "CREATE TABLE `b` LIKE `a`",
3233+
TableInfo: &model.TableInfo{},
3234+
ExtraTableID: 101,
3235+
}
3236+
ddlEvent, ok, err := buildDDLEventForNewTableDDL(rawEvent, nil, 0)
3237+
require.NoError(t, err)
3238+
require.True(t, ok)
3239+
require.ElementsMatch(t, []int64{common.DDLSpanTableID, 101}, ddlEvent.BlockedTables.TableIDs)
3240+
3241+
rawEvent.ReferTablePartitionIDs = []int64{111, 112}
3242+
ddlEvent, ok, err = buildDDLEventForNewTableDDL(rawEvent, nil, 0)
3243+
require.NoError(t, err)
3244+
require.True(t, ok)
3245+
require.ElementsMatch(t, []int64{common.DDLSpanTableID, 111, 112}, ddlEvent.BlockedTables.TableIDs)
3246+
}
3247+
3248+
func TestUpdateDDLHistoryForAddDropTable_CreateTableLikeAddsReferTable(t *testing.T) {
3249+
args := updateDDLHistoryFuncArgs{
3250+
ddlEvent: &PersistedDDLEvent{
3251+
Type: byte(model.ActionCreateTable),
3252+
TableID: 200,
3253+
ExtraTableID: 101,
3254+
FinishedTs: 10,
3255+
TableInfo: &model.TableInfo{},
3256+
},
3257+
tablesDDLHistory: map[int64][]uint64{},
3258+
tableTriggerDDLHistory: []uint64{},
3259+
}
3260+
updateDDLHistoryForAddDropTable(args)
3261+
require.Equal(t, []uint64{10}, args.tablesDDLHistory[200])
3262+
require.Equal(t, []uint64{10}, args.tablesDDLHistory[101])
3263+
3264+
args.ddlEvent.ReferTablePartitionIDs = []int64{111, 112}
3265+
args.tablesDDLHistory = map[int64][]uint64{}
3266+
args.tableTriggerDDLHistory = []uint64{}
3267+
updateDDLHistoryForAddDropTable(args)
3268+
require.Equal(t, []uint64{10}, args.tablesDDLHistory[200])
3269+
require.Equal(t, []uint64{10}, args.tablesDDLHistory[111])
3270+
require.Equal(t, []uint64{10}, args.tablesDDLHistory[112])
3271+
require.Empty(t, args.tablesDDLHistory[101])
3272+
}
3273+
3274+
func TestExtractTableInfoFuncForSingleTableDDL_CreateTableLikeReferTableIgnored(t *testing.T) {
3275+
rawEvent := &PersistedDDLEvent{
3276+
Type: byte(model.ActionCreateTable),
3277+
TableID: 140,
3278+
ExtraTableID: 138,
3279+
Query: "CREATE TABLE `b` LIKE `a`",
3280+
}
3281+
3282+
require.NotPanics(t, func() {
3283+
tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 138)
3284+
require.Nil(t, tableInfo)
3285+
require.False(t, deleted)
3286+
})
3287+
3288+
rawEvent.ReferTablePartitionIDs = []int64{111, 112}
3289+
require.NotPanics(t, func() {
3290+
tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 111)
3291+
require.Nil(t, tableInfo)
3292+
require.False(t, deleted)
3293+
})
3294+
require.NotPanics(t, func() {
3295+
tableInfo, deleted := extractTableInfoFuncForSingleTableDDL(rawEvent, 112)
3296+
require.Nil(t, tableInfo)
3297+
require.False(t, deleted)
3298+
})
3299+
}

logservice/schemastore/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ type PersistedDDLEvent struct {
6161
// the following fields are only set when the ddl job involves a partition table
6262
// it is the partition info of the table before this ddl
6363
PrevPartitions []int64 `msg:"prev_partitions"`
64+
// ReferTablePartitionIDs is only set for CREATE TABLE ... LIKE ... when the referenced table is partitioned.
65+
// It records the physical partition IDs of the referenced table.
66+
ReferTablePartitionIDs []int64 `msg:"refer_table_partitions"`
6467

6568
Query string `msg:"query"`
6669
SchemaVersion int64 `msg:"schema_version"`

0 commit comments

Comments
 (0)