Skip to content

Commit 1cc5270

Browse files
committed
feat(controller): add lease expiration cleanup and cooldown enforcement
Add two optional enhancements to the Node Doctor Controller: 1. Lease Expiration Background Job: - Background goroutine runs every 30 seconds to clean expired leases - Marks stale leases as "expired" in-memory and syncs to database - Proper lifecycle management with start/stop in server lifecycle 2. Cooldown Period Enforcement: - Prevents rapid repeated remediations on the same node - Uses existing CooldownPeriod config (10 minutes default) - Returns HTTP 429 with retry time when cooldown is active - Added GetLastCompletedLease() to storage interface Also adds CompletedAt field to Lease struct and comprehensive tests for both features.
1 parent c1ab204 commit 1cc5270

File tree

5 files changed

+535
-1
lines changed

5 files changed

+535
-1
lines changed

pkg/controller/server.go

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ type Server struct {
3232
ready bool
3333
startTime time.Time
3434

35+
// Lease cleanup goroutine control
36+
leaseCleanupStopCh chan struct{}
37+
leaseCleanupWg sync.WaitGroup
38+
3539
// In-memory state (fallback when storage is nil)
3640
nodeReports map[string]*NodeReport // nodeName -> latest report
3741
leases map[string]*Lease // leaseID -> lease
@@ -119,6 +123,10 @@ func (s *Server) Start(ctx context.Context) error {
119123
}
120124
}
121125

126+
// Start lease cleanup background job
127+
s.leaseCleanupStopCh = make(chan struct{})
128+
s.startLeaseCleanup(ctx)
129+
122130
s.started = true
123131
s.ready = true
124132
s.startTime = time.Now()
@@ -138,7 +146,13 @@ func (s *Server) Stop(ctx context.Context) error {
138146

139147
log.Printf("[INFO] Stopping controller server...")
140148

141-
// Stop correlator first
149+
// Stop lease cleanup goroutine
150+
if s.leaseCleanupStopCh != nil {
151+
close(s.leaseCleanupStopCh)
152+
s.leaseCleanupWg.Wait()
153+
}
154+
155+
// Stop correlator
142156
if s.correlator != nil {
143157
if err := s.correlator.Stop(); err != nil {
144158
log.Printf("[WARN] Failed to stop correlator: %v", err)
@@ -638,6 +652,28 @@ func (s *Server) requestLease(w http.ResponseWriter, r *http.Request) {
638652
}
639653
}
640654

655+
// Check cooldown period - prevent rapid repeated remediations
656+
if s.config.Coordination.CooldownPeriod > 0 && s.storage != nil {
657+
lastLease, err := s.storage.GetLastCompletedLease(context.Background(), req.NodeName)
658+
if err != nil {
659+
log.Printf("[WARN] Failed to check cooldown for node %s: %v", req.NodeName, err)
660+
} else if lastLease != nil {
661+
cooldownEnds := lastLease.CompletedAt.Add(s.config.Coordination.CooldownPeriod)
662+
if time.Now().Before(cooldownEnds) {
663+
remaining := time.Until(cooldownEnds).Round(time.Second)
664+
s.metrics.RecordLeaseDenied("cooldown")
665+
response := LeaseResponse{
666+
Approved: false,
667+
Message: fmt.Sprintf("Cooldown active: wait %s before next remediation (last: %s)", remaining, lastLease.RemediationType),
668+
RetryAt: cooldownEnds,
669+
}
670+
s.writeSuccess(w, http.StatusTooManyRequests, response)
671+
log.Printf("[INFO] Denied lease for node %s: cooldown period active (ends %s)", req.NodeName, cooldownEnds.Format(time.RFC3339))
672+
return
673+
}
674+
}
675+
}
676+
641677
// Grant lease
642678
duration := s.config.Coordination.DefaultLeaseDuration
643679
if req.RequestedDuration != "" {
@@ -821,6 +857,69 @@ func (s *Server) IsReady() bool {
821857
return s.ready
822858
}
823859

860+
// =====================
861+
// Lease Cleanup
862+
// =====================
863+
864+
// startLeaseCleanup runs a background goroutine that periodically expires stale leases
865+
// from the in-memory map and syncs with the database.
866+
func (s *Server) startLeaseCleanup(ctx context.Context) {
867+
s.leaseCleanupWg.Add(1)
868+
go func() {
869+
defer s.leaseCleanupWg.Done()
870+
871+
ticker := time.NewTicker(30 * time.Second)
872+
defer ticker.Stop()
873+
874+
log.Printf("[INFO] Lease cleanup goroutine started (interval: 30s)")
875+
876+
for {
877+
select {
878+
case <-ctx.Done():
879+
log.Printf("[INFO] Lease cleanup goroutine stopped (context cancelled)")
880+
return
881+
case <-s.leaseCleanupStopCh:
882+
log.Printf("[INFO] Lease cleanup goroutine stopped")
883+
return
884+
case <-ticker.C:
885+
s.cleanupExpiredLeases(ctx)
886+
}
887+
}
888+
}()
889+
}
890+
891+
// cleanupExpiredLeases marks expired leases in the in-memory map and updates storage.
892+
func (s *Server) cleanupExpiredLeases(ctx context.Context) {
893+
s.mu.Lock()
894+
defer s.mu.Unlock()
895+
896+
now := time.Now()
897+
expiredCount := 0
898+
899+
for id, lease := range s.leases {
900+
if lease.Status == "active" && now.After(lease.ExpiresAt) {
901+
lease.Status = "expired"
902+
lease.CompletedAt = now
903+
s.leases[id] = lease
904+
expiredCount++
905+
906+
// Also update storage if available
907+
if s.storage != nil {
908+
if err := s.storage.UpdateLeaseStatus(ctx, id, "expired"); err != nil {
909+
log.Printf("[WARN] Failed to update expired lease %s in storage: %v", id, err)
910+
}
911+
}
912+
913+
log.Printf("[INFO] Lease %s auto-expired for node %s (was due: %s)",
914+
id, lease.NodeName, lease.ExpiresAt.Format(time.RFC3339))
915+
}
916+
}
917+
918+
if expiredCount > 0 {
919+
log.Printf("[INFO] Cleaned up %d expired leases from memory", expiredCount)
920+
}
921+
}
922+
824923
// SetReady sets the readiness state
825924
func (s *Server) SetReady(ready bool) {
826925
s.mu.Lock()

0 commit comments

Comments
 (0)