Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 44 additions & 58 deletions internal/exporter/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,44 +127,6 @@ func (e *ExporterHostSyncer) filterExporterInstances(hostName string, exporterIn
return exporterInstances
}

// handleBootcUpgrade handles bootc upgrade checking and execution
func (e *ExporterHostSyncer) handleBootcUpgrade(hostSsh ssh.HostManager) (bool, error) {
// Check if bootc upgrade service is already running
statusCmd, _ := hostSsh.RunHostCommand("systemctl is-active bootc-fetch-apply-updates.service bootc-fetch-apply-updates.timer")
if statusCmd != nil {
statuses := strings.Fields(statusCmd.Stdout)
if len(statuses) == 2 &&
(statuses[0] == "active" || statuses[0] == "activating" ||
statuses[1] == "active" || statuses[1] == "activating") {
fmt.Printf(" ⚠️ Bootc upgrade in progress, skipping exporter instances for this host\n")
return true, nil // skip = true
}
}

// Check booted image
bootcStdout, err := hostSsh.RunHostCommand("[ -f /run/ostree-booted ] && bootc upgrade --check")
if err == nil && bootcStdout != nil && bootcStdout.ExitCode == 0 && bootcStdout.Stdout != "" {
if strings.HasPrefix(bootcStdout.Stdout, "No changes") {
if e.dryRun {
fmt.Printf(" ✅ Bootc image is up to date\n")
}
} else if e.dryRun {
fmt.Printf(" 📄 Would upgrade bootc image\n")
} else {
// Trigger bootc upgrade timer now. Assuming it uses manual activation (e.g. OnActiveSec=0, RandomizedDelaySec=1h, RemainAfterElapse=false)
_, err := hostSsh.RunHostCommand("systemctl restart bootc-fetch-apply-updates.timer")
if err != nil {
return false, fmt.Errorf("error triggering bootc upgrade service: %w", err)
}
fmt.Printf(" ✅ Bootc upgrade started, skipping exporter instances for this host\n")
return true, nil // skip = true
}
} else {
fmt.Printf(" ℹ️ Not a bootc managed host\n")
}
return false, nil // skip = false
}

// processExporterInstance processes a single exporter instance
func (e *ExporterHostSyncer) processExporterInstance(exporterInstance *api.ExporterInstance, hostSsh ssh.HostManager) error {
if isDead, deadAnnotation := isExporterInstanceDead(exporterInstance); isDead {
Expand Down Expand Up @@ -222,8 +184,17 @@ func (e *ExporterHostSyncer) calculateBackoffDelay(attempts int) time.Duration {
return delay
}

// processExporterInstances processes exporter instances and adds failures to global retry queue
func (e *ExporterHostSyncer) processExporterInstances(exporterInstances []*api.ExporterInstance, hostSsh ssh.HostManager, hostName string, retryQueue *[]RetryItem) {
// addToRetryQueue increments attempts and adds a retry item to the next retry queue
func (e *ExporterHostSyncer) addToRetryQueue(retryItem *RetryItem, err error, nextRetryQueue *[]RetryItem) {
retryItem.Attempts++
retryItem.LastError = err
retryItem.LastAttemptTime = time.Now()
*nextRetryQueue = append(*nextRetryQueue, *retryItem)
}

// processExporterInstancesAndBootc processes exporter instances and adds failures to global retry queue
func (e *ExporterHostSyncer) processExporterInstancesAndBootc(exporterInstances []*api.ExporterInstance, hostSsh ssh.HostManager, hostName string, retryQueue *[]RetryItem) {

for _, exporterInstance := range exporterInstances {
if err := e.processExporterInstance(exporterInstance, hostSsh); err != nil {
fmt.Printf(" ❌ Failed to process %s: %v\n", exporterInstance.Name, err)
Expand All @@ -237,6 +208,19 @@ func (e *ExporterHostSyncer) processExporterInstances(exporterInstances []*api.E
})
}
}

if err := hostSsh.HandleBootcUpgrade(e.dryRun); err != nil {
// For other errors, just log them and continue
fmt.Printf(" ⚠️ Bootc upgrade error: %v\n", err)
*retryQueue = append(*retryQueue, RetryItem{
ExporterInstance: nil,
HostSSH: hostSsh,
HostName: hostName,
Attempts: 1,
LastError: err,
LastAttemptTime: time.Now(),
})
}
}

// processGlobalRetryQueue processes the global retry queue with exponential backoff
Expand Down Expand Up @@ -272,18 +256,26 @@ func (e *ExporterHostSyncer) processGlobalRetryQueue(retryQueue []RetryItem) err

// Second pass: retry items that are ready
for _, retryItem := range itemsToRetry {
fmt.Printf("🔄 Retrying %s on %s (attempt %d/%d)...\n",
retryItem.ExporterInstance.Name, retryItem.HostName, retryItem.Attempts+1, e.retryConfig.MaxAttempts)

if err := e.processExporterInstance(retryItem.ExporterInstance, retryItem.HostSSH); err != nil {
// Still failed, increment attempts and add to next retry queue
retryItem.Attempts++
retryItem.LastError = err
retryItem.LastAttemptTime = time.Now()
nextRetryQueue = append(nextRetryQueue, retryItem)
fmt.Printf("❌ Retry failed for %s on %s: %v\n", retryItem.ExporterInstance.Name, retryItem.HostName, err)
if retryItem.ExporterInstance == nil {
fmt.Printf("🔄 Retrying bootc upgrade on %s (attempt %d/%d)...\n",
retryItem.HostName, retryItem.Attempts+1, e.retryConfig.MaxAttempts)
if err := retryItem.HostSSH.HandleBootcUpgrade(e.dryRun); err != nil {
fmt.Printf("❌ Retry failed for bootc upgrade on %s: %v\n", retryItem.HostName, err)
e.addToRetryQueue(&retryItem, err, &nextRetryQueue)
} else {
fmt.Printf("✅ Retry succeeded for bootc upgrade on %s\n", retryItem.HostName)
}
} else {
fmt.Printf("✅ Retry succeeded for %s on %s\n", retryItem.ExporterInstance.Name, retryItem.HostName)
fmt.Printf("🔄 Retrying instance %s on %s (attempt %d/%d)...\n",
retryItem.ExporterInstance.Name, retryItem.HostName, retryItem.Attempts+1, e.retryConfig.MaxAttempts)

if err := e.processExporterInstance(retryItem.ExporterInstance, retryItem.HostSSH); err != nil {
// Still failed, increment attempts and add to next retry queue
fmt.Printf("❌ Retry failed for %s on %s: %v\n", retryItem.ExporterInstance.Name, retryItem.HostName, err)
e.addToRetryQueue(&retryItem, err, &nextRetryQueue)
} else {
fmt.Printf("✅ Retry succeeded for %s on %s\n", retryItem.ExporterInstance.Name, retryItem.HostName)
}
}
}

Expand Down Expand Up @@ -359,14 +351,8 @@ func (e *ExporterHostSyncer) SyncExporterHosts() error {
fmt.Printf(" ✅ Connection: %s\n", status)
}

if skip, err := e.handleBootcUpgrade(hostSsh); err != nil {
return err
} else if skip {
continue
}

// Process each exporter instance and add failures to global retry queue
e.processExporterInstances(exporterInstances, hostSsh, host.Name, &retryQueue)
e.processExporterInstancesAndBootc(exporterInstances, hostSsh, host.Name, &retryQueue)
}

// Second pass: retry all failed instances globally
Expand Down
77 changes: 77 additions & 0 deletions internal/exporter/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,24 @@ import (
"golang.org/x/crypto/ssh/agent"
)

// BootcStatus represents the status of bootc upgrade
type BootcStatus int

const (
BOOTC_UP_TO_DATE BootcStatus = iota
BOOTC_UPDATING
BOOTC_WILL_UPDATE
BOOTC_NOT_MANAGED
)

type HostManager interface {
Status() (string, error)
NeedsUpdate() (bool, error)
Diff() (string, error)
Apply(exporterConfig *v1alpha1.ExporterConfigTemplate, dryRun bool) error
RunHostCommand(command string) (*CommandResult, error)
GetBootcStatus() BootcStatus
HandleBootcUpgrade(dryRun bool) error
}

// CommandResult represents the result of running a command via SSH
Expand Down Expand Up @@ -202,6 +214,18 @@ func (m *SSHHostManager) Apply(exporterConfig *v1alpha1.ExporterConfigTemplate,
return fmt.Errorf("failed to reconcile exporter config file: %w", err)
}

if m.GetBootcStatus() == BOOTC_UPDATING {
if dryRun {
fmt.Printf(" 📄 Bootc upgrade in progress, would skip exporter service restarts/container updates\n")
} else {
fmt.Printf(" ⚠️ Bootc upgrade in progress, skipping exporter service restarts/container updates\n")
return nil
}
}

// Only if bootc is not updating, we restart/start services and pull containers
// otherwise it's too much pressure on the system

if changedExporterConfig || changedContainer || changedService {
if !dryRun {
// Apply the changes: reload systemd, enable service and restart the exporter
Expand Down Expand Up @@ -386,6 +410,59 @@ func (m *SSHHostManager) reconcileFile(path string, content string, dryRun bool)
return true, nil
}

// GetBootcStatus checks the bootc status and returns the appropriate BootcStatus enum
func (m *SSHHostManager) GetBootcStatus() BootcStatus {
// Check if bootc upgrade service is already running
statusCmd, _ := m.RunHostCommand("systemctl is-active bootc-fetch-apply-updates.service bootc-fetch-apply-updates.timer")
if statusCmd != nil {
statuses := strings.Fields(statusCmd.Stdout)
if len(statuses) == 2 &&
(statuses[0] == "active" || statuses[0] == "activating" ||
statuses[1] == "active" || statuses[1] == "activating") {
return BOOTC_UPDATING
}
}

// Check booted image
bootcStdout, err := m.RunHostCommand("[ -f /run/ostree-booted ] && bootc upgrade --check")
if err == nil && bootcStdout != nil && bootcStdout.ExitCode == 0 && bootcStdout.Stdout != "" {
if strings.HasPrefix(bootcStdout.Stdout, "No changes") {
return BOOTC_UP_TO_DATE
} else {
return BOOTC_WILL_UPDATE
}
}

return BOOTC_NOT_MANAGED
}

// HandleBootcUpgrade handles bootc upgrade checking and execution
func (m *SSHHostManager) HandleBootcUpgrade(dryRun bool) error {
status := m.GetBootcStatus()

switch status {
case BOOTC_UPDATING:
fmt.Printf(" ⚠️ Bootc upgrade in progress\n")
case BOOTC_UP_TO_DATE:
fmt.Printf(" ✅ Bootc image is up to date\n")
case BOOTC_WILL_UPDATE:
if dryRun {
fmt.Printf(" 📄 Would upgrade bootc image\n")
} else {
// Trigger bootc upgrade timer now. Assuming it uses manual activation (e.g. OnActiveSec=0, RandomizedDelaySec=1h, RemainAfterElapse=false)
_, err := m.RunHostCommand("systemctl restart bootc-fetch-apply-updates.timer")
if err != nil {
return fmt.Errorf("error triggering bootc upgrade service: %w", err)
}
fmt.Printf(" ✅ Bootc upgrade started\n")
return nil
}
case BOOTC_NOT_MANAGED:
fmt.Printf(" ℹ️ Not a bootc managed host\n")
}
return nil
}

func (m *SSHHostManager) createSshClient() (*ssh.Client, error) {

port := 22
Expand Down