Skip to content

Commit 3171999

Browse files
committed
Added cluster health check
1 parent 1c914ae commit 3171999

File tree

1 file changed

+72
-1
lines changed

1 file changed

+72
-1
lines changed

service/upgrade_manager.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,13 @@ func (m *upgradeManager) StartDatabaseUpgrade(ctx context.Context, force bool) e
256256
return nil
257257
}
258258

259+
// Check cluster health
260+
if mode.IsClusterMode() {
261+
if err := m.isClusterHealthy(ctx); err != nil {
262+
return maskAny(errors.Wrap(err, "Found unhealthy cluster"))
263+
}
264+
}
265+
259266
// Run upgrade with agency.
260267
// Create an agency lock, so we know we're the only one to create a plan.
261268
m.log.Debug().Msg("Creating agency API")
@@ -395,6 +402,17 @@ func (m *upgradeManager) RetryDatabaseUpgrade(ctx context.Context) error {
395402
return maskAny(client.NewBadRequestError("Retry needs an agency"))
396403
}
397404

405+
// Check cluster health
406+
if mode.IsClusterMode() {
407+
if err := m.isClusterHealthy(ctx); err != nil {
408+
return maskAny(errors.Wrap(err, "Found unhealthy cluster"))
409+
}
410+
}
411+
412+
// Note that in contrast to StartDatabaseUpgrade we do not use an agency lock
413+
// here. The reason for that is that we expect to have a plan and use
414+
// the revision condition to ensure a "safe" update.
415+
398416
// Retry upgrade with agency.
399417
plan, err := m.readUpgradePlan(ctx)
400418
if agency.IsKeyNotFound(err) {
@@ -777,7 +795,7 @@ func (m *upgradeManager) RunWatchUpgradePlan(ctx context.Context) {
777795
// processUpgradePlan inspects the first entry of the given plan and acts upon
778796
// it when needed.
779797
func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePlan) error {
780-
_, myPeer, _ := m.upgradeManagerContext.ClusterConfig()
798+
_, myPeer, mode := m.upgradeManagerContext.ClusterConfig()
781799
_, isRunning, _ := m.upgradeManagerContext.IsRunningMaster()
782800
if !isRunning {
783801
return maskAny(fmt.Errorf("Not in running phase"))
@@ -829,6 +847,13 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
829847
if err := m.waitUntil(ctx, m.isAgencyHealth, "Agency is not yet healthy: %v"); err != nil {
830848
return recordFailure(errors.Wrap(err, "Agency is not healthy in time"))
831849
}
850+
851+
// Wait until cluster healthy
852+
if mode.IsClusterMode() {
853+
if err := m.waitUntil(ctx, m.isClusterHealthy, "Cluster is not yet healthy: %v"); err != nil {
854+
return recordFailure(errors.Wrap(err, "Cluster is not healthy in time"))
855+
}
856+
}
832857
case UpgradeEntryTypeDBServer:
833858
// Restart the dbserver in auto-upgrade mode
834859
m.log.Info().Msg("Upgrading dbserver")
@@ -858,6 +883,12 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
858883
if err := m.waitUntil(ctx, m.areDBServersResponding, "DBServers are not yet all responding: %v"); err != nil {
859884
return recordFailure(errors.Wrap(err, "Not all DBServers are responding in time"))
860885
}
886+
887+
// Wait until cluster healthy
888+
if err := m.waitUntil(ctx, m.isClusterHealthy, "Cluster is not yet healthy: %v"); err != nil {
889+
return recordFailure(errors.Wrap(err, "Cluster is not healthy in time"))
890+
}
891+
861892
return nil
862893
}
863894
if err := upgrade(); err != nil {
@@ -881,6 +912,11 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
881912
if err := m.waitUntil(ctx, m.areCoordinatorsResponding, "Coordinator are not yet all responding: %v"); err != nil {
882913
return recordFailure(errors.Wrap(err, "Not all Coordinators are responding in time"))
883914
}
915+
916+
// Wait until cluster healthy
917+
if err := m.waitUntil(ctx, m.isClusterHealthy, "Cluster is not yet healthy: %v"); err != nil {
918+
return recordFailure(errors.Wrap(err, "Cluster is not healthy in time"))
919+
}
884920
case UpgradeEntryTypeSingle:
885921
// Restart the activefailover single server in auto-upgrade mode
886922
m.log.Info().Msg("Upgrading single server")
@@ -1099,6 +1135,41 @@ func (m *upgradeManager) isAgencyHealth(ctx context.Context) error {
10991135
return nil
11001136
}
11011137

1138+
// isClusterHealthy performs a check on the cluster health status.
1139+
// If any of the servers is reported as not GOOD, an error is returned.
1140+
func (m *upgradeManager) isClusterHealthy(ctx context.Context) error {
1141+
// Get cluster config
1142+
clusterConfig, _, _ := m.upgradeManagerContext.ClusterConfig()
1143+
// Build endpoint list
1144+
endpoints, err := clusterConfig.GetCoordinatorEndpoints()
1145+
if err != nil {
1146+
return maskAny(err)
1147+
}
1148+
// Build client
1149+
c, err := m.upgradeManagerContext.CreateClient(endpoints, ConnectionTypeDatabase)
1150+
if err != nil {
1151+
return maskAny(err)
1152+
}
1153+
// Check health
1154+
cl, err := c.Cluster(ctx)
1155+
if err != nil {
1156+
return maskAny(err)
1157+
}
1158+
h, err := cl.Health(ctx)
1159+
if err != nil {
1160+
return maskAny(err)
1161+
}
1162+
for id, sh := range h.Health {
1163+
if sh.Role == driver.ServerRoleAgent && sh.Status == "" {
1164+
continue
1165+
}
1166+
if sh.Status != driver.ServerStatusGood {
1167+
return maskAny(fmt.Errorf("Server '%s' has a '%s' status", id, sh.Status))
1168+
}
1169+
}
1170+
return nil
1171+
}
1172+
11021173
// areDBServersResponding performs a check if all dbservers are responding.
11031174
func (m *upgradeManager) areDBServersResponding(ctx context.Context) error {
11041175
// Get cluster config

0 commit comments

Comments
 (0)