@@ -937,6 +937,42 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
937
937
return nil
938
938
}
939
939
940
+ // WaitForAtomicCutOverRename waits for the cut-over RENAME operation while periodically
941
+ // pinging the applier connection to avoid the connection becoming idle.
942
+ func (this * Applier ) WaitForAtomicCutOverRename (tx * gosql.Tx , okToUnlockTable <- chan bool ) {
943
+ ticker := time .NewTicker (time .Millisecond * 500 )
944
+ defer ticker .Stop ()
945
+ for {
946
+ select {
947
+ case <- okToUnlockTable :
948
+ this .migrationContext .Log .Infof ("Will now proceed to drop magic table and unlock tables" )
949
+ return
950
+ case <- ticker .C :
951
+ // keep connection alive as we wait for the RENAME.
952
+ if _ , err := tx .Query (`select /* gh-ost */ 1` ); err != nil {
953
+ this .migrationContext .Log .Errorf ("Failed to ping applier connection: %v" , err )
954
+ }
955
+ }
956
+ }
957
+ }
958
+
959
+ // InitAtomicCutOverIdleTimeout sets the cut-over session wait_timeout, returning a func() to rollback
960
+ // to the original wait_timeout and an error, if any.
961
+ func (this * Applier ) InitAtomicCutOverIdleTimeout (tx * gosql.Tx ) (restoreTimeoutFunc func (), err error ) {
962
+ this .migrationContext .Log .Infof ("Setting cut-over idle timeout as %d seconds" , this .migrationContext .CutOverIdleTimeoutSeconds )
963
+ query := fmt .Sprintf (`set /* gh-ost */ session wait_timeout:=%d` , this .migrationContext .CutOverIdleTimeoutSeconds )
964
+ _ , err = tx .Exec (query )
965
+ return func () {
966
+ this .migrationContext .Log .Infof ("Restoring applier idle timeout as %d seconds" , this .migrationContext .ApplierWaitTimeout )
967
+ query = fmt .Sprintf (`set /* gh-ost */ session wait_timeout:=%d` , this .migrationContext .ApplierWaitTimeout )
968
+ if _ , err := sqlutils .ExecNoPrepare (this .db , query ); err != nil {
969
+ this .migrationContext .Log .Errorf ("Failed to restore applier wait_timeout to %d seconds: %v" ,
970
+ this .migrationContext .ApplierWaitTimeout , err ,
971
+ )
972
+ }
973
+ }, err
974
+ }
975
+
940
976
// AtomicCutOverMagicLock
941
977
func (this * Applier ) AtomicCutOverMagicLock (sessionIdChan chan int64 , tableLocked chan <- error , okToUnlockTable <- chan bool , tableUnlocked chan <- error ) error {
942
978
tx , err := this .db .Begin ()
@@ -977,22 +1013,13 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
977
1013
return err
978
1014
}
979
1015
980
- if this .migrationContext .CutOverIdleTimeoutSeconds > 0 {
981
- this .migrationContext .Log .Infof ("Setting cut-over idle timeout as %d seconds" , this .migrationContext .CutOverIdleTimeoutSeconds )
982
- query = fmt .Sprintf (`set /* gh-ost */ session wait_timeout:=%d` , this .migrationContext .CutOverIdleTimeoutSeconds )
983
- if _ , err := tx .Exec (query ); err != nil {
1016
+ if this .migrationContext .CutOverIdleTimeoutSeconds >= 1 {
1017
+ restoreIdleTimeoutFunc , err := this .InitAtomicCutOverIdleTimeout (tx )
1018
+ if err != nil {
984
1019
tableLocked <- err
985
1020
return err
986
1021
}
987
- defer func () {
988
- this .migrationContext .Log .Infof ("Restoring applier idle timeout as %d seconds" , this .migrationContext .ApplierWaitTimeout )
989
- query = fmt .Sprintf (`set /* gh-ost */ session wait_timeout:=%d` , this .migrationContext .ApplierWaitTimeout )
990
- if _ , err := sqlutils .ExecNoPrepare (this .db , query ); err != nil {
991
- this .migrationContext .Log .Errorf ("Failed to restore applier wait_timeout to %d seconds: %v" ,
992
- this .migrationContext .ApplierWaitTimeout , err ,
993
- )
994
- }
995
- }()
1022
+ defer restoreIdleTimeoutFunc ()
996
1023
}
997
1024
998
1025
if err := this .CreateAtomicCutOverSentryTable (); err != nil {
@@ -1025,8 +1052,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
1025
1052
1026
1053
// The cut-over phase will proceed to apply remaining backlog onto ghost table,
1027
1054
// and issue RENAME. We wait here until told to proceed.
1028
- <- okToUnlockTable
1029
- this .migrationContext .Log .Infof ("Will now proceed to drop magic table and unlock tables" )
1055
+ this .WaitForAtomicCutOverRename (tx , okToUnlockTable )
1030
1056
1031
1057
// The magic table is here because we locked it. And we are the only ones allowed to drop it.
1032
1058
// And in fact, we will:
0 commit comments