Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions internal/controller/aggregates_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package controller

import (
"context"
"errors"
"fmt"
"slices"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
logger "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/gophercloud/gophercloud/v2"
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates"
Expand Down Expand Up @@ -73,30 +76,24 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)

aggs, err := aggregatesByName(ctx, ac.computeClient)
if err != nil {
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: err.Error(),
})
return ctrl.Result{}, ac.Status().Update(ctx, hv)
return ctrl.Result{}, ac.trackError(ctx, hv, "failed fetching aggregates", err)
}

toAdd := Difference(hv.Status.Aggregates, hv.Spec.Aggregates)
toRemove := Difference(hv.Spec.Aggregates, hv.Status.Aggregates)

// We need to add first the host to the aggregates, because if we first drop
// an aggregate with a filter criterion and then add a new one, we leave the host
// open for period of time. Still, this may fail due to a conflict of aggregates
// with different availability zones, so we collect all the errors and return them
// so it hopefully will converge eventually.
var errs []error
if len(toAdd) > 0 {
log.Info("Adding", "aggregates", toAdd)
for item := range slices.Values(toAdd) {
err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, "")
if err != nil {
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: err.Error(),
})
return ctrl.Result{}, ac.Status().Update(ctx, hv)
errs = append(errs, err)
}
}
}
Expand All @@ -106,17 +103,15 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)
for item := range slices.Values(toRemove) {
err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item)
if err != nil {
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: err.Error(),
})
return ctrl.Result{}, ac.Status().Update(ctx, hv)
errs = append(errs, err)
}
}
}

if errs != nil {
return ctrl.Result{}, ac.trackError(ctx, hv, "failed updating aggregates", errs...)
}

hv.Status.Aggregates = hv.Spec.Aggregates
meta.SetStatusCondition(&hv.Status.Conditions, metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Expand All @@ -127,6 +122,31 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, ac.Status().Update(ctx, hv)
}

func (ac *AggregatesController) trackError(ctx context.Context, hv *kvmv1.Hypervisor, msg string, errs ...error) error {
err := errors.Join(errs...)
if err == nil {
return nil
}

condition := metav1.Condition{
Type: ConditionTypeAggregatesUpdated,
Status: metav1.ConditionFalse,
Reason: ConditionAggregatesFailed,
Message: err.Error(),
}

if meta.SetStatusCondition(&hv.Status.Conditions, condition) {
if err2 := ac.Status().Update(ctx, hv); err2 != nil {
return errors.Join(err, err2)
}
logger.FromContext(ctx).
WithCallDepth(1). // Where did we call trackError() from?
Error(err, msg)
}

return err
}

// SetupWithManager sets up the controller with the Manager.
func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.Background()
Expand All @@ -140,7 +160,7 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error {

return ctrl.NewControllerManagedBy(mgr).
Named(AggregatesControllerName).
For(&kvmv1.Hypervisor{}).
For(&kvmv1.Hypervisor{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(ac)
}

Expand Down
Loading