Skip to content
4 changes: 3 additions & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ type MigrationContext struct {

recentBinlogCoordinates mysql.BinlogCoordinates

BinlogSyncerMaxReconnectAttempts int
BinlogSyncerMaxReconnectAttempts int
AllowSetupMetadataLockInstruments bool
IsOpenMetadataLockInstruments bool

Log Logger
}
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func main() {
flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")

flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session hold the MDL of original table before unlock tables in cut-over phase")
flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited")

flag.BoolVar(&migrationContext.IncludeTriggers, "include-triggers", false, "When true, the triggers (if exist) will be created on the new table")
Expand Down
62 changes: 61 additions & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,28 @@ func (this *Applier) dropTable(tableName string) error {
return nil
}

func (this *Applier) StateMetadataLockInstrument() error {
query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`
var enabled, timed string
if err := this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
}
if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") {
this.migrationContext.IsOpenMetadataLockInstruments = true
return nil
}
if !this.migrationContext.AllowSetupMetadataLockInstruments {
return nil
}
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed)
if _, err := this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil {
return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err)
}
this.migrationContext.IsOpenMetadataLockInstruments = true
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled")
return nil
}

// dropTriggers drop the triggers on the applied host
func (this *Applier) DropTriggersFromGhost() error {
if len(this.migrationContext.Triggers) > 0 {
Expand Down Expand Up @@ -1095,7 +1117,7 @@ func (this *Applier) RevertAtomicCutOverWaitTimeout() {
}

// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, renameLockSessionId *int64) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
Expand Down Expand Up @@ -1186,6 +1208,20 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
// We DO NOT return here because we must `UNLOCK TABLES`!
}

this.migrationContext.Log.Infof("Session renameLockSessionId is %+v", *renameLockSessionId)
// Checking the lock is held by rename session
if *renameLockSessionId > 0 && this.migrationContext.IsOpenMetadataLockInstruments {
sleepDuration := time.Duration(10*this.migrationContext.CutOverLockTimeoutSeconds) * time.Millisecond
for i := 1; i <= 100; i++ {
err := this.ExpectMetadataLock(*renameLockSessionId)
if err == nil {
this.migrationContext.Log.Infof("Rename session is pending lock on the origin table !")
break
} else {
time.Sleep(sleepDuration)
}
}
}
// Tables still locked
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
Expand Down Expand Up @@ -1405,3 +1441,27 @@ func (this *Applier) Teardown() {
this.singletonDB.Close()
atomic.StoreInt64(&this.finishedMigrating, 1)
}

func (this *Applier) ExpectMetadataLock(sessionId int64) error {
found := false
query := `
select /* gh-ost */ m.owner_thread_id
from performance_schema.metadata_locks m join performance_schema.threads t
on m.owner_thread_id=t.thread_id
where m.object_type = 'TABLE' and m.object_schema = ? and m.object_name = ?
and m.lock_type = 'EXCLUSIVE' and m.lock_status = 'PENDING'
and t.processlist_id = ?
`
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
found = true
return nil
}, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, sessionId)
if err != nil {
return err
}
if !found {
err = fmt.Errorf("cannot find PENDING metadata lock on original table: `%s`.`%s`", this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
return this.migrationContext.Log.Errore(err)
}
return nil
}
8 changes: 7 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,9 @@ func (this *Migrator) atomicCutOver() (err error) {
lockOriginalSessionIdChan := make(chan int64, 2)
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
var renameLockSessionId int64
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &renameLockSessionId); err != nil {
this.migrationContext.Log.Errore(err)
}
}()
Expand Down Expand Up @@ -735,6 +736,7 @@ func (this *Migrator) atomicCutOver() (err error) {
// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to release the lock

renameLockSessionId = renameSessionId
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
Expand Down Expand Up @@ -1203,6 +1205,10 @@ func (this *Migrator) initiateApplier() error {
}
}
this.applier.WriteChangelogState(string(GhostTableMigrated))
if err := this.applier.StateMetadataLockInstrument(); err != nil {
this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
return err
}
go this.applier.InitiateHeartbeat()
return nil
}
Expand Down
84 changes: 83 additions & 1 deletion go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
gosql "database/sql"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
Expand All @@ -22,7 +23,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go/modules/mysql"

"fmt"
"runtime"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
Expand Down Expand Up @@ -335,6 +336,7 @@ func (suite *MigratorTestSuite) TestMigrateEmpty() {
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.InitiallyDropOldTable = true

migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255), ENGINE=InnoDB"

Expand Down Expand Up @@ -573,6 +575,86 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) {
assert.Equal(t, tries, 100)
}

func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(64))", getTestTableName()))
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("insert into %s values(1,'a')", getTestTableName()))
suite.Require().NoError(err)

done := make(chan error, 1)
go func() {
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
if err != nil {
done <- err
return
}
migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.AllowSetupMetadataLockInstruments = true
migrationContext.AlterStatementOptions = "ADD COLUMN foobar varchar(255)"
migrationContext.HeartbeatIntervalMilliseconds = 100
migrationContext.CutOverLockTimeoutSeconds = 4

_, filename, _, _ := runtime.Caller(0)
migrationContext.PostponeCutOverFlagFile = filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag")

migrator := NewMigrator(migrationContext, "0.0.0")

//nolint:contextcheck
done <- migrator.Migrate()
}()

time.Sleep(2 * time.Second)
//nolint:dogsled
_, filename, _, _ := runtime.Caller(0)
err = os.Remove(filepath.Join(filepath.Dir(filename), "../../tmp/ghost.postpone.flag"))
if err != nil {
suite.Require().NoError(err)
}
time.Sleep(1 * time.Second)
go func() {
holdConn, err := suite.db.Conn(ctx)
suite.Require().NoError(err)
_, err = holdConn.ExecContext(ctx, "SELECT *, sleep(2) FROM test._testing_gho WHERE id = 1")
suite.Require().NoError(err)
}()

dmlConn, err := suite.db.Conn(ctx)
suite.Require().NoError(err)

_, err = dmlConn.ExecContext(ctx, fmt.Sprintf("insert into %s (id, name) values(2,'b')", getTestTableName()))
fmt.Println("insert into table original table")
suite.Require().NoError(err)

migrateErr := <-done
suite.Require().NoError(migrateErr)

// Verify the new column was added
var delValue, OriginalValue int64
err = suite.db.QueryRow(
fmt.Sprintf("select count(*) from %s._%s_del", testMysqlDatabase, testMysqlTableName),
).Scan(&delValue)
suite.Require().NoError(err)

err = suite.db.QueryRow("select count(*) from " + getTestTableName()).Scan(&OriginalValue)
suite.Require().NoError(err)

suite.Require().LessOrEqual(delValue, OriginalValue)

var tableName, createTableSQL string
//nolint:execinquery
err = suite.db.QueryRow("SHOW CREATE TABLE "+getTestTableName()).Scan(&tableName, &createTableSQL)
suite.Require().NoError(err)

suite.Require().Equal(testMysqlTableName, tableName)
suite.Require().Equal("CREATE TABLE `testing` (\n `id` int NOT NULL,\n `name` varchar(64) DEFAULT NULL,\n `foobar` varchar(255) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createTableSQL)
}

func TestMigrator(t *testing.T) {
suite.Run(t, new(MigratorTestSuite))
}
Loading