Skip to content

PBM-1547 Spike PoC for control collection rs #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions cmd/pbm-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

type Agent struct {
leadConn connect.Client
ccrsConn connect.Client
nodeConn *mongo.Client
bcp *currentBackup
pitrjob *currentPitr
Expand All @@ -47,6 +48,7 @@ type Agent struct {
func newAgent(
ctx context.Context,
leadConn connect.Client,
ccrsConn connect.Client,
uri string,
numParallelColls int,
) (*Agent, error) {
Expand All @@ -67,6 +69,7 @@ func newAgent(

a := &Agent{
leadConn: leadConn,
ccrsConn: ccrsConn,
closeCMD: make(chan struct{}),
nodeConn: nodeConn,
brief: topo.NodeBrief{
Expand Down Expand Up @@ -154,8 +157,9 @@ func (a *Agent) Start(ctx context.Context) error {
logger.Printf("conn level ReadConcern: %v; WriteConcern: %v",
a.leadConn.MongoOptions().ReadConcern.Level,
a.leadConn.MongoOptions().WriteConcern.W)
// todo: also log for ccrsConn?

c, cerr := ctrl.ListenCmd(ctx, a.leadConn, a.closeCMD)
c, cerr := ctrl.ListenCmd(ctx, a.ccrsConn, a.closeCMD)

logger.Printf("listening for the commands")

Expand All @@ -169,7 +173,7 @@ func (a *Agent) Start(ctx context.Context) error {

logger.Printf("got command %s, opid: %s", cmd, cmd.OPID)

ep, err := config.GetEpoch(ctx, a.leadConn)
ep, err := config.GetEpoch(ctx, a.ccrsConn)
if err != nil {
logger.Error(string(cmd.Cmd), "", cmd.OPID.String(), ep.TS(), "get epoch: %v", err)
continue
Expand Down Expand Up @@ -210,7 +214,7 @@ func (a *Agent) Start(ctx context.Context) error {
return errors.Wrap(err, "stop listening")
}

ep, _ := config.GetEpoch(ctx, a.leadConn)
ep, _ := config.GetEpoch(ctx, a.ccrsConn)
logger.Error("", "", "", ep.TS(), "listening commands: %v", err)
}
}
Expand Down Expand Up @@ -284,14 +288,14 @@ func (a *Agent) HbStatus(ctx context.Context) {
}

updateAgentStat(ctx, a, l, true, &hb)
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb)
if err != nil {
l.Error("set status: %v", err)
}

defer func() {
l.Debug("deleting agent status")
err := topo.RemoveAgentStatus(context.Background(), a.leadConn, hb)
err := topo.RemoveAgentStatus(context.Background(), a.ccrsConn, hb)
if err != nil {
logger := logger.NewEvent("agentCheckup", "", "", primitive.Timestamp{})
logger.Error("remove agent heartbeat: %v", err)
Expand Down Expand Up @@ -326,13 +330,13 @@ func (a *Agent) HbStatus(ctx context.Context) {

if now.Sub(storageCheckTime) >= storageCheckInterval {
updateAgentStat(ctx, a, l, true, &hb)
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb)
if err == nil {
storageCheckTime = now
}
} else {
updateAgentStat(ctx, a, l, false, &hb)
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb)
}
if err != nil {
l.Error("set status: %v", err)
Expand Down Expand Up @@ -410,7 +414,7 @@ func (a *Agent) warnIfParallelAgentDetected(
l log.LogEvent,
lastHeartbeat primitive.Timestamp,
) {
s, err := topo.GetAgentStatus(ctx, a.leadConn, a.brief.SetName, a.brief.Me)
s, err := topo.GetAgentStatus(ctx, a.ccrsConn, a.brief.SetName, a.brief.Me)
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return
Expand Down Expand Up @@ -456,7 +460,7 @@ func (a *Agent) storStatus(
return topo.SubsysStatus{OK: true}
}

stg, err := util.GetStorage(ctx, a.leadConn, a.brief.Me, log)
stg, err := util.GetStorage(ctx, a.ccrsConn, a.brief.Me, log)
if err != nil {
return topo.SubsysStatus{Err: fmt.Sprintf("unable to get storage: %v", err)}
}
Expand Down
34 changes: 17 additions & 17 deletions cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
go a.sliceNow(opid)
}

cfg, err := config.GetProfiledConfig(ctx, a.leadConn, cmd.Profile)
cfg, err := config.GetProfiledConfig(ctx, a.ccrsConn, cmd.Profile)
if err != nil {
l.Error("get profiled config: %v", err)
return
Expand All @@ -106,19 +106,19 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
var bcp *backup.Backup
switch cmd.Type {
case defs.PhysicalBackup:
bcp = backup.NewPhysical(a.leadConn, a.nodeConn, a.brief)
bcp = backup.NewPhysical(a.leadConn, a.ccrsConn, a.nodeConn, a.brief)
case defs.ExternalBackup:
bcp = backup.NewExternal(a.leadConn, a.nodeConn, a.brief)
bcp = backup.NewExternal(a.leadConn, a.ccrsConn, a.nodeConn, a.brief)
case defs.IncrementalBackup:
bcp = backup.NewIncremental(a.leadConn, a.nodeConn, a.brief, cmd.IncrBase)
bcp = backup.NewIncremental(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, cmd.IncrBase)
case defs.LogicalBackup:
fallthrough
default:
numParallelColls := a.numParallelColls
if cfg.Backup != nil && cfg.Backup.NumParallelCollections > 0 {
numParallelColls = cfg.Backup.NumParallelCollections
}
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, numParallelColls)
bcp = backup.New(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, numParallelColls)
}

bcp.SetConfig(cfg)
Expand All @@ -145,8 +145,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
}
l.Debug("init backup meta")

if err = topo.CheckTopoForBackup(ctx, a.leadConn, cmd.Type); err != nil {
ferr := backup.ChangeBackupState(a.leadConn, cmd.Name, defs.StatusError, err.Error())
if err = topo.CheckTopoForBackup(ctx, a.leadConn, a.ccrsConn, cmd.Type); err != nil {
ferr := backup.ChangeBackupState(a.ccrsConn, cmd.Name, defs.StatusError, err.Error())
l.Info("mark backup as %s `%v`: %v", defs.StatusError, err, ferr)
return
}
Expand All @@ -157,7 +157,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
const srcHostMultiplier = 3.0
var c map[string]float64
if cmd.Type == defs.IncrementalBackup && !cmd.IncrBase {
src, err := backup.LastIncrementalBackup(ctx, a.leadConn)
src, err := backup.LastIncrementalBackup(ctx, a.ccrsConn)
if err != nil {
// try backup anyway
l.Warning("define source backup: %v", err)
Expand All @@ -169,7 +169,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
}
}

agents, err := topo.ListSteadyAgents(ctx, a.leadConn)
agents, err := topo.ListSteadyAgents(ctx, a.leadConn, a.ccrsConn)
if err != nil {
l.Error("get agents list: %v", err)
return
Expand Down Expand Up @@ -204,7 +204,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
}

epoch := ep.TS()
lck := lock.NewLock(a.leadConn, lock.LockHeader{
lck := lock.NewLock(a.ccrsConn, lock.LockHeader{
Type: ctrl.CmdBackup,
Replset: a.brief.SetName,
Node: a.brief.Me,
Expand All @@ -229,7 +229,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
}
}()

err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
err = backup.SetRSNomineeACK(ctx, a.ccrsConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
if err != nil {
l.Warning("set nominee ack: %v", err)
}
Expand Down Expand Up @@ -272,13 +272,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
l := log.LogEventFromContext(ctx)
l.Debug("nomination list for %s: %v", rs, nodes)

err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs)
err := backup.SetRSNomination(ctx, a.ccrsConn, bcp, rs)
if err != nil {
return errors.Wrap(err, "set nomination meta")
}

for _, n := range nodes {
nms, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
nms, err := backup.GetRSNominees(ctx, a.ccrsConn, bcp, rs)
if err != nil && !errors.Is(err, errors.ErrNotFound) {
return errors.Wrap(err, "get nomination meta")
}
Expand All @@ -287,13 +287,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
return nil
}

err = backup.SetRSNominees(ctx, a.leadConn, bcp, rs, n)
err = backup.SetRSNominees(ctx, a.ccrsConn, bcp, rs, n)
if err != nil {
return errors.Wrap(err, "set nominees")
}
l.Debug("nomination %s, set candidates %v", rs, n)

err = backup.BackupHB(ctx, a.leadConn, bcp)
err = backup.BackupHB(ctx, a.leadConn, a.ccrsConn, bcp)
if err != nil {
l.Warning("send heartbeat: %v", err)
}
Expand All @@ -316,7 +316,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) {
for {
select {
case <-tk.C:
nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, a.brief.SetName)
nm, err := backup.GetRSNominees(ctx, a.ccrsConn, bcp, a.brief.SetName)
if err != nil {
if errors.Is(err, errors.ErrNotFound) {
continue
Expand Down Expand Up @@ -344,7 +344,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) {
// false is returned in case a single active lock exists or error happens.
// true means that there's no active locks.
func (a *Agent) startBcpLockCheck(ctx context.Context) (bool, error) {
locks, err := lock.GetLocks(ctx, a.leadConn, &lock.LockHeader{})
locks, err := lock.GetLocks(ctx, a.ccrsConn, &lock.LockHeader{})
if err != nil {
return false, errors.Wrap(err, "get all locks for backup start")
}
Expand Down
29 changes: 24 additions & 5 deletions cmd/pbm-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"github.com/percona/percona-backup-mongodb/pbm/version"
)

const mongoConnFlag = "mongodb-uri"
const (
mongoConnFlag = "mongodb-uri"
ccrsConnFlag = "ccrs-uri"
)

func main() {
rootCmd := rootCommand()
Expand All @@ -45,14 +48,17 @@ func rootCommand() *cobra.Command {
},
Run: func(cmd *cobra.Command, args []string) {
url := "mongodb://" + strings.Replace(viper.GetString(mongoConnFlag), "mongodb://", "", 1)
ccrsURL := viper.GetString(ccrsConnFlag)

fmt.Println("--- ccrsURL:", ccrsURL)

hidecreds()

logOpts := buildLogOpts()

l := log.NewWithOpts(nil, "", "", logOpts).NewDefaultEvent()

err := runAgent(url, viper.GetInt("backup.dump-parallel-collections"), logOpts)
err := runAgent(url, ccrsURL, viper.GetInt("backup.dump-parallel-collections"), logOpts)
if err != nil {
l.Error("Exit: %v", err)
os.Exit(1)
Expand Down Expand Up @@ -102,6 +108,10 @@ func setRootFlags(rootCmd *cobra.Command) {
_ = viper.BindPFlag(mongoConnFlag, rootCmd.Flags().Lookup(mongoConnFlag))
_ = viper.BindEnv(mongoConnFlag, "PBM_MONGODB_URI")

rootCmd.Flags().String(ccrsConnFlag, "", "Control Collection RS connection string")
_ = viper.BindPFlag(ccrsConnFlag, rootCmd.Flags().Lookup(ccrsConnFlag))
_ = viper.BindEnv(ccrsConnFlag, "PBM_CCRS_URI")

rootCmd.Flags().Int("dump-parallel-collections", 0, "Number of collections to dump in parallel")
_ = viper.BindPFlag("backup.dump-parallel-collections", rootCmd.Flags().Lookup("dump-parallel-collections"))
_ = viper.BindEnv("backup.dump-parallel-collections", "PBM_DUMP_PARALLEL_COLLECTIONS")
Expand Down Expand Up @@ -182,6 +192,7 @@ func buildLogOpts() *log.Opts {

func runAgent(
mongoURI string,
ccrsURI string,
dumpConns int,
logOpts *log.Opts,
) error {
Expand All @@ -193,18 +204,26 @@ func runAgent(
return errors.Wrap(err, "connect to PBM")
}

err = setupNewDB(ctx, leadConn)
ccrsConn := leadConn
if ccrsURI != "" && ccrsURI != mongoURI {
ccrsConn, err = connect.Connect(ctx, ccrsURI, "pbm-agent-ccrs")
if err != nil {
return errors.Wrap(err, "connect to CCRS")
}
}

err = setupNewDB(ctx, ccrsConn)
if err != nil {
return errors.Wrap(err, "setup pbm collections")
}

agent, err := newAgent(ctx, leadConn, mongoURI, dumpConns)
agent, err := newAgent(ctx, leadConn, ccrsConn, mongoURI, dumpConns)
if err != nil {
return errors.Wrap(err, "connect to the node")
}

logger := log.NewWithOpts(
agent.leadConn,
agent.ccrsConn,
agent.brief.SetName,
agent.brief.Me,
logOpts)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pbm-agent/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
}

l.Info("oplog replay started")
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1)
rr := restore.New(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1)
err = rr.ReplayOplog(ctx, r, opID, l)
if err != nil {
if errors.Is(err, restore.ErrNoDataForShard) {
Expand Down
8 changes: 4 additions & 4 deletions cmd/pbm-agent/pitr.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (a *Agent) PITR(ctx context.Context) {
if err != nil {
// we need epoch just to log pitr err with an extra context
// so not much care if we get it or not
ep, _ := config.GetEpoch(ctx, a.leadConn)
ep, _ := config.GetEpoch(ctx, a.ccrsConn)
l.Error(string(ctrl.CmdPITR), "", "", ep.TS(), "init: %v", err)
}

Expand Down Expand Up @@ -420,7 +420,7 @@ func (a *Agent) leadNomination(
return
}

candidates, err := topo.ListSteadyAgents(ctx, a.leadConn)
candidates, err := topo.ListSteadyAgents(ctx, a.leadConn, a.ccrsConn)
if err != nil {
l.Error("get agents list: %v", err)
return
Expand Down Expand Up @@ -530,7 +530,7 @@ func (a *Agent) waitAllOpLockRelease(ctx context.Context) (bool, error) {
for {
select {
case <-tick.C:
running, err := oplog.IsOplogSlicing(ctx, a.leadConn)
running, err := oplog.IsOplogSlicing(ctx, a.leadConn, a.ccrsConn)
if err != nil {
return false, errors.Wrap(err, "is oplog slicing check")
}
Expand Down Expand Up @@ -664,7 +664,7 @@ func (a *Agent) getPITRClusterAndStaleStatus(ctx context.Context) (oplog.Status,
l := log.LogEventFromContext(ctx)
isStale := false

meta, err := oplog.GetMeta(ctx, a.leadConn)
meta, err := oplog.GetMeta(ctx, a.ccrsConn)
if err != nil {
if !errors.Is(err, errors.ErrNotFound) {
l.Error("getting metta for reconfig status check: %v", err)
Expand Down
Loading
Loading