Skip to content

Commit 30a56f2

Browse files
authored
use fsnotify and klog (#5)
* use fsnotify and klog Signed-off-by: Simon Davies <simongdavies@users.noreply.github.com> * fix: address PR review comments - Rename newHealth to health (Go naming conventions) - Remove dead kubeletSock check (wrong directory being watched) - Handle closed fsnotify channels with fallback to polling - Update comment to reflect actual behavior Signed-off-by: Simon Davies <simongdavies@users.noreply.github.com> --------- Signed-off-by: Simon Davies <simongdavies@users.noreply.github.com>
1 parent ff6cc35 commit 30a56f2

File tree

3 files changed

+100
-30
lines changed

3 files changed

+100
-30
lines changed

device-plugin/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ module github.com/hyperlight-dev/hyperlight-device-plugin
33
go 1.25.0
44

55
require (
6+
github.com/fsnotify/fsnotify v1.8.0
67
google.golang.org/grpc v1.78.0
8+
k8s.io/klog/v2 v2.130.1
79
k8s.io/kubelet v0.35.0
810
)
911

1012
require (
13+
github.com/go-logr/logr v1.4.3 // indirect
1114
golang.org/x/net v0.48.0 // indirect
1215
golang.org/x/sys v0.39.0 // indirect
1316
golang.org/x/text v0.32.0 // indirect

device-plugin/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
2+
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
13
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
24
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
35
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
@@ -34,5 +36,7 @@ google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
3436
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
3537
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
3638
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
39+
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
40+
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
3741
k8s.io/kubelet v0.35.0 h1:8cgJHCBCKLYuuQ7/Pxb/qWbJfX1LXIw7790ce9xHq7c=
3842
k8s.io/kubelet v0.35.0/go.mod h1:ciRzAXn7C4z5iB7FhG1L2CGPPXLTVCABDlbXt/Zz8YA=

device-plugin/main.go

Lines changed: 93 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package main
1818

1919
import (
2020
"context"
21+
"flag"
2122
"fmt"
22-
"log"
2323
"net"
2424
"os"
2525
"os/signal"
@@ -28,8 +28,10 @@ import (
2828
"syscall"
2929
"time"
3030

31+
"github.com/fsnotify/fsnotify"
3132
"google.golang.org/grpc"
3233
"google.golang.org/grpc/credentials/insecure"
34+
"k8s.io/klog/v2"
3335
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
3436
)
3537

@@ -66,7 +68,7 @@ func NewHyperlightDevicePlugin() (*HyperlightDevicePlugin, error) {
6668
return nil, fmt.Errorf("no supported hypervisor found (/dev/kvm or /dev/mshv)")
6769
}
6870

69-
log.Printf("Detected hypervisor: %s at %s", hypervisor, devicePath)
71+
klog.Infof("Detected hypervisor: %s at %s", hypervisor, devicePath)
7072

7173
// Create CDI spec
7274
if err := writeCDISpec(hypervisor, devicePath); err != nil {
@@ -84,7 +86,7 @@ func NewHyperlightDevicePlugin() (*HyperlightDevicePlugin, error) {
8486
if count, err := strconv.Atoi(countStr); err == nil && count > 0 {
8587
numDevices = count
8688
} else {
87-
log.Printf("Invalid DEVICE_COUNT '%s', using default %d", countStr, defaultDeviceCount)
89+
klog.Warningf("Invalid DEVICE_COUNT '%s', using default %d", countStr, defaultDeviceCount)
8890
}
8991
}
9092

@@ -95,7 +97,7 @@ func NewHyperlightDevicePlugin() (*HyperlightDevicePlugin, error) {
9597
Health: pluginapi.Healthy,
9698
}
9799
}
98-
log.Printf("Advertising %d hypervisor devices (configurable via DEVICE_COUNT)", numDevices)
100+
klog.Infof("Advertising %d hypervisor devices (configurable via DEVICE_COUNT)", numDevices)
99101

100102
return &HyperlightDevicePlugin{
101103
devices: devices,
@@ -113,7 +115,7 @@ func writeCDISpec(hypervisor, devicePath string) error {
113115
if parsed, err := strconv.Atoi(uidStr); err == nil && parsed >= 0 {
114116
uid = parsed
115117
} else {
116-
log.Printf("Invalid DEVICE_UID '%s', using default %d", uidStr, defaultDeviceUID)
118+
klog.Warningf("Invalid DEVICE_UID '%s', using default %d", uidStr, defaultDeviceUID)
117119
}
118120
}
119121

@@ -122,11 +124,11 @@ func writeCDISpec(hypervisor, devicePath string) error {
122124
if parsed, err := strconv.Atoi(gidStr); err == nil && parsed >= 0 {
123125
gid = parsed
124126
} else {
125-
log.Printf("Invalid DEVICE_GID '%s', using default %d", gidStr, defaultDeviceGID)
127+
klog.Warningf("Invalid DEVICE_GID '%s', using default %d", gidStr, defaultDeviceGID)
126128
}
127129
}
128130

129-
log.Printf("CDI device ownership: uid=%d, gid=%d (configurable via DEVICE_UID/DEVICE_GID)", uid, gid)
131+
klog.Infof("CDI device ownership: uid=%d, gid=%d (configurable via DEVICE_UID/DEVICE_GID)", uid, gid)
130132

131133
spec := fmt.Sprintf(`{
132134
"cdiVersion": "0.6.0",
@@ -159,7 +161,7 @@ func writeCDISpec(hypervisor, devicePath string) error {
159161
if err := os.WriteFile(cdiSpecPath, []byte(spec), 0644); err != nil {
160162
return err
161163
}
162-
log.Printf("CDI spec written to %s", cdiSpecPath)
164+
klog.Infof("CDI spec written to %s", cdiSpecPath)
163165
return nil
164166
}
165167

@@ -173,7 +175,7 @@ func (p *HyperlightDevicePlugin) GetDevicePluginOptions(ctx context.Context, req
173175

174176
// ListAndWatch lists devices and watches for changes
175177
func (p *HyperlightDevicePlugin) ListAndWatch(req *pluginapi.Empty, srv pluginapi.DevicePlugin_ListAndWatchServer) error {
176-
log.Printf("ListAndWatch called, sending %d devices", len(p.devices))
178+
klog.Infof("ListAndWatch called, sending %d devices", len(p.devices))
177179

178180
if err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil {
179181
return err
@@ -191,12 +193,16 @@ func (p *HyperlightDevicePlugin) ListAndWatch(req *pluginapi.Empty, srv pluginap
191193
health := pluginapi.Healthy
192194
if _, err := os.Stat(p.devicePath); err != nil {
193195
health = pluginapi.Unhealthy
194-
log.Printf("Device %s not found, marking unhealthy", p.devicePath)
196+
klog.Warningf("Device %s not found, marking all devices unhealthy", p.devicePath)
195197
}
196198

199+
// Check if health changed (compare against first device as representative)
197200
if p.devices[0].Health != health {
198-
p.devices[0].Health = health
199-
log.Printf("Device health changed to %s", health)
201+
// Update ALL devices - they all share the same underlying hypervisor device
202+
for i := range p.devices {
203+
p.devices[i].Health = health
204+
}
205+
klog.Infof("Device health changed to %s for all %d devices", health, len(p.devices))
200206
if err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil {
201207
return err
202208
}
@@ -207,7 +213,7 @@ func (p *HyperlightDevicePlugin) ListAndWatch(req *pluginapi.Empty, srv pluginap
207213

208214
// Allocate allocates devices to a container
209215
func (p *HyperlightDevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
210-
log.Printf("Allocate called for %d containers", len(req.ContainerRequests))
216+
klog.V(2).Infof("Allocate called for %d containers", len(req.ContainerRequests))
211217

212218
responses := make([]*pluginapi.ContainerAllocateResponse, len(req.ContainerRequests))
213219

@@ -220,7 +226,7 @@ func (p *HyperlightDevicePlugin) Allocate(ctx context.Context, req *pluginapi.Al
220226
},
221227
},
222228
}
223-
log.Printf("Allocated CDI device: hyperlight.dev/hypervisor=%s", p.hypervisor)
229+
klog.V(2).Infof("Allocated CDI device: hyperlight.dev/hypervisor=%s", p.hypervisor)
224230
}
225231

226232
return &pluginapi.AllocateResponse{ContainerResponses: responses}, nil
@@ -242,7 +248,7 @@ func (p *HyperlightDevicePlugin) Start() error {
242248

243249
// Remove old socket
244250
if err := os.Remove(serverSock); err != nil && !os.IsNotExist(err) {
245-
log.Printf("Warning: failed to remove old socket: %v", err)
251+
klog.Warningf("Failed to remove old socket: %v", err)
246252
}
247253

248254
listener, err := net.Listen("unix", serverSock)
@@ -254,9 +260,9 @@ func (p *HyperlightDevicePlugin) Start() error {
254260
pluginapi.RegisterDevicePluginServer(p.server, p)
255261

256262
go func() {
257-
log.Printf("Starting gRPC server on %s", serverSock)
263+
klog.Infof("Starting gRPC server on %s", serverSock)
258264
if err := p.server.Serve(listener); err != nil {
259-
log.Printf("gRPC server stopped: %v", err)
265+
klog.V(1).Infof("gRPC server stopped: %v", err)
260266
}
261267
}()
262268

@@ -297,7 +303,7 @@ func (p *HyperlightDevicePlugin) Register() error {
297303
return fmt.Errorf("failed to register with kubelet: %v", err)
298304
}
299305

300-
log.Printf("Registered with kubelet as %s", resourceName)
306+
klog.Infof("Registered with kubelet as %s", resourceName)
301307
return nil
302308
}
303309

@@ -307,14 +313,69 @@ func (p *HyperlightDevicePlugin) Stop() {
307313
p.server.Stop()
308314
}
309315
os.Remove(serverSock)
310-
log.Println("Device plugin stopped")
316+
klog.Info("Device plugin stopped")
317+
}
318+
319+
// newFSWatcher creates a filesystem watcher for kubelet restart detection.
320+
func newFSWatcher(files ...string) (*fsnotify.Watcher, error) {
321+
watcher, err := fsnotify.NewWatcher()
322+
if err != nil {
323+
return nil, err
324+
}
325+
326+
for _, f := range files {
327+
if err := watcher.Add(f); err != nil {
328+
watcher.Close()
329+
return nil, err
330+
}
331+
}
332+
333+
return watcher, nil
311334
}
312335

313-
// watchKubeletRestart monitors for kubelet restarts by watching the plugin socket.
336+
// watchKubeletRestart monitors for kubelet restarts using fsnotify.
314337
// When kubelet restarts, it deletes all sockets in /var/lib/kubelet/device-plugins/.
315-
// This function blocks until the socket is deleted, signaling a kubelet restart.
338+
// This function blocks until it detects our plugin socket being deleted.
316339
func (p *HyperlightDevicePlugin) watchKubeletRestart() {
317-
log.Println("Watching for kubelet restart (socket deletion)...")
340+
klog.Info("Watching for kubelet restart using fsnotify...")
341+
342+
watcher, err := newFSWatcher(pluginapi.DevicePluginPath)
343+
if err != nil {
344+
klog.Errorf("Failed to create fsnotify watcher, falling back to polling: %v", err)
345+
p.watchKubeletRestartPolling()
346+
return
347+
}
348+
defer watcher.Close()
349+
350+
for {
351+
select {
352+
case <-p.stopCh:
353+
return
354+
case event, ok := <-watcher.Events:
355+
if !ok {
356+
klog.Warning("fsnotify events channel closed, falling back to polling")
357+
p.watchKubeletRestartPolling()
358+
return
359+
}
360+
if event.Name == serverSock && (event.Op&fsnotify.Remove) == fsnotify.Remove {
361+
klog.Info("Plugin socket deleted - kubelet may have restarted")
362+
return
363+
}
364+
case err, ok := <-watcher.Errors:
365+
if !ok {
366+
klog.Warning("fsnotify errors channel closed, falling back to polling")
367+
p.watchKubeletRestartPolling()
368+
return
369+
}
370+
klog.Warningf("fsnotify error: %v", err)
371+
}
372+
}
373+
}
374+
375+
// watchKubeletRestartPolling is a fallback method using polling.
376+
// Used when fsnotify is unavailable.
377+
func (p *HyperlightDevicePlugin) watchKubeletRestartPolling() {
378+
klog.Info("Watching for kubelet restart (polling)...")
318379

319380
ticker := time.NewTicker(time.Second)
320381
defer ticker.Stop()
@@ -324,22 +385,24 @@ func (p *HyperlightDevicePlugin) watchKubeletRestart() {
324385
case <-p.stopCh:
325386
return
326387
case <-ticker.C:
327-
// Check if our socket still exists
328388
if _, err := os.Stat(serverSock); os.IsNotExist(err) {
329-
log.Println("Plugin socket deleted - kubelet may have restarted")
389+
klog.Info("Plugin socket deleted - kubelet may have restarted")
330390
return
331391
}
332392
}
333393
}
334394
}
335395

336396
func main() {
337-
log.SetFlags(log.LstdFlags | log.Lshortfile)
338-
log.Println("Starting Hyperlight Device Plugin")
397+
klog.InitFlags(nil)
398+
flag.Parse()
399+
defer klog.Flush()
400+
401+
klog.Info("Starting Hyperlight Device Plugin")
339402

340403
plugin, err := NewHyperlightDevicePlugin()
341404
if err != nil {
342-
log.Fatalf("Failed to create device plugin: %v", err)
405+
klog.Fatalf("Failed to create device plugin: %v", err)
343406
}
344407

345408
// Handle signals for graceful shutdown
@@ -350,7 +413,7 @@ func main() {
350413
go func() {
351414
for {
352415
if err := plugin.Start(); err != nil {
353-
log.Printf("Failed to start device plugin: %v", err)
416+
klog.Errorf("Failed to start device plugin: %v", err)
354417
time.Sleep(5 * time.Second)
355418
continue
356419
}
@@ -360,13 +423,13 @@ func main() {
360423
plugin.watchKubeletRestart()
361424

362425
// If we get here, kubelet restarted - stop current server and re-register
363-
log.Println("Detected kubelet restart, re-registering...")
426+
klog.Info("Detected kubelet restart, re-registering...")
364427
plugin.server.Stop()
365428
time.Sleep(time.Second) // Brief pause before restart
366429
}
367430
}()
368431

369432
sig := <-sigCh
370-
log.Printf("Received signal %v, shutting down", sig)
433+
klog.Infof("Received signal %v, shutting down", sig)
371434
plugin.Stop()
372435
}

0 commit comments

Comments
 (0)