Skip to content

Commit 4c17091

Browse files
Refactor libvirt event handling + reconcile when domains change (#41)
With this change, we now reconcile the hypervisor resource if any libvirt domain lifecycle events occur. This keeps the hypervisor allocation up to date and simplifies how we manage the domain list. We can only subscribe to libvirt events once. This means, we have to pull out the subscription logic from the `runMigrationListener` function in `libvirt_events.go`. We provide a new `WatchDomainChanges` interface function which can be used to subscribe to libvirt events, and reuse it for the existing migration watching and lifecycle logging. Furthermore, the distribution of libvirt events to the listeners is handled by a new event loop. Then, we also separate the logic unrelated to "migration watching" out, improving the overall structure and clarity of the code. Finally, we make the hypervisor controller reconcile as soon as we obtain a new domain lifecycle event. This is done with the proper mechanism provided by the controller-runtime client library: raw channel sources. The libvirt event subscription is wired together with the controller-runtime event channel on startup of the manager.
1 parent c16be82 commit 4c17091

File tree

11 files changed

+1297
-364
lines changed

11 files changed

+1297
-364
lines changed

internal/controller/hypervisor_controller.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,16 @@ import (
2525
"time"
2626

2727
kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
28+
golibvirt "github.com/digitalocean/go-libvirt"
2829
"k8s.io/apimachinery/pkg/api/meta"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/apimachinery/pkg/runtime"
3132
ctrl "sigs.k8s.io/controller-runtime"
3233
"sigs.k8s.io/controller-runtime/pkg/client"
34+
"sigs.k8s.io/controller-runtime/pkg/event"
35+
"sigs.k8s.io/controller-runtime/pkg/handler"
3336
logger "sigs.k8s.io/controller-runtime/pkg/log"
37+
"sigs.k8s.io/controller-runtime/pkg/source"
3438

3539
"github.com/cobaltcore-dev/kvm-node-agent/internal/certificates"
3640
"github.com/cobaltcore-dev/kvm-node-agent/internal/evacuation"
@@ -48,6 +52,9 @@ type HypervisorReconciler struct {
4852

4953
osDescriptor *systemd.Descriptor
5054
evacuateOnReboot bool
55+
56+
// Channel that can be used to trigger reconcile events.
57+
reconcileCh chan event.GenericEvent
5158
}
5259

5360
const (
@@ -287,6 +294,63 @@ func (r *HypervisorReconciler) Reconcile(ctx context.Context, req ctrl.Request)
287294
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
288295
}
289296

297+
// Trigger a reconcile event for the managed hypervisor through the
298+
// event channel which is watched by the controller manager.
299+
func (r *HypervisorReconciler) triggerReconcile() {
300+
r.reconcileCh <- event.GenericEvent{
301+
Object: &kvmv1.Hypervisor{
302+
TypeMeta: metav1.TypeMeta{
303+
Kind: "Hypervisor",
304+
APIVersion: "kvm.cloud.sap/v1",
305+
},
306+
ObjectMeta: metav1.ObjectMeta{
307+
Name: sys.Hostname,
308+
Namespace: sys.Namespace,
309+
},
310+
},
311+
}
312+
}
313+
314+
// Start is called when the manager starts. It starts the libvirt
315+
// event subscription to receive events when the hypervisor needs to be
316+
// reconciled.
317+
func (r *HypervisorReconciler) Start(ctx context.Context) error {
318+
log := logger.FromContext(ctx, "controller", "hypervisor")
319+
log.Info("starting libvirt event subscription")
320+
321+
// Ensure we're connected to libvirt.
322+
if err := r.Libvirt.Connect(); err != nil {
323+
log.Error(err, "unable to connect to libvirt")
324+
return err
325+
}
326+
327+
// Run a ticker which reconciles the hypervisor resource every minute.
328+
// This ensures that we periodically reconcile the hypervisor even
329+
// if no events are received from libvirt.
330+
go func() {
331+
ticker := time.NewTicker(1 * time.Minute)
332+
defer ticker.Stop()
333+
for {
334+
select {
335+
case <-ticker.C:
336+
r.triggerReconcile()
337+
case <-ctx.Done():
338+
return
339+
}
340+
}
341+
}()
342+
343+
// Domain lifecycle events impact the list of active/inactive domains,
344+
// as well as the allocation of resources on the hypervisor.
345+
r.Libvirt.WatchDomainChanges(
346+
golibvirt.DomainEventIDLifecycle,
347+
"reconcile-on-domain-lifecycle",
348+
func(_ context.Context, _ any) { r.triggerReconcile() },
349+
)
350+
351+
return nil
352+
}
353+
290354
// SetupWithManager sets up the controller with the Manager.
291355
func (r *HypervisorReconciler) SetupWithManager(mgr ctrl.Manager) error {
292356
ctx := context.Background()
@@ -296,7 +360,16 @@ func (r *HypervisorReconciler) SetupWithManager(mgr ctrl.Manager) error {
296360
return fmt.Errorf("unable to get Systemd hostname describe(): %w", err)
297361
}
298362

363+
// Prepare an event channel that will trigger a reconcile event.
364+
r.reconcileCh = make(chan event.GenericEvent)
365+
src := source.Channel(r.reconcileCh, &handler.EnqueueRequestForObject{})
366+
// Run the Start(ctx context.Context) method when the manager starts.
367+
if err := mgr.Add(r); err != nil {
368+
return err
369+
}
370+
299371
return ctrl.NewControllerManagedBy(mgr).
300372
For(&kvmv1.Hypervisor{}).
373+
WatchesRawSource(src).
301374
Complete(r)
302375
}

internal/controller/hypervisor_controller_test.go

Lines changed: 155 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@ package controller
1919

2020
import (
2121
"context"
22+
"errors"
23+
"time"
2224

2325
kvmv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
2426
"github.com/coreos/go-systemd/v22/dbus"
27+
golibvirt "github.com/digitalocean/go-libvirt"
2528
. "github.com/onsi/ginkgo/v2"
2629
. "github.com/onsi/gomega"
27-
"k8s.io/apimachinery/pkg/api/errors"
30+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2831
"k8s.io/apimachinery/pkg/api/resource"
2932
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3033
"k8s.io/apimachinery/pkg/types"
34+
ctrl "sigs.k8s.io/controller-runtime"
35+
"sigs.k8s.io/controller-runtime/pkg/event"
3136
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3237

3338
"github.com/cobaltcore-dev/kvm-node-agent/internal/libvirt"
@@ -36,6 +41,154 @@ import (
3641
)
3742

3843
var _ = Describe("Hypervisor Controller", func() {
44+
Context("When testing Start method", func() {
45+
It("should successfully start and subscribe to libvirt events", func() {
46+
ctx := context.Background()
47+
eventCallbackCalled := false
48+
49+
controllerReconciler := &HypervisorReconciler{
50+
Client: k8sClient,
51+
Scheme: k8sClient.Scheme(),
52+
Libvirt: &libvirt.InterfaceMock{
53+
ConnectFunc: func() error {
54+
return nil
55+
},
56+
WatchDomainChangesFunc: func(eventId golibvirt.DomainEventID, handlerId string, handler func(context.Context, any)) {
57+
eventCallbackCalled = true
58+
Expect(handlerId).To(Equal("reconcile-on-domain-lifecycle"))
59+
},
60+
},
61+
reconcileCh: make(chan event.GenericEvent, 1),
62+
}
63+
64+
err := controllerReconciler.Start(ctx)
65+
Expect(err).NotTo(HaveOccurred())
66+
Expect(eventCallbackCalled).To(BeTrue())
67+
})
68+
69+
It("should fail when libvirt connection fails", func() {
70+
ctx := context.Background()
71+
72+
controllerReconciler := &HypervisorReconciler{
73+
Client: k8sClient,
74+
Scheme: k8sClient.Scheme(),
75+
Libvirt: &libvirt.InterfaceMock{
76+
ConnectFunc: func() error {
77+
return errors.New("connection failed")
78+
},
79+
},
80+
reconcileCh: make(chan event.GenericEvent, 1),
81+
}
82+
83+
err := controllerReconciler.Start(ctx)
84+
Expect(err).To(HaveOccurred())
85+
Expect(err.Error()).To(ContainSubstring("connection failed"))
86+
})
87+
})
88+
89+
Context("When testing triggerReconcile method", func() {
90+
It("should send an event to reconcile channel", func() {
91+
const testHostname = "test-host"
92+
const testNamespace = "test-namespace"
93+
94+
// Override hostname and namespace for this test
95+
originalHostname := sys.Hostname
96+
originalNamespace := sys.Namespace
97+
sys.Hostname = testHostname
98+
sys.Namespace = testNamespace
99+
defer func() {
100+
sys.Hostname = originalHostname
101+
sys.Namespace = originalNamespace
102+
}()
103+
104+
controllerReconciler := &HypervisorReconciler{
105+
Client: k8sClient,
106+
Scheme: k8sClient.Scheme(),
107+
reconcileCh: make(chan event.GenericEvent, 1),
108+
}
109+
110+
// Trigger reconcile in a goroutine to avoid blocking
111+
go controllerReconciler.triggerReconcile()
112+
113+
// Wait for the event with a timeout
114+
select {
115+
case evt := <-controllerReconciler.reconcileCh:
116+
Expect(evt.Object).NotTo(BeNil())
117+
hv, ok := evt.Object.(*kvmv1.Hypervisor)
118+
Expect(ok).To(BeTrue())
119+
Expect(hv.Name).To(Equal(testHostname))
120+
Expect(hv.Namespace).To(Equal(testNamespace))
121+
Expect(hv.Kind).To(Equal("Hypervisor"))
122+
Expect(hv.APIVersion).To(Equal("kvm.cloud.sap/v1"))
123+
case <-time.After(2 * time.Second):
124+
Fail("timeout waiting for reconcile event")
125+
}
126+
})
127+
})
128+
129+
Context("When testing SetupWithManager method", func() {
130+
It("should successfully setup controller with manager", func() {
131+
// Create a test manager
132+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
133+
Scheme: k8sClient.Scheme(),
134+
})
135+
Expect(err).NotTo(HaveOccurred())
136+
137+
controllerReconciler := &HypervisorReconciler{
138+
Client: k8sClient,
139+
Scheme: k8sClient.Scheme(),
140+
Systemd: &systemd.InterfaceMock{
141+
DescribeFunc: func(ctx context.Context) (*systemd.Descriptor, error) {
142+
return &systemd.Descriptor{
143+
OperatingSystemReleaseData: []string{
144+
"PRETTY_NAME=\"Garden Linux 1877.8\"",
145+
"GARDENLINUX_VERSION=1877.8",
146+
},
147+
KernelVersion: "6.1.0",
148+
KernelRelease: "6.1.0-gardenlinux",
149+
KernelName: "Linux",
150+
HardwareVendor: "Test Vendor",
151+
HardwareModel: "Test Model",
152+
HardwareSerial: "TEST123",
153+
FirmwareVersion: "1.0",
154+
FirmwareVendor: "Test BIOS",
155+
FirmwareDate: time.Now().UnixMicro(),
156+
}, nil
157+
},
158+
},
159+
}
160+
161+
err = controllerReconciler.SetupWithManager(mgr)
162+
Expect(err).NotTo(HaveOccurred())
163+
Expect(controllerReconciler.reconcileCh).NotTo(BeNil())
164+
Expect(controllerReconciler.osDescriptor).NotTo(BeNil())
165+
Expect(controllerReconciler.osDescriptor.OperatingSystemReleaseData).To(HaveLen(2))
166+
})
167+
168+
It("should fail when systemd Describe returns error", func() {
169+
// Create a test manager
170+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
171+
Scheme: k8sClient.Scheme(),
172+
})
173+
Expect(err).NotTo(HaveOccurred())
174+
175+
controllerReconciler := &HypervisorReconciler{
176+
Client: k8sClient,
177+
Scheme: k8sClient.Scheme(),
178+
Systemd: &systemd.InterfaceMock{
179+
DescribeFunc: func(ctx context.Context) (*systemd.Descriptor, error) {
180+
return nil, errors.New("systemd describe failed")
181+
},
182+
},
183+
}
184+
185+
err = controllerReconciler.SetupWithManager(mgr)
186+
Expect(err).To(HaveOccurred())
187+
Expect(err.Error()).To(ContainSubstring("unable to get Systemd hostname describe()"))
188+
Expect(err.Error()).To(ContainSubstring("systemd describe failed"))
189+
})
190+
})
191+
39192
Context("When reconciling a resource", func() {
40193
const resourceName = "test-resource"
41194

@@ -50,7 +203,7 @@ var _ = Describe("Hypervisor Controller", func() {
50203
BeforeEach(func() {
51204
By("creating the custom resource for the Kind Hypervisor")
52205
err := k8sClient.Get(ctx, typeNamespacedName, hypervisor)
53-
if err != nil && errors.IsNotFound(err) {
206+
if err != nil && apierrors.IsNotFound(err) {
54207
resource := &kvmv1.Hypervisor{
55208
ObjectMeta: metav1.ObjectMeta{
56209
Name: resourceName,

internal/libvirt/dominfo/client.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ import (
2727
// Client that returns information for all domains on our host.
2828
type Client interface {
2929
// Return information for all domains on our host.
30-
Get(virt *libvirt.Libvirt) ([]DomainInfo, error)
30+
Get(
31+
virt *libvirt.Libvirt,
32+
flags ...libvirt.ConnectListAllDomainsFlags,
33+
) ([]DomainInfo, error)
3134
}
3235

3336
// Implementation of the Client interface.
@@ -39,9 +42,16 @@ func NewClient() Client {
3942
}
4043

4144
// Return information for all domains on our host.
42-
func (m *client) Get(virt *libvirt.Libvirt) ([]DomainInfo, error) {
43-
domains, _, err := virt.ConnectListAllDomains(1,
44-
libvirt.ConnectListDomainsActive|libvirt.ConnectListDomainsInactive)
45+
func (m *client) Get(
46+
virt *libvirt.Libvirt,
47+
flags ...libvirt.ConnectListAllDomainsFlags,
48+
) ([]DomainInfo, error) {
49+
50+
flag := libvirt.ConnectListAllDomainsFlags(0)
51+
for _, f := range flags {
52+
flag |= f
53+
}
54+
domains, _, err := virt.ConnectListAllDomains(1, flag)
4555
if err != nil {
4656
log.Log.Error(err, "failed to list all domains")
4757
return nil, err
@@ -72,7 +82,11 @@ func NewClientEmulator() Client {
7282
}
7383

7484
// Get the domain infos of the host we are mounted on.
75-
func (c *clientEmulator) Get(virt *libvirt.Libvirt) ([]DomainInfo, error) {
85+
func (c *clientEmulator) Get(
86+
virt *libvirt.Libvirt,
87+
flags ...libvirt.ConnectListAllDomainsFlags,
88+
) ([]DomainInfo, error) {
89+
7690
var info DomainInfo
7791
if err := xml.Unmarshal(exampleXML, &info); err != nil {
7892
log.Log.Error(err, "failed to unmarshal example capabilities")

internal/libvirt/dominfo/client_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,3 +347,28 @@ func TestClientTypes_AreDistinct(t *testing.T) {
347347
t.Error("Expected NewClient() and NewClientEmulator() to return different types")
348348
}
349349
}
350+
351+
func TestClientEmulator_Get_WithTwoFlags(t *testing.T) {
352+
client := NewClientEmulator()
353+
354+
// Test that Get accepts multiple flags without error
355+
// The emulator doesn't use libvirt, so we pass nil and arbitrary flags
356+
domainInfos, err := client.Get(nil, 1, 2)
357+
358+
if err != nil {
359+
t.Fatalf("Get() with 2 flags returned unexpected error: %v", err)
360+
}
361+
362+
if len(domainInfos) == 0 {
363+
t.Fatal("Expected at least one domain info from emulator")
364+
}
365+
366+
// Verify the returned domain info has expected structure
367+
if domainInfos[0].Name == "" {
368+
t.Error("Expected domain to have a name")
369+
}
370+
371+
if domainInfos[0].UUID == "" {
372+
t.Error("Expected domain to have a UUID")
373+
}
374+
}

0 commit comments

Comments
 (0)