@@ -433,26 +433,21 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
433
433
return chunkSize , rowsAffected , duration , nil
434
434
}
435
435
436
- // LockTables
437
- func (this * Applier ) LockTables () error {
438
- // query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write, %s.%s write`,
439
- // sql.EscapeName(this.migrationContext.DatabaseName),
440
- // sql.EscapeName(this.migrationContext.OriginalTableName),
441
- // sql.EscapeName(this.migrationContext.DatabaseName),
442
- // sql.EscapeName(this.migrationContext.GetGhostTableName()),
443
- // sql.EscapeName(this.migrationContext.DatabaseName),
444
- // sql.EscapeName(this.migrationContext.GetChangelogTableName()),
445
- // )
436
+ // LockOriginalTable places a write lock on the original table
437
+ func (this * Applier ) LockOriginalTable () error {
446
438
query := fmt .Sprintf (`lock /* gh-ost */ tables %s.%s write` ,
447
439
sql .EscapeName (this .migrationContext .DatabaseName ),
448
440
sql .EscapeName (this .migrationContext .OriginalTableName ),
449
441
)
450
- log .Infof ("Locking tables" )
442
+ log .Infof ("Locking %s.%s" ,
443
+ sql .EscapeName (this .migrationContext .DatabaseName ),
444
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
445
+ )
451
446
this .migrationContext .LockTablesStartTime = time .Now ()
452
447
if _ , err := sqlutils .ExecNoPrepare (this .singletonDB , query ); err != nil {
453
448
return err
454
449
}
455
- log .Infof ("Tables locked" )
450
+ log .Infof ("Table locked" )
456
451
return nil
457
452
}
458
453
@@ -494,6 +489,21 @@ func (this *Applier) SwapTablesQuickAndBumpy() error {
494
489
return nil
495
490
}
496
491
492
+ func (this * Applier ) RenameTable (fromName , toName string ) (err error ) {
493
+ query := fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s` ,
494
+ sql .EscapeName (this .migrationContext .DatabaseName ),
495
+ sql .EscapeName (fromName ),
496
+ sql .EscapeName (this .migrationContext .DatabaseName ),
497
+ sql .EscapeName (toName ),
498
+ )
499
+ log .Infof ("Renaming %s to %s" , fromName , toName )
500
+ if _ , err := sqlutils .ExecNoPrepare (this .db , query ); err != nil {
501
+ return log .Errore (err )
502
+ }
503
+ log .Infof ("Table renamed" )
504
+ return nil
505
+ }
506
+
497
507
// SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table
498
508
// into original's place
499
509
func (this * Applier ) SwapTablesAtomic (sessionIdChan chan int64 ) error {
@@ -587,6 +597,144 @@ func (this *Applier) StopSlaveNicely() error {
587
597
return nil
588
598
}
589
599
600
+ func (this * Applier ) GetSessionLockName (sessionId int64 ) string {
601
+ return fmt .Sprintf ("gh-ost.%d.lock" , sessionId )
602
+ }
603
+
604
+ // LockOriginalTableAndWait locks the original table, notifies the lock is in
605
+ // place, and awaits further instruction
606
+ func (this * Applier ) LockOriginalTableAndWait (sessionIdChan chan int64 , tableLocked chan <- error , okToUnlockTable <- chan bool , tableUnlocked chan <- error ) error {
607
+ tx , err := this .db .Begin ()
608
+ if err != nil {
609
+ tableLocked <- err
610
+ return err
611
+ }
612
+ defer func () {
613
+ tx .Rollback ()
614
+ }()
615
+
616
+ var sessionId int64
617
+ if err := tx .QueryRow (`select connection_id()` ).Scan (& sessionId ); err != nil {
618
+ return err
619
+ }
620
+ sessionIdChan <- sessionId
621
+
622
+ query := `select get_lock(?, 0)`
623
+ lockResult := 0
624
+ lockName := this .GetSessionLockName (sessionId )
625
+ log .Infof ("Grabbing voluntary lock: %s" , lockName )
626
+ if err := tx .QueryRow (query , lockName ).Scan (& lockResult ); err != nil || lockResult != 1 {
627
+ return fmt .Errorf ("Unable to acquire lock %s" , lockName )
628
+ }
629
+
630
+ query = fmt .Sprintf (`lock /* gh-ost */ tables %s.%s write` ,
631
+ sql .EscapeName (this .migrationContext .DatabaseName ),
632
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
633
+ )
634
+ log .Infof ("Locking %s.%s" ,
635
+ sql .EscapeName (this .migrationContext .DatabaseName ),
636
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
637
+ )
638
+ this .migrationContext .LockTablesStartTime = time .Now ()
639
+ if _ , err := tx .Exec (query ); err != nil {
640
+ tableLocked <- err
641
+ return err
642
+ }
643
+ log .Infof ("Table locked" )
644
+ tableLocked <- nil // No error.
645
+
646
+ // The cut-over phase will proceed to apply remaining backlon onto ghost table,
647
+ // and issue RENAMEs. We wait here until told to proceed.
648
+ <- okToUnlockTable
649
+ // Release
650
+ query = `unlock tables`
651
+ log .Infof ("Releasing lock from %s.%s" ,
652
+ sql .EscapeName (this .migrationContext .DatabaseName ),
653
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
654
+ )
655
+ if _ , err := tx .Exec (query ); err != nil {
656
+ tableUnlocked <- err
657
+ return log .Errore (err )
658
+ }
659
+ log .Infof ("Table unlocked" )
660
+ tableUnlocked <- nil
661
+ return nil
662
+ }
663
+
664
+ // RenameOriginalTable will attempt renaming the original table into _old
665
+ func (this * Applier ) RenameOriginalTable (sessionIdChan chan int64 , originalTableRenamed chan <- error ) error {
666
+ tx , err := this .db .Begin ()
667
+ if err != nil {
668
+ return err
669
+ }
670
+ defer func () {
671
+ tx .Rollback ()
672
+ originalTableRenamed <- nil
673
+ }()
674
+ var sessionId int64
675
+ if err := tx .QueryRow (`select connection_id()` ).Scan (& sessionId ); err != nil {
676
+ return err
677
+ }
678
+ sessionIdChan <- sessionId
679
+
680
+ log .Infof ("Setting RENAME timeout as %d seconds" , this .migrationContext .SwapTablesTimeoutSeconds )
681
+ query := fmt .Sprintf (`set session lock_wait_timeout:=%d` , this .migrationContext .SwapTablesTimeoutSeconds )
682
+ if _ , err := tx .Exec (query ); err != nil {
683
+ return err
684
+ }
685
+
686
+ query = fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s` ,
687
+ sql .EscapeName (this .migrationContext .DatabaseName ),
688
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
689
+ sql .EscapeName (this .migrationContext .DatabaseName ),
690
+ sql .EscapeName (this .migrationContext .GetOldTableName ()),
691
+ )
692
+ log .Infof ("Issuing and expecting this to block: %s" , query )
693
+ if _ , err := tx .Exec (query ); err != nil {
694
+ return log .Errore (err )
695
+ }
696
+ log .Infof ("Original table renamed" )
697
+ return nil
698
+ }
699
+
700
+ // RenameGhostTable will attempt renaming the ghost table into original
701
+ func (this * Applier ) RenameGhostTable (sessionIdChan chan int64 , ghostTableRenamed chan <- error ) error {
702
+ tx , err := this .db .Begin ()
703
+ if err != nil {
704
+ return err
705
+ }
706
+ defer func () {
707
+ tx .Rollback ()
708
+ }()
709
+ var sessionId int64
710
+ if err := tx .QueryRow (`select connection_id()` ).Scan (& sessionId ); err != nil {
711
+ return err
712
+ }
713
+ sessionIdChan <- sessionId
714
+
715
+ log .Infof ("Setting RENAME timeout as %d seconds" , this .migrationContext .SwapTablesTimeoutSeconds )
716
+ query := fmt .Sprintf (`set session lock_wait_timeout:=%d` , this .migrationContext .SwapTablesTimeoutSeconds )
717
+ if _ , err := tx .Exec (query ); err != nil {
718
+ return err
719
+ }
720
+
721
+ query = fmt .Sprintf (`rename /* gh-ost */ table %s.%s to %s.%s` ,
722
+ sql .EscapeName (this .migrationContext .DatabaseName ),
723
+ sql .EscapeName (this .migrationContext .GetGhostTableName ()),
724
+ sql .EscapeName (this .migrationContext .DatabaseName ),
725
+ sql .EscapeName (this .migrationContext .OriginalTableName ),
726
+ )
727
+ log .Infof ("Issuing and expecting this to block: %s" , query )
728
+ if _ , err := tx .Exec (query ); err != nil {
729
+ ghostTableRenamed <- err
730
+ return log .Errore (err )
731
+ }
732
+ log .Infof ("Ghost table renamed" )
733
+ ghostTableRenamed <- nil
734
+
735
+ return nil
736
+ }
737
+
590
738
// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
591
739
// on a okToRelease in order to release it
592
740
func (this * Applier ) GrabVoluntaryLock (lockGrabbed chan <- error , okToRelease <- chan bool ) error {
@@ -673,6 +821,17 @@ func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64)
673
821
return nil
674
822
}
675
823
824
+ func (this * Applier ) ExpectUsedLock (sessionId int64 ) error {
825
+ var result int64
826
+ query := `select is_used_lock(?)`
827
+ lockName := this .GetSessionLockName (sessionId )
828
+ log .Infof ("Checking session lock: %s" , lockName )
829
+ if err := this .db .QueryRow (query , lockName ).Scan (& result ); err != nil || result != sessionId {
830
+ return fmt .Errorf ("Session lock %s expected to be found but wasn't" , lockName )
831
+ }
832
+ return nil
833
+ }
834
+
676
835
func (this * Applier ) ExpectProcess (sessionId int64 , stateHint , infoHint string ) error {
677
836
found := false
678
837
query := `
0 commit comments