Skip to content

Commit f25794e

Browse files
authored
Merge branch 'NVIDIA:main' into two-container-one-pod
2 parents 4066752 + 0317271 commit f25794e

File tree

942 files changed

+68554
-25796
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

942 files changed

+68554
-25796
lines changed

.github/workflows/golang.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
go-version: ${{ inputs.golang_versions }}
3535

3636
- name: Lint
37-
uses: golangci/golangci-lint-action@v7
37+
uses: golangci/golangci-lint-action@v8
3838
with:
3939
version: latest
4040
args: -v --timeout 5m

cmd/compute-domain-controller/computedomain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (m *ComputeDomainManager) RemoveFinalizer(ctx context.Context, uid string)
191191
return nil
192192
}
193193

194-
// AssertWorkloadsCompletes ensures that all workloads asssociated with a ComputeDomain have completed.
194+
// AssertWorkloadsCompletes ensures that all workloads associated with a ComputeDomain have completed.
195195
//
196196
// TODO: We should probably also check to ensure that all ResourceClaims
197197
// generated from our ResourceClaimTemplate for workloads are gone. Doing

cmd/compute-domain-kubelet-plugin/checkpoint.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"encoding/json"
55

6+
resourceapi "k8s.io/api/resource/v1beta1"
67
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
78
)
89

@@ -12,14 +13,22 @@ type Checkpoint struct {
1213
}
1314

1415
type CheckpointV1 struct {
15-
PreparedClaims PreparedClaims `json:"preparedClaims,omitempty"`
16+
PreparedClaims PreparedClaimsByUID `json:"preparedClaims,omitempty"`
17+
}
18+
19+
// key: stringified claim UUID
20+
type PreparedClaimsByUID map[string]PreparedClaim
21+
22+
type PreparedClaim struct {
23+
Status resourceapi.ResourceClaimStatus `json:"status,omitempty"`
24+
PreparedDevices PreparedDevices `json:"preparedDevices,omitempty"`
1625
}
1726

1827
func newCheckpoint() *Checkpoint {
1928
pc := &Checkpoint{
2029
Checksum: 0,
2130
V1: &CheckpointV1{
22-
PreparedClaims: make(PreparedClaims),
31+
PreparedClaims: make(PreparedClaimsByUID),
2332
},
2433
}
2534
return pc

cmd/compute-domain-kubelet-plugin/computedomain.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func (m *ComputeDomainManager) GetNodeIPs(ctx context.Context, cdUID string) ([]
325325
}
326326

327327
if cd.Status.Nodes == nil {
328-
return nil, fmt.Errorf("error getting status of nodes in ComputeDomain: %w", err)
328+
return nil, fmt.Errorf("no nodes set for ComputeDomain")
329329
}
330330

331331
if len(cd.Status.Nodes) != cd.Spec.NumNodes {
@@ -437,7 +437,7 @@ func (m *ComputeDomainManager) periodicCleanup(ctx context.Context) {
437437
continue
438438
}
439439
if err != nil {
440-
klog.Errorf("error checking for existenc of directory '%s': %v", m.configFilesRoot, err)
440+
klog.Errorf("error checking for existence of directory '%s': %v", m.configFilesRoot, err)
441441
continue
442442
}
443443

cmd/compute-domain-kubelet-plugin/device_state.go

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424

2525
resourceapi "k8s.io/api/resource/v1beta1"
2626
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/dynamic-resource-allocation/kubeletplugin"
2728
"k8s.io/klog/v2"
28-
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
2929
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
3030
cdiapi "tags.cncf.io/container-device-interface/pkg/cdi"
3131

@@ -115,33 +115,37 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) {
115115
}
116116

117117
for _, c := range checkpoints {
118-
if c == DriverPluginCheckpointFile {
118+
if c == DriverPluginCheckpointFileBasename {
119119
return state, nil
120120
}
121121
}
122122

123123
checkpoint := newCheckpoint()
124-
if err := state.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
124+
if err := state.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
125125
return nil, fmt.Errorf("unable to sync to checkpoint: %v", err)
126126
}
127127

128128
return state, nil
129129
}
130130

131-
func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceClaim) ([]*drapbv1.Device, error) {
131+
func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceClaim) ([]kubeletplugin.Device, error) {
132132
s.Lock()
133133
defer s.Unlock()
134134

135135
claimUID := string(claim.UID)
136136

137137
checkpoint := newCheckpoint()
138-
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
139-
return nil, fmt.Errorf("unable to sync from checkpoint: %v", err)
138+
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
139+
return nil, fmt.Errorf("unable to get checkpoint: %w", err)
140140
}
141-
preparedClaims := checkpoint.V1.PreparedClaims
142141

143-
if preparedClaims[claimUID] != nil {
144-
return preparedClaims[claimUID].GetDevices(), nil
142+
preparedClaim, exists := checkpoint.V1.PreparedClaims[claimUID]
143+
if exists {
144+
// Make this a noop. Associated device(s) has/ave been prepared by us.
145+
// Prepare() must be idempotent, as it may be invoked more than once per
146+
// claim (and actual device preparation must happen at most once).
147+
klog.V(6).Infof("skip prepare: claim %v found in checkpoint", claimUID)
148+
return preparedClaim.PreparedDevices.GetDevices(), nil
145149
}
146150

147151
preparedDevices, err := s.prepareDevices(ctx, claim)
@@ -153,50 +157,65 @@ func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceCl
153157
return nil, fmt.Errorf("unable to create CDI spec file for claim: %w", err)
154158
}
155159

156-
preparedClaims[claimUID] = preparedDevices
157-
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
158-
return nil, fmt.Errorf("unable to sync to checkpoint: %v", err)
160+
// Add ResourceClaimStatus API object to node-local checkpoint: the
161+
// 'unprepare' code path must use local state exclusively (ResourceClaim
162+
// object might have been deleted from the API server).
163+
checkpoint.V1.PreparedClaims[claimUID] = PreparedClaim{
164+
Status: claim.Status,
165+
PreparedDevices: preparedDevices,
166+
}
167+
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
168+
return nil, fmt.Errorf("unable to create checkpoint: %w", err)
159169
}
170+
klog.V(6).Infof("checkpoint written for claim %v", claimUID)
160171

161-
return preparedClaims[claimUID].GetDevices(), nil
172+
return preparedDevices.GetDevices(), nil
162173
}
163174

164-
func (s *DeviceState) Unprepare(ctx context.Context, claim *resourceapi.ResourceClaim) error {
175+
func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error {
165176
s.Lock()
166177
defer s.Unlock()
167178

168-
claimUID := string(claim.UID)
169-
170-
if err := s.unprepareDevices(ctx, claim); err != nil {
171-
return fmt.Errorf("unprepare devices failed: %w", err)
172-
}
179+
claimUID := string(claimRef.UID)
173180

181+
// Rely on local checkpoint state for ability to clean up.
174182
checkpoint := newCheckpoint()
175-
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
176-
return fmt.Errorf("unable to sync from checkpoint: %v", err)
183+
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
184+
return fmt.Errorf("unable to get checkpoint: %w", err)
177185
}
178-
preparedClaims := checkpoint.V1.PreparedClaims
179186

180-
if preparedClaims[claimUID] == nil {
187+
pc, exists := checkpoint.V1.PreparedClaims[claimUID]
188+
if !exists {
189+
// Not an error: if this claim UID is not in the checkpoint then this
190+
// device was never prepared or has already been unprepared (assume that
191+
// Prepare+Checkpoint are done transactionally). Note that
192+
// claimRef.String() contains namespace, name, UID.
193+
klog.Infof("unprepare noop: claim not found in checkpoint data: %v", claimRef.String())
181194
return nil
182195
}
183196

197+
if err := s.unprepareDevices(ctx, &pc.Status); err != nil {
198+
return fmt.Errorf("unprepare devices failed: %w", err)
199+
}
200+
184201
err := s.cdi.DeleteClaimSpecFile(claimUID)
185202
if err != nil {
186203
return fmt.Errorf("unable to delete CDI spec file for claim: %w", err)
187204
}
188205

189-
delete(preparedClaims, claimUID)
190-
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
191-
return fmt.Errorf("unable to sync to checkpoint: %v", err)
206+
// Write new checkpoint reflecting that all devices for this claim have been
207+
// unprepared (by virtue of removing its UID from all mappings).
208+
delete(checkpoint.V1.PreparedClaims, claimUID)
209+
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
210+
return fmt.Errorf("create checkpoint failed: %w", err)
192211
}
193212

194213
return nil
195214
}
196215

197216
func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.ResourceClaim) (PreparedDevices, error) {
198217
// Generate a mapping of each OpaqueDeviceConfigs to the Device.Results it applies to
199-
configResultsMap, err := s.getConfigResultsMap(claim)
218+
configResultsMap, err := s.getConfigResultsMap(&claim.Status)
200219
if err != nil {
201220
return nil, fmt.Errorf("error generating configResultsMap: %w", err)
202221
}
@@ -254,8 +273,8 @@ func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.Res
254273
cdiDevices = append(cdiDevices, d)
255274
}
256275

257-
device := &drapbv1.Device{
258-
RequestNames: []string{result.Request},
276+
device := kubeletplugin.Device{
277+
Requests: []string{result.Request},
259278
PoolName: result.Pool,
260279
DeviceName: result.Device,
261280
CDIDeviceIDs: cdiDevices,
@@ -266,12 +285,12 @@ func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.Res
266285
case ComputeDomainChannelType:
267286
preparedDevice.Channel = &PreparedComputeDomainChannel{
268287
Info: s.allocatable[result.Device].Channel,
269-
Device: device,
288+
Device: &device,
270289
}
271290
case ComputeDomainDaemonType:
272291
preparedDevice.Daemon = &PreparedComputeDomainDaemon{
273292
Info: s.allocatable[result.Device].Daemon,
274-
Device: device,
293+
Device: &device,
275294
}
276295
}
277296

@@ -283,9 +302,9 @@ func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.Res
283302
return preparedDevices, nil
284303
}
285304

286-
func (s *DeviceState) unprepareDevices(ctx context.Context, claim *resourceapi.ResourceClaim) error {
305+
func (s *DeviceState) unprepareDevices(ctx context.Context, cs *resourceapi.ResourceClaimStatus) error {
287306
// Generate a mapping of each OpaqueDeviceConfigs to the Device.Results it applies to
288-
configResultsMap, err := s.getConfigResultsMap(claim)
307+
configResultsMap, err := s.getConfigResultsMap(cs)
289308
if err != nil {
290309
return fmt.Errorf("error generating configResultsMap: %w", err)
291310
}
@@ -407,12 +426,12 @@ func (s *DeviceState) applyComputeDomainDaemonConfig(ctx context.Context, config
407426
return &configState, nil
408427
}
409428

410-
func (s *DeviceState) getConfigResultsMap(claim *resourceapi.ResourceClaim) (map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult, error) {
429+
func (s *DeviceState) getConfigResultsMap(rcs *resourceapi.ResourceClaimStatus) (map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult, error) {
411430
// Retrieve the full set of device configs for the driver.
412431
configs, err := GetOpaqueDeviceConfigs(
413432
configapi.Decoder,
414433
DriverName,
415-
claim.Status.Allocation.Devices.Config,
434+
rcs.Allocation.Devices.Config,
416435
)
417436
if err != nil {
418437
return nil, fmt.Errorf("error getting opaque device configs: %v", err)
@@ -433,7 +452,7 @@ func (s *DeviceState) getConfigResultsMap(claim *resourceapi.ResourceClaim) (map
433452
// Look through the configs and figure out which one will be applied to
434453
// each device allocation result based on their order of precedence and type.
435454
configResultsMap := make(map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult)
436-
for _, result := range claim.Status.Allocation.Devices.Results {
455+
for _, result := range rcs.Allocation.Devices.Results {
437456
device, exists := s.allocatable[result.Device]
438457
if !exists {
439458
return nil, fmt.Errorf("requested device is not allocatable: %v", result.Device)

0 commit comments

Comments
 (0)