Skip to content

Commit 7edafbe

Browse files
committed
refactor cutover for MySQL 8.x rename feature && support OceanBase
1 parent 42c0f08 commit 7edafbe

File tree

7 files changed

+145
-30
lines changed

7 files changed

+145
-30
lines changed

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type MigrationContext struct {
102102
GoogleCloudPlatform bool
103103
AzureMySQL bool
104104
AttemptInstantDDL bool
105+
OceanBase bool
105106

106107
// SkipPortValidation allows skipping the port validation in `ValidateConnection`
107108
// This is useful when connecting to a MySQL instance where the external port

go/base/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
8484
// GCP set users port to "NULL", replace it by gh-ost param
8585
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
8686
var port int
87-
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
87+
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBase {
8888
port = connectionConfig.Key.Port
8989
} else {
9090
portQuery := `select @@global.port`

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func main() {
8686
flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.")
8787
flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).")
8888
flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.")
89+
flag.BoolVar(&migrationContext.OceanBase, "oceanbase", false, "set to 'true' when you execute on OceanBase")
8990

9091
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
9192
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")

go/logic/applier.go

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (this *Applier) InitDBConnections() (err error) {
100100
if err := this.validateAndReadGlobalVariables(); err != nil {
101101
return err
102102
}
103-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
103+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
104104
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
105105
return err
106106
} else {
@@ -714,24 +714,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
714714
return chunkSize, rowsAffected, duration, nil
715715
}
716716

717-
// LockOriginalTable places a write lock on the original table
718-
func (this *Applier) LockOriginalTable() error {
719-
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
720-
sql.EscapeName(this.migrationContext.DatabaseName),
721-
sql.EscapeName(this.migrationContext.OriginalTableName),
722-
)
723-
this.migrationContext.Log.Infof("Locking %s.%s",
724-
sql.EscapeName(this.migrationContext.DatabaseName),
725-
sql.EscapeName(this.migrationContext.OriginalTableName),
726-
)
717+
// lockTable places a write lock on the specific table
718+
func (this *Applier) lockTable(databaseName, tableName string) error {
719+
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName)
720+
this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName)
727721
this.migrationContext.LockTablesStartTime = time.Now()
728722
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
729723
return err
730724
}
731-
this.migrationContext.Log.Infof("Table locked")
725+
this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName)
732726
return nil
733727
}
734728

729+
// LockOriginalTable places a write lock on the original table
730+
func (this *Applier) LockOriginalTable() error {
731+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
732+
}
733+
734+
// LockGhostTable places a write lock on the ghost table
735+
func (this *Applier) LockGhostTable() error {
736+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName())
737+
}
738+
735739
// UnlockTables makes tea. No wait, it unlocks tables.
736740
func (this *Applier) UnlockTables() error {
737741
query := `unlock /* gh-ost */ tables`
@@ -1033,7 +1037,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
10331037

10341038
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
10351039
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
1036-
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
1040+
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
10371041
if _, err := tx.Exec(query); err != nil {
10381042
tableLocked <- err
10391043
return err
@@ -1108,25 +1112,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
11081112
return nil
11091113
}
11101114

1111-
// AtomicCutoverRename
1112-
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1113-
tx, err := this.db.Begin()
1115+
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
1116+
tx, err := db.Begin()
11141117
if err != nil {
11151118
return err
11161119
}
11171120
defer func() {
11181121
tx.Rollback()
1119-
sessionIdChan <- -1
1120-
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1122+
if sessionIdChan != nil {
1123+
sessionIdChan <- -1
1124+
}
1125+
if tablesRenamed != nil {
1126+
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1127+
}
11211128
}()
1122-
var sessionId int64
1123-
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1124-
return err
1129+
1130+
if sessionIdChan != nil {
1131+
var sessionId int64
1132+
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1133+
return err
1134+
}
1135+
sessionIdChan <- sessionId
11251136
}
1126-
sessionIdChan <- sessionId
11271137

11281138
this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
1129-
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
1139+
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
11301140
if _, err := tx.Exec(query); err != nil {
11311141
return err
11321142
}
@@ -1143,14 +1153,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
11431153
)
11441154
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
11451155
if _, err := tx.Exec(query); err != nil {
1146-
tablesRenamed <- err
1156+
if tablesRenamed != nil {
1157+
tablesRenamed <- err
1158+
}
11471159
return this.migrationContext.Log.Errore(err)
11481160
}
1149-
tablesRenamed <- nil
1161+
if tablesRenamed != nil {
1162+
tablesRenamed <- nil
1163+
}
11501164
this.migrationContext.Log.Infof("Tables renamed")
11511165
return nil
11521166
}
11531167

1168+
// AtomicCutoverRename renames tables for atomic cut over in non lock session
1169+
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1170+
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
1171+
}
1172+
1173+
// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
1174+
func (this *Applier) AtomicCutoverRenameWithLock() error {
1175+
return this.atomicCutoverRename(this.singletonDB, nil, nil)
1176+
}
1177+
11541178
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
11551179
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
11561180
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

go/logic/inspect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) {
5656
if err := this.validateConnection(); err != nil {
5757
return err
5858
}
59-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
59+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
6060
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
6161
return err
6262
} else {

go/logic/migrator.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool {
200200

201201
// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
202202
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
203+
if dmlEvent.NewColumnValues == nil {
204+
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
205+
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
206+
return nil
207+
}
203208
// Hey, I created the changelog table, I know the type of columns it has!
204209
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
205210
case "state":
@@ -562,9 +567,15 @@ func (this *Migrator) cutOver() (err error) {
562567

563568
switch this.migrationContext.CutOverType {
564569
case base.CutOverAtomic:
565-
// Atomic solution: we use low timeout and multiple attempts. But for
566-
// each failed attempt, we throttle until replication lag is back to normal
567-
err = this.atomicCutOver()
570+
if this.migrationContext.OceanBase || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
571+
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
572+
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
573+
err = this.atomicCutOverMySQL8()
574+
} else {
575+
// Atomic solution: we use low timeout and multiple attempts. But for
576+
// each failed attempt, we throttle until replication lag is back to normal
577+
err = this.atomicCutOver()
578+
}
568579
case base.CutOverTwoStep:
569580
err = this.cutOverTwoStep()
570581
default:
@@ -643,6 +654,39 @@ func (this *Migrator) cutOverTwoStep() (err error) {
643654
return nil
644655
}
645656

657+
// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
658+
// what's left of last DML entries, and atomically swap original->old, then new->original.
659+
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
660+
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
661+
func (this *Migrator) atomicCutOverMySQL8() (err error) {
662+
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
663+
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
664+
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
665+
666+
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
667+
return err
668+
}
669+
670+
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
671+
return err
672+
}
673+
if err := this.retryOperation(this.applier.LockGhostTable); err != nil {
674+
return err
675+
}
676+
677+
if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
678+
return err
679+
}
680+
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
681+
return err
682+
}
683+
684+
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
685+
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
686+
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
687+
return nil
688+
}
689+
646690
// atomicCutOver
647691
func (this *Migrator) atomicCutOver() (err error) {
648692
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)

go/mysql/utils.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mysql
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"time"
@@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error {
211212
_, err := db.Exec(`KILL QUERY %s`, connectionID)
212213
return err
213214
}
215+
216+
func versionTokens(version string, digits int) []int {
217+
v := strings.Split(version, "-")[0]
218+
tokens := strings.Split(v, ".")
219+
intTokens := make([]int, digits)
220+
for i := range tokens {
221+
if i >= digits {
222+
break
223+
}
224+
intTokens[i], _ = strconv.Atoi(tokens[i])
225+
}
226+
return intTokens
227+
}
228+
229+
func isSmallerVersion(version string, otherVersion string, digits int) bool {
230+
v := versionTokens(version, digits)
231+
o := versionTokens(otherVersion, digits)
232+
for i := 0; i < len(v); i++ {
233+
if v[i] < o[i] {
234+
return true
235+
}
236+
if v[i] > o[i] {
237+
return false
238+
}
239+
if i == digits {
240+
break
241+
}
242+
}
243+
return false
244+
}
245+
246+
// IsSmallerMajorVersion tests two versions against another and returns true if
247+
// the former is a smaller "major" version than the latter.
248+
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
249+
func IsSmallerMajorVersion(version string, otherVersion string) bool {
250+
return isSmallerVersion(version, otherVersion, 2)
251+
}
252+
253+
// IsSmallerMinorVersion tests two versions against another and returns true if
254+
// the former is a smaller "minor" version than the latter.
255+
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
256+
func IsSmallerMinorVersion(version string, otherVersion string) bool {
257+
return isSmallerVersion(version, otherVersion, 3)
258+
}

0 commit comments

Comments
 (0)