Skip to content

Commit f0e0f54

Browse files
committed
update backup to support ccrs
1 parent 45bdf1d commit f0e0f54

File tree

10 files changed

+79
-72
lines changed

10 files changed

+79
-72
lines changed

cmd/pbm-agent/backup.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
9797
go a.sliceNow(opid)
9898
}
9999

100-
cfg, err := config.GetProfiledConfig(ctx, a.leadConn, cmd.Profile)
100+
cfg, err := config.GetProfiledConfig(ctx, a.ccrsConn, cmd.Profile)
101101
if err != nil {
102102
l.Error("get profiled config: %v", err)
103103
return
@@ -106,19 +106,19 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
106106
var bcp *backup.Backup
107107
switch cmd.Type {
108108
case defs.PhysicalBackup:
109-
bcp = backup.NewPhysical(a.leadConn, a.nodeConn, a.brief)
109+
bcp = backup.NewPhysical(a.leadConn, a.ccrsConn, a.nodeConn, a.brief)
110110
case defs.ExternalBackup:
111-
bcp = backup.NewExternal(a.leadConn, a.nodeConn, a.brief)
111+
bcp = backup.NewExternal(a.leadConn, a.ccrsConn, a.nodeConn, a.brief)
112112
case defs.IncrementalBackup:
113-
bcp = backup.NewIncremental(a.leadConn, a.nodeConn, a.brief, cmd.IncrBase)
113+
bcp = backup.NewIncremental(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, cmd.IncrBase)
114114
case defs.LogicalBackup:
115115
fallthrough
116116
default:
117117
numParallelColls := a.numParallelColls
118118
if cfg.Backup != nil && cfg.Backup.NumParallelCollections > 0 {
119119
numParallelColls = cfg.Backup.NumParallelCollections
120120
}
121-
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, numParallelColls)
121+
bcp = backup.New(a.leadConn, a.ccrsConn, a.nodeConn, a.brief, numParallelColls)
122122
}
123123

124124
bcp.SetConfig(cfg)
@@ -145,8 +145,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
145145
}
146146
l.Debug("init backup meta")
147147

148-
if err = topo.CheckTopoForBackup(ctx, a.leadConn, cmd.Type); err != nil {
149-
ferr := backup.ChangeBackupState(a.leadConn, cmd.Name, defs.StatusError, err.Error())
148+
if err = topo.CheckTopoForBackup(ctx, a.leadConn, a.ccrsConn, cmd.Type); err != nil {
149+
ferr := backup.ChangeBackupState(a.ccrsConn, cmd.Name, defs.StatusError, err.Error())
150150
l.Info("mark backup as %s `%v`: %v", defs.StatusError, err, ferr)
151151
return
152152
}
@@ -157,7 +157,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
157157
const srcHostMultiplier = 3.0
158158
var c map[string]float64
159159
if cmd.Type == defs.IncrementalBackup && !cmd.IncrBase {
160-
src, err := backup.LastIncrementalBackup(ctx, a.leadConn)
160+
src, err := backup.LastIncrementalBackup(ctx, a.ccrsConn)
161161
if err != nil {
162162
// try backup anyway
163163
l.Warning("define source backup: %v", err)
@@ -169,7 +169,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
169169
}
170170
}
171171

172-
agents, err := topo.ListSteadyAgents(ctx, a.leadConn)
172+
agents, err := topo.ListSteadyAgents(ctx, a.leadConn, a.ccrsConn)
173173
if err != nil {
174174
l.Error("get agents list: %v", err)
175175
return
@@ -204,7 +204,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
204204
}
205205

206206
epoch := ep.TS()
207-
lck := lock.NewLock(a.leadConn, lock.LockHeader{
207+
lck := lock.NewLock(a.ccrsConn, lock.LockHeader{
208208
Type: ctrl.CmdBackup,
209209
Replset: a.brief.SetName,
210210
Node: a.brief.Me,
@@ -229,7 +229,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
229229
}
230230
}()
231231

232-
err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
232+
err = backup.SetRSNomineeACK(ctx, a.ccrsConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
233233
if err != nil {
234234
l.Warning("set nominee ack: %v", err)
235235
}
@@ -272,13 +272,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
272272
l := log.LogEventFromContext(ctx)
273273
l.Debug("nomination list for %s: %v", rs, nodes)
274274

275-
err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs)
275+
err := backup.SetRSNomination(ctx, a.ccrsConn, bcp, rs)
276276
if err != nil {
277277
return errors.Wrap(err, "set nomination meta")
278278
}
279279

280280
for _, n := range nodes {
281-
nms, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
281+
nms, err := backup.GetRSNominees(ctx, a.ccrsConn, bcp, rs)
282282
if err != nil && !errors.Is(err, errors.ErrNotFound) {
283283
return errors.Wrap(err, "get nomination meta")
284284
}
@@ -287,13 +287,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
287287
return nil
288288
}
289289

290-
err = backup.SetRSNominees(ctx, a.leadConn, bcp, rs, n)
290+
err = backup.SetRSNominees(ctx, a.ccrsConn, bcp, rs, n)
291291
if err != nil {
292292
return errors.Wrap(err, "set nominees")
293293
}
294294
l.Debug("nomination %s, set candidates %v", rs, n)
295295

296-
err = backup.BackupHB(ctx, a.leadConn, bcp)
296+
err = backup.BackupHB(ctx, a.leadConn, a.ccrsConn, bcp)
297297
if err != nil {
298298
l.Warning("send heartbeat: %v", err)
299299
}
@@ -316,7 +316,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) {
316316
for {
317317
select {
318318
case <-tk.C:
319-
nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, a.brief.SetName)
319+
nm, err := backup.GetRSNominees(ctx, a.ccrsConn, bcp, a.brief.SetName)
320320
if err != nil {
321321
if errors.Is(err, errors.ErrNotFound) {
322322
continue
@@ -344,7 +344,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) {
344344
// false is returned in case a single active lock exists or error happens.
345345
// true means that there's no active locks.
346346
func (a *Agent) startBcpLockCheck(ctx context.Context) (bool, error) {
347-
locks, err := lock.GetLocks(ctx, a.leadConn, &lock.LockHeader{})
347+
locks, err := lock.GetLocks(ctx, a.ccrsConn, &lock.LockHeader{})
348348
if err != nil {
349349
return false, errors.Wrap(err, "get all locks for backup start")
350350
}

cmd/pbm-agent/pitr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func (a *Agent) leadNomination(
420420
return
421421
}
422422

423-
candidates, err := topo.ListSteadyAgents(ctx, a.leadConn)
423+
candidates, err := topo.ListSteadyAgents(ctx, a.leadConn, a.ccrsConn)
424424
if err != nil {
425425
l.Error("get agents list: %v", err)
426426
return

cmd/pbm/backup.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ type descBcp struct {
8585
func runBackup(
8686
ctx context.Context,
8787
conn connect.Client,
88+
ccrsConn connect.Client,
8889
pbm *sdk.Client,
8990
b *backupOpts,
9091
outf outFormat,
@@ -101,15 +102,15 @@ func runBackup(
101102
return nil, errors.New("--ns flag is only allowed for logical backup")
102103
}
103104

104-
if err := topo.CheckTopoForBackup(ctx, conn, defs.BackupType(b.typ)); err != nil {
105+
if err := topo.CheckTopoForBackup(ctx, conn, ccrsConn, defs.BackupType(b.typ)); err != nil {
105106
return nil, errors.Wrap(err, "backup pre-check")
106107
}
107108

108109
if err := checkForAnotherOperation(ctx, pbm); err != nil {
109110
return nil, err
110111
}
111112

112-
cfg, err := config.GetProfiledConfig(ctx, conn, b.profile)
113+
cfg, err := config.GetProfiledConfig(ctx, ccrsConn, b.profile)
113114
if err != nil {
114115
if errors.Is(err, config.ErrMissedConfig) {
115116
return nil, errors.New("no config set. Set config with <pbm config>")
@@ -130,7 +131,7 @@ func runBackup(
130131
level = &b.compressionLevel[0]
131132
}
132133

133-
err = sendCmd(ctx, conn, ctrl.Cmd{
134+
err = sendCmd(ctx, ccrsConn, ctrl.Cmd{
134135
Cmd: ctrl.CmdBackup,
135136
Backup: &ctrl.BackupCmd{
136137
Type: defs.BackupType(b.typ),
@@ -155,13 +156,13 @@ func runBackup(
155156
}
156157
startCtx, cancel := context.WithTimeout(ctx, cfg.Backup.Timeouts.StartingStatus())
157158
defer cancel()
158-
err = waitForBcpStatus(startCtx, conn, b.name, showProgress)
159+
err = waitForBcpStatus(startCtx, ccrsConn, b.name, showProgress)
159160
if err != nil {
160161
return nil, err
161162
}
162163

163164
if b.typ == string(defs.ExternalBackup) {
164-
s, err := waitBackup(ctx, conn, b.name, defs.StatusCopyReady, showProgress)
165+
s, err := waitBackup(ctx, ccrsConn, b.name, defs.StatusCopyReady, showProgress)
165166
if err != nil {
166167
return nil, errors.Wrap(err, "waiting for the `copyReady` status")
167168
}
@@ -198,7 +199,7 @@ func runBackup(
198199
if showProgress {
199200
fmt.Printf("\nWaiting for '%s' backup...", b.name)
200201
}
201-
s, err := waitBackup(ctx, conn, b.name, defs.StatusDone, showProgress)
202+
s, err := waitBackup(ctx, ccrsConn, b.name, defs.StatusDone, showProgress)
202203
if s != nil && showProgress {
203204
fmt.Printf(" %s\n", *s)
204205
}

cmd/pbm/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func (app *pbmApp) buildBackupCmd() *cobra.Command {
286286
}
287287

288288
backupOptions.name = time.Now().UTC().Format(time.RFC3339)
289-
return runBackup(app.ctx, app.conn, app.pbm, &backupOptions, app.pbmOutF)
289+
return runBackup(app.ctx, app.conn, app.ccrsConn, app.pbm, &backupOptions, app.pbmOutF)
290290
}),
291291
}
292292

0 commit comments

Comments
 (0)