Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/mysql-migtests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
yes | ./installer_scripts/install-yb-voyager --install-from-local-source
env:
ON_INSTALLER_ERROR_OUTPUT_LOG: Y
DEBEZIUM_VERSION: latest
DEBEZIUM_VERSION: aneesh_ff-fb-remove
DEBEZIUM_RELEASE_TAG: voyager-debezium

- name: Start MySQL
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pg-migtests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
yes | ./install-yb-voyager --install-from-local-source --only-pg-support
env:
ON_INSTALLER_ERROR_OUTPUT_LOG: Y
DEBEZIUM_VERSION: latest
DEBEZIUM_VERSION: aneesh_ff-fb-remove
DEBEZIUM_RELEASE_TAG: voyager-debezium

- name: Test PostgreSQL Connection
Expand Down
20 changes: 10 additions & 10 deletions yb-voyager/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ func getCutoverStatus() string {
utils.ErrExit("get migration status record: %v", err)
}

a := msr.CutoverRequested
a := msr.CutoverToTargetRequested
b := msr.CutoverProcessedBySourceExporter
c := msr.CutoverProcessedByTargetImporter
d := msr.FallForwardSyncStarted
d := msr.ExportFromTargetFallForwardStarted
ffDBExists := msr.FallForwardEnabled
if !a {
return NOT_INITIATED
Expand All @@ -399,14 +399,14 @@ func checkStreamingMode() (bool, error) {
return streamChanges, nil
}

func getFallForwardStatus() string {
func getCutoverToSourceReplicaStatus() string {
msr, err := metaDB.GetMigrationStatusRecord()
if err != nil {
utils.ErrExit("get migration status record: %v", err)
}
a := msr.FallForwardSwitchRequested
b := msr.FallForwardSwitchProcessedByTargetExporter
c := msr.FallForwardSwitchProcessedByFFImporter
a := msr.CutoverToSourceReplicaRequested
b := msr.CutoverToSourceReplicaProcessedByTargetExporter
c := msr.CutoverToSourceReplicaProcessedBySRImporter

if !a {
return NOT_INITIATED
Expand All @@ -416,14 +416,14 @@ func getFallForwardStatus() string {
return INITIATED
}

func getFallBackStatus() string {
func getCutoverToSourceStatus() string {
msr, err := metaDB.GetMigrationStatusRecord()
if err != nil {
utils.ErrExit("get migration status record: %v", err)
}
a := msr.FallBackSwitchRequested
b := msr.FallBackSwitchProcessedByTargetExporter
c := msr.FallBackSwitchProcessedByFBImporter
a := msr.CutoverToSourceRequested
b := msr.CutoverToSourceProcessedByTargetExporter
c := msr.CutoverToSourceProcessedBySourceImporter

if !a {
return NOT_INITIATED
Expand Down
72 changes: 36 additions & 36 deletions yb-voyager/cmd/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,42 @@ limitations under the License.
package cmd

const (
KB = 1024
MB = 1024 * 1024
META_INFO_DIR_NAME = "metainfo"
NEWLINE = '\n'
ORACLE_DEFAULT_PORT = 1521
MYSQL_DEFAULT_PORT = 3306
POSTGRES_DEFAULT_PORT = 5432
YUGABYTEDB_YSQL_DEFAULT_PORT = 5433
YUGABYTEDB_DEFAULT_DATABASE = "yugabyte"
YUGABYTEDB_DEFAULT_SCHEMA = "public"
ORACLE = "oracle"
MYSQL = "mysql"
POSTGRESQL = "postgresql"
YUGABYTEDB = "yugabytedb"
LAST_SPLIT_NUM = 0
SPLIT_INFO_PATTERN = "[0-9]*.[0-9]*.[0-9]*.[0-9]*"
LAST_SPLIT_PATTERN = "0.[0-9]*.[0-9]*.[0-9]*"
COPY_MAX_RETRY_COUNT = 10
MAX_SLEEP_SECOND = 60
DEFAULT_BATCH_SIZE_ORACLE = 10000000
DEFAULT_BATCH_SIZE_YUGABYTEDB = 20000
INDEX_RETRY_COUNT = 5
DDL_MAX_RETRY_COUNT = 5
SCHEMA_VERSION_MISMATCH_ERR = "Query error: schema version mismatch for table"
SNAPSHOT_ONLY = "snapshot-only"
SNAPSHOT_AND_CHANGES = "snapshot-and-changes"
CHANGES_ONLY = "changes-only"
TARGET_DB = "target"
FF_DB = "ff"
FF_DB_IMPORTER_ROLE = "ff_db_importer"
FB_DB_IMPORTER_ROLE = "fb_db_importer"
TARGET_DB_IMPORTER_ROLE = "target_db_importer"
SOURCE_DB_EXPORTER_ROLE = "source_db_exporter"
TARGET_DB_EXPORTER_FF_ROLE = "target_db_exporter_ff"
TARGET_DB_EXPORTER_FB_ROLE = "target_db_exporter_fb"
IMPORT_FILE_ROLE = "import_file"
KB = 1024
MB = 1024 * 1024
META_INFO_DIR_NAME = "metainfo"
NEWLINE = '\n'
ORACLE_DEFAULT_PORT = 1521
MYSQL_DEFAULT_PORT = 3306
POSTGRES_DEFAULT_PORT = 5432
YUGABYTEDB_YSQL_DEFAULT_PORT = 5433
YUGABYTEDB_DEFAULT_DATABASE = "yugabyte"
YUGABYTEDB_DEFAULT_SCHEMA = "public"
ORACLE = "oracle"
MYSQL = "mysql"
POSTGRESQL = "postgresql"
YUGABYTEDB = "yugabytedb"
LAST_SPLIT_NUM = 0
SPLIT_INFO_PATTERN = "[0-9]*.[0-9]*.[0-9]*.[0-9]*"
LAST_SPLIT_PATTERN = "0.[0-9]*.[0-9]*.[0-9]*"
COPY_MAX_RETRY_COUNT = 10
MAX_SLEEP_SECOND = 60
DEFAULT_BATCH_SIZE_ORACLE = 10000000
DEFAULT_BATCH_SIZE_YUGABYTEDB = 20000
INDEX_RETRY_COUNT = 5
DDL_MAX_RETRY_COUNT = 5
SCHEMA_VERSION_MISMATCH_ERR = "Query error: schema version mismatch for table"
SNAPSHOT_ONLY = "snapshot-only"
SNAPSHOT_AND_CHANGES = "snapshot-and-changes"
CHANGES_ONLY = "changes-only"
TARGET_DB = "target"
FF_DB = "ff"
SOURCE_REPLICA_DB_IMPORTER_ROLE = "source_replica_db_importer"
SOURCE_DB_IMPORTER_ROLE = "source_db_importer"
TARGET_DB_IMPORTER_ROLE = "target_db_importer"
SOURCE_DB_EXPORTER_ROLE = "source_db_exporter"
TARGET_DB_EXPORTER_FF_ROLE = "target_db_exporter_ff"
TARGET_DB_EXPORTER_FB_ROLE = "target_db_exporter_fb"
IMPORT_FILE_ROLE = "import_file"
)

var supportedSourceDBTypes = []string{ORACLE, MYSQL, POSTGRESQL, YUGABYTEDB}
Expand Down
179 changes: 65 additions & 114 deletions yb-voyager/cmd/cutover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package cmd
import (
"fmt"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm"
"github.com/yugabyte/yb-voyager/yb-voyager/src/metadb"
Expand Down Expand Up @@ -49,149 +48,101 @@ func init() {
cutoverCmd.AddCommand(cutoverToCmd)
}

func InitiatePrimarySwitch(action string) error {
userFacingActionMsg := "cutover"
switch action {
case "cutover":
userFacingActionMsg = "cutover to target"
case "fallforward":
userFacingActionMsg = "cutover to source-replica"
case "fallback":
userFacingActionMsg = "cutover to source"
}
func InitiateCutover(dbRole string) error {
userFacingActionMsg := fmt.Sprintf("cutover to %s", dbRole)
if !utils.AskPrompt(fmt.Sprintf("Are you sure you want to initiate %s? (y/n)", userFacingActionMsg)) {
utils.PrintAndLog("Aborting %s", userFacingActionMsg)
return nil
}
triggerName := action
err := createTriggerIfNotExists(triggerName)
if err != nil {
return err
}
utils.PrintAndLog("%s initiated, wait for it to complete", userFacingActionMsg)
return nil
}
alreadyInitiatedMsg := fmt.Sprintf("cutover to %s already initiated, wait for it to complete", dbRole)

func createTriggerIfNotExists(triggerName string) error {
cutoverMsg := "cutover already initiated, wait for it to complete"
fallforwardMsg := "cutover to source-replica already initiated, wait for it to complete"
fallbackMsg := "cutover to source already initiated, wait for it to complete"
err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
switch triggerName {
case "cutover":
if record.CutoverRequested {
utils.PrintAndLog(cutoverMsg)
}
// if the above check fails, we will set this to true otherwise its a no-op
record.CutoverRequested = true
case "cutover.source":
if record.CutoverProcessedBySourceExporter {
utils.PrintAndLog(cutoverMsg)
}
record.CutoverProcessedBySourceExporter = true
case "cutover.target":
if record.CutoverProcessedByTargetImporter {
utils.PrintAndLog(cutoverMsg)
switch dbRole {
case "target":
if record.CutoverToTargetRequested {
utils.PrintAndLog(alreadyInitiatedMsg)
}
record.CutoverProcessedByTargetImporter = true
case "fallforward":
if record.FallForwardSwitchRequested {
utils.PrintAndLog(fallforwardMsg)
}
record.FallForwardSwitchRequested = true
case "fallforward.target":
if record.FallForwardSwitchProcessedByTargetExporter {
utils.PrintAndLog(fallforwardMsg)
}
record.FallForwardSwitchProcessedByTargetExporter = true
case "fallforward.ff":
if record.FallForwardSwitchProcessedByFFImporter {
utils.PrintAndLog(fallforwardMsg)
}
record.FallForwardSwitchProcessedByFFImporter = true
case "fallback":
if record.FallBackSwitchRequested {
utils.PrintAndLog(fallbackMsg)
}
record.FallBackSwitchRequested = true
case "fallback.target":
if record.FallBackSwitchProcessedByTargetExporter {
utils.PrintAndLog(fallbackMsg)
record.CutoverToTargetRequested = true
case "source-replica":
if record.CutoverToSourceReplicaRequested {
utils.PrintAndLog(alreadyInitiatedMsg)
}
record.FallBackSwitchProcessedByTargetExporter = true
case "fallback.source":
if record.FallBackSwitchProcessedByFBImporter {
utils.PrintAndLog(fallbackMsg)
record.CutoverToSourceReplicaRequested = true
case "source":
if record.CutoverToSourceRequested {
utils.PrintAndLog(alreadyInitiatedMsg)
}
record.FallBackSwitchProcessedByFBImporter = true
default:
panic("invalid trigger name")
record.CutoverToSourceRequested = true
}

})
if err != nil {
log.Errorf("creating trigger(%s): %v", triggerName, err)
return fmt.Errorf("creating trigger(%s): %w", triggerName, err)
return fmt.Errorf("failed to update MSR: %w", err)
}
utils.PrintAndLog("%s initiated, wait for it to complete", userFacingActionMsg)
return nil
}

func getTriggerName(importerOrExporterRole string) (string, error) {
switch importerOrExporterRole {
case SOURCE_DB_EXPORTER_ROLE:
return "cutover.source", nil
case TARGET_DB_IMPORTER_ROLE:
return "cutover.target", nil
case TARGET_DB_EXPORTER_FF_ROLE:
return "fallforward.target", nil
case TARGET_DB_EXPORTER_FB_ROLE:
return "fallback.target", nil
case FF_DB_IMPORTER_ROLE:
return "fallforward.ff", nil
case FB_DB_IMPORTER_ROLE:
return "fallback.source", nil
default:
return "", fmt.Errorf("invalid role %s", importerOrExporterRole)
}
func markCutoverProcessed(importerOrExporterRole string) error {
err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
switch importerOrExporterRole {
case SOURCE_DB_EXPORTER_ROLE:
record.CutoverProcessedBySourceExporter = true
case TARGET_DB_IMPORTER_ROLE:
record.CutoverProcessedByTargetImporter = true
case TARGET_DB_EXPORTER_FF_ROLE:
record.CutoverToSourceReplicaProcessedByTargetExporter = true
case TARGET_DB_EXPORTER_FB_ROLE:
record.CutoverToSourceProcessedByTargetExporter = true
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
record.CutoverToSourceReplicaProcessedBySRImporter = true
case SOURCE_DB_IMPORTER_ROLE:
record.CutoverToSourceProcessedBySourceImporter = true
default:
panic(fmt.Sprintf("invalid role %s", importerOrExporterRole))
}
})
return err
}

func exitIfDBSwitchedOver(triggerName string) {
func ExitIfAlreadyCutover(importerOrExporterRole string) {
if !dbzm.IsMigrationInStreamingMode(exportDir) {
return
}

msr, err := metaDB.GetMigrationStatusRecord()
record, err := metaDB.GetMigrationStatusRecord()
if err != nil {
utils.ErrExit("exit if db switched over for trigger(%s) exists: load migration status record: %s", triggerName, err)
utils.ErrExit("exit if already cutover: load migration status record: %s", err)
}
cutoverMsg := "cutover already completed for this migration, aborting..."
fallforwardMsg := "cutover to source-replica already completed for this migration, aborting..."
fallbackMsg := "cutover to source already completed for this migration, aborting..."
switch triggerName { // only these trigger names required to be checked for db switch over
case "cutover.source":
if msr.CutoverProcessedBySourceExporter {
utils.ErrExit(cutoverMsg)
cTAlreadyCompleted := "cutover already completed for this migration, aborting..."
cSRAlreadyCompleted := "cutover to source-replica already completed for this migration, aborting..."
cSAlreadyCompleted := "cutover to source already completed for this migration, aborting..."
switch importerOrExporterRole {
case SOURCE_DB_EXPORTER_ROLE:
if record.CutoverProcessedBySourceExporter {
utils.ErrExit(cTAlreadyCompleted)
}
case "cutover.target":
if msr.CutoverProcessedByTargetImporter {
utils.ErrExit(cutoverMsg)
case TARGET_DB_IMPORTER_ROLE:
if record.CutoverProcessedByTargetImporter {
utils.ErrExit(cTAlreadyCompleted)
}
case "fallforward.target":
if msr.FallForwardSwitchProcessedByTargetExporter {
utils.ErrExit(fallforwardMsg)
case TARGET_DB_EXPORTER_FF_ROLE:
if record.CutoverToSourceReplicaProcessedByTargetExporter {
utils.ErrExit(cSRAlreadyCompleted)
}
case "fallforward.ff":
if msr.FallForwardSwitchProcessedByFFImporter {
utils.ErrExit(fallforwardMsg)
case TARGET_DB_EXPORTER_FB_ROLE:
if record.CutoverToSourceProcessedByTargetExporter {
utils.ErrExit(cSAlreadyCompleted)
}
case "fallback.source":
if msr.FallBackSwitchProcessedByFBImporter {
utils.ErrExit(fallbackMsg)
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
if record.CutoverToSourceReplicaProcessedBySRImporter {
utils.ErrExit(cSRAlreadyCompleted)
}
case "fallback.target":
if msr.FallBackSwitchProcessedByTargetExporter {
utils.ErrExit(fallbackMsg)
case SOURCE_DB_IMPORTER_ROLE:
if record.CutoverToSourceProcessedBySourceImporter {
utils.ErrExit(cSAlreadyCompleted)
}
default:
panic("invalid trigger name - " + triggerName)
panic(fmt.Sprintf("invalid role %s", importerOrExporterRole))
}
}
Loading