Skip to content

Commit 7c3b9a1

Browse files
abaowhydbkingmeiji163
authored
Before the successful renaming, a session accessed the ghost table, w… (#1536)
* Before the successful renaming, a session accessed the ghost table, which had already unlocked the original table. There is a very small probability that other sessions dml operations on the original table will occur, and this dml operation will appear in the original table after renaming, resulting in data loss. --------- Co-authored-by: dbking <[email protected]> Co-authored-by: meiji163 <[email protected]>
1 parent 7c40d9e commit 7c3b9a1

File tree

5 files changed

+155
-4
lines changed

5 files changed

+155
-4
lines changed

go/base/context.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,9 @@ type MigrationContext struct {
247247

248248
recentBinlogCoordinates mysql.BinlogCoordinates
249249

250-
BinlogSyncerMaxReconnectAttempts int
250+
BinlogSyncerMaxReconnectAttempts int
251+
AllowSetupMetadataLockInstruments bool
252+
IsOpenMetadataLockInstruments bool
251253

252254
Log Logger
253255
}

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func main() {
137137
flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")
138138

139139
flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
140+
flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session hold the MDL of original table before unlock tables in cut-over phase")
140141
flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited")
141142

142143
flag.BoolVar(&migrationContext.IncludeTriggers, "include-triggers", false, "When true, the triggers (if exist) will be created on the new table")

go/logic/applier.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,28 @@ func (this *Applier) dropTable(tableName string) error {
416416
return nil
417417
}
418418

419+
func (this *Applier) StateMetadataLockInstrument() error {
420+
query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`
421+
var enabled, timed string
422+
if err := this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
423+
return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
424+
}
425+
if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") {
426+
this.migrationContext.IsOpenMetadataLockInstruments = true
427+
return nil
428+
}
429+
if !this.migrationContext.AllowSetupMetadataLockInstruments {
430+
return nil
431+
}
432+
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed)
433+
if _, err := this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil {
434+
return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err)
435+
}
436+
this.migrationContext.IsOpenMetadataLockInstruments = true
437+
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled")
438+
return nil
439+
}
440+
419441
// dropTriggers drop the triggers on the applied host
420442
func (this *Applier) DropTriggersFromGhost() error {
421443
if len(this.migrationContext.Triggers) > 0 {
@@ -1095,7 +1117,7 @@ func (this *Applier) RevertAtomicCutOverWaitTimeout() {
10951117
}
10961118

10971119
// AtomicCutOverMagicLock
1098-
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
1120+
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, renameLockSessionId *int64) error {
10991121
tx, err := this.db.Begin()
11001122
if err != nil {
11011123
tableLocked <- err
@@ -1186,6 +1208,20 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
11861208
// We DO NOT return here because we must `UNLOCK TABLES`!
11871209
}
11881210

1211+
this.migrationContext.Log.Infof("Session renameLockSessionId is %+v", *renameLockSessionId)
1212+
// Checking the lock is held by rename session
1213+
if *renameLockSessionId > 0 && this.migrationContext.IsOpenMetadataLockInstruments {
1214+
sleepDuration := time.Duration(10*this.migrationContext.CutOverLockTimeoutSeconds) * time.Millisecond
1215+
for i := 1; i <= 100; i++ {
1216+
err := this.ExpectMetadataLock(*renameLockSessionId)
1217+
if err == nil {
1218+
this.migrationContext.Log.Infof("Rename session is pending lock on the origin table !")
1219+
break
1220+
} else {
1221+
time.Sleep(sleepDuration)
1222+
}
1223+
}
1224+
}
11891225
// Tables still locked
11901226
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
11911227
sql.EscapeName(this.migrationContext.DatabaseName),
@@ -1405,3 +1441,27 @@ func (this *Applier) Teardown() {
14051441
this.singletonDB.Close()
14061442
atomic.StoreInt64(&this.finishedMigrating, 1)
14071443
}
1444+
1445+
func (this *Applier) ExpectMetadataLock(sessionId int64) error {
1446+
found := false
1447+
query := `
1448+
select /* gh-ost */ m.owner_thread_id
1449+
from performance_schema.metadata_locks m join performance_schema.threads t
1450+
on m.owner_thread_id=t.thread_id
1451+
where m.object_type = 'TABLE' and m.object_schema = ? and m.object_name = ?
1452+
and m.lock_type = 'EXCLUSIVE' and m.lock_status = 'PENDING'
1453+
and t.processlist_id = ?
1454+
`
1455+
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
1456+
found = true
1457+
return nil
1458+
}, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, sessionId)
1459+
if err != nil {
1460+
return err
1461+
}
1462+
if !found {
1463+
err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
1464+
return this.migrationContext.Log.Errore(err)
1465+
}
1466+
return nil
1467+
}

go/logic/migrator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,8 +669,9 @@ func (this *Migrator) atomicCutOver() (err error) {
669669
lockOriginalSessionIdChan := make(chan int64, 2)
670670
tableLocked := make(chan error, 2)
671671
tableUnlocked := make(chan error, 2)
672+
var renameLockSessionId int64
672673
go func() {
673-
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
674+
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil {
674675
this.migrationContext.Log.Errore(err)
675676
}
676677
}()
@@ -735,6 +736,7 @@ func (this *Migrator) atomicCutOver() (err error) {
735736
// Now that we've found the RENAME blocking, AND the locking connection still alive,
736737
// we know it is safe to proceed to release the lock
737738

739+
renameLockSessionId = renameSessionId
738740
okToUnlockTable <- true
739741
// BAM! magic table dropped, original table lock is released
740742
// -> RENAME released -> queries on original are unblocked.
@@ -1203,6 +1205,10 @@ func (this *Migrator) initiateApplier() error {
12031205
}
12041206
}
12051207
this.applier.WriteChangelogState(string(GhostTableMigrated))
1208+
if err := this.applier.StateMetadataLockInstrument(); err != nil {
1209+
this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
1210+
return err
1211+
}
12061212
go this.applier.InitiateHeartbeat()
12071213
return nil
12081214
}

go/logic/migrator_test.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
gosql "database/sql"
1111
"errors"
12+
"fmt"
1213
"os"
1314
"path/filepath"
1415
"strings"
@@ -22,7 +23,7 @@ import (
2223
"github.com/stretchr/testify/suite"
2324
"github.com/testcontainers/testcontainers-go/modules/mysql"
2425

25-
"fmt"
26+
"runtime"
2627

2728
"github.com/github/gh-ost/go/base"
2829
"github.com/github/gh-ost/go/binlog"
@@ -335,6 +336,7 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() {
335336
migrationContext.ApplierConnectionConfig = connectionConfig
336337
migrationContext.InspectorConnectionConfig = connectionConfig
337338
migrationContext.SetConnectionConfig("innodb")
339+
migrationContext.InitiallyDropOldTable = true
338340

339341
migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255), ENGINE=InnoDB"
340342

@@ -573,6 +575,86 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) {
573575
assert.Equal(t, tries, 100)
574576
}
575577

578+
func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() {
579+
ctx := context.Background()
580+
581+
_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(64))", getTestTableName()))
582+
suite.Require().NoError(err)
583+
584+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("insert into %s values(1,'a')", getTestTableName()))
585+
suite.Require().NoError(err)
586+
587+
done := make(chan error, 1)
588+
go func() {
589+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
590+
if err != nil {
591+
done <- err
592+
return
593+
}
594+
migrationContext := newTestMigrationContext()
595+
migrationContext.ApplierConnectionConfig = connectionConfig
596+
migrationContext.InspectorConnectionConfig = connectionConfig
597+
migrationContext.SetConnectionConfig("innodb")
598+
migrationContext.AllowSetupMetadataLockInstruments = true
599+
migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255)"
600+
migrationContext.HeartbeatIntervalMilliseconds = 100
601+
migrationContext.CutOverLockTimeoutSeconds = 4
602+
603+
_, filename, _, _ := runtime.Caller(0)
604+
migrationContext.PostponeCutOverFlagFile = filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag")
605+
606+
migrator := NewMigrator(migrationContext, "0.0.0")
607+
608+
//nolint:contextcheck
609+
done <- migrator.Migrate()
610+
}()
611+
612+
time.Sleep(2 * time.Second)
613+
//nolint:dogsled
614+
_, filename, _, _ := runtime.Caller(0)
615+
err = os.Remove(filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag"))
616+
if err != nil {
617+
suite.Require().NoError(err)
618+
}
619+
time.Sleep(1 * time.Second)
620+
go func() {
621+
holdConn, err := suite.db.Conn(ctx)
622+
suite.Require().NoError(err)
623+
_, err = holdConn.ExecContext(ctx, "SELECT *, sleep(2) FROM test._testing_gho WHERE id = 1")
624+
suite.Require().NoError(err)
625+
}()
626+
627+
dmlConn, err := suite.db.Conn(ctx)
628+
suite.Require().NoError(err)
629+
630+
_, err = dmlConn.ExecContext(ctx, fmt.Sprintf("insert into %s (id, name) values(2,'b')", getTestTableName()))
631+
fmt.Println("insert into table original table")
632+
suite.Require().NoError(err)
633+
634+
migrateErr := <-done
635+
suite.Require().NoError(migrateErr)
636+
637+
// Verify the new column was added
638+
var delValue, OriginalValue int64
639+
err = suite.db.QueryRow(
640+
fmt.Sprintf("select count(*) from %s._%s_del", testMysqlDatabase, testMysqlTableName),
641+
).Scan(&delValue)
642+
suite.Require().NoError(err)
643+
644+
err = suite.db.QueryRow("select count(*) from " + getTestTableName()).Scan(&OriginalValue)
645+
suite.Require().NoError(err)
646+
647+
suite.Require().LessOrEqual(delValue, OriginalValue)
648+
649+
var tableName, createTableSQL string
650+
//nolint:execinquery
651+
err = suite.db.QueryRow("SHOW CREATE TABLE "+getTestTableName()).Scan(&tableName, &createTableSQL)
652+
suite.Require().NoError(err)
653+
654+
suite.Require().Equal(testMysqlTableName, tableName)
655+
suite.Require().Equal("CREATE TABLE `testing` (\n `id` int NOT NULL,\n `name` varchar(64) DEFAULT NULL,\n `foobar` varchar(255) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createTableSQL)
656+
}
657+
576658
func TestMigrator(t *testing.T) {
577659
suite.Run(t, new(MigratorTestSuite))
578660
}

0 commit comments

Comments
 (0)