Skip to content

Commit c0e6f3a

Browse files
authored
PBM-1057: Forbidden running Backup & PITR on non-suitable agents (#1001)
* Add ListStadyAgents for fetching agents which are ready for Backup and PITR * Use only steady agents for backup nomination * Extract and expand backup candidates rules * Add PITR candidates rules logic * Extract MaxReplicationLagTimeSec to defs package * Expand agent's stat with replication lag * Add replication lag guard for Backup and PITR candidates
1 parent a60faa8 commit c0e6f3a

File tree

6 files changed

+58
-17
lines changed

6 files changed

+58
-17
lines changed

cmd/pbm-agent/agent.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,13 @@ func (a *Agent) HbStatus(ctx context.Context) {
330330
} else {
331331
hb.State = n.State
332332
hb.StateStr = n.StateStr
333+
334+
rLag, err := topo.ReplicationLag(ctx, a.nodeConn, a.brief.Me)
335+
if err != nil {
336+
l.Error("get replication lag: %v", err)
337+
hb.Err += fmt.Sprintf("get replication lag: %v", err)
338+
}
339+
hb.ReplicationLag = rLag
333340
}
334341
}
335342

cmd/pbm-agent/backup.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,22 +165,15 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
165165
}
166166
}
167167

168-
agents, err := topo.ListAgentStatuses(ctx, a.leadConn)
168+
agents, err := topo.ListSteadyAgents(ctx, a.leadConn)
169169
if err != nil {
170170
l.Error("get agents list: %v", err)
171171
return
172172
}
173173

174-
validCandidates := make([]topo.AgentStat, 0, len(agents))
175-
for _, s := range agents {
176-
if version.FeatureSupport(s.MongoVersion()).BackupType(cmd.Type) != nil {
177-
continue
178-
}
179-
180-
validCandidates = append(validCandidates, s)
181-
}
174+
candidates := a.getValidCandidates(agents, cmd.Type)
182175

183-
nodes := prio.CalcNodesPriority(c, cfg.Backup.Priority, validCandidates)
176+
nodes := prio.CalcNodesPriority(c, cfg.Backup.Priority, candidates)
184177

185178
shards, err := topo.ClusterMembers(ctx, a.leadConn.MongoClient())
186179
if err != nil {
@@ -256,6 +249,19 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
256249
}
257250
}
258251

252+
// getValidCandidates filters out all agents that are not suitable for the backup.
253+
func (a *Agent) getValidCandidates(agents []topo.AgentStat, backupType defs.BackupType) []topo.AgentStat {
254+
validCandidates := []topo.AgentStat{}
255+
for _, agent := range agents {
256+
if version.FeatureSupport(agent.MongoVersion()).BackupType(backupType) != nil {
257+
continue
258+
}
259+
validCandidates = append(validCandidates, agent)
260+
}
261+
262+
return validCandidates
263+
}
264+
259265
const renominationFrame = 5 * time.Second
260266

261267
func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string) error {

cmd/pbm-agent/pitr.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -412,13 +412,13 @@ func (a *Agent) leadNomination(
412412
return
413413
}
414414

415-
agents, err := topo.ListAgentStatuses(ctx, a.leadConn)
415+
candidates, err := topo.ListSteadyAgents(ctx, a.leadConn)
416416
if err != nil {
417417
l.Error("get agents list: %v", err)
418418
return
419419
}
420420

421-
nodes := prio.CalcNodesPriority(nil, cfgPrio, agents)
421+
nodes := prio.CalcNodesPriority(nil, cfgPrio, candidates)
422422

423423
shards, err := topo.ClusterMembers(ctx, a.leadConn.MongoClient())
424424
if err != nil {
@@ -433,7 +433,7 @@ func (a *Agent) leadNomination(
433433
return
434434
}
435435

436-
err = a.reconcileReadyStatus(ctx, agents)
436+
err = a.reconcileReadyStatus(ctx, candidates)
437437
if err != nil {
438438
l.Error("reconciling ready status: %v", err)
439439
return
@@ -645,7 +645,7 @@ func (a *Agent) reconcileReadyStatus(ctx context.Context, agents []topo.AgentSta
645645
if err := oplog.SetClusterStatus(ctx, a.leadConn, oplog.StatusUnset); err != nil {
646646
l.Error("error while cleaning cluster status: %v", err)
647647
}
648-
return errors.New("timeout while roconciling ready status")
648+
return errors.New("timeout while reconciling ready status")
649649
}
650650
}
651651
}

pbm/defs/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ const (
136136

137137
const StaleFrameSec uint32 = 30
138138

139+
const MaxReplicationLagTimeSec = 21
140+
139141
const (
140142
// MetadataFileSuffix is a suffix for the metadata file on a storage
141143
MetadataFileSuffix = ".pbm.json"

pbm/topo/agent.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type AgentStat struct {
4545
// DelaySecs is the node configured replication delay (lag).
4646
DelaySecs int32 `bson:"delay"`
4747

48+
// Replication lag for mongod.
49+
ReplicationLag int `bson:"repl_lag"`
50+
4851
// AgentVer has the PBM Agent version (looks like `v2.3.4`)
4952
AgentVer string `bson:"v"`
5053

@@ -192,6 +195,31 @@ func ListAgentStatuses(ctx context.Context, m connect.Client) ([]AgentStat, erro
192195
return ListAgents(ctx, m)
193196
}
194197

198+
// ListSteadyAgents returns agents which are in steady state for backup or PITR.
199+
func ListSteadyAgents(ctx context.Context, m connect.Client) ([]AgentStat, error) {
200+
agents, err := ListAgentStatuses(ctx, m)
201+
if err != nil {
202+
return nil, errors.Wrap(err, "listing agents")
203+
}
204+
steadyAgents := []AgentStat{}
205+
for _, a := range agents {
206+
if a.State != defs.NodeStatePrimary &&
207+
a.State != defs.NodeStateSecondary {
208+
continue
209+
}
210+
if a.Arbiter || a.DelaySecs > 0 {
211+
continue
212+
}
213+
if a.ReplicationLag >= defs.MaxReplicationLagTimeSec {
214+
continue
215+
}
216+
217+
steadyAgents = append(steadyAgents, a)
218+
}
219+
220+
return steadyAgents, nil
221+
}
222+
195223
func ListAgents(ctx context.Context, m connect.Client) ([]AgentStat, error) {
196224
cur, err := m.AgentsStatusCollection().Find(ctx, bson.D{})
197225
if err != nil {

pbm/topo/topo.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ func collectTopoCheckErrors(
148148
return nil
149149
}
150150

151-
const maxReplicationLagTimeSec = 21
152-
153151
// NodeSuits checks if node can perform backup
154152
func NodeSuits(ctx context.Context, m *mongo.Client, inf *NodeInfo) (bool, error) {
155153
status, err := GetNodeStatus(ctx, m, inf.Me)
@@ -165,7 +163,7 @@ func NodeSuits(ctx context.Context, m *mongo.Client, inf *NodeInfo) (bool, error
165163
return false, errors.Wrap(err, "get node replication lag")
166164
}
167165

168-
return replLag < maxReplicationLagTimeSec && status.Health == defs.NodeHealthUp &&
166+
return replLag < defs.MaxReplicationLagTimeSec && status.Health == defs.NodeHealthUp &&
169167
(status.State == defs.NodeStatePrimary || status.State == defs.NodeStateSecondary),
170168
nil
171169
}

0 commit comments

Comments
 (0)