Skip to content

Commit 45bdf1d

Browse files
committed
add ccrs conn to agent and cli
1 parent ba29f80 commit 45bdf1d

File tree

18 files changed

+197
-117
lines changed

18 files changed

+197
-117
lines changed

cmd/pbm-agent/agent.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
type Agent struct {
2828
leadConn connect.Client
29+
ccrsConn connect.Client
2930
nodeConn *mongo.Client
3031
bcp *currentBackup
3132
pitrjob *currentPitr
@@ -47,6 +48,7 @@ type Agent struct {
4748
func newAgent(
4849
ctx context.Context,
4950
leadConn connect.Client,
51+
ccrsConn connect.Client,
5052
uri string,
5153
numParallelColls int,
5254
) (*Agent, error) {
@@ -67,6 +69,7 @@ func newAgent(
6769

6870
a := &Agent{
6971
leadConn: leadConn,
72+
ccrsConn: ccrsConn,
7073
closeCMD: make(chan struct{}),
7174
nodeConn: nodeConn,
7275
brief: topo.NodeBrief{
@@ -154,8 +157,9 @@ func (a *Agent) Start(ctx context.Context) error {
154157
logger.Printf("conn level ReadConcern: %v; WriteConcern: %v",
155158
a.leadConn.MongoOptions().ReadConcern.Level,
156159
a.leadConn.MongoOptions().WriteConcern.W)
160+
// todo: also log for ccrsConn?
157161

158-
c, cerr := ctrl.ListenCmd(ctx, a.leadConn, a.closeCMD)
162+
c, cerr := ctrl.ListenCmd(ctx, a.ccrsConn, a.closeCMD)
159163

160164
logger.Printf("listening for the commands")
161165

@@ -169,7 +173,7 @@ func (a *Agent) Start(ctx context.Context) error {
169173

170174
logger.Printf("got command %s, opid: %s", cmd, cmd.OPID)
171175

172-
ep, err := config.GetEpoch(ctx, a.leadConn)
176+
ep, err := config.GetEpoch(ctx, a.ccrsConn)
173177
if err != nil {
174178
logger.Error(string(cmd.Cmd), "", cmd.OPID.String(), ep.TS(), "get epoch: %v", err)
175179
continue
@@ -210,7 +214,7 @@ func (a *Agent) Start(ctx context.Context) error {
210214
return errors.Wrap(err, "stop listening")
211215
}
212216

213-
ep, _ := config.GetEpoch(ctx, a.leadConn)
217+
ep, _ := config.GetEpoch(ctx, a.ccrsConn)
214218
logger.Error("", "", "", ep.TS(), "listening commands: %v", err)
215219
}
216220
}
@@ -284,14 +288,14 @@ func (a *Agent) HbStatus(ctx context.Context) {
284288
}
285289

286290
updateAgentStat(ctx, a, l, true, &hb)
287-
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
291+
err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb)
288292
if err != nil {
289293
l.Error("set status: %v", err)
290294
}
291295

292296
defer func() {
293297
l.Debug("deleting agent status")
294-
err := topo.RemoveAgentStatus(context.Background(), a.leadConn, hb)
298+
err := topo.RemoveAgentStatus(context.Background(), a.ccrsConn, hb)
295299
if err != nil {
296300
logger := logger.NewEvent("agentCheckup", "", "", primitive.Timestamp{})
297301
logger.Error("remove agent heartbeat: %v", err)
@@ -326,13 +330,13 @@ func (a *Agent) HbStatus(ctx context.Context) {
326330

327331
if now.Sub(storageCheckTime) >= storageCheckInterval {
328332
updateAgentStat(ctx, a, l, true, &hb)
329-
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
333+
err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb)
330334
if err == nil {
331335
storageCheckTime = now
332336
}
333337
} else {
334338
updateAgentStat(ctx, a, l, false, &hb)
335-
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
339+
err = topo.SetAgentStatus(ctx, a.ccrsConn, &hb)
336340
}
337341
if err != nil {
338342
l.Error("set status: %v", err)
@@ -410,7 +414,7 @@ func (a *Agent) warnIfParallelAgentDetected(
410414
l log.LogEvent,
411415
lastHeartbeat primitive.Timestamp,
412416
) {
413-
s, err := topo.GetAgentStatus(ctx, a.leadConn, a.brief.SetName, a.brief.Me)
417+
s, err := topo.GetAgentStatus(ctx, a.ccrsConn, a.brief.SetName, a.brief.Me)
414418
if err != nil {
415419
if errors.Is(err, mongo.ErrNoDocuments) {
416420
return
@@ -456,7 +460,7 @@ func (a *Agent) storStatus(
456460
return topo.SubsysStatus{OK: true}
457461
}
458462

459-
stg, err := util.GetStorage(ctx, a.leadConn, a.brief.Me, log)
463+
stg, err := util.GetStorage(ctx, a.ccrsConn, a.brief.Me, log)
460464
if err != nil {
461465
return topo.SubsysStatus{Err: fmt.Sprintf("unable to get storage: %v", err)}
462466
}

cmd/pbm-agent/main.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ import (
2121
"github.com/percona/percona-backup-mongodb/pbm/version"
2222
)
2323

24-
const mongoConnFlag = "mongodb-uri"
24+
const (
25+
mongoConnFlag = "mongodb-uri"
26+
ccrsConnFlag = "ccrs-uri"
27+
)
2528

2629
func main() {
2730
rootCmd := rootCommand()
@@ -45,14 +48,21 @@ func rootCommand() *cobra.Command {
4548
},
4649
Run: func(cmd *cobra.Command, args []string) {
4750
url := "mongodb://" + strings.Replace(viper.GetString(mongoConnFlag), "mongodb://", "", 1)
51+
ccrsURL := viper.GetString(ccrsConnFlag)
52+
53+
fmt.Println("--- ccrsURL:", ccrsURL)
54+
55+
if ccrsURL == "" {
56+
ccrsURL = url
57+
}
4858

4959
hidecreds()
5060

5161
logOpts := buildLogOpts()
5262

5363
l := log.NewWithOpts(nil, "", "", logOpts).NewDefaultEvent()
5464

55-
err := runAgent(url, viper.GetInt("backup.dump-parallel-collections"), logOpts)
65+
err := runAgent(url, ccrsURL, viper.GetInt("backup.dump-parallel-collections"), logOpts)
5666
if err != nil {
5767
l.Error("Exit: %v", err)
5868
os.Exit(1)
@@ -102,6 +112,10 @@ func setRootFlags(rootCmd *cobra.Command) {
102112
_ = viper.BindPFlag(mongoConnFlag, rootCmd.Flags().Lookup(mongoConnFlag))
103113
_ = viper.BindEnv(mongoConnFlag, "PBM_MONGODB_URI")
104114

115+
rootCmd.Flags().String(ccrsConnFlag, "", "Control Collection RS connection string")
116+
_ = viper.BindPFlag(ccrsConnFlag, rootCmd.Flags().Lookup(ccrsConnFlag))
117+
_ = viper.BindEnv(ccrsConnFlag, "PBM_CCRS_URI")
118+
105119
rootCmd.Flags().Int("dump-parallel-collections", 0, "Number of collections to dump in parallel")
106120
_ = viper.BindPFlag("backup.dump-parallel-collections", rootCmd.Flags().Lookup("dump-parallel-collections"))
107121
_ = viper.BindEnv("backup.dump-parallel-collections", "PBM_DUMP_PARALLEL_COLLECTIONS")
@@ -182,6 +196,7 @@ func buildLogOpts() *log.Opts {
182196

183197
func runAgent(
184198
mongoURI string,
199+
ccrsURI string,
185200
dumpConns int,
186201
logOpts *log.Opts,
187202
) error {
@@ -193,18 +208,23 @@ func runAgent(
193208
return errors.Wrap(err, "connect to PBM")
194209
}
195210

196-
err = setupNewDB(ctx, leadConn)
211+
ccrsConn, err := connect.Connect(ctx, ccrsURI, "pbm-agent-ccrs")
212+
if err != nil {
213+
return errors.Wrap(err, "connect to CCRS")
214+
}
215+
216+
err = setupNewDB(ctx, ccrsConn)
197217
if err != nil {
198218
return errors.Wrap(err, "setup pbm collections")
199219
}
200220

201-
agent, err := newAgent(ctx, leadConn, mongoURI, dumpConns)
221+
agent, err := newAgent(ctx, leadConn, ccrsConn, mongoURI, dumpConns)
202222
if err != nil {
203223
return errors.Wrap(err, "connect to the node")
204224
}
205225

206226
logger := log.NewWithOpts(
207-
agent.leadConn,
227+
agent.ccrsConn,
208228
agent.brief.SetName,
209229
agent.brief.Me,
210230
logOpts)

cmd/pbm-agent/oplog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (a *Agent) OplogReplay(ctx context.Context, r *ctrl.ReplayCmd, opID ctrl.OP
7777
}
7878

7979
l.Info("oplog replay started")
80-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1)
80+
rr := restore.New(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, cfg, r.RSMap, 0, 1)
8181
err = rr.ReplayOplog(ctx, r, opID, l)
8282
if err != nil {
8383
if errors.Is(err, restore.ErrNoDataForShard) {

cmd/pbm-agent/pitr.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (a *Agent) PITR(ctx context.Context) {
122122
if err != nil {
123123
// we need epoch just to log pitr err with an extra context
124124
// so not much care if we get it or not
125-
ep, _ := config.GetEpoch(ctx, a.leadConn)
125+
ep, _ := config.GetEpoch(ctx, a.ccrsConn)
126126
l.Error(string(ctrl.CmdPITR), "", "", ep.TS(), "init: %v", err)
127127
}
128128

@@ -530,7 +530,7 @@ func (a *Agent) waitAllOpLockRelease(ctx context.Context) (bool, error) {
530530
for {
531531
select {
532532
case <-tick.C:
533-
running, err := oplog.IsOplogSlicing(ctx, a.leadConn)
533+
running, err := oplog.IsOplogSlicing(ctx, a.leadConn, a.ccrsConn)
534534
if err != nil {
535535
return false, errors.Wrap(err, "is oplog slicing check")
536536
}
@@ -664,7 +664,7 @@ func (a *Agent) getPITRClusterAndStaleStatus(ctx context.Context) (oplog.Status,
664664
l := log.LogEventFromContext(ctx)
665665
isStale := false
666666

667-
meta, err := oplog.GetMeta(ctx, a.leadConn)
667+
meta, err := oplog.GetMeta(ctx, a.ccrsConn)
668668
if err != nil {
669669
if !errors.Is(err, errors.ErrNotFound) {
670670
l.Error("getting metta for reconfig status check: %v", err)

cmd/pbm-agent/restore.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
5050
var lck *lock.Lock
5151
if nodeInfo.IsPrimary {
5252
epts := ep.TS()
53-
lck = lock.NewLock(a.leadConn, lock.LockHeader{
53+
lck = lock.NewLock(a.ccrsConn, lock.LockHeader{
5454
Type: ctrl.CmdRestore,
5555
Replset: nodeInfo.SetName,
5656
Node: nodeInfo.Me,
@@ -79,7 +79,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
7979
}
8080
}()
8181

82-
err = config.SetConfigVar(ctx, a.leadConn, "pitr.enabled", "false")
82+
err = config.SetConfigVar(ctx, a.ccrsConn, "pitr.enabled", "false")
8383
if err != nil {
8484
l.Error("disable oplog slicer: %v", err)
8585
} else {
@@ -122,9 +122,9 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
122122
l.Info("backup: %s", r.BackupName)
123123

124124
// XXX: why is backup searched on storage?
125-
bcp, err = restore.LookupBackupMeta(ctx, a.leadConn, r.BackupName, a.brief.Me)
125+
bcp, err = restore.LookupBackupMeta(ctx, a.ccrsConn, r.BackupName, a.brief.Me)
126126
if err != nil {
127-
err1 := addRestoreMetaWithError(ctx, a.leadConn, l, opid, r, nodeInfo.SetName,
127+
err1 := addRestoreMetaWithError(ctx, a.ccrsConn, l, opid, r, nodeInfo.SetName,
128128
"define base backup: %v", err)
129129
if err1 != nil {
130130
l.Error("failed to save meta: %v", err1)
@@ -133,7 +133,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
133133
}
134134

135135
if !r.OplogTS.IsZero() && bcp.LastWriteTS.Compare(r.OplogTS) >= 0 {
136-
err1 := addRestoreMetaWithError(ctx, a.leadConn, l, opid, r, nodeInfo.SetName,
136+
err1 := addRestoreMetaWithError(ctx, a.ccrsConn, l, opid, r, nodeInfo.SetName,
137137
"snapshot's last write is later than the target time. "+
138138
"Try to set an earlier snapshot. Or leave the snapshot empty "+
139139
"so PBM will choose one.")
@@ -146,7 +146,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
146146
r.BackupName = bcp.Name
147147
}
148148

149-
cfg, err := config.GetConfig(ctx, a.leadConn)
149+
cfg, err := config.GetConfig(ctx, a.ccrsConn)
150150
if err != nil {
151151
l.Error("get PBM configuration: %v", err)
152152
return
@@ -164,7 +164,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
164164
numParallelColls := getNumParallelCollsConfig(r.NumParallelColls, cfg.Restore)
165165
numInsertionWorkersPerCol := getNumInsertionWorkersConfig(r.NumInsertionWorkers, cfg.Restore)
166166

167-
rr := restore.New(a.leadConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
167+
rr := restore.New(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, cfg, r.RSMap, numParallelColls, numInsertionWorkersPerCol)
168168
if r.OplogTS.IsZero() {
169169
err = rr.Snapshot(ctx, r, opid, bcp)
170170
} else {
@@ -193,6 +193,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
193193
rstr, err = restore.NewPhysical(
194194
ctx,
195195
a.leadConn,
196+
a.ccrsConn,
196197
a.nodeConn,
197198
nodeInfo,
198199
r.RSMap,
@@ -216,7 +217,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
216217
}
217218

218219
if bcpType == defs.LogicalBackup && nodeInfo.IsLeader() {
219-
epch, err := config.ResetEpoch(ctx, a.leadConn)
220+
epch, err := config.ResetEpoch(ctx, a.ccrsConn)
220221
if err != nil {
221222
l.Error("reset epoch: %v", err)
222223
}

cmd/pbm-agent/resync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,12 @@ func (a *Agent) helpSyncProfileBackups(ctx context.Context, profile *config.Conf
135135
}
136136

137137
func (a *Agent) handleSyncMainStorage(ctx context.Context, includeRestores bool) error {
138-
cfg, err := config.GetConfig(ctx, a.leadConn)
138+
cfg, err := config.GetConfig(ctx, a.ccrsConn)
139139
if err != nil {
140140
return errors.Wrap(err, "get config")
141141
}
142142

143-
err = resync.Resync(ctx, a.leadConn, &cfg.Storage, a.brief.Me, includeRestores)
143+
err = resync.Resync(ctx, a.ccrsConn, &cfg.Storage, a.brief.Me, includeRestores)
144144
if err != nil {
145145
return errors.Wrap(err, "resync")
146146
}

cmd/pbm/config.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ func (c confVals) String() string {
5151
func runConfig(
5252
ctx context.Context,
5353
conn connect.Client,
54+
ccrsConn connect.Client,
5455
pbm *sdk.Client,
5556
c *configOpts,
5657
) (fmt.Stringer, error) {
@@ -65,7 +66,7 @@ func runConfig(
6566
var o confVals
6667
rsnc := false
6768
for k, v := range c.set {
68-
err := config.SetConfigVar(ctx, conn, k, v)
69+
err := config.SetConfigVar(ctx, ccrsConn, k, v)
6970
if err != nil {
7071
return nil, errors.Wrapf(err, "set %s", k)
7172
}
@@ -90,7 +91,7 @@ func runConfig(
9091
}
9192
return o, nil
9293
case len(c.key) > 0:
93-
k, err := config.GetConfigVar(ctx, conn, c.key)
94+
k, err := config.GetConfigVar(ctx, ccrsConn, c.key)
9495
if err != nil {
9596
if errors.Is(err, config.ErrUnsetConfigPath) {
9697
return confKV{c.key, ""}, nil // unset config path
@@ -134,7 +135,8 @@ func runConfig(
134135
oldCfg = &config.Config{}
135136
}
136137

137-
if err := config.SetConfig(ctx, conn, newCfg); err != nil {
138+
// TODO: determine if both conn
139+
if err := config.SetConfig(ctx, ccrsConn, newCfg); err != nil {
138140
return nil, errors.Wrap(err, "unable to set config: write to db")
139141
}
140142

0 commit comments

Comments
 (0)