Skip to content

Commit 756762c

Browse files
committed
refactor cutover for MySQL 8.x rename feature && support OceanBase
1 parent 2ea0e60 commit 756762c

File tree

6 files changed

+173
-30
lines changed

6 files changed

+173
-30
lines changed

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type MigrationContext struct {
102102
GoogleCloudPlatform bool
103103
AzureMySQL bool
104104
AttemptInstantDDL bool
105+
OceanBase bool
105106

106107
// SkipPortValidation allows skipping the port validation in `ValidateConnection`
107108
// This is useful when connecting to a MySQL instance where the external port

go/base/utils.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package base
77

88
import (
9+
"errors"
910
"fmt"
1011
"os"
1112
"regexp"
@@ -62,6 +63,10 @@ func StringContainsAll(s string, substrings ...string) bool {
6263
}
6364

6465
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
66+
if err := validateOceanBaseConnection(db, migrationContext); err != nil {
67+
return "", err
68+
}
69+
6570
versionQuery := `select @@global.version`
6671

6772
var version string
@@ -84,7 +89,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
8489
// GCP set users port to "NULL", replace it by gh-ost param
8590
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
8691
var port int
87-
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
92+
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBase {
8893
port = connectionConfig.Key.Port
8994
} else {
9095
portQuery := `select @@global.port`
@@ -102,3 +107,27 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
102107
return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort)
103108
}
104109
}
110+
111+
func validateOceanBaseConnection(db *gosql.DB, migrationContext *MigrationContext) error {
112+
versionCommentQuery := `select @@global.version_comment`
113+
var versionComment string
114+
if err := db.QueryRow(versionCommentQuery).Scan(&versionComment); err != nil {
115+
return nil
116+
}
117+
if !strings.Contains(versionComment, "OceanBase") {
118+
return nil
119+
}
120+
121+
migrationContext.Log.Infof("OceanBase connection identified, version_comment: %v", versionComment)
122+
migrationContext.OceanBase = true
123+
124+
enableLockPriorityQuery := `select value from oceanbase.GV$OB_PARAMETERS where name='enable_lock_priority'`
125+
var enableLockPriority bool
126+
if err := db.QueryRow(enableLockPriorityQuery).Scan(&enableLockPriority); err != nil {
127+
return err
128+
}
129+
if !enableLockPriority {
130+
return errors.New("system parameter 'enable_lock_priority' should be true to support cut-over")
131+
}
132+
return nil
133+
}

go/logic/applier.go

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (this *Applier) InitDBConnections() (err error) {
100100
if err := this.validateAndReadGlobalVariables(); err != nil {
101101
return err
102102
}
103-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
103+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
104104
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
105105
return err
106106
} else {
@@ -723,24 +723,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
723723
return chunkSize, rowsAffected, duration, nil
724724
}
725725

726-
// LockOriginalTable places a write lock on the original table
727-
func (this *Applier) LockOriginalTable() error {
728-
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
729-
sql.EscapeName(this.migrationContext.DatabaseName),
730-
sql.EscapeName(this.migrationContext.OriginalTableName),
731-
)
732-
this.migrationContext.Log.Infof("Locking %s.%s",
733-
sql.EscapeName(this.migrationContext.DatabaseName),
734-
sql.EscapeName(this.migrationContext.OriginalTableName),
735-
)
726+
// lockTable places a write lock on the specific table
727+
func (this *Applier) lockTable(databaseName, tableName string) error {
728+
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName)
729+
this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName)
736730
this.migrationContext.LockTablesStartTime = time.Now()
737731
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
738732
return err
739733
}
740-
this.migrationContext.Log.Infof("Table locked")
734+
this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName)
741735
return nil
742736
}
743737

738+
// LockOriginalTable places a write lock on the original table
739+
func (this *Applier) LockOriginalTable() error {
740+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
741+
}
742+
743+
// LockGhostTable places a write lock on the ghost table
744+
func (this *Applier) LockGhostTable() error {
745+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName())
746+
}
747+
744748
// UnlockTables makes tea. No wait, it unlocks tables.
745749
func (this *Applier) UnlockTables() error {
746750
query := `unlock /* gh-ost */ tables`
@@ -1046,7 +1050,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
10461050

10471051
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
10481052
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
1049-
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
1053+
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
10501054
if _, err := tx.Exec(query); err != nil {
10511055
tableLocked <- err
10521056
return err
@@ -1121,25 +1125,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
11211125
return nil
11221126
}
11231127

1124-
// AtomicCutoverRename
1125-
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1126-
tx, err := this.db.Begin()
1128+
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
1129+
tx, err := db.Begin()
11271130
if err != nil {
11281131
return err
11291132
}
11301133
defer func() {
11311134
tx.Rollback()
1132-
sessionIdChan <- -1
1133-
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1135+
if sessionIdChan != nil {
1136+
sessionIdChan <- -1
1137+
}
1138+
if tablesRenamed != nil {
1139+
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1140+
}
11341141
}()
1135-
var sessionId int64
1136-
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1137-
return err
1142+
1143+
if sessionIdChan != nil {
1144+
var sessionId int64
1145+
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1146+
return err
1147+
}
1148+
sessionIdChan <- sessionId
11381149
}
1139-
sessionIdChan <- sessionId
11401150

11411151
this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
1142-
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
1152+
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
11431153
if _, err := tx.Exec(query); err != nil {
11441154
return err
11451155
}
@@ -1156,14 +1166,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
11561166
)
11571167
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
11581168
if _, err := tx.Exec(query); err != nil {
1159-
tablesRenamed <- err
1169+
if tablesRenamed != nil {
1170+
tablesRenamed <- err
1171+
}
11601172
return this.migrationContext.Log.Errore(err)
11611173
}
1162-
tablesRenamed <- nil
1174+
if tablesRenamed != nil {
1175+
tablesRenamed <- nil
1176+
}
11631177
this.migrationContext.Log.Infof("Tables renamed")
11641178
return nil
11651179
}
11661180

1181+
// AtomicCutoverRename renames tables for atomic cut over in non lock session
1182+
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1183+
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
1184+
}
1185+
1186+
// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
1187+
func (this *Applier) AtomicCutoverRenameWithLock() error {
1188+
return this.atomicCutoverRename(this.singletonDB, nil, nil)
1189+
}
1190+
11671191
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
11681192
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
11691193
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

go/logic/inspect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (this *Inspector) InitDBConnections() (err error) {
6060
}
6161
this.dbVersion = this.migrationContext.InspectorMySQLVersion
6262

63-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
63+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBase {
6464
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
6565
return err
6666
} else {

go/logic/migrator.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ func (this *Migrator) canStopStreaming() bool {
201201

202202
// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
203203
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
204+
if dmlEvent.NewColumnValues == nil {
205+
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
206+
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
207+
return nil
208+
}
204209
// Hey, I created the changelog table, I know the type of columns it has!
205210
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
206211
case "state":
@@ -563,9 +568,15 @@ func (this *Migrator) cutOver() (err error) {
563568

564569
switch this.migrationContext.CutOverType {
565570
case base.CutOverAtomic:
566-
// Atomic solution: we use low timeout and multiple attempts. But for
567-
// each failed attempt, we throttle until replication lag is back to normal
568-
err = this.atomicCutOver()
571+
if this.migrationContext.OceanBase || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
572+
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
573+
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
574+
err = this.atomicCutOverMySQL8()
575+
} else {
576+
// Atomic solution: we use low timeout and multiple attempts. But for
577+
// each failed attempt, we throttle until replication lag is back to normal
578+
err = this.atomicCutOver()
579+
}
569580
case base.CutOverTwoStep:
570581
err = this.cutOverTwoStep()
571582
default:
@@ -644,6 +655,39 @@ func (this *Migrator) cutOverTwoStep() (err error) {
644655
return nil
645656
}
646657

658+
// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
659+
// what's left of last DML entries, and atomically swap original->old, then new->original.
660+
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
661+
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
662+
func (this *Migrator) atomicCutOverMySQL8() (err error) {
663+
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
664+
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
665+
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
666+
667+
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
668+
return err
669+
}
670+
671+
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
672+
return err
673+
}
674+
if err := this.retryOperation(this.applier.LockGhostTable); err != nil {
675+
return err
676+
}
677+
678+
if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
679+
return err
680+
}
681+
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
682+
return err
683+
}
684+
685+
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
686+
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
687+
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
688+
return nil
689+
}
690+
647691
// atomicCutOver
648692
func (this *Migrator) atomicCutOver() (err error) {
649693
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)

go/mysql/utils.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mysql
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"time"
@@ -221,3 +222,47 @@ func Kill(db *gosql.DB, connectionID string) error {
221222
_, err := db.Exec(`KILL QUERY %s`, connectionID)
222223
return err
223224
}
225+
226+
func versionTokens(version string, digits int) []int {
227+
v := strings.Split(version, "-")[0]
228+
tokens := strings.Split(v, ".")
229+
intTokens := make([]int, digits)
230+
for i := range tokens {
231+
if i >= digits {
232+
break
233+
}
234+
intTokens[i], _ = strconv.Atoi(tokens[i])
235+
}
236+
return intTokens
237+
}
238+
239+
func isSmallerVersion(version string, otherVersion string, digits int) bool {
240+
v := versionTokens(version, digits)
241+
o := versionTokens(otherVersion, digits)
242+
for i := 0; i < len(v); i++ {
243+
if v[i] < o[i] {
244+
return true
245+
}
246+
if v[i] > o[i] {
247+
return false
248+
}
249+
if i == digits {
250+
break
251+
}
252+
}
253+
return false
254+
}
255+
256+
// IsSmallerMajorVersion tests two versions against another and returns true if
257+
// the former is a smaller "major" version than the latter.
258+
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
259+
func IsSmallerMajorVersion(version string, otherVersion string) bool {
260+
return isSmallerVersion(version, otherVersion, 2)
261+
}
262+
263+
// IsSmallerMinorVersion tests two versions against another and returns true if
264+
// the former is a smaller "minor" version than the latter.
265+
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
266+
func IsSmallerMinorVersion(version string, otherVersion string) bool {
267+
return isSmallerVersion(version, otherVersion, 3)
268+
}

0 commit comments

Comments
 (0)