Skip to content

Commit f076283

Browse files
committed
[PBM-780] PBM SDK (inc impl) (#882)
1 parent 53d08f2 commit f076283

File tree

116 files changed

+3995
-3546
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+3995
-3546
lines changed

cmd/pbm-agent/agent.go

Lines changed: 142 additions & 206 deletions
Large diffs are not rendered by default.

cmd/pbm-agent/backup.go

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
11
package main
22

33
import (
4+
"context"
45
"time"
56

7+
"github.com/percona/percona-backup-mongodb/internal/backup"
68
"github.com/percona/percona-backup-mongodb/internal/config"
7-
"github.com/percona/percona-backup-mongodb/internal/context"
9+
"github.com/percona/percona-backup-mongodb/internal/ctrl"
810
"github.com/percona/percona-backup-mongodb/internal/defs"
911
"github.com/percona/percona-backup-mongodb/internal/errors"
1012
"github.com/percona/percona-backup-mongodb/internal/lock"
1113
"github.com/percona/percona-backup-mongodb/internal/log"
12-
"github.com/percona/percona-backup-mongodb/internal/priority"
13-
"github.com/percona/percona-backup-mongodb/internal/query"
1414
"github.com/percona/percona-backup-mongodb/internal/storage"
1515
"github.com/percona/percona-backup-mongodb/internal/topo"
16-
"github.com/percona/percona-backup-mongodb/internal/types"
1716
"github.com/percona/percona-backup-mongodb/internal/version"
18-
"github.com/percona/percona-backup-mongodb/pbm/backup"
1917
)
2018

2119
type currentBackup struct {
22-
header *types.BackupCmd
20+
header *ctrl.BackupCmd
2321
cancel context.CancelFunc
2422
}
2523

@@ -52,17 +50,19 @@ func (a *Agent) CancelBackup() {
5250
}
5351

5452
// Backup starts backup
55-
func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPID, ep config.Epoch) {
53+
func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, ep config.Epoch) {
54+
logger := log.FromContext(ctx)
55+
5656
if cmd == nil {
57-
l := a.log.NewEvent(string(defs.CmdBackup), "", opid.String(), ep.TS())
57+
l := logger.NewEvent(string(ctrl.CmdBackup), "", opid.String(), ep.TS())
5858
l.Error("missed command")
5959
return
6060
}
6161

62-
l := a.log.NewEvent(string(defs.CmdBackup), cmd.Name, opid.String(), ep.TS())
63-
ctx = log.SetLoggerToContext(ctx, a.log)
62+
l := logger.NewEvent(string(ctrl.CmdBackup), cmd.Name, opid.String(), ep.TS())
63+
ctx = log.SetLoggerToContext(ctx, logger)
6464

65-
nodeInfo, err := topo.GetNodeInfoExt(ctx, a.node.Session())
65+
nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
6666
if err != nil {
6767
l.Error("get node info: %v", err)
6868
return
@@ -75,7 +75,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
7575
}
7676

7777
isClusterLeader := nodeInfo.IsClusterLeader()
78-
canRunBackup, err := topo.NodeSuitsExt(ctx, a.node.Session(), nodeInfo, cmd.Type)
78+
canRunBackup, err := topo.NodeSuitsExt(ctx, a.nodeConn, nodeInfo, cmd.Type)
7979
if err != nil {
8080
l.Error("node check: %v", err)
8181
if !isClusterLeader {
@@ -97,18 +97,18 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
9797
var bcp *backup.Backup
9898
switch cmd.Type {
9999
case defs.PhysicalBackup:
100-
bcp = backup.NewPhysical(a.pbm, a.node)
100+
bcp = backup.NewPhysical(a.leadConn, a.nodeConn, a.brief)
101101
case defs.ExternalBackup:
102-
bcp = backup.NewExternal(a.pbm, a.node)
102+
bcp = backup.NewExternal(a.leadConn, a.nodeConn, a.brief)
103103
case defs.IncrementalBackup:
104-
bcp = backup.NewIncremental(a.pbm, a.node, cmd.IncrBase)
104+
bcp = backup.NewIncremental(a.leadConn, a.nodeConn, a.brief, cmd.IncrBase)
105105
case defs.LogicalBackup:
106106
fallthrough
107107
default:
108-
bcp = backup.New(a.pbm, a.node)
108+
bcp = backup.New(a.leadConn, a.nodeConn, a.brief, a.dumpConns)
109109
}
110110

111-
cfg, err := config.GetConfig(ctx, a.pbm.Conn)
111+
cfg, err := config.GetConfig(ctx, a.leadConn)
112112
if err != nil {
113113
l.Error("unable to get PBM config settings: " + err.Error())
114114
return
@@ -122,7 +122,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
122122
if isClusterLeader {
123123
balancer := topo.BalancerModeOff
124124
if nodeInfo.IsSharded() {
125-
bs, err := topo.GetBalancerStatus(ctx, a.pbm.Conn)
125+
bs, err := topo.GetBalancerStatus(ctx, a.leadConn)
126126
if err != nil {
127127
l.Error("get balancer status: %v", err)
128128
return
@@ -138,8 +138,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
138138
}
139139
l.Debug("init backup meta")
140140

141-
if err = topo.CheckTopoForBackup(ctx, a.pbm.Conn, cmd.Type); err != nil {
142-
ferr := query.ChangeBackupState(a.pbm.Conn, cmd.Name, defs.StatusError, err.Error())
141+
if err = topo.CheckTopoForBackup(ctx, a.leadConn, cmd.Type); err != nil {
142+
ferr := backup.ChangeBackupState(a.leadConn, cmd.Name, defs.StatusError, err.Error())
143143
l.Info("mark backup as %s `%v`: %v", defs.StatusError, err, ferr)
144144
return
145145
}
@@ -150,7 +150,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
150150
const srcHostMultiplier = 3.0
151151
var c map[string]float64
152152
if cmd.Type == defs.IncrementalBackup && !cmd.IncrBase {
153-
src, err := query.LastIncrementalBackup(ctx, a.pbm.Conn)
153+
src, err := backup.LastIncrementalBackup(ctx, a.leadConn)
154154
if err != nil {
155155
// try backup anyway
156156
l.Warning("define source backup: %v", err)
@@ -162,7 +162,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
162162
}
163163
}
164164

165-
agents, err := topo.ListAgentStatuses(ctx, a.pbm.Conn)
165+
agents, err := topo.ListAgentStatuses(ctx, a.leadConn)
166166
if err != nil {
167167
l.Error("get agents list: %v", err)
168168
return
@@ -177,12 +177,12 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
177177
validCandidates = append(validCandidates, s)
178178
}
179179

180-
nodes, err := priority.BcpNodesPriority(ctx, a.pbm.Conn, c, validCandidates)
180+
nodes, err := BcpNodesPriority(ctx, a.leadConn, c, validCandidates)
181181
if err != nil {
182182
l.Error("get nodes priority: %v", err)
183183
return
184184
}
185-
shards, err := topo.ClusterMembers(ctx, a.pbm.Conn.MongoClient())
185+
shards, err := topo.ClusterMembers(ctx, a.leadConn.MongoClient())
186186
if err != nil {
187187
l.Error("get cluster members: %v", err)
188188
return
@@ -208,8 +208,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
208208
}
209209

210210
epoch := ep.TS()
211-
lck := lock.NewLock(a.pbm.Conn, lock.LockHeader{
212-
Type: defs.CmdBackup,
211+
lck := lock.NewLock(a.leadConn, lock.LockHeader{
212+
Type: ctrl.CmdBackup,
213213
Replset: nodeInfo.SetName,
214214
Node: nodeInfo.Me,
215215
OPID: opid.String(),
@@ -219,8 +219,8 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
219219
// install a backup lock despite having PITR one
220220
got, err := a.acquireLock(ctx, lck, l, func(ctx context.Context) (bool, error) {
221221
return lck.Rewrite(ctx, &lock.LockHeader{
222-
Replset: a.node.RS(),
223-
Type: defs.CmdPITR,
222+
Replset: a.brief.SetName,
223+
Type: ctrl.CmdPITR,
224224
})
225225
})
226226
if err != nil {
@@ -232,7 +232,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
232232
return
233233
}
234234

235-
err = query.SetRSNomineeACK(ctx, a.pbm.Conn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
235+
err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
236236
if err != nil {
237237
l.Warning("set nominee ack: %v", err)
238238
}
@@ -264,15 +264,15 @@ func (a *Agent) Backup(ctx context.Context, cmd *types.BackupCmd, opid types.OPI
264264

265265
const renominationFrame = 5 * time.Second
266266

267-
func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string, l *log.Event) error {
267+
func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string, l log.LogEvent) error {
268268
l.Debug("nomination list for %s: %v", rs, nodes)
269-
err := query.SetRSNomination(ctx, a.pbm.Conn, bcp, rs)
269+
err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs)
270270
if err != nil {
271271
return errors.Wrap(err, "set nomination meta")
272272
}
273273

274274
for _, n := range nodes {
275-
nms, err := query.GetRSNominees(ctx, a.pbm.Conn, bcp, rs)
275+
nms, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
276276
if err != nil && !errors.Is(err, errors.ErrNotFound) {
277277
return errors.Wrap(err, "get nomination meta")
278278
}
@@ -281,13 +281,13 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
281281
return nil
282282
}
283283

284-
err = query.SetRSNominees(ctx, a.pbm.Conn, bcp, rs, n)
284+
err = backup.SetRSNominees(ctx, a.leadConn, bcp, rs, n)
285285
if err != nil {
286286
return errors.Wrap(err, "set nominees")
287287
}
288288
l.Debug("nomination %s, set candidates %v", rs, n)
289289

290-
err = query.BackupHB(ctx, a.pbm.Conn, bcp)
290+
err = backup.BackupHB(ctx, a.leadConn, bcp)
291291
if err != nil {
292292
l.Warning("send heartbeat: %v", err)
293293
}
@@ -298,7 +298,7 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
298298
return nil
299299
}
300300

301-
func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l *log.Event) (bool, error) {
301+
func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l log.LogEvent) (bool, error) {
302302
tk := time.NewTicker(time.Millisecond * 500)
303303
defer tk.Stop()
304304

@@ -308,7 +308,7 @@ func (a *Agent) waitNomination(ctx context.Context, bcp, rs, node string, l *log
308308
for {
309309
select {
310310
case <-tk.C:
311-
nm, err := query.GetRSNominees(ctx, a.pbm.Conn, bcp, rs)
311+
nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, rs)
312312
if err != nil {
313313
if errors.Is(err, errors.ErrNotFound) {
314314
continue

cmd/pbm-agent/lock.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
"go.mongodb.org/mongo-driver/bson/primitive"
7+
8+
"github.com/percona/percona-backup-mongodb/internal/backup"
9+
"github.com/percona/percona-backup-mongodb/internal/ctrl"
10+
"github.com/percona/percona-backup-mongodb/internal/defs"
11+
"github.com/percona/percona-backup-mongodb/internal/errors"
12+
"github.com/percona/percona-backup-mongodb/internal/lock"
13+
"github.com/percona/percona-backup-mongodb/internal/log"
14+
"github.com/percona/percona-backup-mongodb/internal/restore"
15+
)
16+
17+
func markBcpStale(ctx context.Context, l *lock.Lock, opid string) error {
18+
bcp, err := backup.GetBackupByOPID(ctx, l.Connect(), opid)
19+
if err != nil {
20+
return errors.Wrap(err, "get backup meta")
21+
}
22+
23+
// not to rewrite an error emitted by the agent
24+
if bcp.Status == defs.StatusError || bcp.Status == defs.StatusDone {
25+
return nil
26+
}
27+
28+
if logger := log.FromContext(ctx); logger != nil {
29+
logger.Debug(string(ctrl.CmdBackup), "", opid, primitive.Timestamp{}, "mark stale meta")
30+
}
31+
return backup.ChangeBackupStateOPID(l.Connect(), opid, defs.StatusError,
32+
"some of pbm-agents were lost during the backup")
33+
}
34+
35+
func markRestoreStale(ctx context.Context, l *lock.Lock, opid string) error {
36+
r, err := restore.GetRestoreMetaByOPID(ctx, l.Connect(), opid)
37+
if err != nil {
38+
return errors.Wrap(err, "get retore meta")
39+
}
40+
41+
// not to rewrite an error emitted by the agent
42+
if r.Status == defs.StatusError || r.Status == defs.StatusDone {
43+
return nil
44+
}
45+
46+
if logger := log.FromContext(ctx); logger != nil {
47+
logger.Debug(string(ctrl.CmdRestore), "", opid, primitive.Timestamp{}, "mark stale meta")
48+
}
49+
return restore.ChangeRestoreStateOPID(ctx, l.Connect(), opid, defs.StatusError,
50+
"some of pbm-agents were lost during the restore")
51+
}

cmd/pbm-agent/main.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
5-
"log"
6+
stdlog "log"
67
"os"
78
"runtime"
89
"strconv"
910
"strings"
1011

1112
"github.com/alecthomas/kingpin"
12-
mlog "github.com/mongodb/mongo-tools/common/log"
13+
mtLog "github.com/mongodb/mongo-tools/common/log"
1314
"github.com/mongodb/mongo-tools/common/options"
1415

15-
"github.com/percona/percona-backup-mongodb/internal/context"
16+
"github.com/percona/percona-backup-mongodb/internal/connect"
1617
"github.com/percona/percona-backup-mongodb/internal/errors"
17-
plog "github.com/percona/percona-backup-mongodb/internal/log"
18+
"github.com/percona/percona-backup-mongodb/internal/log"
1819
"github.com/percona/percona-backup-mongodb/internal/version"
19-
"github.com/percona/percona-backup-mongodb/pbm"
2020
)
2121

2222
const mongoConnFlag = "mongodb-uri"
@@ -51,7 +51,7 @@ func main() {
5151

5252
cmd, err := pbmCmd.DefaultEnvars().Parse(os.Args[1:])
5353
if err != nil && cmd != versionCmd.FullCommand() {
54-
log.Println("Error: Parse command line parameters:", err)
54+
stdlog.Println("Error: Parse command line parameters:", err)
5555
return
5656
}
5757

@@ -73,38 +73,45 @@ func main() {
7373
hidecreds()
7474

7575
err = runAgent(url, *dumpConns)
76-
log.Println("Exit:", err)
76+
stdlog.Println("Exit:", err)
7777
if err != nil {
7878
os.Exit(1)
7979
}
8080
}
8181

8282
func runAgent(mongoURI string, dumpConns int) error {
83-
mlog.SetDateFormat(plog.LogTimeFormat)
84-
mlog.SetVerbosity(&options.Verbosity{VLevel: mlog.DebugLow})
83+
mtLog.SetDateFormat(log.LogTimeFormat)
84+
mtLog.SetVerbosity(&options.Verbosity{VLevel: mtLog.DebugLow})
8585

8686
ctx, cancel := context.WithCancel(context.Background())
8787
defer cancel()
8888

89-
pbmClient, err := pbm.New(ctx, mongoURI, "pbm-agent")
89+
leadConn, err := connect.Connect(ctx, mongoURI, &connect.ConnectOptions{AppName: "pbm-agent"})
9090
if err != nil {
9191
return errors.Wrap(err, "connect to PBM")
9292
}
9393

94-
agnt := newAgent(pbmClient)
95-
defer agnt.Close()
96-
err = agnt.AddNode(ctx, mongoURI, dumpConns)
94+
agent, err := newAgent(ctx, leadConn, mongoURI, dumpConns)
9795
if err != nil {
9896
return errors.Wrap(err, "connect to the node")
9997
}
100-
agnt.InitLogger()
10198

102-
if err := agnt.CanStart(ctx); err != nil {
99+
logger := log.New(agent.leadConn.LogCollection(), agent.brief.SetName, agent.brief.Me)
100+
defer logger.Close()
101+
102+
ctx = log.SetLoggerToContext(ctx, logger)
103+
104+
if err := agent.CanStart(ctx); err != nil {
103105
return errors.Wrap(err, "pre-start check")
104106
}
105107

106-
go agnt.PITR(ctx)
107-
go agnt.HbStatus(ctx)
108+
err = setupNewDB(ctx, agent.leadConn)
109+
if err != nil {
110+
return errors.Wrap(err, "setup pbm collections")
111+
}
112+
113+
go agent.PITR(ctx)
114+
go agent.HbStatus(ctx)
108115

109-
return errors.Wrap(agnt.Start(ctx), "listen the commands stream")
116+
return errors.Wrap(agent.Start(ctx), "listen the commands stream")
110117
}

0 commit comments

Comments
 (0)