Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 3 additions & 82 deletions internal/controller/aggregates_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
logger "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/gophercloud/gophercloud/v2"
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates"

kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"github.com/cobaltcore-dev/openstack-hypervisor-operator/internal/openstack"
Expand Down Expand Up @@ -70,7 +69,7 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

aggs, err := aggregatesByName(ctx, ac.computeClient)
aggs, err := openstack.GetAggregatesByName(ctx, ac.computeClient)
if err != nil {
err = fmt.Errorf("failed listing aggregates: %w", err)
if err2 := ac.setErrorCondition(ctx, hv, err.Error()); err2 != nil {
Expand All @@ -91,7 +90,7 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)
if len(toAdd) > 0 {
log.Info("Adding", "aggregates", toAdd)
for item := range slices.Values(toAdd) {
if err = addToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil {
if err = openstack.AddToAggregate(ctx, ac.computeClient, aggs, hv.Name, item, ""); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -100,7 +99,7 @@ func (ac *AggregatesController) Reconcile(ctx context.Context, req ctrl.Request)
if len(toRemove) > 0 {
log.Info("Removing", "aggregates", toRemove)
for item := range slices.Values(toRemove) {
if err = removeFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil {
if err = openstack.RemoveFromAggregate(ctx, ac.computeClient, aggs, hv.Name, item); err != nil {
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -162,81 +161,3 @@ func (ac *AggregatesController) SetupWithManager(mgr ctrl.Manager) error {
For(&kvmv1.Hypervisor{}, builder.WithPredicates(utils.LifecycleEnabledPredicate)).
Complete(ac)
}

func aggregatesByName(ctx context.Context, serviceClient *gophercloud.ServiceClient) (map[string]*aggregates.Aggregate, error) {
pages, err := aggregates.List(serviceClient).AllPages(ctx)
if err != nil {
return nil, fmt.Errorf("cannot list aggregates due to %w", err)
}

aggs, err := aggregates.ExtractAggregates(pages)
if err != nil {
return nil, fmt.Errorf("cannot list aggregates due to %w", err)
}

aggregateMap := make(map[string]*aggregates.Aggregate, len(aggs))
for _, aggregate := range aggs {
aggregateMap[aggregate.Name] = &aggregate
}
return aggregateMap, nil
}

func addToAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name, zone string) (err error) {
aggregate, found := aggs[name]
log := logger.FromContext(ctx)
if !found {
aggregate, err = aggregates.Create(ctx, serviceClient,
aggregates.CreateOpts{
Name: name,
AvailabilityZone: zone,
}).Extract()
if err != nil {
return fmt.Errorf("failed to create aggregate %v due to %w", name, err)
}
aggs[name] = aggregate
}

if slices.Contains(aggregate.Hosts, host) {
log.Info("Found host in aggregate", "host", host, "name", name)
return nil
}

result, err := aggregates.AddHost(ctx, serviceClient, aggregate.ID, aggregates.AddHostOpts{Host: host}).Extract()
if err != nil {
return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err)
}
log.Info("Added host to aggregate", "host", host, "name", name)
aggs[name] = result

return nil
}

func removeFromAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name string) error {
aggregate, found := aggs[name]
log := logger.FromContext(ctx)
if !found {
log.Info("cannot find aggregate", "name", name)
return nil
}

found = false
for _, aggHost := range aggregate.Hosts {
if aggHost == host {
found = true
}
}

if !found {
log.Info("cannot find host in aggregate", "host", host, "name", name)
return nil
}

result, err := aggregates.RemoveHost(ctx, serviceClient, aggregate.ID, aggregates.RemoveHostOpts{Host: host}).Extract()
if err != nil {
return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err)
}
aggs[name] = result
log.Info("removed host from aggregate", "host", host, "name", name)

return nil
}
2 changes: 1 addition & 1 deletion internal/controller/decomission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (r *NodeDecommissionReconciler) Reconcile(ctx context.Context, req ctrl.Req

// Before removing the service, first take the node out of the aggregates,
// so when the node comes back, it doesn't up with the old associations
aggs, err := aggregatesByName(ctx, r.computeClient)
aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient)
if err != nil {
return r.setDecommissioningCondition(ctx, hv, fmt.Sprintf("cannot list aggregates due to %v", err))
}
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/onboarding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,16 @@ func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1.
return fmt.Errorf("cannot find availability-zone label %v on node", corev1.LabelTopologyZone)
}

aggs, err := aggregatesByName(ctx, r.computeClient)
aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient)
if err != nil {
return fmt.Errorf("cannot list aggregates %w", err)
}

if err = addToAggregate(ctx, r.computeClient, aggs, host, zone, zone); err != nil {
if err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, zone, zone); err != nil {
return fmt.Errorf("failed to agg to availability-zone aggregate %w", err)
}

err = addToAggregate(ctx, r.computeClient, aggs, host, testAggregateName, "")
err = openstack.AddToAggregate(ctx, r.computeClient, aggs, host, testAggregateName, "")
if err != nil {
return fmt.Errorf("failed to agg to test aggregate %w", err)
}
Expand All @@ -217,7 +217,7 @@ func (r *OnboardingController) initialOnboarding(ctx context.Context, hv *kvmv1.
continue
}
if slices.Contains(aggregate.Hosts, host) {
if err := removeFromAggregate(ctx, r.computeClient, aggs, host, aggregateName); err != nil {
if err := openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, aggregateName); err != nil {
errs = append(errs, err)
}
}
Expand Down Expand Up @@ -345,12 +345,12 @@ func (r *OnboardingController) completeOnboarding(ctx context.Context, host stri
return ctrl.Result{}, err
}

aggs, err := aggregatesByName(ctx, r.computeClient)
aggs, err := openstack.GetAggregatesByName(ctx, r.computeClient)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get aggregates %w", err)
}

err = removeFromAggregate(ctx, r.computeClient, aggs, host, testAggregateName)
err = openstack.RemoveFromAggregate(ctx, r.computeClient, aggs, host, testAggregateName)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to remove from test aggregate %w", err)
}
Expand Down
107 changes: 107 additions & 0 deletions internal/openstack/aggregates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
SPDX-FileCopyrightText: Copyright 2024 SAP SE or an SAP affiliate company and cobaltcore-dev contributors
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package openstack

import (
"context"
"fmt"
"slices"

logger "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/gophercloud/gophercloud/v2"
"github.com/gophercloud/gophercloud/v2/openstack/compute/v2/aggregates"
)

// GetAggregatesByName retrieves all aggregates from nova and returns them as a map keyed by name.
func GetAggregatesByName(ctx context.Context, serviceClient *gophercloud.ServiceClient) (map[string]*aggregates.Aggregate, error) {
pages, err := aggregates.List(serviceClient).AllPages(ctx)
if err != nil {
return nil, fmt.Errorf("cannot list aggregates due to %w", err)
}

aggs, err := aggregates.ExtractAggregates(pages)
if err != nil {
return nil, fmt.Errorf("cannot list aggregates due to %w", err)
}

aggregateMap := make(map[string]*aggregates.Aggregate, len(aggs))
for _, aggregate := range aggs {
aggregateMap[aggregate.Name] = &aggregate
}
return aggregateMap, nil
}

// AddToAggregate adds the given host to the named aggregate, creating the aggregate if it does not yet exist.
func AddToAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name, zone string) (err error) {
aggregate, found := aggs[name]
log := logger.FromContext(ctx)
if !found {
aggregate, err = aggregates.Create(ctx, serviceClient,
aggregates.CreateOpts{
Name: name,
AvailabilityZone: zone,
}).Extract()
if err != nil {
return fmt.Errorf("failed to create aggregate %v due to %w", name, err)
}
aggs[name] = aggregate
}

if slices.Contains(aggregate.Hosts, host) {
log.Info("Found host in aggregate", "host", host, "name", name)
return nil
}

result, err := aggregates.AddHost(ctx, serviceClient, aggregate.ID, aggregates.AddHostOpts{Host: host}).Extract()
if err != nil {
return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err)
}
log.Info("Added host to aggregate", "host", host, "name", name)
aggs[name] = result

return nil
}

// RemoveFromAggregate removes the given host from the named aggregate.
func RemoveFromAggregate(ctx context.Context, serviceClient *gophercloud.ServiceClient, aggs map[string]*aggregates.Aggregate, host, name string) error {
aggregate, found := aggs[name]
log := logger.FromContext(ctx)
if !found {
log.Info("cannot find aggregate", "name", name)
return nil
}

found = false
for _, aggHost := range aggregate.Hosts {
if aggHost == host {
found = true
}
}

if !found {
log.Info("cannot find host in aggregate", "host", host, "name", name)
return nil
}

result, err := aggregates.RemoveHost(ctx, serviceClient, aggregate.ID, aggregates.RemoveHostOpts{Host: host}).Extract()
if err != nil {
return fmt.Errorf("failed to add host %v to aggregate %v due to %w", host, name, err)
}
aggs[name] = result
log.Info("removed host from aggregate", "host", host, "name", name)

return nil
}