Skip to content

Commit 13889d3

Browse files
committed
support OceanBase Binlog Service
1 parent 59db6fa commit 13889d3

File tree

7 files changed

+146
-32
lines changed

7 files changed

+146
-32
lines changed

go/base/context.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type MigrationContext struct {
102102
GoogleCloudPlatform bool
103103
AzureMySQL bool
104104
AttemptInstantDDL bool
105+
OceanBaseBinlogService bool
105106

106107
config ContextConfig
107108
configMutex *sync.Mutex

go/base/utils.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@
66
package base
77

88
import (
9+
gosql "database/sql"
910
"fmt"
1011
"os"
1112
"regexp"
1213
"strings"
1314
"time"
1415

15-
gosql "database/sql"
16-
1716
"github.com/github/gh-ost/go/mysql"
1817
)
1918

@@ -75,7 +74,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
7574
// AliyunRDS set users port to "NULL", replace it by gh-ost param
7675
// GCP set users port to "NULL", replace it by gh-ost param
7776
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
78-
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
77+
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL || migrationContext.OceanBaseBinlogService {
7978
port = connectionConfig.Key.Port
8079
} else {
8180
portQuery := `select @@global.port`

go/cmd/gh-ost/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func main() {
8787
flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.")
8888
flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).")
8989
flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.")
90+
flag.BoolVar(&migrationContext.OceanBaseBinlogService, "oceanbase", false, "set to 'true' when you execute on OceanBase Binlog Service")
9091

9192
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
9293
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")

go/logic/applier.go

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (this *Applier) InitDBConnections() (err error) {
9292
if err := this.validateAndReadTimeZone(); err != nil {
9393
return err
9494
}
95-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
95+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService {
9696
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
9797
return err
9898
} else {
@@ -670,24 +670,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
670670
return chunkSize, rowsAffected, duration, nil
671671
}
672672

673-
// LockOriginalTable places a write lock on the original table
674-
func (this *Applier) LockOriginalTable() error {
675-
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
676-
sql.EscapeName(this.migrationContext.DatabaseName),
677-
sql.EscapeName(this.migrationContext.OriginalTableName),
678-
)
679-
this.migrationContext.Log.Infof("Locking %s.%s",
680-
sql.EscapeName(this.migrationContext.DatabaseName),
681-
sql.EscapeName(this.migrationContext.OriginalTableName),
682-
)
673+
// lockTable places a write lock on the specific table
674+
func (this *Applier) lockTable(databaseName, tableName string) error {
675+
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, databaseName, tableName)
676+
this.migrationContext.Log.Infof("Locking %s.%s", databaseName, tableName)
683677
this.migrationContext.LockTablesStartTime = time.Now()
684678
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
685679
return err
686680
}
687-
this.migrationContext.Log.Infof("Table locked")
681+
this.migrationContext.Log.Infof("Table %s.%s locked", databaseName, tableName)
688682
return nil
689683
}
690684

685+
// LockOriginalTable places a write lock on the original table
686+
func (this *Applier) LockOriginalTable() error {
687+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
688+
}
689+
690+
// LockGhostTable places a write lock on the ghost table
691+
func (this *Applier) LockGhostTable() error {
692+
return this.lockTable(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName())
693+
}
694+
691695
// UnlockTables makes tea. No wait, it unlocks tables.
692696
func (this *Applier) UnlockTables() error {
693697
query := `unlock /* gh-ost */ tables`
@@ -968,7 +972,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
968972

969973
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
970974
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
971-
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
975+
query = fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, tableLockTimeoutSeconds)
972976
if _, err := tx.Exec(query); err != nil {
973977
tableLocked <- err
974978
return err
@@ -1037,25 +1041,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
10371041
return nil
10381042
}
10391043

1040-
// AtomicCutoverRename
1041-
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1042-
tx, err := this.db.Begin()
1044+
func (this *Applier) atomicCutoverRename(db *gosql.DB, sessionIdChan chan int64, tablesRenamed chan<- error) error {
1045+
tx, err := db.Begin()
10431046
if err != nil {
10441047
return err
10451048
}
10461049
defer func() {
10471050
tx.Rollback()
1048-
sessionIdChan <- -1
1049-
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1051+
if sessionIdChan != nil {
1052+
sessionIdChan <- -1
1053+
}
1054+
if tablesRenamed != nil {
1055+
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
1056+
}
10501057
}()
1051-
var sessionId int64
1052-
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1053-
return err
1058+
1059+
if sessionIdChan != nil {
1060+
var sessionId int64
1061+
if err := tx.QueryRow(`select /* gh-ost */ connection_id()`).Scan(&sessionId); err != nil {
1062+
return err
1063+
}
1064+
sessionIdChan <- sessionId
10541065
}
1055-
sessionIdChan <- sessionId
10561066

10571067
this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
1058-
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
1068+
query := fmt.Sprintf(`set /* gh-ost */ session lock_wait_timeout=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
10591069
if _, err := tx.Exec(query); err != nil {
10601070
return err
10611071
}
@@ -1072,14 +1082,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
10721082
)
10731083
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
10741084
if _, err := tx.Exec(query); err != nil {
1075-
tablesRenamed <- err
1085+
if tablesRenamed != nil {
1086+
tablesRenamed <- err
1087+
}
10761088
return this.migrationContext.Log.Errore(err)
10771089
}
1078-
tablesRenamed <- nil
1090+
if tablesRenamed != nil {
1091+
tablesRenamed <- nil
1092+
}
10791093
this.migrationContext.Log.Infof("Tables renamed")
10801094
return nil
10811095
}
10821096

1097+
// AtomicCutoverRename renames tables for atomic cut over in non lock session
1098+
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
1099+
return this.atomicCutoverRename(this.db, sessionIdChan, tablesRenamed)
1100+
}
1101+
1102+
// AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
1103+
func (this *Applier) AtomicCutoverRenameWithLock() error {
1104+
return this.atomicCutoverRename(this.singletonDB, nil, nil)
1105+
}
1106+
10831107
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
10841108
query := fmt.Sprintf(`show /* gh-ost */ global status like '%s'`, variableName)
10851109
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

go/logic/inspect.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (this *Inspector) InitDBConnections() (err error) {
5656
if err := this.validateConnection(); err != nil {
5757
return err
5858
}
59-
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
59+
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL && !this.migrationContext.OceanBaseBinlogService {
6060
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
6161
return err
6262
} else {

go/logic/migrator.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ func (this *Migrator) canStopStreaming() bool {
200200

201201
// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted.
202202
func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
203+
if dmlEvent.NewColumnValues == nil {
204+
// in some compatible systems, such as OceanBase Binlog Service, an UPSERT event is
205+
// converted to a DELETE event and an INSERT event, we need to skip the DELETE event.
206+
return nil
207+
}
203208
// Hey, I created the changelog table, I know the type of columns it has!
204209
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
205210
case "state":
@@ -551,9 +556,15 @@ func (this *Migrator) cutOver() (err error) {
551556

552557
switch this.migrationContext.CutOverType {
553558
case base.CutOverAtomic:
554-
// Atomic solution: we use low timeout and multiple attempts. But for
555-
// each failed attempt, we throttle until replication lag is back to normal
556-
err = this.atomicCutOver()
559+
if this.migrationContext.OceanBaseBinlogService || !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
560+
// Atomic solution for latest MySQL: cut over the tables in the same session where the origin
561+
// table and ghost table are both locked, it can only work on MySQL 8.0.13 or later versions
562+
err = this.atomicCutOverMySQL8()
563+
} else {
564+
// Atomic solution: we use low timeout and multiple attempts. But for
565+
// each failed attempt, we throttle until replication lag is back to normal
566+
err = this.atomicCutOver()
567+
}
557568
case base.CutOverTwoStep:
558569
err = this.cutOverTwoStep()
559570
default:
@@ -632,6 +643,39 @@ func (this *Migrator) cutOverTwoStep() (err error) {
632643
return nil
633644
}
634645

646+
// atomicCutOverMySQL8 will lock down the original table and the ghost table, execute
647+
// what's left of last DML entries, and atomically swap original->old, then new->original.
648+
// It requires to execute RENAME TABLE when the table is LOCKED under WRITE LOCK, which is
649+
// supported from MySQL 8.0.13, see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-13.html.
650+
func (this *Migrator) atomicCutOverMySQL8() (err error) {
651+
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
652+
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
653+
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
654+
655+
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
656+
return err
657+
}
658+
659+
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
660+
return err
661+
}
662+
if err := this.retryOperation(this.applier.LockGhostTable); err != nil {
663+
return err
664+
}
665+
666+
if err := this.applier.AtomicCutoverRenameWithLock(); err != nil {
667+
return err
668+
}
669+
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
670+
return err
671+
}
672+
673+
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
674+
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
675+
this.migrationContext.Log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
676+
return nil
677+
}
678+
635679
// atomicCutOver
636680
func (this *Migrator) atomicCutOver() (err error) {
637681
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)

go/mysql/utils.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package mysql
88
import (
99
gosql "database/sql"
1010
"fmt"
11+
"strconv"
1112
"strings"
1213
"sync"
1314
"time"
@@ -211,3 +212,47 @@ func Kill(db *gosql.DB, connectionID string) error {
211212
_, err := db.Exec(`KILL QUERY %s`, connectionID)
212213
return err
213214
}
215+
216+
func versionTokens(version string, digits int) []int {
217+
v := strings.Split(version, "-")[0]
218+
tokens := strings.Split(v, ".")
219+
intTokens := make([]int, digits)
220+
for i := range tokens {
221+
if i >= digits {
222+
break
223+
}
224+
intTokens[i], _ = strconv.Atoi(tokens[i])
225+
}
226+
return intTokens
227+
}
228+
229+
func isSmallerVersion(version string, otherVersion string, digits int) bool {
230+
v := versionTokens(version, digits)
231+
o := versionTokens(otherVersion, digits)
232+
for i := 0; i < len(v); i++ {
233+
if v[i] < o[i] {
234+
return true
235+
}
236+
if v[i] > o[i] {
237+
return false
238+
}
239+
if i == digits {
240+
break
241+
}
242+
}
243+
return false
244+
}
245+
246+
// IsSmallerMajorVersion tests two versions against another and returns true if
247+
// the former is a smaller "major" version than the latter.
248+
// e.g. 5.5.36 is NOT a smaller major version as compared to 5.5.40, but IS as compared to 5.6.9
249+
func IsSmallerMajorVersion(version string, otherVersion string) bool {
250+
return isSmallerVersion(version, otherVersion, 2)
251+
}
252+
253+
// IsSmallerMinorVersion tests two versions against another and returns true if
254+
// the former is a smaller "minor" version than the latter.
255+
// e.g. 5.5.36 is a smaller major version as compared to 5.5.40, as well as compared to 5.6.7
256+
func IsSmallerMinorVersion(version string, otherVersion string) bool {
257+
return isSmallerVersion(version, otherVersion, 3)
258+
}

0 commit comments

Comments
 (0)