Skip to content

Commit 7516484

Browse files
authored
Merge pull request #46 from ActiDoo/codex/prevent-reboot-when-node-is-unhealthy
Add cluster health coordination and enforce cluster policies before reboots
2 parents 442286f + 2dab587 commit 7516484

File tree

9 files changed

+1207
-40
lines changed

9 files changed

+1207
-40
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ service that favours safety, explicit configuration, and verifiable supply-chain
1616
timing/exit-code data for diagnostics.
1717
- **Health gating** – executes an operator-supplied script twice (pre- and post-lock) with rich environment variables for
1818
node identity, cluster policies, maintenance windows, and optional metrics endpoints.
19+
- **Cluster-wide health coordination** – persists unhealthy node markers in etcd so peers refuse to reboot while any script is
20+
reporting failure, keeps publishing each node's health even when no reboot is pending, applies configured cluster policy thresholds (minimum healthy counts, fractions, fallback protections) before allowing another reboot, and clears the block automatically once the node becomes healthy again.【F:pkg/clusterhealth/etcd.go†L18-L153】【F:pkg/orchestrator/runner.go†L321-L469】
1921
- **Distributed coordination** – etcd-backed mutex with annotated metadata (`node`, `pid`, `acquired_at`) so operators can
2022
inspect lock holders during incidents.
2123
- **Safeguards** – kill switch file, dry-run mode, deny/allow maintenance windows, a configurable cooldown between

cmd/clusterrebootd/main.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"syscall"
1818
"time"
1919

20+
"github.com/clusterrebootd/clusterrebootd/pkg/clusterhealth"
2021
"github.com/clusterrebootd/clusterrebootd/pkg/config"
2122
"github.com/clusterrebootd/clusterrebootd/pkg/cooldown"
2223
"github.com/clusterrebootd/clusterrebootd/pkg/detector"
@@ -141,7 +142,10 @@ func commandRunWithWriters(args []string, stdout, stderr io.Writer) int {
141142
}
142143
defer locker.Close()
143144

144-
var cooldownManager *cooldown.EtcdManager
145+
var (
146+
cooldownManager *cooldown.EtcdManager
147+
clusterHealthManager *clusterhealth.EtcdManager
148+
)
145149
if cfg.MinRebootIntervalSec > 0 {
146150
cdMgr, cdErr := cooldown.NewEtcdManager(cooldown.EtcdManagerOptions{
147151
Endpoints: cfg.EtcdEndpoints,
@@ -159,6 +163,21 @@ func commandRunWithWriters(args []string, stdout, stderr io.Writer) int {
159163
defer cooldownManager.Close()
160164
}
161165

166+
chMgr, chErr := clusterhealth.NewEtcdManager(clusterhealth.EtcdManagerOptions{
167+
Endpoints: cfg.EtcdEndpoints,
168+
DialTimeout: 5 * time.Second,
169+
Namespace: cfg.EtcdNamespace,
170+
Prefix: "cluster_health",
171+
TLS: tlsConfig,
172+
NodeName: cfg.NodeName,
173+
})
174+
if chErr != nil {
175+
fmt.Fprintf(stderr, "failed to initialise cluster health manager: %v\n", chErr)
176+
return exitRunError
177+
}
178+
clusterHealthManager = chMgr
179+
defer clusterHealthManager.Close()
180+
162181
jsonLogger := observability.NewJSONLogger(stderr)
163182
metricsCollector := observability.MetricsCollector(observability.NoopMetricsCollector{})
164183
var (
@@ -214,6 +233,7 @@ func commandRunWithWriters(args []string, stdout, stderr io.Writer) int {
214233
if cooldownManager != nil {
215234
runnerOptions = append(runnerOptions, orchestrator.WithCooldownManager(cooldownManager))
216235
}
236+
runnerOptions = append(runnerOptions, orchestrator.WithClusterHealthManager(clusterHealthManager))
217237
runner, err := orchestrator.NewRunner(cfg, engine, healthRunner, locker, runnerOptions...)
218238
if err != nil {
219239
fmt.Fprintf(stderr, "failed to initialise orchestrator: %v\n", err)
@@ -380,7 +400,10 @@ func commandStatusWithWriters(args []string, stdout, stderr io.Writer) int {
380400
}
381401
locker = etcdManager
382402
}
383-
var cooldownManager *cooldown.EtcdManager
403+
var (
404+
cooldownManager *cooldown.EtcdManager
405+
clusterHealthManager *clusterhealth.EtcdManager
406+
)
384407
if cfgCopy.MinRebootIntervalSec > 0 {
385408
cdMgr, cdErr := cooldown.NewEtcdManager(cooldown.EtcdManagerOptions{
386409
Endpoints: cfgCopy.EtcdEndpoints,
@@ -403,13 +426,32 @@ func commandStatusWithWriters(args []string, stdout, stderr io.Writer) int {
403426
defer cooldownManager.Close()
404427
}
405428

429+
if !*skipLock {
430+
chMgr, chErr := clusterhealth.NewEtcdManager(clusterhealth.EtcdManagerOptions{
431+
Endpoints: cfgCopy.EtcdEndpoints,
432+
DialTimeout: 5 * time.Second,
433+
Namespace: cfgCopy.EtcdNamespace,
434+
Prefix: "cluster_health",
435+
TLS: tlsConfig,
436+
NodeName: cfgCopy.NodeName,
437+
})
438+
if chErr != nil {
439+
fmt.Fprintf(stderr, "failed to initialise cluster health manager: %v\n", chErr)
440+
return exitRunError
441+
}
442+
clusterHealthManager = chMgr
443+
defer clusterHealthManager.Close()
444+
}
445+
406446
runnerOptions := []orchestrator.Option{orchestrator.WithMaxLockAttempts(1), orchestrator.WithCommandEnvironment(baseEnv)}
407447
if *skipLock {
408448
runnerOptions = append(runnerOptions, orchestrator.WithLockAcquisition(false, "lock acquisition skipped (--skip-lock)"))
409449
}
410450
if cooldownManager != nil {
411451
runnerOptions = append(runnerOptions, orchestrator.WithCooldownManager(cooldownManager))
412452
}
453+
runnerOptions = append(runnerOptions, orchestrator.WithClusterHealthManager(clusterHealthManager))
454+
runnerOptions = append(runnerOptions, orchestrator.WithClusterHealthReporting(false))
413455

414456
runner, err := orchestrator.NewRunner(&cfgCopy, engine, healthRunner, locker, runnerOptions...)
415457
if err != nil {
@@ -585,11 +627,19 @@ func writeHealthResult(w io.Writer, label string, res *health.Result) {
585627
}
586628
}
587629

630+
func healthLabel(phase, fallback string) string {
631+
phase = strings.TrimSpace(phase)
632+
if phase == "" {
633+
phase = fallback
634+
}
635+
return fmt.Sprintf("%s health", phase)
636+
}
637+
588638
func reportOutcome(stdout io.Writer, outcome orchestrator.Outcome) {
589639
fmt.Fprintln(stdout, "pre-lock detector evaluations:")
590640
writeDetectorResults(stdout, outcome.DetectorResults)
591641
if outcome.PreLockHealthResult != nil {
592-
writeHealthResult(stdout, "pre-lock health", outcome.PreLockHealthResult)
642+
writeHealthResult(stdout, healthLabel(outcome.PreLockHealthPhase, "pre-lock"), outcome.PreLockHealthResult)
593643
}
594644
if outcome.LockAcquired {
595645
fmt.Fprintln(stdout, "lock acquired")
@@ -599,7 +649,11 @@ func reportOutcome(stdout io.Writer, outcome orchestrator.Outcome) {
599649
writeDetectorResults(stdout, outcome.PostLockDetectorResults)
600650
}
601651
if outcome.PostLockHealthResult != nil {
602-
writeHealthResult(stdout, "post-lock health", outcome.PostLockHealthResult)
652+
writeHealthResult(stdout, healthLabel(outcome.PostLockHealthPhase, "post-lock"), outcome.PostLockHealthResult)
653+
}
654+
if len(outcome.ClusterUnhealthy) > 0 {
655+
fmt.Fprintln(stdout, "cluster health blockers:")
656+
writeClusterHealthRecords(stdout, outcome.ClusterUnhealthy)
603657
}
604658
fmt.Fprintf(stdout, "outcome: %s - %s\n", outcome.Status, outcome.Message)
605659
if len(outcome.Command) > 0 {
@@ -610,6 +664,26 @@ func reportOutcome(stdout io.Writer, outcome orchestrator.Outcome) {
610664
}
611665
}
612666

667+
func writeClusterHealthRecords(w io.Writer, records []clusterhealth.Record) {
668+
for _, rec := range records {
669+
details := []string{}
670+
if rec.Stage != "" {
671+
details = append(details, fmt.Sprintf("stage=%s", rec.Stage))
672+
}
673+
if rec.Reason != "" {
674+
details = append(details, fmt.Sprintf("reason=%s", rec.Reason))
675+
}
676+
if !rec.ReportedAt.IsZero() {
677+
details = append(details, fmt.Sprintf("reported_at=%s", rec.ReportedAt.UTC().Format(time.RFC3339Nano)))
678+
}
679+
if len(details) > 0 {
680+
fmt.Fprintf(w, " - %s (%s)\n", rec.Node, strings.Join(details, ", "))
681+
} else {
682+
fmt.Fprintf(w, " - %s\n", rec.Node)
683+
}
684+
}
685+
}
686+
613687
func exitCodeForOutcome(out orchestrator.Outcome) int {
614688
switch out.Status {
615689
case orchestrator.OutcomeKillSwitch:

docs/OPERATIONS.md

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,12 @@ it for your environment, and run the daemon with `clusterrebootd run
6262
the interval in etcd and refuses new reboot attempts until the window expires,
6363
preventing back-to-back maintenance events.【F:examples/config.yaml†L23-L30】【F:cmd/clusterrebootd/main.go†L233-L272】【F:pkg/orchestrator/runner.go†L311-L376】
6464
5. **Set cluster policies and maintenance windows**`cluster_policies`
65-
expresses minimum healthy nodes and fallback protections. Maintenance windows
66-
allow operators to block or explicitly permit reboots using cron-like day/time
67-
ranges; deny rules always win, while allow rules opt the coordinator into the
68-
listed windows.【F:examples/config.yaml†L85-L112】【F:pkg/windows/windows.go†L1-L123】
65+
expresses minimum healthy nodes and fallback protections, which the
66+
orchestrator enforces automatically by evaluating cluster-wide health records
67+
before each reboot attempt. Maintenance windows allow operators to block or
68+
explicitly permit reboots using cron-like day/time ranges; deny rules always
69+
win, while allow rules opt the coordinator into the listed
70+
windows.【F:examples/config.yaml†L85-L112】【F:pkg/windows/windows.go†L1-L123】【F:pkg/orchestrator/runner.go†L321-L469】
6971
6. **Wire observability and safety toggles** – Define `kill_switch_file` so a
7072
single touch blocks reboots, and enable the Prometheus listener via
7173
`metrics.enabled`/`metrics.listen` when metrics are required.【F:examples/config.yaml†L41-L47】【F:examples/config.yaml†L114-L118】【F:cmd/clusterrebootd/main.go†L193-L252】
@@ -98,6 +100,13 @@ Health scripts are the final safeguard before a reboot. Follow these practices:
98100
Diagnostics invoked with `status --skip-health` or `--skip-lock` set
99101
`RC_SKIP_HEALTH`/`RC_SKIP_LOCK` to `true`, allowing scripts to short-circuit
100102
optional checks when operators intentionally bypass them.【F:cmd/clusterrebootd/main.go†L298-L305】【F:pkg/orchestrator/runner.go†L485-L500】
103+
- **Expect global gating on failure** – The coordinator now stores an unhealthy
104+
marker in etcd whenever the script exits non-zero, runs the script even when
105+
no reboot is pending so the cluster view stays current, and evaluates the
106+
configured cluster policy thresholds before allowing another reboot. Peers
107+
block their own reboots until a later pass succeeds and clears the entry.
108+
Use the `status` command with health checks enabled to verify the marker
109+
clears after remediation.【F:pkg/clusterhealth/etcd.go†L18-L153】【F:pkg/orchestrator/runner.go†L321-L469】
101110
- **Return meaningful exit codes** – Exit `0` to allow the reboot, non-zero to
102111
block it. Write concise status details to stdout/stderr; they are captured in
103112
the JSON logs and CLI output for incident response.【F:cmd/clusterrebootd/main.go†L482-L517】

docs/STATE.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@
4242
- The health script base environment now includes cluster policy thresholds,
4343
fallback node lists, and configured maintenance windows so gating logic can
4444
enforce operator intent without re-reading the configuration file.
45+
- Cluster health coordination now records unhealthy nodes in etcd so any peer
46+
that detects a reboot requirement blocks until the failing node reports a
47+
healthy script outcome again, and the daemon runs the gate script even when no
48+
reboot is pending so the cluster view stays accurate while applying the
49+
configured cluster policy thresholds to prevent cascading outages when the
50+
cluster is already degraded.【F:pkg/clusterhealth/etcd.go†L18-L153】【F:pkg/orchestrator/runner.go†L321-L469】
4551
- Reboot command execution now expands the same environment placeholders (e.g.
4652
`RC_NODE_NAME`) so the logged and invoked command reflects the active node
4753
context without depending on shell-specific substitution.

0 commit comments

Comments
 (0)