diff --git a/cmd/pbm-agent/agent.go b/cmd/pbm-agent/agent.go index 9e2781cae..fb58b9379 100644 --- a/cmd/pbm-agent/agent.go +++ b/cmd/pbm-agent/agent.go @@ -26,6 +26,7 @@ import ( type Agent struct { leadConn connect.Client + ccrsConn connect.Client nodeConn *mongo.Client bcp *currentBackup pitrjob *currentPitr @@ -47,6 +48,7 @@ type Agent struct { func newAgent( ctx context.Context, leadConn connect.Client, + ccrsConn connect.Client, uri string, numParallelColls int, ) (*Agent, error) { @@ -67,6 +69,7 @@ func newAgent( a := &Agent{ leadConn: leadConn, + ccrsConn: ccrsConn, closeCMD: make(chan struct{}), nodeConn: nodeConn, brief: topo.NodeBrief{ @@ -154,8 +157,9 @@ func (a *Agent) Start(ctx context.Context) error { logger.Printf("conn level ReadConcern: %v; WriteConcern: %v", a.leadConn.MongoOptions().ReadConcern.Level, a.leadConn.MongoOptions().WriteConcern.W) + // todo: also log for ccrsConn? - c, cerr := ctrl.ListenCmd(ctx, a.leadConn, a.closeCMD) + c, cerr := ctrl.ListenCmd(ctx, a.ccrsConn, a.closeCMD) logger.Printf("listening for the commands") @@ -169,7 +173,7 @@ func (a *Agent) Start(ctx context.Context) error { logger.Printf("got command %s, opid: %s", cmd, cmd.OPID) - ep, err := config.GetEpoch(ctx, a.leadConn) + ep, err := config.GetEpoch(ctx, a.ccrsConn) if err != nil { logger.Error(string(cmd.Cmd), "", cmd.OPID.String(), ep.TS(), "get epoch: %v", err) continue @@ -210,7 +214,7 @@ func (a *Agent) Start(ctx context.Context) error { return errors.Wrap(err, "stop listening") } - ep, _ := config.GetEpoch(ctx, a.leadConn) + ep, _ := config.GetEpoch(ctx, a.ccrsConn) logger.Error("", "", "", ep.TS(), "listening commands: %v", err) } } @@ -284,14 +288,14 @@ func (a *Agent) HbStatus(ctx context.Context) { } updateAgentStat(ctx, a, l, true, &hb) - err = topo.SetAgentStatus(ctx, a.leadConn, &hb) + err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb) if err != nil { l.Error("set status: %v", err) } defer func() { l.Debug("deleting agent status") - err := topo.RemoveAgentStatus(context.Background(), a.leadConn, hb) + err := topo.RemoveAgentStatus(context.Background(), a.ccrsConn, hb) if err != nil { logger := logger.NewEvent("agentCheckup", "", "", primitive.Timestamp{}) logger.Error("remove agent heartbeat: %v", err) @@ -326,13 +330,13 @@ func (a *Agent) HbStatus(ctx context.Context) { if now.Sub(storageCheckTime) >= storageCheckInterval { updateAgentStat(ctx, a, l, true, &hb) - err = topo.SetAgentStatus(ctx, a.leadConn, &hb) + err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb) if err == nil { storageCheckTime = now } } else { updateAgentStat(ctx, a, l, false, &hb) - err = topo.SetAgentStatus(ctx, a.leadConn, &hb) + err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb) } if err != nil { l.Error("set status: %v", err) @@ -410,7 +414,7 @@ func (a *Agent) warnIfParallelAgentDetected( l log.LogEvent, lastHeartbeat primitive.Timestamp, ) { - s, err := topo.GetAgentStatus(ctx, a.leadConn, a.brief.SetName, a.brief.Me) + s, err := topo.GetAgentStatus(ctx, a.ccrsConn, a.brief.SetName, a.brief.Me) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return @@ -456,7 +460,7 @@ func (a *Agent) storStatus( return topo.SubsysStatus{OK: true} } - stg, err := util.GetStorage(ctx, a.leadConn, a.brief.Me, log) + stg, err := util.GetStorage(ctx, a.ccrsConn, a.brief.Me, log) if err != nil { return topo.SubsysStatus{Err: fmt.Sprintf("unable to get storage: %v", err)} } diff --git a/cmd/pbm-agent/backup.go b/cmd/pbm-agent/backup.go index 2e18eb82b..b43a53158 100644 --- a/cmd/pbm-agent/backup.go +++ b/cmd/pbm-agent/backup.go @@ -97,7 +97,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, go a.sliceNow(opid) } - cfg, err := config.GetProfiledConfig(ctx, a.leadConn, cmd.Profile) + cfg, err := config.GetProfiledConfig(ctx, a.ccrsConn, cmd.Profile) if err != nil { l.Error("get profiled config: %v", err) return @@ -106,11 +106,11 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, var bcp *backup.Backup switch cmd.Type { case defs.PhysicalBackup: - bcp = backup.NewPhysical(a.leadConn, a.nodeConn, a.brief) + bcp = backup.NewPhysical(a.leadConn, a.ccrsConn, a.nodeConn, a.brief) case defs.ExternalBackup: - bcp = backup.NewExternal(a.leadConn, a.nodeConn, a.brief) + bcp = backup.NewExternal(a.leadConn, a.ccrsConn, a.nodeConn, a.brief) case defs.IncrementalBackup: - bcp = backup.NewIncremental(a.leadConn, a.nodeConn, a.brief, cmd.IncrBase) + bcp = backup.NewIncremental(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, cmd.IncrBase) case defs.LogicalBackup: fallthrough default: @@ -118,7 +118,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, if cfg.Backup != nil && cfg.Backup.NumParallelCollections > 0 { numParallelColls = cfg.Backup.NumParallelCollections } - bcp = backup.New(a.leadConn, a.nodeConn, a.brief, numParallelColls) + bcp = backup.New(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, numParallelColls) } bcp.SetConfig(cfg) @@ -145,8 +145,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, } l.Debug("init backup meta") - if err = topo.CheckTopoForBackup(ctx, a.leadConn, cmd.Type); err != nil { - ferr := backup.ChangeBackupState(a.leadConn, cmd.Name, defs.StatusError, err.Error()) + if err = topo.CheckTopoForBackup(ctx, a.leadConn, a.ccrsConn, cmd.Type); err != nil { + ferr := backup.ChangeBackupState(a.ccrsConn, cmd.Name, defs.StatusError, err.Error()) l.Info("mark backup as %s `%v`: %v", defs.StatusError, err, ferr) return } @@ -157,7 +157,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, const srcHostMultiplier = 3.0 var c map[string]float64 if cmd.Type == defs.IncrementalBackup && !cmd.IncrBase { - src, err := backup.LastIncrementalBackup(ctx, a.leadConn) + src, err := backup.LastIncrementalBackup(ctx, a.ccrsConn) if err != nil { // try backup anyway l.Warning("define source backup: %v", err) @@ -169,7 +169,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, } } - agents, err := topo.ListSteadyAgents(ctx, a.leadConn) + agents, err := topo.ListSteadyAgents(ctx, a.leadConn, a.ccrsConn) if err != nil { l.Error("get agents list: %v", err) return @@ -204,7 +204,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, } epoch := ep.TS() - lck := lock.NewLock(a.leadConn, lock.LockHeader{ + lck := lock.NewLock(a.ccrsConn, lock.LockHeader{ Type: ctrl.CmdBackup, Replset: a.brief.SetName, Node: a.brief.Me, @@ -229,7 +229,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, } }() - err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me) + err = backup.SetRSNomineeACK(ctx, a.ccrsConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me) if err != nil { l.Warning("set nominee ack: %v", err) } @@ -272,13 +272,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string l := log.LogEventFromContext(ctx) l.Debug("nomination list for %s: %v", rs, nodes) - err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs) + err := backup.SetRSNomination(ctx, a.ccrsConn, bcp, rs) if err != nil { return errors.Wrap(err, "set nomination meta") } for _, n := range nodes { - nms, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs) + nms, err := backup.GetRSNominees(ctx, a.ccrsConn, bcp, rs) if err != nil && !errors.Is(err, errors.ErrNotFound) { return errors.Wrap(err, "get nomination meta") } @@ -287,13 +287,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string return nil } - err = backup.SetRSNominees(ctx, a.leadConn, bcp, rs, n) + err = backup.SetRSNominees(ctx, a.ccrsConn, bcp, rs, n) if err != nil { return errors.Wrap(err, "set nominees") } l.Debug("nomination %s, set candidates %v", rs, n) - err = backup.BackupHB(ctx, a.leadConn, bcp) + err = backup.BackupHB(ctx, a.leadConn, a.ccrsConn, bcp) if err != nil { l.Warning("send heartbeat: %v", err) } @@ -316,7 +316,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) { for { select { case <-tk.C: - nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, a.brief.SetName) + nm, err := backup.GetRSNominees(ctx, a.ccrsConn, bcp, a.brief.SetName) if err != nil { if errors.Is(err, errors.ErrNotFound) { continue @@ -344,7 +344,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) { // false is returned in case a single active lock exists or error happens. // true means that there's no active locks. func (a *Agent) startBcpLockCheck(ctx context.Context) (bool, error) { - locks, err := lock.GetLocks(ctx, a.leadConn, &lock.LockHeader{}) + locks, err := lock.GetLocks(ctx, a.ccrsConn, &lock.LockHeader{}) if err != nil { return false, errors.Wrap(err, "get all locks for backup start") } diff --git a/cmd/pbm-agent/main.go b/cmd/pbm-agent/main.go index f5f5dedae..5dc27630d 100644 --- a/cmd/pbm-agent/main.go +++ b/cmd/pbm-agent/main.go @@ -21,7 +21,10 @@ import ( "github.com/percona/percona-backup-mongodb/pbm/version" ) -const mongoConnFlag = "mongodb-uri" +const ( + mongoConnFlag = "mongodb-uri" + ccrsConnFlag = "ccrs-uri" +) func main() { rootCmd := rootCommand() @@ -45,6 +48,9 @@ func rootCommand() *cobra.Command { }, Run: func(cmd *cobra.Command, args []string) { url := "mongodb://" + strings.Replace(viper.GetString(mongoConnFlag), "mongodb://", "", 1) + ccrsURL := viper.GetString(ccrsConnFlag) + + fmt.Println("--- ccrsURL:", ccrsURL) hidecreds() @@ -52,7 +58,7 @@ func rootCommand() *cobra.Command { l := log.NewWithOpts(nil, "", "", logOpts).NewDefaultEvent() - err := runAgent(url, viper.GetInt("backup.dump-parallel-collections"), logOpts) + err := runAgent(url, ccrsURL, viper.GetInt("backup.dump-parallel-collections"), logOpts) if err != nil { l.Error("Exit: %v", err) os.Exit(1) @@ -102,6 +108,10 @@ func setRootFlags(rootCmd *cobra.Command) { _ = viper.BindPFlag(mongoConnFlag, rootCmd.Flags().Lookup(mongoConnFlag)) _ = viper.BindEnv(mongoConnFlag, "PBM_MONGODB_URI") + rootCmd.Flags().String(ccrsConnFlag, "", "Control Collection RS connection string") + _ = viper.BindPFlag(ccrsConnFlag, rootCmd.Flags().Lookup(ccrsConnFlag)) + _ = viper.BindEnv(ccrsConnFlag, "PBM_CCRS_URI") + rootCmd.Flags().Int("dump-parallel-collections", 0, "Number of collections to dump in parallel") _ = viper.BindPFlag("backup.dump-parallel-collections", rootCmd.Flags().Lookup("dump-parallel-collections")) _ = viper.BindEnv("backup.dump-parallel-collections", "PBM_DUMP_PARALLEL_COLLECTIONS") @@ -182,6 +192,7 @@ func buildLogOpts() *log.Opts { func runAgent( mongoURI string, + ccrsURI string, dumpConns int, logOpts *log.Opts, ) error { @@ -193,18 +204,26 @@ func runAgent( return errors.Wrap(err, "connect to PBM") } - err = setupNewDB(ctx, leadConn) + ccrsConn := leadConn + if ccrsURI != "" && ccrsURI != mongoURI { + ccrsConn, err = connect.Connect(ctx, ccrsURI, "pbm-agent-ccrs") + if err != nil { + return errors.Wrap(err, "connect to CCRS") + } + } + + err = setupNewDB(ctx, ccrsConn) if err != nil { return errors.Wrap(err, "setup pbm collections") } - agent, err := newAgent(ctx, leadConn, mongoURI, dumpConns) + agent, err := newAgent(ctx, leadConn, ccrsConn, mongoURI, dumpConns) if err != nil { return errors.Wrap(err, "connect to the node") } logger := log.NewWithOpts( - agent.leadConn, + agent.ccrsConn, agent.brief.SetName, agent.brief.Me, logOpts) diff --git a/cmd/pbm-agent/oplog.go b/cmd/pbm-agent/oplog.go index ff48d45e2..434df953f 100644 --- a/cmd/pbm-agent/oplog.go +++ b/cmd/pbm-agent/oplog.go @@ -77,7 +77,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP } l.Info("oplog replay started") - rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1) + rr := restore.New(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1) err = rr.ReplayOplog(ctx, r, opID, l) if err != nil { if errors.Is(err, restore.ErrNoDataForShard) { diff --git a/cmd/pbm-agent/pitr.go b/cmd/pbm-agent/pitr.go index b92627ebe..a2f4804a2 100644 --- a/cmd/pbm-agent/pitr.go +++ b/cmd/pbm-agent/pitr.go @@ -122,7 +122,7 @@ func (a *Agent) PITR(ctx context.Context) { if err != nil { // we need epoch just to log pitr err with an extra context // so not much care if we get it or not - ep, _ := config.GetEpoch(ctx, a.leadConn) + ep, _ := config.GetEpoch(ctx, a.ccrsConn) l.Error(string(ctrl.CmdPITR), "", "", ep.TS(), "init: %v", err) } @@ -420,7 +420,7 @@ func (a *Agent) leadNomination( return } - candidates, err := topo.ListSteadyAgents(ctx, a.leadConn) + candidates, err := topo.ListSteadyAgents(ctx, a.leadConn, a.ccrsConn) if err != nil { l.Error("get agents list: %v", err) return @@ -530,7 +530,7 @@ func (a *Agent) waitAllOpLockRelease(ctx context.Context) (bool, error) { for { select { case <-tick.C: - running, err := oplog.IsOplogSlicing(ctx, a.leadConn) + running, err := oplog.IsOplogSlicing(ctx, a.leadConn, a.ccrsConn) if err != nil { return false, errors.Wrap(err, "is oplog slicing check") } @@ -664,7 +664,7 @@ func (a *Agent) getPITRClusterAndStaleStatus(ctx context.Context) (oplog.Status, l := log.LogEventFromContext(ctx) isStale := false - meta, err := oplog.GetMeta(ctx, a.leadConn) + meta, err := oplog.GetMeta(ctx, a.ccrsConn) if err != nil { if !errors.Is(err, errors.ErrNotFound) { l.Error("getting metta for reconfig status check: %v", err) diff --git a/cmd/pbm-agent/restore.go b/cmd/pbm-agent/restore.go index 3207703dd..5b15767fb 100644 --- a/cmd/pbm-agent/restore.go +++ b/cmd/pbm-agent/restore.go @@ -50,7 +50,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, var lck *lock.Lock if nodeInfo.IsPrimary { epts := ep.TS() - lck = lock.NewLock(a.leadConn, lock.LockHeader{ + lck = lock.NewLock(a.ccrsConn, lock.LockHeader{ Type: ctrl.CmdRestore, Replset: nodeInfo.SetName, Node: nodeInfo.Me, @@ -79,7 +79,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, } }() - err = config.SetConfigVar(ctx, a.leadConn, "pitr.enabled", "false") + err = config.SetConfigVar(ctx, a.ccrsConn, "pitr.enabled", "false") if err != nil { l.Error("disable oplog slicer: %v", err) } else { @@ -122,9 +122,9 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, l.Info("backup: %s", r.BackupName) // XXX: why is backup searched on storage? - bcp, err = restore.LookupBackupMeta(ctx, a.leadConn, r.BackupName, a.brief.Me) + bcp, err = restore.LookupBackupMeta(ctx, a.ccrsConn, r.BackupName, a.brief.Me) if err != nil { - err1 := addRestoreMetaWithError(ctx, a.leadConn, l, opid, r, nodeInfo.SetName, + err1 := addRestoreMetaWithError(ctx, a.ccrsConn, l, opid, r, nodeInfo.SetName, "define base backup: %v", err) if err1 != nil { l.Error("failed to save meta: %v", err1) @@ -133,7 +133,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, } if !r.OplogTS.IsZero() && bcp.LastWriteTS.Compare(r.OplogTS) >= 0 { - err1 := addRestoreMetaWithError(ctx, a.leadConn, l, opid, r, nodeInfo.SetName, + err1 := addRestoreMetaWithError(ctx, a.ccrsConn, l, opid, r, nodeInfo.SetName, "snapshot's last write is later than the target time. "+ "Try to set an earlier snapshot. Or leave the snapshot empty "+ "so PBM will choose one.") @@ -146,7 +146,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, r.BackupName = bcp.Name } - cfg, err := config.GetConfig(ctx, a.leadConn) + cfg, err := config.GetConfig(ctx, a.ccrsConn) if err != nil { l.Error("get PBM configuration: %v", err) return @@ -164,7 +164,16 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, numParallelColls := getNumParallelCollsConfig(r.NumParallelColls, cfg.Restore) numInsertionWorkersPerCol := getNumInsertionWorkersConfig(r.NumInsertionWorkers, cfg.Restore) - rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol) + rr := restore.New( + a.leadConn, + a.ccrsConn, + a.nodeConn, + a.brief, + cfg, + r.RSMap, + numParallelColls, + numInsertionWorkersPerCol, + ) if r.OplogTS.IsZero() { err = rr.Snapshot(ctx, r, opid, bcp) } else { @@ -193,6 +202,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, rstr, err = restore.NewPhysical( ctx, a.leadConn, + a.ccrsConn, a.nodeConn, nodeInfo, r.RSMap, @@ -216,7 +226,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID, } if bcpType == defs.LogicalBackup && nodeInfo.IsLeader() { - epch, err := config.ResetEpoch(ctx, a.leadConn) + epch, err := config.ResetEpoch(ctx, a.ccrsConn) if err != nil { l.Error("reset epoch: %v", err) } diff --git a/cmd/pbm-agent/resync.go b/cmd/pbm-agent/resync.go index 61e1e297b..d99bfd46b 100644 --- a/cmd/pbm-agent/resync.go +++ b/cmd/pbm-agent/resync.go @@ -135,12 +135,12 @@ func (a *Agent) helpSyncProfileBackups(ctx context.Context, profile *config.Conf } func (a *Agent) handleSyncMainStorage(ctx context.Context, includeRestores bool) error { - cfg, err := config.GetConfig(ctx, a.leadConn) + cfg, err := config.GetConfig(ctx, a.ccrsConn) if err != nil { return errors.Wrap(err, "get config") } - err = resync.Resync(ctx, a.leadConn, &cfg.Storage, a.brief.Me, includeRestores) + err = resync.Resync(ctx, a.ccrsConn, &cfg.Storage, a.brief.Me, includeRestores) if err != nil { return errors.Wrap(err, "resync") } diff --git a/cmd/pbm/backup.go b/cmd/pbm/backup.go index 1583ca864..b5be8a22f 100644 --- a/cmd/pbm/backup.go +++ b/cmd/pbm/backup.go @@ -85,6 +85,7 @@ type descBcp struct { func runBackup( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, pbm *sdk.Client, b *backupOpts, outf outFormat, @@ -101,7 +102,7 @@ func runBackup( return nil, errors.New("--ns flag is only allowed for logical backup") } - if err := topo.CheckTopoForBackup(ctx, conn, defs.BackupType(b.typ)); err != nil { + if err := topo.CheckTopoForBackup(ctx, conn, ccrsConn, defs.BackupType(b.typ)); err != nil { return nil, errors.Wrap(err, "backup pre-check") } @@ -109,7 +110,7 @@ func runBackup( return nil, err } - cfg, err := config.GetProfiledConfig(ctx, conn, b.profile) + cfg, err := config.GetProfiledConfig(ctx, ccrsConn, b.profile) if err != nil { if errors.Is(err, config.ErrMissedConfig) { return nil, errors.New("no config set. Set config with ") @@ -130,7 +131,7 @@ func runBackup( level = &b.compressionLevel[0] } - err = sendCmd(ctx, conn, ctrl.Cmd{ + err = sendCmd(ctx, ccrsConn, ctrl.Cmd{ Cmd: ctrl.CmdBackup, Backup: &ctrl.BackupCmd{ Type: defs.BackupType(b.typ), @@ -155,13 +156,13 @@ func runBackup( } startCtx, cancel := context.WithTimeout(ctx, cfg.Backup.Timeouts.StartingStatus()) defer cancel() - err = waitForBcpStatus(startCtx, conn, b.name, showProgress) + err = waitForBcpStatus(startCtx, ccrsConn, b.name, showProgress) if err != nil { return nil, err } if b.typ == string(defs.ExternalBackup) { - s, err := waitBackup(ctx, conn, b.name, defs.StatusCopyReady, showProgress) + s, err := waitBackup(ctx, ccrsConn, b.name, defs.StatusCopyReady, showProgress) if err != nil { return nil, errors.Wrap(err, "waiting for the `copyReady` status") } @@ -198,7 +199,7 @@ func runBackup( if showProgress { fmt.Printf("\nWaiting for '%s' backup...", b.name) } - s, err := waitBackup(ctx, conn, b.name, defs.StatusDone, showProgress) + s, err := waitBackup(ctx, ccrsConn, b.name, defs.StatusDone, showProgress) if s != nil && showProgress { fmt.Printf(" %s\n", *s) } diff --git a/cmd/pbm/config.go b/cmd/pbm/config.go index 237609c07..b99ef8ecd 100644 --- a/cmd/pbm/config.go +++ b/cmd/pbm/config.go @@ -51,6 +51,7 @@ func (c confVals) String() string { func runConfig( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, pbm *sdk.Client, c *configOpts, ) (fmt.Stringer, error) { @@ -65,7 +66,7 @@ func runConfig( var o confVals rsnc := false for k, v := range c.set { - err := config.SetConfigVar(ctx, conn, k, v) + err := config.SetConfigVar(ctx, ccrsConn, k, v) if err != nil { return nil, errors.Wrapf(err, "set %s", k) } @@ -90,7 +91,7 @@ func runConfig( } return o, nil case len(c.key) > 0: - k, err := config.GetConfigVar(ctx, conn, c.key) + k, err := config.GetConfigVar(ctx, ccrsConn, c.key) if err != nil { if errors.Is(err, config.ErrUnsetConfigPath) { return confKV{c.key, ""}, nil // unset config path @@ -134,7 +135,8 @@ func runConfig( oldCfg = &config.Config{} } - if err := config.SetConfig(ctx, conn, newCfg); err != nil { + // TODO: determine if both conn + if err := config.SetConfig(ctx, ccrsConn, newCfg); err != nil { return nil, errors.Wrap(err, "unable to set config: write to db") } diff --git a/cmd/pbm/main.go b/cmd/pbm/main.go index 886456e26..f763d118e 100644 --- a/cmd/pbm/main.go +++ b/cmd/pbm/main.go @@ -32,6 +32,7 @@ const ( const ( mongoConnFlag = "mongodb-uri" + ccrsConnFlag = "ccrs-uri" RSMappingEnvVar = "PBM_REPLSET_REMAPPING" RSMappingFlag = "replset-remapping" RSMappingDoc = "re-map replset names for backups/oplog (e.g. to_name_1=from_name_1,to_name_2=from_name_2)" @@ -63,13 +64,15 @@ type cliResult interface { type pbmApp struct { rootCmd *cobra.Command - ctx context.Context - cancel context.CancelFunc - pbmOutF outFormat - mURL string - conn connect.Client - pbm *sdk.Client - node string + ctx context.Context + cancel context.CancelFunc + pbmOutF outFormat + mURL string + ccrsURI string + conn connect.Client + ccrsConn connect.Client + pbm *sdk.Client + node string } func main() { @@ -128,6 +131,14 @@ func newPbmApp() *pbmApp { _ = viper.BindPFlag(mongoConnFlag, app.rootCmd.PersistentFlags().Lookup(mongoConnFlag)) _ = viper.BindEnv(mongoConnFlag, "PBM_MONGODB_URI") + app.rootCmd.PersistentFlags().String( + ccrsConnFlag, + "", + "Control Collection RS connection string (Default = PBM_CCRS_URI environment variable)", + ) + _ = viper.BindPFlag(ccrsConnFlag, app.rootCmd.PersistentFlags().Lookup(ccrsConnFlag)) + _ = viper.BindEnv(ccrsConnFlag, "PBM_CCRS_URI") + app.rootCmd.PersistentFlags().StringP("out", "o", string(outText), "Output format /") _ = viper.BindPFlag("out", app.rootCmd.PersistentFlags().Lookup("out")) @@ -170,6 +181,8 @@ func (app *pbmApp) persistentPreRun(cmd *cobra.Command, args []string) error { ) } + app.ccrsURI = viper.GetString(ccrsConnFlag) + if viper.GetString("describe-restore.config") != "" || viper.GetString("restore-finish.config") != "" { return nil } @@ -179,6 +192,7 @@ func (app *pbmApp) persistentPreRun(cmd *cobra.Command, args []string) error { if err != nil { exitErr(errors.Wrap(err, "connect to mongodb"), app.pbmOutF) } + app.ctx = log.SetLoggerToContext(app.ctx, log.New(app.conn, "", "")) ver, err := version.GetMongoVersion(app.ctx, app.conn.MongoClient()) @@ -190,11 +204,21 @@ func (app *pbmApp) persistentPreRun(cmd *cobra.Command, args []string) error { fmt.Fprintf(os.Stderr, "WARNING: %v\n", err) } - app.pbm, err = sdk.NewClient(app.ctx, app.mURL) + app.pbm, err = sdk.NewClient(app.ctx, app.mURL, app.ccrsURI) if err != nil { exitErr(errors.Wrap(err, "init sdk"), app.pbmOutF) } + if app.ccrsURI == "" || app.ccrsURI == app.mURL { + app.ccrsConn = app.conn + app.ccrsURI = app.mURL + } else { + app.ccrsConn, err = connect.Connect(app.ctx, app.ccrsURI, "pbm-ctl-ccrs") + if err != nil { + exitErr(errors.Wrap(err, "connect to ccrs"), app.pbmOutF) + } + } + inf, err := topo.GetNodeInfo(app.ctx, app.conn.MongoClient()) if err != nil { exitErr(errors.Wrap(err, "unable to obtain node info"), app.pbmOutF) @@ -263,7 +287,7 @@ func (app *pbmApp) buildBackupCmd() *cobra.Command { } backupOptions.name = time.Now().UTC().Format(time.RFC3339) - return runBackup(app.ctx, app.conn, app.pbm, &backupOptions, app.pbmOutF) + return runBackup(app.ctx, app.conn, app.ccrsConn, app.pbm, &backupOptions, app.pbmOutF) }), } @@ -377,7 +401,7 @@ func (app *pbmApp) buildConfigCmd() *cobra.Command { if len(args) == 1 { cfg.key = args[0] } - return runConfig(app.ctx, app.conn, app.pbm, &cfg) + return runConfig(app.ctx, app.conn, app.ccrsConn, app.pbm, &cfg) }), } @@ -713,7 +737,7 @@ func (app *pbmApp) buildLogCmd() *cobra.Command { return nil, err } - return runLogs(app.ctx, app.conn, &logOptions, app.pbmOutF) + return runLogs(app.ctx, app.ccrsConn, &logOptions, app.pbmOutF) }), } @@ -762,7 +786,7 @@ func (app *pbmApp) buildRestoreCmd() *cobra.Command { val, _ := cmd.Flags().GetBool("allow-partly-done") restoreOptions.allowPartlyDone = &val } - return runRestore(app.ctx, app.conn, app.pbm, &restoreOptions, app.node, app.pbmOutF) + return runRestore(app.ctx, app.conn, app.ccrsConn, app.pbm, &restoreOptions, app.node, app.pbmOutF) }), } @@ -866,7 +890,7 @@ func (app *pbmApp) buildReplayCmd() *cobra.Command { Use: "oplog-replay", Short: "Replay oplog", RunE: app.wrapRunE(func(cmd *cobra.Command, args []string) (fmt.Stringer, error) { - return replayOplog(app.ctx, app.conn, app.pbm, replayOpts, app.node, app.pbmOutF) + return replayOplog(app.ctx, app.conn, app.ccrsConn, app.pbm, replayOpts, app.node, app.pbmOutF) }), } @@ -910,7 +934,7 @@ func (app *pbmApp) buildStatusCmd() *cobra.Command { } } - return status(app.ctx, app.conn, app.pbm, app.mURL, statusOpts, app.pbmOutF == outJSONpretty) + return status(app.ctx, app.conn, app.ccrsConn, app.pbm, app.mURL, statusOpts, app.pbmOutF == outJSONpretty) }), } diff --git a/cmd/pbm/oplog.go b/cmd/pbm/oplog.go index 168f7e135..ea05a8e17 100644 --- a/cmd/pbm/oplog.go +++ b/cmd/pbm/oplog.go @@ -44,6 +44,7 @@ func (r oplogReplayResult) String() string { func replayOplog( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, pbm *sdk.Client, o replayOptions, node string, @@ -106,7 +107,7 @@ func replayOplog( } fmt.Print("Started.\nWaiting to finish") - err = waitRestore(ctx, conn, m, node, defs.StatusDone, 0) + err = waitRestore(ctx, conn, ccrsConn, m, node, defs.StatusDone, 0) if err != nil { if errors.Is(err, context.DeadlineExceeded) { err = errWaitTimeout diff --git a/cmd/pbm/restore.go b/cmd/pbm/restore.go index b370d02a5..148df91d6 100644 --- a/cmd/pbm/restore.go +++ b/cmd/pbm/restore.go @@ -118,6 +118,7 @@ func (r externRestoreRet) String() string { func runRestore( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, pbm *sdk.Client, o *restoreOpts, node string, @@ -164,12 +165,25 @@ func runRestore( } tdiff := time.Now().Unix() - int64(clusterTime.T) - m, err := doRestore(ctx, conn, o, numParallelColls, numInsertionWorkers, nss, o.nsFrom, o.nsTo, rsMap, node, outf) + m, err := doRestore( + ctx, + conn, + ccrsConn, + o, + numParallelColls, + numInsertionWorkers, + nss, + o.nsFrom, + o.nsTo, + rsMap, + node, + outf, + ) if err != nil { return nil, err } if o.extern && outf == outText { - err = waitRestore(ctx, conn, m, node, defs.StatusCopyReady, tdiff) + err = waitRestore(ctx, conn, ccrsConn, m, node, defs.StatusCopyReady, tdiff) if err != nil { return nil, errors.Wrap(err, "waiting for the `copyReady` status") } @@ -195,7 +209,7 @@ func runRestore( typ = " physical restore.\nWaiting to finish" } fmt.Printf("Started%s", typ) - err = waitRestore(ctx, conn, m, node, defs.StatusDone, tdiff) + err = waitRestore(ctx, conn, ccrsConn, m, node, defs.StatusDone, tdiff) if err == nil { return restoreRet{ Name: m.Name, @@ -221,12 +235,13 @@ func runRestore( func waitRestore( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, m *restore.RestoreMeta, node string, status defs.Status, tskew int64, ) error { - ep, _ := config.GetEpoch(ctx, conn) + ep, _ := config.GetEpoch(ctx, ccrsConn) l := log.FromContext(ctx). NewEvent(string(ctrl.CmdRestore), m.Backup, m.OPID, ep.TS()) stg, err := util.GetStorage(ctx, conn, node, l) @@ -383,6 +398,7 @@ func nsIsTaken( func doRestore( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, o *restoreOpts, numParallelColls *int32, numInsertionWorkers *int32, @@ -393,7 +409,7 @@ func doRestore( node string, outf outFormat, ) (*restore.RestoreMeta, error) { - bcp, bcpType, err := checkBackup(ctx, conn, o, nss, nsFrom, nsTo) + bcp, bcpType, err := checkBackup(ctx, ccrsConn, o, nss, nsFrom, nsTo) if err != nil { return nil, err } @@ -453,7 +469,7 @@ func doRestore( } } - err = sendCmd(ctx, conn, cmd) + err = sendCmd(ctx, ccrsConn, cmd) if err != nil { return nil, errors.Wrap(err, "send command") } @@ -491,10 +507,10 @@ func doRestore( fn = restore.GetRestoreMeta startCtx, cancel = context.WithTimeout(ctx, defs.WaitActionStart) } else { - ep, _ := config.GetEpoch(ctx, conn) + ep, _ := config.GetEpoch(ctx, ccrsConn) l := log.FromContext(ctx).NewEvent(string(ctrl.CmdRestore), bcp, "", ep.TS()) - stg, err := util.GetStorage(ctx, conn, node, l) + stg, err := util.GetStorage(ctx, ccrsConn, node, l) if err != nil { return nil, errors.Wrap(err, "get storage") } @@ -510,7 +526,7 @@ func doRestore( } defer cancel() - return waitForRestoreStatus(startCtx, conn, name, fn) + return waitForRestoreStatus(startCtx, ccrsConn, name, fn) } func runFinishRestore(o descrRestoreOpts, node string) (fmt.Stringer, error) { diff --git a/cmd/pbm/status.go b/cmd/pbm/status.go index ba4d26594..cc8a88777 100644 --- a/cmd/pbm/status.go +++ b/cmd/pbm/status.go @@ -68,14 +68,19 @@ type statusSect struct { Name string longName string Obj fmt.Stringer - f func(ctx context.Context, conn connect.Client) (fmt.Stringer, error) + f func(ctx context.Context, conn, ccrsConn connect.Client) (fmt.Stringer, error) } func (f statusSect) String() string { return fmt.Sprintf("%s\n%s\n", sprinth(f.longName), f.Obj) } -func (o statusOut) set(ctx context.Context, conn connect.Client, sfilter map[string]bool) error { +func (o statusOut) set( + ctx context.Context, + conn connect.Client, + ccrsConn connect.Client, + sfilter map[string]bool, +) error { for _, se := range o.data { if sfilter != nil && !sfilter[se.Name] { se.Obj = nil @@ -83,7 +88,7 @@ func (o statusOut) set(ctx context.Context, conn connect.Client, sfilter map[str } var err error - se.Obj, err = se.f(ctx, conn) + se.Obj, err = se.f(ctx, conn, ccrsConn) if err != nil { return errors.Wrapf(err, "get status of %s", se.Name) } @@ -95,6 +100,7 @@ func (o statusOut) set(ctx context.Context, conn connect.Client, sfilter map[str func status( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, pbm *sdk.Client, curi string, opts statusOptions, @@ -109,21 +115,21 @@ func status( data: []*statusSect{ { "cluster", "Cluster", nil, - func(ctx context.Context, _ connect.Client) (fmt.Stringer, error) { + func(ctx context.Context, _, _ connect.Client) (fmt.Stringer, error) { return clusterStatus(ctx, pbm, cli.RSConfGetter(curi), opts.priority) }, }, {"pitr", "PITR incremental backup", nil, getPitrStatus}, { "running", "Currently running", nil, - func(ctx context.Context, _ connect.Client) (fmt.Stringer, error) { + func(ctx context.Context, _, _ connect.Client) (fmt.Stringer, error) { return getCurrOps(ctx, pbm) }, }, { "backups", "Backups", nil, - func(ctx context.Context, conn connect.Client) (fmt.Stringer, error) { - return getStorageStat(ctx, conn, pbm, rsMap) + func(ctx context.Context, conn, ccrsConn connect.Client) (fmt.Stringer, error) { + return getStorageStat(ctx, conn, ccrsConn, pbm, rsMap) }, }, }, @@ -138,7 +144,7 @@ func status( } } - err = out.set(ctx, conn, sfilter) + err = out.set(ctx, conn, ccrsConn, sfilter) return out, err } @@ -281,33 +287,33 @@ func (p pitrStat) String() string { return s } -func getPitrStatus(ctx context.Context, conn connect.Client) (fmt.Stringer, error) { +func getPitrStatus(ctx context.Context, conn, ccrsConn connect.Client) (fmt.Stringer, error) { var p pitrStat var err error - p.InConf, _, err = config.IsPITREnabled(ctx, conn) + p.InConf, _, err = config.IsPITREnabled(ctx, ccrsConn) if err != nil { return p, errors.Wrap(err, "unable check PITR config status") } - p.Running, err = oplog.IsOplogSlicing(ctx, conn) + p.Running, err = oplog.IsOplogSlicing(ctx, conn, ccrsConn) if err != nil { return p, errors.Wrap(err, "unable check PITR running status") } if p.InConf && p.Running { - p.RunningNodes, err = oplog.GetAgentsWithACK(ctx, conn) + p.RunningNodes, err = oplog.GetAgentsWithACK(ctx, ccrsConn) if err != nil && !errors.Is(err, errors.ErrNotFound) { return p, errors.Wrap(err, "unable to fetch PITR running nodes") } } - p.Err, err = getPitrErr(ctx, conn) + p.Err, err = getPitrErr(ctx, conn, ccrsConn) return p, errors.Wrap(err, "check for errors") } -func getPitrErr(ctx context.Context, conn connect.Client) (string, error) { - epch, err := config.GetEpoch(ctx, conn) +func getPitrErr(ctx context.Context, conn, ccrsConn connect.Client) (string, error) { + epch, err := config.GetEpoch(ctx, ccrsConn) if err != nil { return "", errors.Wrap(err, "get current epoch") } @@ -321,7 +327,7 @@ func getPitrErr(ctx context.Context, conn connect.Client) (string, error) { LOOP: for _, s := range shards { l, err := log.LogGetExactSeverity(ctx, - conn, + ccrsConn, &log.LogRequest{ LogKeys: log.LogKeys{ Severity: log.Error, @@ -341,7 +347,7 @@ LOOP: // check if some node in the RS had successfully restarted slicing nl, err := log.LogGetExactSeverity(ctx, - conn, + ccrsConn, &log.LogRequest{ LogKeys: log.LogKeys{ Severity: log.Debug, @@ -529,12 +535,13 @@ func (s storageStat) String() string { func getStorageStat( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, pbm *sdk.Client, rsMap map[string]string, ) (fmt.Stringer, error) { var s storageStat - cfg, err := config.GetConfig(ctx, conn) + cfg, err := config.GetConfig(ctx, ccrsConn) if err != nil { return s, errors.Wrap(err, "get config") } @@ -573,7 +580,7 @@ func getStorageStat( // which the `confsrv` param in `bcpMatchCluster` is all about bcpsMatchCluster(bcps, ver.VersionString, fcv, shards, inf.SetName, rsMap) - stg, err := util.GetStorage(ctx, conn, inf.Me, + stg, err := util.GetStorage(ctx, ccrsConn, inf.Me, log.FromContext(ctx).NewEvent("", "", "", primitive.Timestamp{})) if err != nil { return s, errors.Wrap(err, "get storage") @@ -633,7 +640,7 @@ func getStorageStat( s.Snapshot = append(s.Snapshot, snpsht) } - s.PITR, err = getPITRranges(ctx, conn, bcps, rsMap) + s.PITR, err = getPITRranges(ctx, conn, ccrsConn, bcps, rsMap) if err != nil { return s, errors.Wrap(err, "get PITR chunks") } @@ -644,6 +651,7 @@ func getStorageStat( func getPITRranges( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, bcps []backup.BackupMeta, rsMap map[string]string, ) (*pitrRanges, error) { @@ -661,7 +669,7 @@ func getPITRranges( var size int64 var rstlines [][]oplog.Timeline for _, s := range shards { - tlns, err := oplog.PITRGetValidTimelines(ctx, conn, mapRevRS(s.RS), now) + tlns, err := oplog.PITRGetValidTimelines(ctx, ccrsConn, mapRevRS(s.RS), now) if err != nil { return nil, errors.Wrapf(err, "get PITR timelines for %s replset: %s", s.RS, err) } diff --git a/pbm/backup/backup.go b/pbm/backup/backup.go index a4162af9b..6b16cdd9d 100644 --- a/pbm/backup/backup.go +++ b/pbm/backup/backup.go @@ -25,6 +25,7 @@ import ( type Backup struct { leadConn connect.Client + ccrsConn connect.Client nodeConn *mongo.Client brief topo.NodeBrief config *config.Config @@ -36,9 +37,10 @@ type Backup struct { oplogSlicerInterval time.Duration } -func New(leadConn connect.Client, conn *mongo.Client, brief topo.NodeBrief, dumpConns int) *Backup { +func New(leadConn, ccrsConn connect.Client, conn *mongo.Client, brief topo.NodeBrief, dumpConns int) *Backup { return &Backup{ leadConn: leadConn, + ccrsConn: ccrsConn, nodeConn: conn, brief: brief, typ: defs.LogicalBackup, @@ -46,27 +48,30 @@ func New(leadConn connect.Client, conn *mongo.Client, brief topo.NodeBrief, dump } } -func NewPhysical(leadConn connect.Client, conn *mongo.Client, brief topo.NodeBrief) *Backup { +func NewPhysical(leadConn, ccrsConn connect.Client, conn *mongo.Client, brief topo.NodeBrief) *Backup { return &Backup{ leadConn: leadConn, + ccrsConn: ccrsConn, nodeConn: conn, brief: brief, typ: defs.PhysicalBackup, } } -func NewExternal(leadConn connect.Client, conn *mongo.Client, brief topo.NodeBrief) *Backup { +func NewExternal(leadConn, ccrsConn connect.Client, conn *mongo.Client, brief topo.NodeBrief) *Backup { return &Backup{ leadConn: leadConn, + ccrsConn: ccrsConn, nodeConn: conn, brief: brief, typ: defs.ExternalBackup, } } -func NewIncremental(leadConn connect.Client, conn *mongo.Client, brief topo.NodeBrief, base bool) *Backup { +func NewIncremental(leadConn, ccrsConn connect.Client, conn *mongo.Client, brief topo.NodeBrief, base bool) *Backup { return &Backup{ leadConn: leadConn, + ccrsConn: ccrsConn, nodeConn: conn, brief: brief, typ: defs.IncrementalBackup, @@ -158,7 +163,7 @@ func (b *Backup) Init( } } - return saveBackupMeta(ctx, b.leadConn, meta) + return saveBackupMeta(ctx, b.ccrsConn, meta) } // Run runs backup. @@ -202,7 +207,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l return errors.Wrap(err, "unable to get PBM storage configuration settings") } - bcpm, err := NewDBManager(b.leadConn).GetBackupByName(ctx, bcp.Name) + bcpm, err := NewDBManager(b.ccrsConn).GetBackupByName(ctx, bcp.Name) if err != nil { return errors.Wrap(err, "balancer status, get backup meta") } @@ -215,11 +220,11 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l status = defs.StatusCancelled } - ferr := ChangeRSState(b.leadConn, bcp.Name, rsMeta.Name, status, err.Error()) + ferr := ChangeRSState(b.ccrsConn, bcp.Name, rsMeta.Name, status, err.Error()) l.Info("mark RS as %s `%v`: %v", status, err, ferr) if inf.IsLeader() { - ferr := ChangeBackupState(b.leadConn, bcp.Name, status, err.Error()) + ferr := ChangeBackupState(b.ccrsConn, bcp.Name, status, err.Error()) l.Info("mark backup as %s `%v`: %v", status, err, ferr) } } @@ -246,7 +251,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l hbstop := make(chan struct{}) defer close(hbstop) - err := BackupHB(ctx, b.leadConn, bcp.Name) + err := BackupHB(ctx, b.leadConn, b.ccrsConn, bcp.Name) if err != nil { return errors.Wrap(err, "init heartbeat") } @@ -260,7 +265,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l case <-ctx.Done(): return case <-tk.C: - err = BackupHB(ctx, b.leadConn, bcp.Name) + err = BackupHB(ctx, b.leadConn, b.ccrsConn, bcp.Name) if err != nil { l.Error("send pbm heartbeat: %v", err) } @@ -333,7 +338,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l return err } - err = ChangeRSState(b.leadConn, bcp.Name, rsMeta.Name, defs.StatusDone, "") + err = ChangeRSState(b.ccrsConn, bcp.Name, rsMeta.Name, defs.StatusDone, "") if err != nil { return errors.Wrap(err, "set shard's StatusDone") } @@ -350,7 +355,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l return err } - bcpm, err = NewDBManager(b.leadConn).GetBackupByName(ctx, bcp.Name) + bcpm, err = NewDBManager(b.ccrsConn).GetBackupByName(ctx, bcp.Name) if err != nil { return errors.Wrap(err, "get backup metadata") } @@ -374,7 +379,7 @@ func (b *Backup) Run(ctx context.Context, bcp *ctrl.BackupCmd, opid ctrl.OPID, l return errors.Wrap(err, "check backup files") } - err = ChangeBackupStateWithUnixTime(ctx, b.leadConn, bcp.Name, defs.StatusDone, unix, "") + err = ChangeBackupStateWithUnixTime(ctx, b.ccrsConn, bcp.Name, defs.StatusDone, unix, "") return errors.Wrapf(err, "check cluster for backup done: update backup meta with %s", defs.StatusDone) } else { @@ -391,7 +396,7 @@ func (b *Backup) toState( inf *topo.NodeInfo, wait *time.Duration, ) error { - err := ChangeRSState(b.leadConn, bcp, inf.SetName, status, "") + err := ChangeRSState(b.ccrsConn, bcp, inf.SetName, status, "") if err != nil { return errors.Wrap(err, "set shard's status") } @@ -436,7 +441,7 @@ func (b *Backup) reconcileStatus( return err } - err = ChangeBackupState(b.leadConn, bcpName, status, "") + err = ChangeBackupState(b.ccrsConn, bcpName, status, "") return errors.Wrapf(err, "update backup meta with %s", status) } @@ -509,7 +514,7 @@ func (b *Backup) converged( status defs.Status, ) (bool, error) { shardsToFinish := len(shards) - bmeta, err := NewDBManager(b.leadConn).GetBackupByName(ctx, bcpName) + bmeta, err := NewDBManager(b.ccrsConn).GetBackupByName(ctx, bcpName) if err != nil { return false, errors.Wrap(err, "get backup metadata") } @@ -523,7 +528,7 @@ func (b *Backup) converged( for _, shard := range bmeta.Replsets { if shard.Name == sh.RS { // check if node alive - lck, err := lock.GetLockData(ctx, b.leadConn, &lock.LockHeader{ + lck, err := lock.GetLockData(ctx, b.ccrsConn, &lock.LockHeader{ Type: ctrl.CmdBackup, OPID: opid, Replset: shard.Name, @@ -576,7 +581,7 @@ func (b *Backup) waitForStatus( for { select { case <-tk.C: - bmeta, err := NewDBManager(b.leadConn).GetBackupByName(ctx, bcpName) + bmeta, err := NewDBManager(b.ccrsConn).GetBackupByName(ctx, bcpName) if errors.Is(err, errors.ErrNotFound) { continue } @@ -620,7 +625,7 @@ func (b *Backup) waitForFirstLastWrite( for { select { case <-tk.C: - bmeta, err := NewDBManager(b.leadConn).GetBackupByName(ctx, bcpName) + bmeta, err := NewDBManager(b.ccrsConn).GetBackupByName(ctx, bcpName) if err != nil { return first, last, errors.Wrap(err, "get backup metadata") } @@ -656,7 +661,7 @@ func writeMeta(stg storage.Storage, meta *BackupMeta) error { func (b *Backup) setClusterFirstWrite(ctx context.Context, bcpName string) error { var err error var bcp *BackupMeta - dbManager := NewDBManager(b.leadConn) + dbManager := NewDBManager(b.ccrsConn) // make sure all replset has the first write ts for { @@ -682,27 +687,28 @@ func (b *Backup) setClusterFirstWrite(ctx context.Context, bcpName string) error } } - err = SetFirstWrite(ctx, b.leadConn, bcpName, fw) + err = SetFirstWrite(ctx, b.ccrsConn, bcpName, fw) return errors.Wrap(err, "set timestamp") } func (b *Backup) setClusterLastWrite(ctx context.Context, bcpName string) error { - return setClusterLastWriteImpl(ctx, b.leadConn, primitive.Timestamp.Before, bcpName) + return setClusterLastWriteImpl(ctx, b.leadConn, b.ccrsConn, primitive.Timestamp.Before, bcpName) } func (b *Backup) setClusterLastWriteForPhysical(ctx context.Context, bcpName string) error { - return setClusterLastWriteImpl(ctx, b.leadConn, primitive.Timestamp.After, bcpName) + return setClusterLastWriteImpl(ctx, b.leadConn, b.ccrsConn, primitive.Timestamp.After, bcpName) } func setClusterLastWriteImpl( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, cmp func(a, b primitive.Timestamp) bool, bcpName string, ) error { var err error var bcp *BackupMeta - dbManager := NewDBManager(conn) + dbManager := NewDBManager(ccrsConn) // make sure all replset has the last write ts for { @@ -724,7 +730,7 @@ func setClusterLastWriteImpl( return errors.Wrap(err, "read cluster time") } - locks, err := lock.GetLocks(ctx, conn, &lock.LockHeader{ + locks, err := lock.GetLocks(ctx, ccrsConn, &lock.LockHeader{ Type: ctrl.CmdBackup, OPID: bcp.OPID, }) @@ -758,7 +764,7 @@ func setClusterLastWriteImpl( } } - err = SetLastWrite(ctx, conn, bcpName, lw) + err = SetLastWrite(ctx, ccrsConn, bcpName, lw) return errors.Wrap(err, "set timestamp") } diff --git a/pbm/backup/logical.go b/pbm/backup/logical.go index 7821e725d..8cbcb5b59 100644 --- a/pbm/backup/logical.go +++ b/pbm/backup/logical.go @@ -55,7 +55,7 @@ func (b *Backup) doLogical( rsMeta.Status = defs.StatusRunning rsMeta.OplogName = path.Join(bcp.Name, rsMeta.Name, "oplog") rsMeta.DumpName = path.Join(bcp.Name, rsMeta.Name, archive.MetaFile) - err = AddRSMeta(ctx, b.leadConn, bcp.Name, *rsMeta) + err = AddRSMeta(ctx, b.ccrsConn, bcp.Name, *rsMeta) if err != nil { return errors.Wrap(err, "add shard's metadata") } @@ -202,7 +202,7 @@ func (b *Backup) doLogical( l.Info("dump finished, waiting for the oplog") - err = ChangeRSState(b.leadConn, bcp.Name, rsMeta.Name, defs.StatusDumpDone, "") + err = ChangeRSState(b.ccrsConn, bcp.Name, rsMeta.Name, defs.StatusDumpDone, "") if err != nil { return errors.Wrap(err, "set shard's StatusDumpDone") } @@ -224,7 +224,7 @@ func (b *Backup) doLogical( return errors.Wrap(err, "oplog") } - err = SetRSLastWrite(b.leadConn, bcp.Name, rsMeta.Name, lastSavedTS) + err = SetRSLastWrite(b.ccrsConn, bcp.Name, rsMeta.Name, lastSavedTS) if err != nil { return errors.Wrap(err, "set shard's last write ts") } @@ -236,7 +236,7 @@ func (b *Backup) doLogical( } } - err = IncBackupSize(ctx, b.leadConn, bcp.Name, snapshotSize+oplogSize, nil) + err = IncBackupSize(ctx, b.ccrsConn, bcp.Name, snapshotSize+oplogSize, nil) if err != nil { return errors.Wrap(err, "inc backup size") } diff --git a/pbm/backup/physical.go b/pbm/backup/physical.go index 11d816a2f..983f3028b 100644 --- a/pbm/backup/physical.go +++ b/pbm/backup/physical.go @@ -216,7 +216,7 @@ func (b *Backup) doPhysical( {"incrementalBackup", true}, } if !b.incrBase { - src, err := LastIncrementalBackup(ctx, b.leadConn) + src, err := LastIncrementalBackup(ctx, b.ccrsConn) if err != nil { return errors.Wrap(err, "define source backup") } @@ -226,7 +226,7 @@ func (b *Backup) doPhysical( // ? should be done during Init()? if inf.IsLeader() { - err := SetSrcBackup(ctx, b.leadConn, bcp.Name, src.Name) + err := SetSrcBackup(ctx, b.ccrsConn, bcp.Name, src.Name) if err != nil { return errors.Wrap(err, "set source backup in meta") } @@ -306,7 +306,7 @@ func (b *Backup) doPhysical( // custom thisBackupName was used rsMeta.CustomThisID = cursor.CustomThisID } - err = AddRSMeta(ctx, b.leadConn, bcp.Name, *rsMeta) + err = AddRSMeta(ctx, b.ccrsConn, bcp.Name, *rsMeta) if err != nil { return errors.Wrap(err, "add shard's metadata") } @@ -390,7 +390,7 @@ func (b *Backup) handleExternal( // original LastWriteTS in the meta stored on PBM storage. As rsMeta might // be used outside of this method. fsMeta := *rsMeta - bmeta, err := NewDBManager(b.leadConn).GetBackupByName(ctx, bcp.Name) + bmeta, err := NewDBManager(b.ccrsConn).GetBackupByName(ctx, bcp.Name) if err == nil { fsMeta.LastWriteTS = bmeta.LastWriteTS } else { @@ -532,7 +532,7 @@ func (b *Backup) uploadPhysical( totalUncompressed := sizeUncompressed + flSize err = IncBackupSize( ctx, - b.leadConn, + b.ccrsConn, bcp.Name, totalSize, &totalUncompressed, @@ -542,7 +542,7 @@ func (b *Backup) uploadPhysical( } err = SetBackupSizeForRS( ctx, - b.leadConn, + b.ccrsConn, bcp.Name, rsMeta.Name, totalSize, diff --git a/pbm/backup/query.go b/pbm/backup/query.go index 9e9990c2b..f405de81d 100644 --- a/pbm/backup/query.go +++ b/pbm/backup/query.go @@ -135,13 +135,13 @@ func saveBackupMeta(ctx context.Context, conn connect.Client, meta *BackupMeta) return err } -func BackupHB(ctx context.Context, conn connect.Client, bcpName string) error { +func BackupHB(ctx context.Context, conn, ccrsConn connect.Client, bcpName string) error { ts, err := topo.GetClusterTime(ctx, conn) if err != nil { return errors.Wrap(err, "read cluster time") } - _, err = conn.BcpCollection().UpdateOne( + _, err = ccrsConn.BcpCollection().UpdateOne( ctx, bson.D{{"name", bcpName}}, bson.D{ diff --git a/pbm/oplog/oplog.go b/pbm/oplog/oplog.go index 14e6264d8..5f85006cf 100644 --- a/pbm/oplog/oplog.go +++ b/pbm/oplog/oplog.go @@ -77,8 +77,8 @@ func findLastOplogTS(ctx context.Context, m *mongo.Client) (primitive.Timestamp, // IsOplogSlicing checks if PITR slicing is running. It looks for PITR locks // and returns true if there is at least one not stale. -func IsOplogSlicing(ctx context.Context, conn connect.Client) (bool, error) { - locks, err := lock.GetOpLocks(ctx, conn, &lock.LockHeader{Type: ctrl.CmdPITR}) +func IsOplogSlicing(ctx context.Context, conn, ccrsConn connect.Client) (bool, error) { + locks, err := lock.GetOpLocks(ctx, ccrsConn, &lock.LockHeader{Type: ctrl.CmdPITR}) if err != nil { return false, errors.Wrap(err, "get locks") } diff --git a/pbm/restore/logical.go b/pbm/restore/logical.go index d0ea38f08..59f4f9e97 100644 --- a/pbm/restore/logical.go +++ b/pbm/restore/logical.go @@ -41,6 +41,7 @@ import ( type Restore struct { name string leadConn connect.Client + ccrsConn connect.Client nodeConn *mongo.Client brief topo.NodeBrief stopHB chan struct{} @@ -90,6 +91,7 @@ type restoreUsersAndRolesOption bool // New creates a new restore object func New( leadConn connect.Client, + ccrsConn connect.Client, nodeConn *mongo.Client, brief topo.NodeBrief, cfg *config.Config, @@ -103,6 +105,7 @@ func New( return &Restore{ leadConn: leadConn, + ccrsConn: ccrsConn, nodeConn: nodeConn, brief: brief, rsMap: rsMap, @@ -219,7 +222,7 @@ func (r *Restore) Snapshot( cloneNS, cmd.UsersAndRoles) - err = setRestoreBackup(ctx, r.leadConn, r.name, cmd.BackupName, nss) + err = setRestoreBackup(ctx, r.ccrsConn, r.name, cmd.BackupName, nss) if err != nil { return errors.Wrap(err, "set backup name") } @@ -363,7 +366,7 @@ func (r *Restore) PITR( if err != nil { return errors.Wrap(err, "get backup storage") } - r.oplogStg, err = util.GetStorage(ctx, r.leadConn, r.nodeInfo.Me, log.LogEventFromContext(ctx)) + r.oplogStg, err = util.GetStorage(ctx, r.ccrsConn, r.nodeInfo.Me, log.LogEventFromContext(ctx)) if err != nil { return errors.Wrap(err, "get oplog storage") } @@ -385,13 +388,13 @@ func (r *Restore) PITR( cmd.UsersAndRoles) if r.nodeInfo.IsLeader() { - err = SetOplogTimestamps(ctx, r.leadConn, r.name, 0, int64(cmd.OplogTS.T)) + err = SetOplogTimestamps(ctx, r.ccrsConn, r.name, 0, int64(cmd.OplogTS.T)) if err != nil { return errors.Wrap(err, "set PITR timestamp") } } - err = setRestoreBackup(ctx, r.leadConn, r.name, bcp.Name, nss) + err = setRestoreBackup(ctx, r.ccrsConn, r.name, bcp.Name, nss) if err != nil { return errors.Wrap(err, "set backup name") } @@ -515,13 +518,13 @@ func (r *Restore) ReplayOplog(ctx context.Context, cmd *ctrl.ReplayCmd, opid ctr } if r.nodeInfo.IsLeader() { - err := SetOplogTimestamps(ctx, r.leadConn, r.name, int64(cmd.Start.T), int64(cmd.End.T)) + err := SetOplogTimestamps(ctx, r.ccrsConn, r.name, int64(cmd.Start.T), int64(cmd.End.T)) if err != nil { return errors.Wrap(err, "set oplog timestamps") } } - oplogShards, err := oplog.AllOplogRSNames(ctx, r.leadConn, cmd.Start, cmd.End) + oplogShards, err := oplog.AllOplogRSNames(ctx, r.ccrsConn, cmd.Start, cmd.End) if err != nil { return err } @@ -535,7 +538,7 @@ func (r *Restore) ReplayOplog(ctx context.Context, cmd *ctrl.ReplayCmd, opid ctr return r.Done(ctx) // skip. no oplog for current rs } - r.oplogStg, err = util.GetStorage(ctx, r.leadConn, r.nodeInfo.Me, log.LogEventFromContext(ctx)) + r.oplogStg, err = util.GetStorage(ctx, r.ccrsConn, r.nodeInfo.Me, log.LogEventFromContext(ctx)) if err != nil { return errors.Wrapf(err, "get oplog storage") } @@ -595,7 +598,7 @@ func (r *Restore) init(ctx context.Context, name string, opid ctrl.OPID, l log.L Replsets: []RestoreReplset{}, Hb: ts, } - err = SetRestoreMeta(ctx, r.leadConn, meta) + err = SetRestoreMeta(ctx, r.ccrsConn, meta) if err != nil { return errors.Wrap(err, "write backup meta to db") } @@ -608,7 +611,7 @@ func (r *Restore) init(ctx context.Context, name string, opid ctrl.OPID, l log.L for { select { case <-tk.C: - err := RestoreHB(ctx, r.leadConn, r.name) + err := RestoreHB(ctx, r.leadConn, r.ccrsConn, r.name) if err != nil { l.Error("send heartbeat: %v", err) } @@ -633,7 +636,7 @@ func (r *Restore) init(ctx context.Context, name string, opid ctrl.OPID, l log.L Conditions: Conditions{}, } - err = AddRestoreRSMeta(ctx, r.leadConn, r.name, rsMeta) + err = AddRestoreRSMeta(ctx, r.ccrsConn, r.name, rsMeta) if err != nil { return errors.Wrap(err, "add shard's metadata") } @@ -670,7 +673,7 @@ func (r *Restore) checkTopologyForOplog(currShards []topo.Shard, oplogShards []s // is contiguous - there are no gaps), checks for respective files on storage and returns // chunks list if all checks passed func (r *Restore) chunks(ctx context.Context, from, to primitive.Timestamp) ([]oplog.OplogChunk, error) { - return chunks(ctx, r.leadConn, r.oplogStg, from, to, r.nodeInfo.SetName, r.rsMap) + return chunks(ctx, r.ccrsConn, r.oplogStg, from, to, r.nodeInfo.SetName, r.rsMap) } // LookupBackupMeta fetches backup metadata. @@ -859,7 +862,7 @@ func (r *Restore) checkSnapshot(ctx context.Context, bcp *backup.BackupMeta, nss func (r *Restore) toState(ctx context.Context, status defs.Status, wait *time.Duration) error { r.log.Info("moving to state %s", status) - return toState(ctx, r.leadConn, status, r.name, r.nodeInfo, r.reconcileStatus, wait) + return toState(ctx, r.leadConn, r.ccrsConn, status, r.name, r.nodeInfo, r.reconcileStatus, wait) } // dropShardedDBs drop all sharded databases present in the backup. @@ -1346,7 +1349,7 @@ func updateChunksRouterTable(ctx context.Context, m connect.Client, sMap map[str } func (r *Restore) setcommittedTxn(ctx context.Context, txn []phys.RestoreTxn) error { - return RestoreSetRSTxn(ctx, r.leadConn, r.name, r.nodeInfo.SetName, txn) + return RestoreSetRSTxn(ctx, r.ccrsConn, r.name, r.nodeInfo.SetName, txn) } func (r *Restore) getcommittedTxn(ctx context.Context) (map[string]primitive.Timestamp, error) { @@ -1358,11 +1361,12 @@ func (r *Restore) getcommittedTxn(ctx context.Context) (map[string]primitive.Tim } for len(shards) > 0 { - bmeta, err := GetRestoreMeta(ctx, r.leadConn, r.name) + bmeta, err := GetRestoreMeta(ctx, r.ccrsConn, r.name) if err != nil { return nil, errors.Wrap(err, "get restore metadata") } + // needs r.ccrsConn and r.leadConn clusterTime, err := topo.GetClusterTime(ctx, r.leadConn) if err != nil { return nil, errors.Wrap(err, "read cluster time") @@ -1375,7 +1379,7 @@ func (r *Restore) getcommittedTxn(ctx context.Context) (map[string]primitive.Tim continue } // check if node alive - lck, err := lock.GetLockData(ctx, r.leadConn, &lock.LockHeader{ + lck, err := lock.GetLockData(ctx, r.ccrsConn, &lock.LockHeader{ Type: ctrl.CmdRestore, OPID: r.opid, Replset: shard.Name, @@ -1437,13 +1441,13 @@ func (r *Restore) applyOplog(ctx context.Context, ranges []oplogRange, options * tops = append(tops, t.Oplog...) } - err = RestoreSetRSPartTxn(ctx, r.leadConn, r.name, r.nodeInfo.SetName, tops) + err = RestoreSetRSPartTxn(ctx, r.ccrsConn, r.name, r.nodeInfo.SetName, tops) if err != nil { return errors.Wrap(err, "set partial transactions") } } - err = RestoreSetRSStat(ctx, r.leadConn, r.name, r.nodeInfo.SetName, stat) + err = RestoreSetRSStat(ctx, r.ccrsConn, r.name, r.nodeInfo.SetName, stat) if err != nil { r.log.Warning("applyOplog: failed to set stat: %v", err) } @@ -1469,7 +1473,7 @@ func (r *Restore) snapshot(input io.Reader, cloneNS snapshot.CloneNS, excludeRou // Done waits for the replicas to finish the job // and marks restore as done func (r *Restore) Done(ctx context.Context) error { - err := ChangeRestoreRSState(ctx, r.leadConn, r.name, r.nodeInfo.SetName, defs.StatusDone, "") + err := ChangeRestoreRSState(ctx, r.ccrsConn, r.name, r.nodeInfo.SetName, defs.StatusDone, "") if err != nil { return errors.Wrap(err, "set shard's StatusDone") } @@ -1480,7 +1484,7 @@ func (r *Restore) Done(ctx context.Context) error { return errors.Wrap(err, "check cluster for the restore done") } - m, err := GetRestoreMeta(ctx, r.leadConn, r.name) + m, err := GetRestoreMeta(ctx, r.ccrsConn, r.name) if err != nil { return errors.Wrap(err, "update stat: get restore meta") } @@ -1500,7 +1504,7 @@ func (r *Restore) Done(ctx context.Context) error { } } - err = RestoreSetStat(ctx, r.leadConn, r.name, phys.RestoreStat{RS: stat}) + err = RestoreSetStat(ctx, r.ccrsConn, r.name, phys.RestoreStat{RS: stat}) if err != nil { return errors.Wrap(err, "set restore stat") } @@ -1598,24 +1602,24 @@ func (r *Restore) swapUsers(ctx context.Context, exclude *topo.AuthInfo, nss []s func (r *Restore) reconcileStatus(ctx context.Context, status defs.Status, timeout *time.Duration) error { if timeout != nil { - err := convergeClusterWithTimeout(ctx, r.leadConn, r.name, r.opid, r.shards, status, *timeout) + err := convergeClusterWithTimeout(ctx, r.leadConn, r.ccrsConn, r.name, r.opid, r.shards, status, *timeout) return errors.Wrap(err, "convergeClusterWithTimeout") } - err := convergeCluster(ctx, r.leadConn, r.name, r.opid, r.shards, status) + err := convergeCluster(ctx, r.leadConn, r.ccrsConn, r.name, r.opid, r.shards, status) return errors.Wrap(err, "convergeCluster") } func (r *Restore) waitForStatus(ctx context.Context, status defs.Status) error { r.log.Debug("waiting for '%s' status", status) - return waitForStatus(ctx, r.leadConn, r.name, status) + return waitForStatus(ctx, r.leadConn, r.ccrsConn, r.name, status) } // MarkFailed sets the restore and rs state as failed with the given message func (r *Restore) MarkFailed(ctx context.Context, e error) error { - err := ChangeRestoreState(ctx, r.leadConn, r.name, defs.StatusError, e.Error()) + err := ChangeRestoreState(ctx, r.ccrsConn, r.name, defs.StatusError, e.Error()) if err != nil { return errors.Wrap(err, "set restore state") } - err = ChangeRestoreRSState(ctx, r.leadConn, r.name, r.nodeInfo.SetName, defs.StatusError, e.Error()) + err = ChangeRestoreRSState(ctx, r.ccrsConn, r.name, r.nodeInfo.SetName, defs.StatusError, e.Error()) return errors.Wrap(err, "set replset state") } diff --git a/pbm/restore/physical.go b/pbm/restore/physical.go index 5388295a1..ab6904070 100644 --- a/pbm/restore/physical.go +++ b/pbm/restore/physical.go @@ -75,6 +75,7 @@ type files struct { type PhysRestore struct { leadConn connect.Client + ccrsConn connect.Client node *mongo.Client dbpath string // an ephemeral port to restart mongod on during the restore @@ -124,6 +125,7 @@ type PhysRestore struct { func NewPhysical( ctx context.Context, leadConn connect.Client, + ccrsConn connect.Client, node *mongo.Client, inf *topo.NodeInfo, rsMap map[string]string, @@ -185,6 +187,7 @@ func NewPhysical( return &PhysRestore{ leadConn: leadConn, + ccrsConn: ccrsConn, node: node, dbpath: p, rsConf: rcf, @@ -1027,6 +1030,8 @@ func (l *logBuff) Flush() error { // metadata as with logical restores. But later, since mongod being shutdown, // status sync going via storage (see `PhysRestore.toState`) // +// TODO: should ccrs change the behavior instead of storage? +// // Unlike in logical restore, _every_ node of the replicaset is taking part in // a physical restore. In that way, we avoid logical resync between nodes after // the restore. Each node in the cluster does: @@ -1115,7 +1120,7 @@ func (r *PhysRestore) Snapshot( var oplogRanges []oplogRange if !pitr.IsZero() { - chunks, err := chunks(ctx, r.leadConn, r.stg, r.restoreTS, pitr, r.rsConf.ID, r.rsMap) + chunks, err := chunks(ctx, r.ccrsConn, r.stg, r.restoreTS, pitr, r.rsConf.ID, r.rsMap) if err != nil { return err } @@ -2177,7 +2182,7 @@ func (r *PhysRestore) startMongo(opts ...string) error { const hbFrameSec = 60 * 2 func (r *PhysRestore) init(ctx context.Context, name string, opid ctrl.OPID, l log.LogEvent) error { - cfg, err := config.GetConfig(ctx, r.leadConn) + cfg, err := config.GetConfig(ctx, r.ccrsConn) if err != nil { return errors.Wrap(err, "get pbm config") } @@ -2448,7 +2453,7 @@ func (r *PhysRestore) setBcpFiles(ctx context.Context) error { r.log.Debug("get src %s", bcp.SrcBackup) var err error - bcp, err = backup.NewDBManager(r.leadConn).GetBackupByName(ctx, bcp.SrcBackup) + bcp, err = backup.NewDBManager(r.ccrsConn).GetBackupByName(ctx, bcp.SrcBackup) if err != nil { return errors.Wrapf(err, "get source backup") } @@ -2543,7 +2548,7 @@ func getRS(bcp *backup.BackupMeta, rs string) *backup.BackupReplset { func (r *PhysRestore) prepareBackup(ctx context.Context, backupName string) error { var err error - r.bcp, err = backup.NewDBManager(r.leadConn).GetBackupByName(ctx, backupName) + r.bcp, err = backup.NewDBManager(r.ccrsConn).GetBackupByName(ctx, backupName) if errors.Is(err, errors.ErrNotFound) { r.bcp, err = GetMetaFromStore(r.stg, backupName) } @@ -2560,7 +2565,7 @@ func (r *PhysRestore) prepareBackup(ctx context.Context, backupName string) erro return errors.New("snapshot name doesn't set") } - err = setRestoreBackup(ctx, r.leadConn, r.name, r.bcp.Name, nil) + err = setRestoreBackup(ctx, r.ccrsConn, r.name, r.bcp.Name, nil) if err != nil { return errors.Wrap(err, "set backup name") } diff --git a/pbm/restore/query.go b/pbm/restore/query.go index 6a222bb2e..778095d9c 100644 --- a/pbm/restore/query.go +++ b/pbm/restore/query.go @@ -207,13 +207,13 @@ func AddRestoreRSMeta(ctx context.Context, m connect.Client, name string, rs Res return err } -func RestoreHB(ctx context.Context, m connect.Client, name string) error { +func RestoreHB(ctx context.Context, m, ccrsConn connect.Client, name string) error { ts, err := topo.GetClusterTime(ctx, m) if err != nil { return errors.Wrap(err, "read cluster time") } - _, err = m.RestoresCollection().UpdateOne( + _, err = ccrsConn.RestoresCollection().UpdateOne( ctx, bson.D{{"name", name}}, bson.D{ diff --git a/pbm/restore/restore.go b/pbm/restore/restore.go index 574acc6ba..995758a05 100644 --- a/pbm/restore/restore.go +++ b/pbm/restore/restore.go @@ -43,13 +43,14 @@ func GetMetaFromStore(stg storage.Storage, bcpName string) (*backup.BackupMeta, func toState( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, status defs.Status, bcp string, inf *topo.NodeInfo, reconcileFn reconcileStatus, wait *time.Duration, ) error { - err := ChangeRestoreRSState(ctx, conn, bcp, inf.SetName, status, "") + err := ChangeRestoreRSState(ctx, ccrsConn, bcp, inf.SetName, status, "") if err != nil { return errors.Wrap(err, "set shard's status") } @@ -64,7 +65,7 @@ func toState( } } - err = waitForStatus(ctx, conn, bcp, status) + err = waitForStatus(ctx, conn, ccrsConn, bcp, status) if err != nil { return errors.Wrapf(err, "waiting for %s", status) } @@ -78,6 +79,7 @@ type reconcileStatus func(ctx context.Context, status defs.Status, timeout *time func convergeCluster( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, name, opid string, shards []topo.Shard, status defs.Status, @@ -88,7 +90,7 @@ func convergeCluster( for { select { case <-tk.C: - ok, err := converged(ctx, conn, name, opid, shards, status) + ok, err := converged(ctx, conn, ccrsConn, name, opid, shards, status) if err != nil { return err } @@ -108,6 +110,7 @@ var errConvergeTimeOut = errors.New("reached converge timeout") func convergeClusterWithTimeout( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, name, opid string, shards []topo.Shard, @@ -124,7 +127,7 @@ func convergeClusterWithTimeout( select { case <-tk.C: var ok bool - ok, err := converged(ctx, conn, name, opid, shards, status) + ok, err := converged(ctx, conn, ccrsConn, name, opid, shards, status) if err != nil { return err } @@ -142,12 +145,13 @@ func convergeClusterWithTimeout( func converged( ctx context.Context, conn connect.Client, + ccrsConn connect.Client, name, opid string, shards []topo.Shard, status defs.Status, ) (bool, error) { shardsToFinish := len(shards) - bmeta, err := GetRestoreMeta(ctx, conn, name) + bmeta, err := GetRestoreMeta(ctx, ccrsConn, name) if err != nil { return false, errors.Wrap(err, "get backup metadata") } @@ -161,7 +165,7 @@ func converged( for _, shard := range bmeta.Replsets { if shard.Name == sh.RS { // check if node alive - lck, err := lock.GetLockData(ctx, conn, &lock.LockHeader{ + lck, err := lock.GetLockData(ctx, ccrsConn, &lock.LockHeader{ Type: ctrl.CmdRestore, OPID: opid, Replset: shard.Name, @@ -192,7 +196,7 @@ func converged( } if shardsToFinish == 0 { - err := ChangeRestoreState(ctx, conn, name, status, "") + err := ChangeRestoreState(ctx, ccrsConn, name, status, "") if err != nil { return false, errors.Wrapf(err, "update backup meta with %s", status) } @@ -202,14 +206,14 @@ func converged( return false, nil } -func waitForStatus(ctx context.Context, conn connect.Client, name string, status defs.Status) error { +func waitForStatus(ctx context.Context, conn, ccrsConn connect.Client, name string, status defs.Status) error { tk := time.NewTicker(time.Second * 1) defer tk.Stop() for { select { case <-tk.C: - meta, err := GetRestoreMeta(ctx, conn, name) + meta, err := GetRestoreMeta(ctx, ccrsConn, name) if errors.Is(err, errors.ErrNotFound) { continue } diff --git a/pbm/topo/agent.go b/pbm/topo/agent.go index 07a738a17..2a28f9cfd 100644 --- a/pbm/topo/agent.go +++ b/pbm/topo/agent.go @@ -157,7 +157,7 @@ func GetAgentStatus(ctx context.Context, m connect.Client, rs, node string) (Age } // AgentStatusGC cleans up stale agent statuses -func AgentStatusGC(ctx context.Context, m connect.Client) error { +func AgentStatusGC(ctx context.Context, m, ccrsConn connect.Client) error { ct, err := GetClusterTime(ctx, m) if err != nil { return errors.Wrap(err, "get cluster time") @@ -172,7 +172,7 @@ func AgentStatusGC(ctx context.Context, m connect.Client) error { stalesec = 35 } ct.T -= uint32(stalesec) - _, err = m.AgentsStatusCollection().DeleteMany( + _, err = ccrsConn.AgentsStatusCollection().DeleteMany( ctx, bson.M{"hb": bson.M{"$lt": ct}}, ) @@ -181,17 +181,17 @@ func AgentStatusGC(ctx context.Context, m connect.Client) error { } // ListAgentStatuses returns list of registered agents -func ListAgentStatuses(ctx context.Context, m connect.Client) ([]AgentStat, error) { - if err := AgentStatusGC(ctx, m); err != nil { +func ListAgentStatuses(ctx context.Context, m, ccrsConn connect.Client) ([]AgentStat, error) { + if err := AgentStatusGC(ctx, m, ccrsConn); err != nil { return nil, errors.Wrap(err, "remove stale statuses") } - return ListAgents(ctx, m) + return ListAgents(ctx, ccrsConn) } // ListSteadyAgents returns agents which are in steady state for backup or PITR. -func ListSteadyAgents(ctx context.Context, m connect.Client) ([]AgentStat, error) { - agents, err := ListAgentStatuses(ctx, m) +func ListSteadyAgents(ctx context.Context, m, ccrsConn connect.Client) ([]AgentStat, error) { + agents, err := ListAgentStatuses(ctx, m, ccrsConn) if err != nil { return nil, errors.Wrap(err, "listing agents") } diff --git a/pbm/topo/cluster.go b/pbm/topo/cluster.go index eb3e20a1b..341365238 100644 --- a/pbm/topo/cluster.go +++ b/pbm/topo/cluster.go @@ -43,7 +43,7 @@ func GetClusterTime(ctx context.Context, m connect.Client) (primitive.Timestamp, // Make a read to force the cluster timestamp update. // Otherwise, cluster timestamp could remain the same between node info reads, // while in fact time has been moved forward. - err := m.LockCollection().FindOne(ctx, bson.D{}).Err() + err := m.MongoClient().Database("admin").Collection("system.version").FindOne(ctx, bson.D{}).Err() if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return primitive.Timestamp{}, errors.Wrap(err, "void read") } diff --git a/pbm/topo/topo.go b/pbm/topo/topo.go index bc136cab1..b64425073 100644 --- a/pbm/topo/topo.go +++ b/pbm/topo/topo.go @@ -36,7 +36,7 @@ type AuthUserRoles struct { DB string `bson:"db" json:"db"` } -func CheckTopoForBackup(ctx context.Context, m connect.Client, type_ defs.BackupType) error { +func CheckTopoForBackup(ctx context.Context, m, ccrsConn connect.Client, type_ defs.BackupType) error { members, err := ClusterMembers(ctx, m.MongoClient()) if err != nil { return errors.Wrap(err, "get cluster members") @@ -47,7 +47,7 @@ func CheckTopoForBackup(ctx context.Context, m connect.Client, type_ defs.Backup return errors.Wrap(err, "get cluster time") } - agentList, err := ListAgents(ctx, m) + agentList, err := ListAgents(ctx, ccrsConn) if err != nil { return errors.Wrap(err, "list agents") } diff --git a/sdk/impl.go b/sdk/impl.go index 044416a80..00fd8b9a3 100644 --- a/sdk/impl.go +++ b/sdk/impl.go @@ -35,12 +35,22 @@ var ( ) type Client struct { - conn connect.Client - node string + conn connect.Client + ccrsConn connect.Client + node string } func (c *Client) Close(ctx context.Context) error { - return c.conn.Disconnect(ctx) + var firstErr error + if err := c.conn.Disconnect(ctx); err != nil { + firstErr = err + } + if c.ccrsConn != c.conn { + if err := c.ccrsConn.Disconnect(ctx); err != nil { + return err + } + } + return firstErr } func (c *Client) CommandInfo(ctx context.Context, id CommandID) (*Command, error) { @@ -49,7 +59,7 @@ func (c *Client) CommandInfo(ctx context.Context, id CommandID) (*Command, error return nil, ErrInvalidCommandID } - res := c.conn.CmdStreamCollection().FindOne(ctx, bson.D{{"_id", opid.Obj()}}) + res := c.ccrsConn.CmdStreamCollection().FindOne(ctx, bson.D{{"_id", opid.Obj()}}) if err := res.Err(); err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, ErrNotFound @@ -67,19 +77,19 @@ func (c *Client) CommandInfo(ctx context.Context, id CommandID) (*Command, error } func (c *Client) GetConfig(ctx context.Context) (*Config, error) { - return config.GetConfig(ctx, c.conn) + return config.GetConfig(ctx, c.ccrsConn) } func (c *Client) SetConfig(ctx context.Context, cfg Config) (CommandID, error) { - return NoOpID, config.SetConfig(ctx, c.conn, &cfg) + return NoOpID, config.SetConfig(ctx, c.ccrsConn, &cfg) } func (c *Client) GetAllConfigProfiles(ctx context.Context) ([]config.Config, error) { - return config.ListProfiles(ctx, c.conn) + return config.ListProfiles(ctx, c.ccrsConn) } func (c *Client) GetConfigProfile(ctx context.Context, name string) (*config.Config, error) { - profile, err := config.GetProfile(ctx, c.conn, name) + profile, err := config.GetProfile(ctx, c.ccrsConn, name) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { err = config.ErrMissedConfigProfile @@ -91,17 +101,17 @@ func (c *Client) GetConfigProfile(ctx context.Context, name string) (*config.Con } func (c *Client) AddConfigProfile(ctx context.Context, name string, cfg *Config) (CommandID, error) { - opid, err := ctrl.SendAddConfigProfile(ctx, c.conn, name, cfg.Storage) + opid, err := ctrl.SendAddConfigProfile(ctx, c.ccrsConn, name, cfg.Storage) return CommandID(opid.String()), err } func (c *Client) RemoveConfigProfile(ctx context.Context, name string) (CommandID, error) { - opid, err := ctrl.SendRemoveConfigProfile(ctx, c.conn, name) + opid, err := ctrl.SendRemoveConfigProfile(ctx, c.ccrsConn, name) return CommandID(opid.String()), err } func (c *Client) GetAllBackups(ctx context.Context) ([]BackupMetadata, error) { - return backup.BackupsList(ctx, c.conn, 0) + return backup.BackupsList(ctx, c.ccrsConn, 0) } func (c *Client) GetAllRestores( @@ -113,7 +123,7 @@ func (c *Client) GetAllRestores( if limit < 0 { limit = 0 } - return restore.RestoreList(ctx, c.conn, limit) + return restore.RestoreList(ctx, c.ccrsConn, limit) } func (c *Client) GetBackupByName( @@ -121,7 +131,7 @@ func (c *Client) GetBackupByName( name string, options GetBackupByNameOptions, ) (*BackupMetadata, error) { - bcp, err := backup.NewDBManager(c.conn).GetBackupByName(ctx, name) + bcp, err := backup.NewDBManager(c.ccrsConn).GetBackupByName(ctx, name) if err != nil { return nil, errors.Wrap(err, "get backup meta") } @@ -134,7 +144,7 @@ func (c *Client) GetBackupByOpID( opid string, options GetBackupByNameOptions, ) (*BackupMetadata, error) { - bcp, err := backup.NewDBManager(c.conn).GetBackupByOpID(ctx, opid) + bcp, err := backup.NewDBManager(c.ccrsConn).GetBackupByOpID(ctx, opid) if err != nil { return nil, errors.Wrap(err, "get backup meta") } @@ -152,7 +162,7 @@ func (c *Client) getBackupHelper( return nil, ErrNotBaseIncrement } - increments, err := backup.FetchAllIncrements(ctx, c.conn, bcp) + increments, err := backup.FetchAllIncrements(ctx, c.ccrsConn, bcp) if err != nil { return nil, errors.New("get increments") } @@ -255,11 +265,11 @@ func (c *Client) getStorageForRead(ctx context.Context, bcp *backup.BackupMeta) } func (c *Client) GetRestoreByName(ctx context.Context, name string) (*RestoreMetadata, error) { - return restore.GetRestoreMeta(ctx, c.conn, name) + return restore.GetRestoreMeta(ctx, c.ccrsConn, name) } func (c *Client) GetRestoreByOpID(ctx context.Context, opid string) (*RestoreMetadata, error) { - return restore.GetRestoreMetaByOPID(ctx, c.conn, opid) + return restore.GetRestoreMetaByOPID(ctx, c.ccrsConn, opid) } func (c *Client) SyncFromStorage(ctx context.Context, includeRestores bool) (CommandID, error) { @@ -267,7 +277,7 @@ func (c *Client) SyncFromStorage(ctx context.Context, includeRestores bool) (Com if includeRestores { opts = &ctrl.ResyncCmd{IncludeRestores: true} } - opid, err := ctrl.SendResync(ctx, c.conn, opts) + opid, err := ctrl.SendResync(ctx, c.ccrsConn, opts) return CommandID(opid.String()), err } @@ -276,12 +286,12 @@ func (c *Client) SyncFromExternalStorage(ctx context.Context, name string) (Comm return NoOpID, errors.New("name is not provided") } - opid, err := ctrl.SendResync(ctx, c.conn, &ctrl.ResyncCmd{Name: name}) + opid, err := ctrl.SendResync(ctx, c.ccrsConn, &ctrl.ResyncCmd{Name: name}) return CommandID(opid.String()), err } func (c *Client) SyncFromAllExternalStorages(ctx context.Context) (CommandID, error) { - opid, err := ctrl.SendResync(ctx, c.conn, &ctrl.ResyncCmd{All: true}) + opid, err := ctrl.SendResync(ctx, c.ccrsConn, &ctrl.ResyncCmd{All: true}) return CommandID(opid.String()), err } @@ -290,12 +300,12 @@ func (c *Client) ClearSyncFromExternalStorage(ctx context.Context, name string) return NoOpID, errors.New("name is not provided") } - opid, err := ctrl.SendResync(ctx, c.conn, &ctrl.ResyncCmd{Name: name, Clear: true}) + opid, err := ctrl.SendResync(ctx, c.ccrsConn, &ctrl.ResyncCmd{Name: name, Clear: true}) return CommandID(opid.String()), err } func (c *Client) ClearSyncFromAllExternalStorages(ctx context.Context) (CommandID, error) { - opid, err := ctrl.SendResync(ctx, c.conn, &ctrl.ResyncCmd{All: true, Clear: true}) + opid, err := ctrl.SendResync(ctx, c.ccrsConn, &ctrl.ResyncCmd{All: true, Clear: true}) return CommandID(opid.String()), err } @@ -314,7 +324,7 @@ func (c *Client) DeleteBackupByName(ctx context.Context, name string) (CommandID return NoOpID, err } - opid, err := ctrl.SendDeleteBackupByName(ctx, c.conn, name) + opid, err := ctrl.SendDeleteBackupByName(ctx, c.ccrsConn, name) return CommandID(opid.String()), err } @@ -323,12 +333,12 @@ func (c *Client) DeleteBackupBefore( beforeTS Timestamp, options DeleteBackupBeforeOptions, ) (CommandID, error) { - opid, err := ctrl.SendDeleteBackupBefore(ctx, c.conn, beforeTS, options.Type) + opid, err := ctrl.SendDeleteBackupBefore(ctx, c.ccrsConn, beforeTS, options.Type) return CommandID(opid.String()), err } func (c *Client) DeleteOplogRange(ctx context.Context, until Timestamp) (CommandID, error) { - opid, err := ctrl.SendDeleteOplogRangeBefore(ctx, c.conn, until) + opid, err := ctrl.SendDeleteOplogRangeBefore(ctx, c.ccrsConn, until) return CommandID(opid.String()), err } @@ -337,12 +347,12 @@ func (c *Client) CleanupReport(ctx context.Context, beforeTS Timestamp) (Cleanup } func (c *Client) RunCleanup(ctx context.Context, beforeTS Timestamp) (CommandID, error) { - opid, err := ctrl.SendCleanup(ctx, c.conn, beforeTS) + opid, err := ctrl.SendCleanup(ctx, c.ccrsConn, beforeTS) return CommandID(opid.String()), err } func (c *Client) CancelBackup(ctx context.Context) (CommandID, error) { - opid, err := ctrl.SendCancelBackup(ctx, c.conn) + opid, err := ctrl.SendCancelBackup(ctx, c.ccrsConn) return CommandID(opid.String()), err } @@ -365,7 +375,7 @@ func (c *Client) Restore(ctx context.Context, backupName string, clusterTS Times var ErrStaleHearbeat = errors.New("stale heartbeat") func (c *Client) OpLocks(ctx context.Context) ([]OpLock, error) { - locks, err := lock.GetLocks(ctx, c.conn, &lock.LockHeader{}) + locks, err := lock.GetLocks(ctx, c.ccrsConn, &lock.LockHeader{}) if err != nil { return nil, errors.Wrap(err, "get locks") } diff --git a/sdk/sdk.go b/sdk/sdk.go index 65ef5db0d..1191b6337 100644 --- a/sdk/sdk.go +++ b/sdk/sdk.go @@ -132,18 +132,26 @@ func (l *OpLock) Err() error { return l.err } -func NewClient(ctx context.Context, uri string) (*Client, error) { +func NewClient(ctx context.Context, uri, ccrsURI string) (*Client, error) { conn, err := connect.Connect(ctx, uri, "sdk") if err != nil { return nil, err } + ccrsConn := conn + if ccrsURI != "" && ccrsURI != uri { + ccrsConn, err = connect.Connect(ctx, ccrsURI, "sdk-ccrs") + if err != nil { + return nil, err + } + } + inf, err := topo.GetNodeInfo(ctx, conn.MongoClient()) if err != nil { return nil, errors.Wrap(err, "get node info") } - return &Client{conn: conn, node: inf.Me}, nil + return &Client{conn: conn, ccrsConn: ccrsConn, node: inf.Me}, nil } func CommandLogCursor(ctx context.Context, c *Client, cid CommandID) (*log.Cursor, error) { @@ -193,7 +201,7 @@ func WaitForDeleteOplogRange(ctx context.Context, client *Client) error { } func WaitForErrorLog(ctx context.Context, client *Client, cmd *Command) (string, error) { - return lastLogErr(ctx, client.conn, cmd.Cmd, cmd.TS) + return lastLogErr(ctx, client.ccrsConn, cmd.Cmd, cmd.TS) } func CanDeleteBackup(ctx context.Context, client *Client, bcp *BackupMetadata) error { diff --git a/sdk/util.go b/sdk/util.go index e9c97eace..d4d78415f 100644 --- a/sdk/util.go +++ b/sdk/util.go @@ -73,7 +73,7 @@ func WaitForResync(ctx context.Context, c *Client, cid CommandID) error { }, } - outC, errC := log.Follow(ctx, c.conn, r, false) + outC, errC := log.Follow(ctx, c.ccrsConn, r, false) for { select {