Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Implementation Tasks: PR #1601 - Address Elezar's Review Concerns

## Issue Reference
- PR: #1601
- Review: Elezar's concerns about health check refactoring
- Document: FINAL_SYNTHESIS_PR1601.md

## Tasks

### Phase 1: Constructor Initialization (Eliminates Race Condition)

- [DONE] **Task 1**: Modify `devicePluginForResource()` to initialize health context at construction time
- File: `internal/plugin/server.go`
- Lines: 78-104
- Changes: Create `healthCtx` and `healthCancel` before plugin struct initialization
- Addresses: Elezar's concern #1 (line 281 - synchronization)
- Commit: 651a76091

- [DONE] **Task 2**: Remove health context creation from `initialize()`
- File: `internal/plugin/server.go`
- Lines: 114-118
- Changes: Remove `context.WithCancel()` call (already done in constructor)
- Addresses: Cleanup redundant initialization
- Commit: d055f1e0c

### Phase 2: Restart-Safe Cleanup

- [DONE] **Task 3**: Modify `cleanup()` to recreate context after cancellation
- File: `internal/plugin/server.go`
- Lines: 120-129
- Changes: Recreate `healthCtx` and `healthCancel` after cancelling for restart support
- Addresses: Elezar's concern #2 (line 128 - why nil these fields), fixes plugin restart
- Commit: cc2a0a77c

### Phase 3: Health Channel Lifecycle

- [DONE] **Task 4**: Close health channel properly in `cleanup()`
- File: `internal/plugin/server.go`
- Lines: 120-129
- Changes: Close channel before niling to prevent panics
- Addresses: Devil's advocate blocker - channel never closed
- Commit: 795807362

- [DONE] **Task 5**: Handle closed channel in `ListAndWatch()`
- File: `internal/plugin/server.go`
- Lines: 287-298
- Changes: Add `ok` check when receiving from health channel
- Addresses: Graceful handling of channel closure
- Commit: 20860c46f

### Phase 4: Error Handling Improvements

- [DONE] **Task 6**: Improve error handling in health check goroutine
- File: `internal/plugin/server.go`
- Lines: 160-168
- Changes: Use switch statement to distinguish error types and log success
- Addresses: Elezar's concern #3 (line 167 - error handling)
- Commit: 6bc227110

## Progress
- Total Tasks: 6
- Completed: 6 ✅
- In Progress: 0
- Blocked: 0

## Implementation Complete! 🎉

All 6 tasks have been successfully implemented and committed:
1. ✅ Constructor initialization (651a76091)
2. ✅ Remove redundant initialization (d055f1e0c)
3. ✅ Restart-safe cleanup (cc2a0a77c)
4. ✅ Close health channel (795807362)
5. ✅ Handle closed channel (20860c46f)
6. ✅ Improve error handling (6bc227110)

## Notes
- All changes are in a single file: `internal/plugin/server.go`
- Changes are backward compatible
- Plugin restart functionality is critical - must test after implementation
61 changes: 51 additions & 10 deletions internal/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"time"

cdiapi "tags.cncf.io/container-device-interface/pkg/cdi"
Expand Down Expand Up @@ -62,7 +63,12 @@ type nvidiaDevicePlugin struct {
socket string
server *grpc.Server
health chan *rm.Device
stop chan interface{}

// healthCtx and healthCancel control the health check goroutine lifecycle.
// healthWg is used to wait for the health check goroutine to complete during cleanup.
healthCtx context.Context
healthCancel context.CancelFunc
healthWg sync.WaitGroup

imexChannels imex.Channels

Expand All @@ -76,6 +82,10 @@ func (o *options) devicePluginForResource(ctx context.Context, resourceManager r
return nil, err
}

// Initialize health context at construction time to eliminate race condition
// where ListAndWatch() might access healthCtx before initialize() completes.
healthCtx, healthCancel := context.WithCancel(ctx)

plugin := nvidiaDevicePlugin{
ctx: ctx,
rm: resourceManager,
Expand All @@ -94,7 +104,10 @@ func (o *options) devicePluginForResource(ctx context.Context, resourceManager r
// time the plugin server is restarted.
server: nil,
health: nil,
stop: nil,

// Health context initialized at construction to prevent race conditions
healthCtx: healthCtx,
healthCancel: healthCancel,
}
return &plugin, nil
}
Expand All @@ -109,14 +122,27 @@ func getPluginSocketPath(resource spec.ResourceName) string {
func (plugin *nvidiaDevicePlugin) initialize() {
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
plugin.health = make(chan *rm.Device)
plugin.stop = make(chan interface{})
// healthCtx and healthCancel already initialized at construction time
}

func (plugin *nvidiaDevicePlugin) cleanup() {
close(plugin.stop)
if plugin.healthCancel != nil {
plugin.healthCancel()
// Recreate context for potential plugin restart. The same plugin instance
// may be restarted via Start() after Stop(), so we need a fresh context.
plugin.healthCtx, plugin.healthCancel = context.WithCancel(plugin.ctx)
}
plugin.healthWg.Wait()

// Close health channel before niling to prevent panics in ListAndWatch()
if plugin.health != nil {
close(plugin.health)
}

plugin.server = nil
plugin.health = nil
plugin.stop = nil
// Do not nil healthCtx or healthCancel - they are needed for restart
// and are recreated above if they were cancelled
}

// Devices returns the full set of devices associated with the plugin.
Expand Down Expand Up @@ -148,10 +174,17 @@ func (plugin *nvidiaDevicePlugin) Start(kubeletSocket string) error {
}
klog.Infof("Registered device plugin for '%s' with Kubelet", plugin.rm.Resource())

plugin.healthWg.Add(1)
go func() {
defer plugin.healthWg.Done()
// TODO: add MPS health check
err := plugin.rm.CheckHealth(plugin.stop, plugin.health)
if err != nil {
err := plugin.rm.CheckHealth(plugin.healthCtx, plugin.health)
switch {
case err == nil:
klog.Infof("Health check completed successfully for '%s'", plugin.rm.Resource())
case errors.Is(err, context.Canceled):
klog.V(4).Infof("Health check canceled for '%s' (plugin shutdown)", plugin.rm.Resource())
default:
klog.Errorf("Failed to start health check: %v; continuing with health checks disabled", err)
}
}()
Expand Down Expand Up @@ -265,15 +298,23 @@ func (plugin *nvidiaDevicePlugin) GetDevicePluginOptions(context.Context, *plugi

// ListAndWatch lists devices and update that list according to the health status
func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
// Capture references at start to avoid race with cleanup() which may nil these fields.
healthCtx := plugin.healthCtx
health := plugin.health

if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
return err
}

for {
select {
case <-plugin.stop:
case <-healthCtx.Done():
return nil
case d := <-plugin.health:
case d, ok := <-health:
if !ok {
// Health channel closed, health checks stopped
return nil
}
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = pluginapi.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
Expand Down Expand Up @@ -368,7 +409,7 @@ func (plugin *nvidiaDevicePlugin) getAllocateResponse(requestIds []string) (*plu
// updateResponseForMPS ensures that the ContainerAllocate response contains the information required to use MPS.
// This includes per-resource pipe and log directories as well as a global daemon-specific shm
// and assumes that an MPS control daemon has already been started.
func (plugin nvidiaDevicePlugin) updateResponseForMPS(response *pluginapi.ContainerAllocateResponse) {
func (plugin *nvidiaDevicePlugin) updateResponseForMPS(response *pluginapi.ContainerAllocateResponse) {
plugin.mps.updateReponse(response)
}

Expand Down
Loading
Loading