Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions internal/controller/hypervisor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ import (
"time"

kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
golibvirt "github.com/digitalocean/go-libvirt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
logger "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/cobaltcore-dev/kvm-node-agent/internal/certificates"
"github.com/cobaltcore-dev/kvm-node-agent/internal/evacuation"
Expand All @@ -48,6 +52,9 @@ type HypervisorReconciler struct {

osDescriptor *systemd.Descriptor
evacuateOnReboot bool

// Channel that can be used to trigger reconcile events.
reconcileCh chan event.GenericEvent
}

const (
Expand Down Expand Up @@ -287,6 +294,63 @@ func (r *HypervisorReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}

// Trigger a reconcile event for the managed hypervisor through the
// event channel which is watched by the controller manager.
func (r *HypervisorReconciler) triggerReconcile() {
r.reconcileCh <- event.GenericEvent{
Object: &kvmv1.Hypervisor{
TypeMeta: metav1.TypeMeta{
Kind: "Hypervisor",
APIVersion: "kvm.cloud.sap/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: sys.Hostname,
Namespace: sys.Namespace,
},
},
}
}

// Start is called when the manager starts. It starts the libvirt
// event subscription to receive events when the hypervisor needs to be
// reconciled.
func (r *HypervisorReconciler) Start(ctx context.Context) error {
log := logger.FromContext(ctx, "controller", "hypervisor")
log.Info("starting libvirt event subscription")

// Ensure we're connected to libvirt.
if err := r.Libvirt.Connect(); err != nil {
log.Error(err, "unable to connect to libvirt")
return err
}

// Run a ticker which reconciles the hypervisor resource every minute.
// This ensures that we periodically reconcile the hypervisor even
// if no events are received from libvirt.
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.triggerReconcile()
case <-ctx.Done():
return
}
}
}()

// Domain lifecycle events impact the list of active/inactive domains,
// as well as the allocation of resources on the hypervisor.
r.Libvirt.WatchDomainChanges(
golibvirt.DomainEventIDLifecycle,
"reconcile-on-domain-lifecycle",
func(_ context.Context, _ any) { r.triggerReconcile() },
)

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *HypervisorReconciler) SetupWithManager(mgr ctrl.Manager) error {
ctx := context.Background()
Expand All @@ -296,7 +360,16 @@ func (r *HypervisorReconciler) SetupWithManager(mgr ctrl.Manager) error {
return fmt.Errorf("unable to get Systemd hostname describe(): %w", err)
}

// Prepare an event channel that will trigger a reconcile event.
r.reconcileCh = make(chan event.GenericEvent)
src := source.Channel(r.reconcileCh, &handler.EnqueueRequestForObject{})
// Run the Start(ctx context.Context) method when the manager starts.
if err := mgr.Add(r); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&kvmv1.Hypervisor{}).
WatchesRawSource(src).
Complete(r)
}
157 changes: 155 additions & 2 deletions internal/controller/hypervisor_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ package controller

import (
"context"
"errors"
"time"

kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"github.com/coreos/go-systemd/v22/dbus"
golibvirt "github.com/digitalocean/go-libvirt"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/cobaltcore-dev/kvm-node-agent/internal/libvirt"
Expand All @@ -36,6 +41,154 @@ import (
)

var _ = Describe("Hypervisor Controller", func() {
Context("When testing Start method", func() {
It("should successfully start and subscribe to libvirt events", func() {
ctx := context.Background()
eventCallbackCalled := false

controllerReconciler := &HypervisorReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Libvirt: &libvirt.InterfaceMock{
ConnectFunc: func() error {
return nil
},
WatchDomainChangesFunc: func(eventId golibvirt.DomainEventID, handlerId string, handler func(context.Context, any)) {
eventCallbackCalled = true
Expect(handlerId).To(Equal("reconcile-on-domain-lifecycle"))
},
},
reconcileCh: make(chan event.GenericEvent, 1),
}

err := controllerReconciler.Start(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(eventCallbackCalled).To(BeTrue())
})

It("should fail when libvirt connection fails", func() {
ctx := context.Background()

controllerReconciler := &HypervisorReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Libvirt: &libvirt.InterfaceMock{
ConnectFunc: func() error {
return errors.New("connection failed")
},
},
reconcileCh: make(chan event.GenericEvent, 1),
}

err := controllerReconciler.Start(ctx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("connection failed"))
})
})

Context("When testing triggerReconcile method", func() {
It("should send an event to reconcile channel", func() {
const testHostname = "test-host"
const testNamespace = "test-namespace"

// Override hostname and namespace for this test
originalHostname := sys.Hostname
originalNamespace := sys.Namespace
sys.Hostname = testHostname
sys.Namespace = testNamespace
defer func() {
sys.Hostname = originalHostname
sys.Namespace = originalNamespace
}()

controllerReconciler := &HypervisorReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
reconcileCh: make(chan event.GenericEvent, 1),
}

// Trigger reconcile in a goroutine to avoid blocking
go controllerReconciler.triggerReconcile()

// Wait for the event with a timeout
select {
case evt := <-controllerReconciler.reconcileCh:
Expect(evt.Object).NotTo(BeNil())
hv, ok := evt.Object.(*kvmv1.Hypervisor)
Expect(ok).To(BeTrue())
Expect(hv.Name).To(Equal(testHostname))
Expect(hv.Namespace).To(Equal(testNamespace))
Expect(hv.Kind).To(Equal("Hypervisor"))
Expect(hv.APIVersion).To(Equal("kvm.cloud.sap/v1"))
case <-time.After(2 * time.Second):
Fail("timeout waiting for reconcile event")
}
})
})

Context("When testing SetupWithManager method", func() {
It("should successfully setup controller with manager", func() {
// Create a test manager
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: k8sClient.Scheme(),
})
Expect(err).NotTo(HaveOccurred())

controllerReconciler := &HypervisorReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Systemd: &systemd.InterfaceMock{
DescribeFunc: func(ctx context.Context) (*systemd.Descriptor, error) {
return &systemd.Descriptor{
OperatingSystemReleaseData: []string{
"PRETTY_NAME=\"Garden Linux 1877.8\"",
"GARDENLINUX_VERSION=1877.8",
},
KernelVersion: "6.1.0",
KernelRelease: "6.1.0-gardenlinux",
KernelName: "Linux",
HardwareVendor: "Test Vendor",
HardwareModel: "Test Model",
HardwareSerial: "TEST123",
FirmwareVersion: "1.0",
FirmwareVendor: "Test BIOS",
FirmwareDate: time.Now().UnixMicro(),
}, nil
},
},
}

err = controllerReconciler.SetupWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
Expect(controllerReconciler.reconcileCh).NotTo(BeNil())
Expect(controllerReconciler.osDescriptor).NotTo(BeNil())
Expect(controllerReconciler.osDescriptor.OperatingSystemReleaseData).To(HaveLen(2))
})

It("should fail when systemd Describe returns error", func() {
// Create a test manager
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: k8sClient.Scheme(),
})
Expect(err).NotTo(HaveOccurred())

controllerReconciler := &HypervisorReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Systemd: &systemd.InterfaceMock{
DescribeFunc: func(ctx context.Context) (*systemd.Descriptor, error) {
return nil, errors.New("systemd describe failed")
},
},
}

err = controllerReconciler.SetupWithManager(mgr)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("unable to get Systemd hostname describe()"))
Expect(err.Error()).To(ContainSubstring("systemd describe failed"))
})
})

Context("When reconciling a resource", func() {
const resourceName = "test-resource"

Expand All @@ -50,7 +203,7 @@ var _ = Describe("Hypervisor Controller", func() {
BeforeEach(func() {
By("creating the custom resource for the Kind Hypervisor")
err := k8sClient.Get(ctx, typeNamespacedName, hypervisor)
if err != nil && errors.IsNotFound(err) {
if err != nil && apierrors.IsNotFound(err) {
resource := &kvmv1.Hypervisor{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Expand Down
24 changes: 19 additions & 5 deletions internal/libvirt/dominfo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (
// Client that returns information for all domains on our host.
type Client interface {
// Return information for all domains on our host.
Get(virt *libvirt.Libvirt) ([]DomainInfo, error)
Get(
virt *libvirt.Libvirt,
flags ...libvirt.ConnectListAllDomainsFlags,
) ([]DomainInfo, error)
}

// Implementation of the Client interface.
Expand All @@ -39,9 +42,16 @@ func NewClient() Client {
}

// Return information for all domains on our host.
func (m *client) Get(virt *libvirt.Libvirt) ([]DomainInfo, error) {
domains, _, err := virt.ConnectListAllDomains(1,
libvirt.ConnectListDomainsActive|libvirt.ConnectListDomainsInactive)
func (m *client) Get(
virt *libvirt.Libvirt,
flags ...libvirt.ConnectListAllDomainsFlags,
) ([]DomainInfo, error) {

flag := libvirt.ConnectListAllDomainsFlags(0)
for _, f := range flags {
flag |= f
}
domains, _, err := virt.ConnectListAllDomains(1, flag)
if err != nil {
log.Log.Error(err, "failed to list all domains")
return nil, err
Expand Down Expand Up @@ -72,7 +82,11 @@ func NewClientEmulator() Client {
}

// Get the domain infos of the host we are mounted on.
func (c *clientEmulator) Get(virt *libvirt.Libvirt) ([]DomainInfo, error) {
func (c *clientEmulator) Get(
virt *libvirt.Libvirt,
flags ...libvirt.ConnectListAllDomainsFlags,
) ([]DomainInfo, error) {

var info DomainInfo
if err := xml.Unmarshal(exampleXML, &info); err != nil {
log.Log.Error(err, "failed to unmarshal example capabilities")
Expand Down
25 changes: 25 additions & 0 deletions internal/libvirt/dominfo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,28 @@ func TestClientTypes_AreDistinct(t *testing.T) {
t.Error("Expected NewClient() and NewClientEmulator() to return different types")
}
}

func TestClientEmulator_Get_WithTwoFlags(t *testing.T) {
client := NewClientEmulator()

// Test that Get accepts multiple flags without error
// The emulator doesn't use libvirt, so we pass nil and arbitrary flags
domainInfos, err := client.Get(nil, 1, 2)

if err != nil {
t.Fatalf("Get() with 2 flags returned unexpected error: %v", err)
}

if len(domainInfos) == 0 {
t.Fatal("Expected at least one domain info from emulator")
}

// Verify the returned domain info has expected structure
if domainInfos[0].Name == "" {
t.Error("Expected domain to have a name")
}

if domainInfos[0].UUID == "" {
t.Error("Expected domain to have a UUID")
}
}
Loading