Skip to content

Commit 24ec0c2

Browse files
authored
Merge branch 'master' into grodowski/coding-chimp/on-retry-hook
2 parents 7795e8d + 7c3b9a1 commit 24ec0c2

File tree

5 files changed

+155
-4
lines changed

5 files changed

+155
-4
lines changed

go/base/context.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,9 @@ type MigrationContext struct {
247247

248248
recentBinlogCoordinates mysql.BinlogCoordinates
249249

250-
BinlogSyncerMaxReconnectAttempts int
250+
BinlogSyncerMaxReconnectAttempts int
251+
AllowSetupMetadataLockInstruments bool
252+
IsOpenMetadataLockInstruments bool
251253

252254
Log Logger
253255
}

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func main() {
137137
flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")
138138

139139
flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
140+
flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session hold the MDL of original table before unlock tables in cut-over phase")
140141
flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited")
141142

142143
flag.BoolVar(&migrationContext.IncludeTriggers, "include-triggers", false, "When true, the triggers (if exist) will be created on the new table")

go/logic/applier.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,28 @@ func (this *Applier) dropTable(tableName string) error {
416416
return nil
417417
}
418418

419+
func (this *Applier) StateMetadataLockInstrument() error {
420+
query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`
421+
var enabled, timed string
422+
if err := this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
423+
return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
424+
}
425+
if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") {
426+
this.migrationContext.IsOpenMetadataLockInstruments = true
427+
return nil
428+
}
429+
if !this.migrationContext.AllowSetupMetadataLockInstruments {
430+
return nil
431+
}
432+
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed)
433+
if _, err := this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil {
434+
return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err)
435+
}
436+
this.migrationContext.IsOpenMetadataLockInstruments = true
437+
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled")
438+
return nil
439+
}
440+
419441
// dropTriggers drop the triggers on the applied host
420442
func (this *Applier) DropTriggersFromGhost() error {
421443
if len(this.migrationContext.Triggers) > 0 {
@@ -1091,7 +1113,7 @@ func (this *Applier) RevertAtomicCutOverWaitTimeout() {
10911113
}
10921114

10931115
// AtomicCutOverMagicLock
1094-
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
1116+
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, renameLockSessionId *int64) error {
10951117
tx, err := this.db.Begin()
10961118
if err != nil {
10971119
tableLocked <- err
@@ -1182,6 +1204,20 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
11821204
// We DO NOT return here because we must `UNLOCK TABLES`!
11831205
}
11841206

1207+
this.migrationContext.Log.Infof("Session renameLockSessionId is %+v", *renameLockSessionId)
1208+
// Checking the lock is held by rename session
1209+
if *renameLockSessionId > 0 && this.migrationContext.IsOpenMetadataLockInstruments {
1210+
sleepDuration := time.Duration(10*this.migrationContext.CutOverLockTimeoutSeconds) * time.Millisecond
1211+
for i := 1; i <= 100; i++ {
1212+
err := this.ExpectMetadataLock(*renameLockSessionId)
1213+
if err == nil {
1214+
this.migrationContext.Log.Infof("Rename session is pending lock on the origin table !")
1215+
break
1216+
} else {
1217+
time.Sleep(sleepDuration)
1218+
}
1219+
}
1220+
}
11851221
// Tables still locked
11861222
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
11871223
sql.EscapeName(this.migrationContext.DatabaseName),
@@ -1401,3 +1437,27 @@ func (this *Applier) Teardown() {
14011437
this.singletonDB.Close()
14021438
atomic.StoreInt64(&this.finishedMigrating, 1)
14031439
}
1440+
1441+
func (this *Applier) ExpectMetadataLock(sessionId int64) error {
1442+
found := false
1443+
query := `
1444+
select /* gh-ost */ m.owner_thread_id
1445+
from performance_schema.metadata_locks m join performance_schema.threads t
1446+
on m.owner_thread_id=t.thread_id
1447+
where m.object_type = 'TABLE' and m.object_schema = ? and m.object_name = ?
1448+
and m.lock_type = 'EXCLUSIVE' and m.lock_status = 'PENDING'
1449+
and t.processlist_id = ?
1450+
`
1451+
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
1452+
found = true
1453+
return nil
1454+
}, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, sessionId)
1455+
if err != nil {
1456+
return err
1457+
}
1458+
if !found {
1459+
err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
1460+
return this.migrationContext.Log.Errore(err)
1461+
}
1462+
return nil
1463+
}

go/logic/migrator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -681,8 +681,9 @@ func (this *Migrator) atomicCutOver() (err error) {
681681
lockOriginalSessionIdChan := make(chan int64, 2)
682682
tableLocked := make(chan error, 2)
683683
tableUnlocked := make(chan error, 2)
684+
var renameLockSessionId int64
684685
go func() {
685-
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
686+
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil {
686687
this.migrationContext.Log.Errore(err)
687688
}
688689
}()
@@ -747,6 +748,7 @@ func (this *Migrator) atomicCutOver() (err error) {
747748
// Now that we've found the RENAME blocking, AND the locking connection still alive,
748749
// we know it is safe to proceed to release the lock
749750

751+
renameLockSessionId = renameSessionId
750752
okToUnlockTable <- true
751753
// BAM! magic table dropped, original table lock is released
752754
// -> RENAME released -> queries on original are unblocked.
@@ -1215,6 +1217,10 @@ func (this *Migrator) initiateApplier() error {
12151217
}
12161218
}
12171219
this.applier.WriteChangelogState(string(GhostTableMigrated))
1220+
if err := this.applier.StateMetadataLockInstrument(); err != nil {
1221+
this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
1222+
return err
1223+
}
12181224
go this.applier.InitiateHeartbeat()
12191225
return nil
12201226
}

go/logic/migrator_test.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
gosql "database/sql"
1212
"errors"
1313
"io"
14+
"fmt"
1415
"os"
1516
"path/filepath"
1617
"strings"
@@ -24,7 +25,7 @@ import (
2425
"github.com/stretchr/testify/suite"
2526
"github.com/testcontainers/testcontainers-go/modules/mysql"
2627

27-
"fmt"
28+
"runtime"
2829

2930
"github.com/github/gh-ost/go/base"
3031
"github.com/github/gh-ost/go/binlog"
@@ -339,6 +340,7 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() {
339340
migrationContext.ApplierConnectionConfig = connectionConfig
340341
migrationContext.InspectorConnectionConfig = connectionConfig
341342
migrationContext.SetConnectionConfig("innodb")
343+
migrationContext.InitiallyDropOldTable = true
342344

343345
migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255), ENGINE=InnoDB"
344346

@@ -697,6 +699,86 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) {
697699
assert.Equal(t, tries, 100)
698700
}
699701

702+
func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() {
703+
ctx := context.Background()
704+
705+
_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(64))", getTestTableName()))
706+
suite.Require().NoError(err)
707+
708+
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("insert into %s values(1,'a')", getTestTableName()))
709+
suite.Require().NoError(err)
710+
711+
done := make(chan error, 1)
712+
go func() {
713+
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
714+
if err != nil {
715+
done <- err
716+
return
717+
}
718+
migrationContext := newTestMigrationContext()
719+
migrationContext.ApplierConnectionConfig = connectionConfig
720+
migrationContext.InspectorConnectionConfig = connectionConfig
721+
migrationContext.SetConnectionConfig("innodb")
722+
migrationContext.AllowSetupMetadataLockInstruments = true
723+
migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255)"
724+
migrationContext.HeartbeatIntervalMilliseconds = 100
725+
migrationContext.CutOverLockTimeoutSeconds = 4
726+
727+
_, filename, _, _ := runtime.Caller(0)
728+
migrationContext.PostponeCutOverFlagFile = filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag")
729+
730+
migrator := NewMigrator(migrationContext, "0.0.0")
731+
732+
//nolint:contextcheck
733+
done <- migrator.Migrate()
734+
}()
735+
736+
time.Sleep(2 * time.Second)
737+
//nolint:dogsled
738+
_, filename, _, _ := runtime.Caller(0)
739+
err = os.Remove(filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag"))
740+
if err != nil {
741+
suite.Require().NoError(err)
742+
}
743+
time.Sleep(1 * time.Second)
744+
go func() {
745+
holdConn, err := suite.db.Conn(ctx)
746+
suite.Require().NoError(err)
747+
_, err = holdConn.ExecContext(ctx, "SELECT *, sleep(2) FROM test._testing_gho WHERE id = 1")
748+
suite.Require().NoError(err)
749+
}()
750+
751+
dmlConn, err := suite.db.Conn(ctx)
752+
suite.Require().NoError(err)
753+
754+
_, err = dmlConn.ExecContext(ctx, fmt.Sprintf("insert into %s (id, name) values(2,'b')", getTestTableName()))
755+
fmt.Println("insert into table original table")
756+
suite.Require().NoError(err)
757+
758+
migrateErr := <-done
759+
suite.Require().NoError(migrateErr)
760+
761+
// Verify the new column was added
762+
var delValue, OriginalValue int64
763+
err = suite.db.QueryRow(
764+
fmt.Sprintf("select count(*) from %s._%s_del", testMysqlDatabase, testMysqlTableName),
765+
).Scan(&delValue)
766+
suite.Require().NoError(err)
767+
768+
err = suite.db.QueryRow("select count(*) from " + getTestTableName()).Scan(&OriginalValue)
769+
suite.Require().NoError(err)
770+
771+
suite.Require().LessOrEqual(delValue, OriginalValue)
772+
773+
var tableName, createTableSQL string
774+
//nolint:execinquery
775+
err = suite.db.QueryRow("SHOW CREATE TABLE "+getTestTableName()).Scan(&tableName, &createTableSQL)
776+
suite.Require().NoError(err)
777+
778+
suite.Require().Equal(testMysqlTableName, tableName)
779+
suite.Require().Equal("CREATE TABLE `testing` (\n `id` int NOT NULL,\n `name` varchar(64) DEFAULT NULL,\n `foobar` varchar(255) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createTableSQL)
780+
}
781+
700782
func TestMigrator(t *testing.T) {
701783
suite.Run(t, new(MigratorTestSuite))
702784
}

0 commit comments

Comments
 (0)