@@ -389,11 +389,18 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
389
389
atomic .StoreInt64 (& this .migrationContext .IsPostponingCutOver , 0 )
390
390
391
391
if this .migrationContext .TestOnReplica {
392
- return this .stopWritesAndCompleteMigrationOnReplica ()
392
+ // With `--test-on-replica` we stop replication thread, and then proceed to use
393
+ // the same cut-over phase as the master would use. That means we take locks
394
+ // and swap the tables.
395
+ // The difference is that we will later swap the tables back.
396
+ log .Debugf ("testing on replica. Stopping replication IO thread" )
397
+ if err := this .retryOperation (this .applier .StopSlaveNicely ); err != nil {
398
+ return err
399
+ }
400
+ // We're merly testing, we don't want to keep this state. Rollback the renames as possible
401
+ defer this .applier .RenameTablesRollback ()
393
402
}
394
- // Running on master
395
-
396
- {
403
+ if this .migrationContext .CutOverType == base .CutOverSafe {
397
404
// Lock-based solution: we use low timeout and multiple attempts. But for
398
405
// each failed attempt, we throttle until replication lag is back to normal
399
406
err := this .retryOperation (
@@ -404,20 +411,10 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
404
411
return err
405
412
}
406
413
if this .migrationContext .CutOverType == base .CutOverTwoStep {
407
- return this .stopWritesAndCompleteMigrationOnMasterQuickAndBumpy ()
408
- }
409
-
410
- {
411
- // Lock-based solution: we use low timeout and multiple attempts. But for
412
- // each failed attempt, we throttle until replication lag is back to normal
413
- if err := this .retryOperation (
414
- func () error {
415
- return this .executeAndThrottleOnError (this .stopWritesAndCompleteMigrationOnMasterViaLock )
416
- }); err != nil {
417
- return err
418
- }
414
+ err := this .retryOperation (this .cutOverTwoStep )
415
+ return err
419
416
}
420
- return
417
+ return nil
421
418
}
422
419
423
420
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
@@ -440,11 +437,11 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
440
437
return nil
441
438
}
442
439
443
- // stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
440
+ // cutOverTwoStep will lock down the original table, execute
444
441
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
445
442
// There is a point in time where the "original" table does not exist and queries are non-blocked
446
443
// and failing.
447
- func (this * Migrator ) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy () (err error ) {
444
+ func (this * Migrator ) cutOverTwoStep () (err error ) {
448
445
if err := this .retryOperation (this .applier .LockOriginalTable ); err != nil {
449
446
return err
450
447
}
@@ -465,69 +462,6 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
465
462
return nil
466
463
}
467
464
468
- // stopWritesAndCompleteMigrationOnMasterViaLock will lock down the original table, execute
469
- // what's left of last DML entries, and atomically swap & unlock (original->old && new->original)
470
- func (this * Migrator ) stopWritesAndCompleteMigrationOnMasterViaLock () (err error ) {
471
- lockGrabbed := make (chan error , 1 )
472
- okToReleaseLock := make (chan bool , 1 )
473
- swapResult := make (chan error , 1 )
474
- go func () {
475
- if err := this .applier .GrabVoluntaryLock (lockGrabbed , okToReleaseLock ); err != nil {
476
- log .Errore (err )
477
- }
478
- }()
479
- if err := <- lockGrabbed ; err != nil {
480
- return log .Errore (err )
481
- }
482
- blockingQuerySessionIdChan := make (chan int64 , 1 )
483
- go func () {
484
- this .applier .IssueBlockingQueryOnVoluntaryLock (blockingQuerySessionIdChan )
485
- }()
486
- blockingQuerySessionId := <- blockingQuerySessionIdChan
487
- log .Infof ("Intentional blocking query connection id is %+v" , blockingQuerySessionId )
488
-
489
- if err := this .retryOperation (
490
- func () error {
491
- return this .applier .ExpectProcess (blockingQuerySessionId , "User lock" , this .migrationContext .GetVoluntaryLockName ())
492
- }); err != nil {
493
- return err
494
- }
495
- log .Infof ("Found blocking query to be executing" )
496
- swapSessionIdChan := make (chan int64 , 1 )
497
- go func () {
498
- swapResult <- this .applier .SwapTablesAtomic (swapSessionIdChan )
499
- }()
500
-
501
- swapSessionId := <- swapSessionIdChan
502
- log .Infof ("RENAME connection id is %+v" , swapSessionId )
503
- if err := this .retryOperation (
504
- func () error {
505
- return this .applier .ExpectProcess (swapSessionId , "metadata lock" , "rename" )
506
- }); err != nil {
507
- return err
508
- }
509
- log .Infof ("Found RENAME to be executing" )
510
-
511
- // OK, at this time we know any newly incoming DML on original table is blocked.
512
- this .waitForEventsUpToLock ()
513
-
514
- okToReleaseLock <- true
515
- // BAM: voluntary lock is released, blocking query is released, rename is released.
516
- // We now check RENAME result. We have lock_wait_timeout. We put it on purpose, to avoid
517
- // locking the tables for too long. If lock time exceeds said timeout, the RENAME fails
518
- // and returns a non-nil error, in which case tables have not been swapped, and we are
519
- // not really done. We are, however, good to go for more retries.
520
- if err := <- swapResult ; err != nil {
521
- // Bummer. We shall rest a while and try again
522
- return err
523
- }
524
- // ooh nice! We're actually truly and thankfully done
525
- lockAndRenameDuration := this .migrationContext .RenameTablesEndTime .Sub (this .migrationContext .LockTablesStartTime )
526
- renameDuration := this .migrationContext .RenameTablesEndTime .Sub (this .migrationContext .RenameTablesStartTime )
527
- log .Debugf ("Lock & rename duration: %s. Of this, rename time was %s. During rename time, queries on %s were blocked" , lockAndRenameDuration , renameDuration , sql .EscapeName (this .migrationContext .OriginalTableName ))
528
- return nil
529
- }
530
-
531
465
// cutOverSafe performs a safe cut over, where normally (no failure) the original table
532
466
// is being locked until swapped, hence DML queries being locked and unaware of the cut-over.
533
467
// In the worst case, there will ba a minor outage, where the original table would not exist.
0 commit comments