@@ -100,7 +100,7 @@ func (this *Applier) InitDBConnections() (err error) {
100
100
if err := this .validateAndReadGlobalVariables (); err != nil {
101
101
return err
102
102
}
103
- if ! this .migrationContext .AliyunRDS && ! this .migrationContext .GoogleCloudPlatform && ! this .migrationContext .AzureMySQL {
103
+ if ! this .migrationContext .AliyunRDS && ! this .migrationContext .GoogleCloudPlatform && ! this .migrationContext .AzureMySQL && ! this . migrationContext . OceanBase {
104
104
if impliedKey , err := mysql .GetInstanceKey (this .db ); err != nil {
105
105
return err
106
106
} else {
@@ -714,24 +714,28 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
714
714
return chunkSize , rowsAffected , duration , nil
715
715
}
716
716
717
- // LockOriginalTable places a write lock on the original table
718
- func (this * Applier ) LockOriginalTable () error {
719
- query := fmt .Sprintf (`lock /* gh-ost */ tables %s.%s write` ,
720
- sql .EscapeName (this .migrationContext .DatabaseName ),
721
- sql .EscapeName (this .migrationContext .OriginalTableName ),
722
- )
723
- this .migrationContext .Log .Infof ("Locking %s.%s" ,
724
- sql .EscapeName (this .migrationContext .DatabaseName ),
725
- sql .EscapeName (this .migrationContext .OriginalTableName ),
726
- )
717
+ // lockTable places a write lock on the specific table
718
+ func (this * Applier ) lockTable (databaseName , tableName string ) error {
719
+ query := fmt .Sprintf (`lock /* gh-ost */ tables %s.%s write` , databaseName , tableName )
720
+ this .migrationContext .Log .Infof ("Locking %s.%s" , databaseName , tableName )
727
721
this .migrationContext .LockTablesStartTime = time .Now ()
728
722
if _ , err := sqlutils .ExecNoPrepare (this .singletonDB , query ); err != nil {
729
723
return err
730
724
}
731
- this .migrationContext .Log .Infof ("Table locked" )
725
+ this .migrationContext .Log .Infof ("Table %s.%s locked" , databaseName , tableName )
732
726
return nil
733
727
}
734
728
729
+ // LockOriginalTable places a write lock on the original table
730
+ func (this * Applier ) LockOriginalTable () error {
731
+ return this .lockTable (this .migrationContext .DatabaseName , this .migrationContext .OriginalTableName )
732
+ }
733
+
734
+ // LockGhostTable places a write lock on the ghost table
735
+ func (this * Applier ) LockGhostTable () error {
736
+ return this .lockTable (this .migrationContext .DatabaseName , this .migrationContext .GetGhostTableName ())
737
+ }
738
+
735
739
// UnlockTables makes tea. No wait, it unlocks tables.
736
740
func (this * Applier ) UnlockTables () error {
737
741
query := `unlock /* gh-ost */ tables`
@@ -1033,7 +1037,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
1033
1037
1034
1038
tableLockTimeoutSeconds := this .migrationContext .CutOverLockTimeoutSeconds * 2
1035
1039
this .migrationContext .Log .Infof ("Setting LOCK timeout as %d seconds" , tableLockTimeoutSeconds )
1036
- query = fmt .Sprintf (`set /* gh-ost */ session lock_wait_timeout: =%d` , tableLockTimeoutSeconds )
1040
+ query = fmt .Sprintf (`set /* gh-ost */ session lock_wait_timeout=%d` , tableLockTimeoutSeconds )
1037
1041
if _ , err := tx .Exec (query ); err != nil {
1038
1042
tableLocked <- err
1039
1043
return err
@@ -1108,25 +1112,31 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
1108
1112
return nil
1109
1113
}
1110
1114
1111
- // AtomicCutoverRename
1112
- func (this * Applier ) AtomicCutoverRename (sessionIdChan chan int64 , tablesRenamed chan <- error ) error {
1113
- tx , err := this .db .Begin ()
1115
+ func (this * Applier ) atomicCutoverRename (db * gosql.DB , sessionIdChan chan int64 , tablesRenamed chan <- error ) error {
1116
+ tx , err := db .Begin ()
1114
1117
if err != nil {
1115
1118
return err
1116
1119
}
1117
1120
defer func () {
1118
1121
tx .Rollback ()
1119
- sessionIdChan <- - 1
1120
- tablesRenamed <- fmt .Errorf ("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads" )
1122
+ if sessionIdChan != nil {
1123
+ sessionIdChan <- - 1
1124
+ }
1125
+ if tablesRenamed != nil {
1126
+ tablesRenamed <- fmt .Errorf ("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads" )
1127
+ }
1121
1128
}()
1122
- var sessionId int64
1123
- if err := tx .QueryRow (`select /* gh-ost */ connection_id()` ).Scan (& sessionId ); err != nil {
1124
- return err
1129
+
1130
+ if sessionIdChan != nil {
1131
+ var sessionId int64
1132
+ if err := tx .QueryRow (`select /* gh-ost */ connection_id()` ).Scan (& sessionId ); err != nil {
1133
+ return err
1134
+ }
1135
+ sessionIdChan <- sessionId
1125
1136
}
1126
- sessionIdChan <- sessionId
1127
1137
1128
1138
this .migrationContext .Log .Infof ("Setting RENAME timeout as %d seconds" , this .migrationContext .CutOverLockTimeoutSeconds )
1129
- query := fmt .Sprintf (`set /* gh-ost */ session lock_wait_timeout: =%d` , this .migrationContext .CutOverLockTimeoutSeconds )
1139
+ query := fmt .Sprintf (`set /* gh-ost */ session lock_wait_timeout=%d` , this .migrationContext .CutOverLockTimeoutSeconds )
1130
1140
if _ , err := tx .Exec (query ); err != nil {
1131
1141
return err
1132
1142
}
@@ -1143,14 +1153,28 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
1143
1153
)
1144
1154
this .migrationContext .Log .Infof ("Issuing and expecting this to block: %s" , query )
1145
1155
if _ , err := tx .Exec (query ); err != nil {
1146
- tablesRenamed <- err
1156
+ if tablesRenamed != nil {
1157
+ tablesRenamed <- err
1158
+ }
1147
1159
return this .migrationContext .Log .Errore (err )
1148
1160
}
1149
- tablesRenamed <- nil
1161
+ if tablesRenamed != nil {
1162
+ tablesRenamed <- nil
1163
+ }
1150
1164
this .migrationContext .Log .Infof ("Tables renamed" )
1151
1165
return nil
1152
1166
}
1153
1167
1168
+ // AtomicCutoverRename renames tables for atomic cut over in non lock session
1169
+ func (this * Applier ) AtomicCutoverRename (sessionIdChan chan int64 , tablesRenamed chan <- error ) error {
1170
+ return this .atomicCutoverRename (this .db , sessionIdChan , tablesRenamed )
1171
+ }
1172
+
1173
+ // AtomicCutoverRenameWithLock renames tables for atomic cut over in the lock session
1174
+ func (this * Applier ) AtomicCutoverRenameWithLock () error {
1175
+ return this .atomicCutoverRename (this .singletonDB , nil , nil )
1176
+ }
1177
+
1154
1178
func (this * Applier ) ShowStatusVariable (variableName string ) (result int64 , err error ) {
1155
1179
query := fmt .Sprintf (`show /* gh-ost */ global status like '%s'` , variableName )
1156
1180
if err := this .db .QueryRow (query ).Scan (& variableName , & result ); err != nil {
0 commit comments