Skip to content

Commit 5098404

Browse files
committed
PBM-397: update agent status right after startup
1 parent 953255a commit 5098404

File tree

2 files changed

+124
-66
lines changed

2 files changed

+124
-66
lines changed

cmd/pbm-agent/agent.go

Lines changed: 122 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,17 @@ func (a *Agent) HbStatus(ctx context.Context) {
282282
MongoVer: nodeVersion.VersionString,
283283
PerconaVer: nodeVersion.PSMDBVersion,
284284
}
285+
286+
updateAgentStat(ctx, a, l, true, &hb)
287+
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
288+
if err != nil {
289+
l.Error("set status: %v", err)
290+
}
291+
285292
defer func() {
286-
if err := topo.RemoveAgentStatus(ctx, a.leadConn, hb); err != nil {
293+
l.Debug("deleting agent status")
294+
err := topo.RemoveAgentStatus(context.Background(), a.leadConn, hb)
295+
if err != nil {
287296
logger := logger.NewEvent("agentCheckup", "", "", primitive.Timestamp{})
288297
logger.Error("remove agent heartbeat: %v", err)
289298
}
@@ -292,74 +301,128 @@ func (a *Agent) HbStatus(ctx context.Context) {
292301
tk := time.NewTicker(defs.AgentsStatCheckRange)
293302
defer tk.Stop()
294303

304+
storageCheckTime := time.Now()
305+
parallelAgentCheckTime := time.Now()
306+
295307
// check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647)
296-
const checkStoreIn = int(60 / (defs.AgentsStatCheckRange / time.Second))
297-
cc := 0
298-
for range tk.C {
299-
// don't check if on pause (e.g. physical restore)
300-
if !a.HbIsRun() {
301-
continue
302-
}
308+
const storageCheckInterval = 15 * time.Second
309+
const parallelAgentCheckInternval = 20 * time.Second
303310

304-
hb.PBMStatus = a.pbmStatus(ctx)
305-
logHbStatus("PBM connection", hb.PBMStatus, l)
311+
for {
312+
select {
313+
case <-ctx.Done():
314+
return
315+
case <-tk.C:
316+
// don't check if on pause (e.g. physical restore)
317+
if !a.HbIsRun() {
318+
continue
319+
}
306320

307-
hb.NodeStatus = a.nodeStatus(ctx)
308-
logHbStatus("node connection", hb.NodeStatus, l)
321+
now := time.Now()
322+
if now.Sub(parallelAgentCheckTime) >= parallelAgentCheckInternval {
323+
a.warnIfParallelAgentDetected(ctx, l, hb.Heartbeat)
324+
parallelAgentCheckTime = now
325+
}
309326

310-
cc++
311-
hb.StorageStatus = a.storStatus(ctx, l, cc == checkStoreIn)
312-
logHbStatus("storage connection", hb.StorageStatus, l)
313-
if cc == checkStoreIn {
314-
cc = 0
327+
if now.Sub(storageCheckTime) >= storageCheckInterval {
328+
updateAgentStat(ctx, a, l, true, &hb)
329+
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
330+
if err == nil {
331+
storageCheckTime = now
332+
}
333+
} else {
334+
updateAgentStat(ctx, a, l, false, &hb)
335+
err = topo.SetAgentStatus(ctx, a.leadConn, &hb)
336+
}
337+
if err != nil {
338+
l.Error("set status: %v", err)
339+
}
315340
}
341+
}
342+
}
316343

317-
hb.Err = ""
318-
hb.Hidden = false
319-
hb.Passive = false
344+
func updateAgentStat(
345+
ctx context.Context,
346+
agent *Agent,
347+
l log.LogEvent,
348+
checkStore bool,
349+
hb *topo.AgentStat,
350+
) {
351+
hb.PBMStatus = agent.pbmStatus(ctx)
352+
logHbStatus("PBM connection", hb.PBMStatus, l)
320353

321-
inf, err := topo.GetNodeInfo(ctx, a.nodeConn)
322-
if err != nil {
323-
l.Error("get NodeInfo: %v", err)
324-
hb.Err += fmt.Sprintf("get NodeInfo: %v", err)
354+
hb.NodeStatus = agent.nodeStatus(ctx)
355+
logHbStatus("node connection", hb.NodeStatus, l)
356+
357+
hb.StorageStatus = agent.storStatus(ctx, l, checkStore, hb)
358+
logHbStatus("storage connection", hb.StorageStatus, l)
359+
360+
hb.Err = ""
361+
hb.Hidden = false
362+
hb.Passive = false
363+
364+
inf, err := topo.GetNodeInfo(ctx, agent.nodeConn)
365+
if err != nil {
366+
l.Error("get NodeInfo: %v", err)
367+
hb.Err += fmt.Sprintf("get NodeInfo: %v", err)
368+
} else {
369+
hb.Hidden = inf.Hidden
370+
hb.Passive = inf.Passive
371+
hb.Arbiter = inf.ArbiterOnly
372+
if inf.SecondaryDelayOld != 0 {
373+
hb.DelaySecs = inf.SecondaryDelayOld
325374
} else {
326-
hb.Hidden = inf.Hidden
327-
hb.Passive = inf.Passive
328-
hb.Arbiter = inf.ArbiterOnly
329-
if inf.SecondaryDelayOld != 0 {
330-
hb.DelaySecs = inf.SecondaryDelayOld
331-
} else {
332-
hb.DelaySecs = inf.SecondaryDelaySecs
333-
}
375+
hb.DelaySecs = inf.SecondaryDelaySecs
334376
}
335377

336-
if inf != nil && inf.ArbiterOnly {
337-
hb.State = defs.NodeStateArbiter
338-
hb.StateStr = "ARBITER"
378+
hb.Heartbeat, err = topo.ClusterTimeFromNodeInfo(inf)
379+
if err != nil {
380+
hb.Err += fmt.Sprintf("get cluster time: %v", err)
381+
}
382+
}
383+
384+
if inf != nil && inf.ArbiterOnly {
385+
hb.State = defs.NodeStateArbiter
386+
hb.StateStr = "ARBITER"
387+
} else {
388+
n, err := topo.GetNodeStatus(ctx, agent.nodeConn, agent.brief.Me)
389+
if err != nil {
390+
l.Error("get replSetGetStatus: %v", err)
391+
hb.Err += fmt.Sprintf("get replSetGetStatus: %v", err)
392+
hb.State = defs.NodeStateUnknown
393+
hb.StateStr = "UNKNOWN"
339394
} else {
340-
n, err := topo.GetNodeStatus(ctx, a.nodeConn, a.brief.Me)
341-
if err != nil {
342-
l.Error("get replSetGetStatus: %v", err)
343-
hb.Err += fmt.Sprintf("get replSetGetStatus: %v", err)
344-
hb.State = defs.NodeStateUnknown
345-
hb.StateStr = "UNKNOWN"
346-
} else {
347-
hb.State = n.State
348-
hb.StateStr = n.StateStr
395+
hb.State = n.State
396+
hb.StateStr = n.StateStr
349397

350-
rLag, err := topo.ReplicationLag(ctx, a.nodeConn, a.brief.Me)
351-
if err != nil {
352-
l.Error("get replication lag: %v", err)
353-
hb.Err += fmt.Sprintf("get replication lag: %v", err)
354-
}
355-
hb.ReplicationLag = rLag
398+
rLag, err := topo.ReplicationLag(ctx, agent.nodeConn, agent.brief.Me)
399+
if err != nil {
400+
l.Error("get replication lag: %v", err)
401+
hb.Err += fmt.Sprintf("get replication lag: %v", err)
356402
}
403+
hb.ReplicationLag = rLag
357404
}
405+
}
406+
}
358407

359-
err = topo.SetAgentStatus(ctx, a.leadConn, hb)
360-
if err != nil {
361-
l.Error("set status: %v", err)
408+
func (a *Agent) warnIfParallelAgentDetected(
409+
ctx context.Context,
410+
l log.LogEvent,
411+
lastHeartbeat primitive.Timestamp,
412+
) {
413+
s, err := topo.GetAgentStatus(ctx, a.leadConn, a.brief.SetName, a.brief.Me)
414+
if err != nil {
415+
if errors.Is(err, mongo.ErrNoDocuments) {
416+
return
362417
}
418+
l.Error("detecting parallel agent: get status: %v", err)
419+
return
420+
}
421+
if !s.Heartbeat.Equal(lastHeartbeat) {
422+
l.Warning("detected possible parallel agent for the node: "+
423+
"expected last heartbeat to be %d.%d, actual is %d.%d",
424+
lastHeartbeat.T, lastHeartbeat.I, s.Heartbeat.T, s.Heartbeat.I)
425+
return
363426
}
364427
}
365428

@@ -381,13 +444,14 @@ func (a *Agent) nodeStatus(ctx context.Context) topo.SubsysStatus {
381444
return topo.SubsysStatus{OK: true}
382445
}
383446

384-
func (a *Agent) storStatus(ctx context.Context, log log.LogEvent, forceCheckStorage bool) topo.SubsysStatus {
447+
func (a *Agent) storStatus(
448+
ctx context.Context,
449+
log log.LogEvent,
450+
forceCheckStorage bool,
451+
stat *topo.AgentStat,
452+
) topo.SubsysStatus {
385453
// check storage once in a while if all is ok (see https://jira.percona.com/browse/PBM-647)
386454
// but if storage was(is) failed, check it always
387-
stat, err := topo.GetAgentStatus(ctx, a.leadConn, a.brief.SetName, a.brief.Me)
388-
if err != nil {
389-
log.Warning("get current storage status: %v", err)
390-
}
391455
if !forceCheckStorage && stat.StorageStatus.OK {
392456
return topo.SubsysStatus{OK: true}
393457
}

pbm/topo/agent.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,8 @@ func (s *AgentStat) MongoVersion() version.MongoVersion {
124124
return v
125125
}
126126

127-
func SetAgentStatus(ctx context.Context, m connect.Client, stat AgentStat) error {
128-
ct, err := GetClusterTime(ctx, m)
129-
if err != nil {
130-
return errors.Wrap(err, "get cluster time")
131-
}
132-
stat.Heartbeat = ct
133-
134-
_, err = m.AgentsStatusCollection().ReplaceOne(
127+
func SetAgentStatus(ctx context.Context, m connect.Client, stat *AgentStat) error {
128+
_, err := m.AgentsStatusCollection().ReplaceOne(
135129
ctx,
136130
bson.D{{"n", stat.Node}, {"rs", stat.RS}},
137131
stat,

0 commit comments

Comments
 (0)