Skip to content

Commit 82fef20

Browse files
authored
PBM-1211: Multi-endpoint feature (#1034)
PBM's multiple endpoint feature enables possibility to specify different endpoint urls for the different cluster members. Supported storage types are: S3 and Azure.
1 parent 94ad374 commit 82fef20

File tree

26 files changed

+200
-127
lines changed

26 files changed

+200
-127
lines changed

cmd/pbm-agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ func (a *Agent) storStatus(
456456
return topo.SubsysStatus{OK: true}
457457
}
458458

459-
stg, err := util.GetStorage(ctx, a.leadConn, log)
459+
stg, err := util.GetStorage(ctx, a.leadConn, a.brief.Me, log)
460460
if err != nil {
461461
return topo.SubsysStatus{Err: fmt.Sprintf("unable to get storage: %v", err)}
462462
}

cmd/pbm-agent/delete.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/percona/percona-backup-mongodb/pbm/backup"
1313
"github.com/percona/percona-backup-mongodb/pbm/config"
14-
"github.com/percona/percona-backup-mongodb/pbm/connect"
1514
"github.com/percona/percona-backup-mongodb/pbm/ctrl"
1615
"github.com/percona/percona-backup-mongodb/pbm/errors"
1716
"github.com/percona/percona-backup-mongodb/pbm/lock"
@@ -97,7 +96,7 @@ func (a *Agent) Delete(ctx context.Context, d *ctrl.DeleteBackupCmd, opid ctrl.O
9796
}
9897

9998
l.Info("deleting backups older than %v", t)
100-
err = backup.DeleteBackupBefore(ctx, a.leadConn, t, bcpType)
99+
err = backup.DeleteBackupBefore(ctx, a.leadConn, t, bcpType, nodeInfo.Me)
101100
if err != nil {
102101
l.Error("deleting: %v", err)
103102
return
@@ -107,7 +106,7 @@ func (a *Agent) Delete(ctx context.Context, d *ctrl.DeleteBackupCmd, opid ctrl.O
107106
ctx := log.SetLogEventToContext(ctx, l)
108107

109108
l.Info("deleting backup")
110-
err := backup.DeleteBackup(ctx, a.leadConn, d.Backup)
109+
err := backup.DeleteBackup(ctx, a.leadConn, d.Backup, nodeInfo.Me)
111110
if err != nil {
112111
l.Error("deleting: %v", err)
113112
return
@@ -188,7 +187,7 @@ func (a *Agent) DeletePITR(ctx context.Context, d *ctrl.DeletePITRCmd, opid ctrl
188187

189188
ts := primitive.Timestamp{T: uint32(t.Unix())}
190189
l.Info("deleting pitr chunks older than %v", t)
191-
err = deletePITRImpl(ctx, a.leadConn, ts)
190+
err = a.deletePITRImpl(ctx, ts)
192191
if err != nil {
193192
l.Error("deleting: %v", err)
194193
return
@@ -260,7 +259,7 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
260259
l.Error("get config: %v", err)
261260
}
262261

263-
stg, err := util.StorageFromConfig(&cfg.Storage, l)
262+
stg, err := util.StorageFromConfig(&cfg.Storage, a.brief.Me, l)
264263
if err != nil {
265264
l.Error("get storage: " + err.Error())
266265
}
@@ -298,16 +297,16 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
298297
l.Error(err.Error())
299298
}
300299

301-
err = resync.Resync(ctx, a.leadConn, &cfg.Storage)
300+
err = resync.Resync(ctx, a.leadConn, &cfg.Storage, a.brief.Me)
302301
if err != nil {
303302
l.Error("storage resync: " + err.Error())
304303
}
305304
}
306305

307-
func deletePITRImpl(ctx context.Context, conn connect.Client, ts primitive.Timestamp) error {
306+
func (a *Agent) deletePITRImpl(ctx context.Context, ts primitive.Timestamp) error {
308307
l := log.LogEventFromContext(ctx)
309308

310-
r, err := backup.MakeCleanupInfo(ctx, conn, ts)
309+
r, err := backup.MakeCleanupInfo(ctx, a.leadConn, ts)
311310
if err != nil {
312311
return errors.Wrap(err, "get pitr chunks")
313312
}
@@ -316,15 +315,15 @@ func deletePITRImpl(ctx context.Context, conn connect.Client, ts primitive.Times
316315
return nil
317316
}
318317

319-
stg, err := util.GetStorage(ctx, conn, l)
318+
stg, err := util.GetStorage(ctx, a.leadConn, a.brief.Me, l)
320319
if err != nil {
321320
return errors.Wrap(err, "get storage")
322321
}
323322

324-
return deleteChunks(ctx, conn, stg, r.Chunks)
323+
return a.deleteChunks(ctx, stg, r.Chunks)
325324
}
326325

327-
func deleteChunks(ctx context.Context, m connect.Client, stg storage.Storage, chunks []oplog.OplogChunk) error {
326+
func (a *Agent) deleteChunks(ctx context.Context, stg storage.Storage, chunks []oplog.OplogChunk) error {
328327
l := log.LogEventFromContext(ctx)
329328

330329
for _, chnk := range chunks {
@@ -333,7 +332,7 @@ func deleteChunks(ctx context.Context, m connect.Client, stg storage.Storage, ch
333332
return errors.Wrapf(err, "delete pitr chunk '%s' (%v) from storage", chnk.FName, chnk)
334333
}
335334

336-
_, err = m.PITRChunksCollection().DeleteOne(
335+
_, err = a.leadConn.PITRChunksCollection().DeleteOne(
337336
ctx,
338337
bson.D{
339338
{"rs", chnk.RS},

cmd/pbm-agent/pitr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ func (a *Agent) pitr(ctx context.Context) error {
284284
}
285285
}()
286286

287-
stg, err := util.StorageFromConfig(&cfg.Storage, l)
287+
stg, err := util.StorageFromConfig(&cfg.Storage, a.brief.Me, l)
288288
if err != nil {
289289
if err := lck.Release(); err != nil {
290290
l.Error("release lock: %v", err)

cmd/pbm-agent/profile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (a *Agent) handleAddConfigProfile(
8686
return
8787
}
8888

89-
stg, err := util.StorageFromConfig(&cmd.Storage, log.LogEventFromContext(ctx))
89+
stg, err := util.StorageFromConfig(&cmd.Storage, a.brief.Me, log.LogEventFromContext(ctx))
9090
if err != nil {
9191
err = errors.Wrap(err, "storage from config")
9292
return

cmd/pbm-agent/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (a *Agent) Restore(ctx context.Context, r *ctrl.RestoreCmd, opid ctrl.OPID,
9191
l.Info("backup: %s", r.BackupName)
9292

9393
// XXX: why is backup searched on storage?
94-
bcp, err = restore.LookupBackupMeta(ctx, a.leadConn, r.BackupName)
94+
bcp, err = restore.LookupBackupMeta(ctx, a.leadConn, r.BackupName, a.brief.Me)
9595
if err != nil {
9696
l.Error("define base backup: %v", err)
9797
return

cmd/pbm-agent/resync.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"golang.org/x/sync/errgroup"
88

99
"github.com/percona/percona-backup-mongodb/pbm/config"
10-
"github.com/percona/percona-backup-mongodb/pbm/connect"
1110
"github.com/percona/percona-backup-mongodb/pbm/ctrl"
1211
"github.com/percona/percona-backup-mongodb/pbm/errors"
1312
"github.com/percona/percona-backup-mongodb/pbm/lock"
@@ -68,11 +67,11 @@ func (a *Agent) Resync(ctx context.Context, cmd *ctrl.ResyncCmd, opid ctrl.OPID,
6867
l.Info("started")
6968

7069
if cmd.All {
71-
err = handleSyncAllProfiles(ctx, a.leadConn, cmd.Clear)
70+
err = a.handleSyncAllProfiles(ctx, cmd.Clear)
7271
} else if cmd.Name != "" {
73-
err = handleSyncProfile(ctx, a.leadConn, cmd.Name, cmd.Clear)
72+
err = a.handleSyncProfile(ctx, cmd.Name, cmd.Clear)
7473
} else {
75-
err = handleSyncMainStorage(ctx, a.leadConn)
74+
err = a.handleSyncMainStorage(ctx)
7675
}
7776
if err != nil {
7877
l.Error(err.Error())
@@ -82,32 +81,32 @@ func (a *Agent) Resync(ctx context.Context, cmd *ctrl.ResyncCmd, opid ctrl.OPID,
8281
l.Info("succeed")
8382
}
8483

85-
func handleSyncAllProfiles(ctx context.Context, conn connect.Client, clear bool) error {
86-
profiles, err := config.ListProfiles(ctx, conn)
84+
func (a *Agent) handleSyncAllProfiles(ctx context.Context, clearProfile bool) error {
85+
profiles, err := config.ListProfiles(ctx, a.leadConn)
8786
if err != nil {
8887
return errors.Wrap(err, "get config profiles")
8988
}
9089

9190
eg, ctx := errgroup.WithContext(ctx)
92-
if clear {
91+
if clearProfile {
9392
for i := range profiles {
9493
eg.Go(func() error {
95-
return helpClearProfileBackups(ctx, conn, profiles[i].Name)
94+
return a.helpClearProfileBackups(ctx, profiles[i].Name)
9695
})
9796
}
9897
} else {
9998
for i := range profiles {
10099
eg.Go(func() error {
101-
return helpSyncProfileBackups(ctx, conn, &profiles[i])
100+
return a.helpSyncProfileBackups(ctx, &profiles[i])
102101
})
103102
}
104103
}
105104

106105
return eg.Wait()
107106
}
108107

109-
func handleSyncProfile(ctx context.Context, conn connect.Client, name string, clear bool) error {
110-
profile, err := config.GetProfile(ctx, conn, name)
108+
func (a *Agent) handleSyncProfile(ctx context.Context, name string, clearProfile bool) error {
109+
profile, err := config.GetProfile(ctx, a.leadConn, name)
111110
if err != nil {
112111
if errors.Is(err, mongo.ErrNoDocuments) {
113112
err = errors.Errorf("profile %q not found", name)
@@ -116,37 +115,37 @@ func handleSyncProfile(ctx context.Context, conn connect.Client, name string, cl
116115
return errors.Wrap(err, "get config profile")
117116
}
118117

119-
if clear {
120-
err = helpClearProfileBackups(ctx, conn, profile.Name)
118+
if clearProfile {
119+
err = a.helpClearProfileBackups(ctx, profile.Name)
121120
} else {
122-
err = helpSyncProfileBackups(ctx, conn, profile)
121+
err = a.helpSyncProfileBackups(ctx, profile)
123122
}
124123

125124
return err
126125
}
127126

128-
func helpClearProfileBackups(ctx context.Context, conn connect.Client, profileName string) error {
129-
err := resync.ClearBackupList(ctx, conn, profileName)
127+
func (a *Agent) helpClearProfileBackups(ctx context.Context, profileName string) error {
128+
err := resync.ClearBackupList(ctx, a.leadConn, profileName)
130129
return errors.Wrapf(err, "clear backup list for %q", profileName)
131130
}
132131

133-
func helpSyncProfileBackups(ctx context.Context, conn connect.Client, profile *config.Config) error {
134-
err := resync.SyncBackupList(ctx, conn, &profile.Storage, profile.Name)
132+
func (a *Agent) helpSyncProfileBackups(ctx context.Context, profile *config.Config) error {
133+
err := resync.SyncBackupList(ctx, a.leadConn, &profile.Storage, profile.Name, a.brief.Me)
135134
return errors.Wrapf(err, "sync backup list for %q", profile.Name)
136135
}
137136

138-
func handleSyncMainStorage(ctx context.Context, conn connect.Client) error {
139-
cfg, err := config.GetConfig(ctx, conn)
137+
func (a *Agent) handleSyncMainStorage(ctx context.Context) error {
138+
cfg, err := config.GetConfig(ctx, a.leadConn)
140139
if err != nil {
141140
return errors.Wrap(err, "get config")
142141
}
143142

144-
err = resync.Resync(ctx, conn, &cfg.Storage)
143+
err = resync.Resync(ctx, a.leadConn, &cfg.Storage, a.brief.Me)
145144
if err != nil {
146145
return errors.Wrap(err, "resync")
147146
}
148147

149-
epch, err := config.ResetEpoch(ctx, conn)
148+
epch, err := config.ResetEpoch(ctx, a.leadConn)
150149
if err != nil {
151150
return errors.Wrap(err, "reset epoch")
152151
}

cmd/pbm-speed-test/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func testStorage(mURL string, compression compress.CompressionType, level *int,
120120
}
121121
defer client.Disconnect(context.Background()) //nolint:errcheck
122122

123-
stg, err := util.GetStorage(context.Background(), client, log.DiscardEvent)
123+
stg, err := util.GetStorage(context.Background(), client, "", log.DiscardEvent)
124124
if err != nil {
125125
stdlog.Fatalln("Error: get storage:", err)
126126
}

cmd/pbm/backup.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,12 @@ func byteCountIEC(b int64) string {
378378
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
379379
}
380380

381-
func describeBackup(ctx context.Context, pbm *sdk.Client, b *descBcp) (fmt.Stringer, error) {
381+
func describeBackup(
382+
ctx context.Context,
383+
pbm *sdk.Client,
384+
b *descBcp,
385+
node string,
386+
) (fmt.Stringer, error) {
382387
bcp, err := pbm.GetBackupByName(ctx, b.name, sdk.GetBackupByNameOptions{})
383388
if err != nil {
384389
return nil, errors.Wrap(err, "get backup meta")
@@ -388,7 +393,7 @@ func describeBackup(ctx context.Context, pbm *sdk.Client, b *descBcp) (fmt.Strin
388393
if b.coll || bcp.Size == 0 {
389394
// to read backed up collection names
390395
// or calculate size of files for legacy backups
391-
stg, err = util.StorageFromConfig(&bcp.Store.StorageConf, log.LogEventFromContext(ctx))
396+
stg, err = util.StorageFromConfig(&bcp.Store.StorageConf, node, log.LogEventFromContext(ctx))
392397
if err != nil {
393398
return nil, errors.Wrap(err, "get storage")
394399
}

cmd/pbm/main.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/percona/percona-backup-mongodb/pbm/errors"
1919
"github.com/percona/percona-backup-mongodb/pbm/log"
2020
"github.com/percona/percona-backup-mongodb/pbm/oplog"
21+
"github.com/percona/percona-backup-mongodb/pbm/topo"
2122
"github.com/percona/percona-backup-mongodb/pbm/version"
2223
"github.com/percona/percona-backup-mongodb/sdk"
2324
)
@@ -468,6 +469,7 @@ func main() {
468469

469470
var conn connect.Client
470471
var pbm *sdk.Client
472+
var node string
471473
// we don't need pbm connection if it is `pbm describe-restore -c ...`
472474
// or `pbm restore-finish `
473475
if describeRestoreOpts.cfg == "" && finishRestore.cfg == "" {
@@ -491,6 +493,12 @@ func main() {
491493
exitErr(errors.Wrap(err, "init sdk"), pbmOutF)
492494
}
493495
defer pbm.Close(context.Background())
496+
497+
inf, err := topo.GetNodeInfo(ctx, conn.MongoClient())
498+
if err != nil {
499+
exitErr(errors.Wrap(err, "unable to obtain node info"), pbmOutF)
500+
}
501+
node = inf.Me
494502
}
495503

496504
switch cmd {
@@ -514,13 +522,13 @@ func main() {
514522
case backupFinishCmd.FullCommand():
515523
out, err = runFinishBcp(ctx, conn, finishBackupName)
516524
case restoreFinishCmd.FullCommand():
517-
out, err = runFinishRestore(finishRestore)
525+
out, err = runFinishRestore(finishRestore, node)
518526
case descBcpCmd.FullCommand():
519-
out, err = describeBackup(ctx, pbm, &descBcp)
527+
out, err = describeBackup(ctx, pbm, &descBcp, node)
520528
case restoreCmd.FullCommand():
521-
out, err = runRestore(ctx, conn, pbm, &restore, pbmOutF)
529+
out, err = runRestore(ctx, conn, pbm, &restore, node, pbmOutF)
522530
case replayCmd.FullCommand():
523-
out, err = replayOplog(ctx, conn, pbm, replayOpts, pbmOutF)
531+
out, err = replayOplog(ctx, conn, pbm, replayOpts, node, pbmOutF)
524532
case listCmd.FullCommand():
525533
out, err = runList(ctx, conn, pbm, &list)
526534
case deleteBcpCmd.FullCommand():
@@ -534,7 +542,7 @@ func main() {
534542
case statusCmd.FullCommand():
535543
out, err = status(ctx, conn, pbm, *mURL, statusOpts, pbmOutF == outJSONpretty)
536544
case describeRestoreCmd.FullCommand():
537-
out, err = describeRestore(ctx, conn, describeRestoreOpts)
545+
out, err = describeRestore(ctx, conn, describeRestoreOpts, node)
538546
}
539547

540548
if err != nil {

cmd/pbm/oplog.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func replayOplog(
4646
conn connect.Client,
4747
pbm *sdk.Client,
4848
o replayOptions,
49+
node string,
4950
outf outFormat,
5051
) (fmt.Stringer, error) {
5152
rsMap, err := parseRSNamesMapping(o.rsMap)
@@ -105,7 +106,7 @@ func replayOplog(
105106
}
106107

107108
fmt.Print("Started.\nWaiting to finish")
108-
err = waitRestore(ctx, conn, m, defs.StatusDone, 0)
109+
err = waitRestore(ctx, conn, m, node, defs.StatusDone, 0)
109110
if err != nil {
110111
if errors.Is(err, context.DeadlineExceeded) {
111112
err = errWaitTimeout

0 commit comments

Comments
 (0)