@@ -504,51 +504,25 @@ func (this *Applier) RenameTable(fromName, toName string) (err error) {
504
504
return nil
505
505
}
506
506
507
- // SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table
508
- // into original's place
509
- func (this * Applier ) SwapTablesAtomic (sessionIdChan chan int64 ) error {
510
-
511
- tx , err := this .db .Begin ()
512
- if err != nil {
513
- return err
514
- }
515
- log .Infof ("Setting timeout for RENAME for %d seconds" , this .migrationContext .SwapTablesTimeoutSeconds )
516
- query := fmt .Sprintf (`set session lock_wait_timeout:=%d` , this .migrationContext .SwapTablesTimeoutSeconds )
517
- if _ , err := tx .Exec (query ); err != nil {
518
- return err
519
- }
520
-
521
- var sessionId int64
522
- if err := tx .QueryRow (`select connection_id()` ).Scan (& sessionId ); err != nil {
523
- return err
524
- }
525
- sessionIdChan <- sessionId
526
-
527
- query = fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s` ,
507
+ func (this * Applier ) RenameTablesRollback () (renameError error ) {
508
+ // Restoring tables to original names.
509
+ // We prefer the single, atomic operation:
510
+ query := fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s` ,
528
511
sql .EscapeName (this .migrationContext .DatabaseName ),
529
512
sql .EscapeName (this .migrationContext .OriginalTableName ),
530
513
sql .EscapeName (this .migrationContext .DatabaseName ),
531
- sql .EscapeName (this .migrationContext .GetOldTableName ()),
532
- sql .EscapeName (this .migrationContext .DatabaseName ),
533
514
sql .EscapeName (this .migrationContext .GetGhostTableName ()),
534
515
sql .EscapeName (this .migrationContext .DatabaseName ),
516
+ sql .EscapeName (this .migrationContext .GetOldTableName ()),
517
+ sql .EscapeName (this .migrationContext .DatabaseName ),
535
518
sql .EscapeName (this .migrationContext .OriginalTableName ),
536
519
)
537
- log .Infof ("Renaming tables" )
538
-
539
- this .migrationContext .RenameTablesStartTime = time .Now ()
540
- if _ , err := tx .Exec (query ); err != nil {
541
- return err
520
+ log .Infof ("Renaming back both tables" )
521
+ if _ , err := sqlutils .ExecNoPrepare (this .db , query ); err == nil {
522
+ return nil
542
523
}
543
- this .migrationContext .RenameTablesEndTime = time .Now ()
544
- tx .Commit ()
545
- log .Infof ("Tables renamed" )
546
- return nil
547
- }
548
-
549
- func (this * Applier ) RenameTablesRollback () (renameError error ) {
550
-
551
- query := fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s` ,
524
+ // But, if for some reason the above was impossible to do, we rename one by one.
525
+ query = fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s` ,
552
526
sql .EscapeName (this .migrationContext .DatabaseName ),
553
527
sql .EscapeName (this .migrationContext .OriginalTableName ),
554
528
sql .EscapeName (this .migrationContext .DatabaseName ),
@@ -573,7 +547,7 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
573
547
574
548
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
575
549
// We need to keep the SQL thread active so as to complete processing received events,
576
- // and have them written to the binary log, so that we can then read them via streamer
550
+ // and have them written to the binary log, so that we can then read them via streamer.
577
551
func (this * Applier ) StopSlaveIOThread () error {
578
552
query := `stop /* gh-ost */ slave io_thread`
579
553
log .Infof ("Stopping replication" )
@@ -595,18 +569,6 @@ func (this *Applier) StartSlaveSQLThread() error {
595
569
return nil
596
570
}
597
571
598
- // MasterPosWait is applicable with --test-on-replica
599
- func (this * Applier ) MasterPosWait (binlogCoordinates * mysql.BinlogCoordinates ) error {
600
- var appliedRows int64
601
- if err := this .db .QueryRow (`select master_pos_wait(?, ?, ?)` , binlogCoordinates .LogFile , binlogCoordinates .LogPos , 3 ).Scan (& appliedRows ); err != nil {
602
- return log .Errore (err )
603
- }
604
- if appliedRows < 0 {
605
- return fmt .Errorf ("Timeout waiting on master_pos_wait()" )
606
- }
607
- return nil
608
- }
609
-
610
572
func (this * Applier ) StopSlaveNicely () error {
611
573
if err := this .StopSlaveIOThread (); err != nil {
612
574
return err
@@ -760,92 +722,6 @@ func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRename
760
722
return nil
761
723
}
762
724
763
- // GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
764
- // on a okToRelease in order to release it
765
- func (this * Applier ) GrabVoluntaryLock (lockGrabbed chan <- error , okToRelease <- chan bool ) error {
766
- lockName := this .migrationContext .GetVoluntaryLockName ()
767
-
768
- tx , err := this .db .Begin ()
769
- if err != nil {
770
- lockGrabbed <- err
771
- return err
772
- }
773
- // Grab
774
- query := `select get_lock(?, 0)`
775
- lockResult := 0
776
- log .Infof ("Grabbing voluntary lock: %s" , lockName )
777
- if err := tx .QueryRow (query , lockName ).Scan (& lockResult ); err != nil {
778
- lockGrabbed <- err
779
- return err
780
- }
781
- if lockResult != 1 {
782
- err := fmt .Errorf ("Lock was not acquired" )
783
- lockGrabbed <- err
784
- return err
785
- }
786
- log .Infof ("Voluntary lock grabbed" )
787
- lockGrabbed <- nil // No error.
788
-
789
- // Listeners on the above will proceed to submit the "all queries till lock have been found"
790
- // We will wait here till we're told to. This will happen once all DML events up till lock
791
- // have been appleid on the ghost table
792
- <- okToRelease
793
- // Release
794
- query = `select ifnull(release_lock(?),0)`
795
- log .Infof ("Releasing voluntary lock" )
796
- if err := tx .QueryRow (query , lockName ).Scan (& lockResult ); err != nil {
797
- return log .Errore (err )
798
- }
799
- if lockResult != 1 {
800
- // Generally speaking we should never get this.
801
- return log .Errorf ("release_lock result was %+v" , lockResult )
802
- }
803
- tx .Rollback ()
804
-
805
- log .Infof ("Voluntary lock released" )
806
- return nil
807
-
808
- }
809
-
810
- // IssueBlockingQueryOnVoluntaryLock will SELECT on the original table using a
811
- // conditional on a known to be occupied lock. This query is expected to block,
812
- // and will further block the followup RENAME statement
813
- func (this * Applier ) IssueBlockingQueryOnVoluntaryLock (sessionIdChan chan int64 ) error {
814
- lockName := this .migrationContext .GetVoluntaryLockName ()
815
-
816
- tx , err := this .db .Begin ()
817
- if err != nil {
818
- return err
819
- }
820
- var sessionId int64
821
- if err := tx .QueryRow (`select connection_id()` ).Scan (& sessionId ); err != nil {
822
- return err
823
- }
824
- sessionIdChan <- sessionId
825
-
826
- // Grab
827
- query := fmt .Sprintf (`
828
- select /* gh-ost blocking-query-%s */
829
- release_lock(?)
830
- from %s.%s
831
- where
832
- get_lock(?, 86400) >= 0
833
- limit 1
834
- ` ,
835
- lockName ,
836
- sql .EscapeName (this .migrationContext .DatabaseName ),
837
- sql .EscapeName (this .migrationContext .OriginalTableName ),
838
- )
839
-
840
- dummyResult := 0
841
- log .Infof ("Issuing blocking query" )
842
- this .migrationContext .LockTablesStartTime = time .Now ()
843
- tx .QueryRow (query , lockName , lockName ).Scan (& dummyResult )
844
- tx .Rollback ()
845
- log .Infof ("Blocking query released" )
846
- return nil
847
- }
848
-
849
725
func (this * Applier ) ExpectUsedLock (sessionId int64 ) error {
850
726
var result int64
851
727
query := `select is_used_lock(?)`
0 commit comments