Skip to content

Commit 191abe3

Browse files
authored
Merge pull request kubernetes#120550 from adrianchiris/fix-dra-node-reboot
DRA: call plugins for claims even if exist in cache
2 parents d008435 + 3738111 commit 191abe3

File tree

3 files changed

+264
-76
lines changed

3 files changed

+264
-76
lines changed

pkg/kubelet/cm/dra/claiminfo.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ import (
3333
type ClaimInfo struct {
3434
sync.RWMutex
3535
state.ClaimInfoState
36-
// annotations is a list of container annotations associated with
36+
// annotations is a mapping of container annotations per DRA plugin associated with
3737
// a prepared resource
38-
annotations []kubecontainer.Annotation
38+
annotations map[string][]kubecontainer.Annotation
39+
prepared bool
3940
}
4041

4142
func (info *ClaimInfo) addPodReference(podUID types.UID) {
@@ -69,11 +70,23 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err
6970
}
7071

7172
info.CDIDevices[pluginName] = cdiDevices
72-
info.annotations = append(info.annotations, annotations...)
73+
info.annotations[pluginName] = annotations
7374

7475
return nil
7576
}
7677

78+
// annotationsAsList returns container annotations as a single list.
79+
func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation {
80+
info.RLock()
81+
defer info.RUnlock()
82+
83+
var lst []kubecontainer.Annotation
84+
for _, v := range info.annotations {
85+
lst = append(lst, v...)
86+
}
87+
return lst
88+
}
89+
7790
// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name.
7891
type claimInfoCache struct {
7992
sync.RWMutex
@@ -93,10 +106,33 @@ func newClaimInfo(driverName, className string, claimUID types.UID, claimName, n
93106
}
94107
claimInfo := ClaimInfo{
95108
ClaimInfoState: claimInfoState,
109+
annotations: make(map[string][]kubecontainer.Annotation),
96110
}
97111
return &claimInfo
98112
}
99113

114+
// newClaimInfoFromResourceClaim creates a new ClaimInfo object
115+
func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo {
116+
// Grab the allocation.resourceHandles. If there are no
117+
// allocation.resourceHandles, create a single resourceHandle with no
118+
// content. This will trigger processing of this claim by a single
119+
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
120+
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
121+
if len(resourceHandles) == 0 {
122+
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
123+
}
124+
125+
return newClaimInfo(
126+
resourceClaim.Status.DriverName,
127+
resourceClaim.Spec.ResourceClassName,
128+
resourceClaim.UID,
129+
resourceClaim.Name,
130+
resourceClaim.Namespace,
131+
make(sets.Set[string]),
132+
resourceHandles,
133+
)
134+
}
135+
100136
// newClaimInfoCache is a function that returns an instance of the claimInfoCache.
101137
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
102138
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)

pkg/kubelet/cm/dra/manager.go

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ import (
2121
"fmt"
2222

2323
v1 "k8s.io/api/core/v1"
24-
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
2524
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2625
"k8s.io/apimachinery/pkg/types"
27-
"k8s.io/apimachinery/pkg/util/sets"
2826
clientset "k8s.io/client-go/kubernetes"
2927
"k8s.io/dynamic-resource-allocation/resourceclaim"
3028
"k8s.io/klog/v2"
@@ -109,42 +107,30 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
109107
continue
110108
}
111109

112-
// Is the resource already prepared? Then add the pod UID to it.
113-
if claimInfo := m.cache.get(*claimName, pod.Namespace); claimInfo != nil {
114-
// We delay checkpointing of this change until this call
115-
// returns successfully. It is OK to do this because we
116-
// will only return successfully from this call if the
117-
// checkpoint has succeeded. That means if the kubelet is
118-
// ever restarted before this checkpoint succeeds, the pod
119-
// whose resources are being prepared would never have
120-
// started, so it's OK (actually correct) to not include it
121-
// in the cache.
122-
claimInfo.addPodReference(pod.UID)
110+
claimInfo := m.cache.get(*claimName, pod.Namespace)
111+
if claimInfo == nil {
112+
// claim does not exist in cache, create new claimInfo object
113+
// to be processed later.
114+
claimInfo = newClaimInfoFromResourceClaim(resourceClaim)
115+
}
116+
117+
// We delay checkpointing of this change until this call
118+
// returns successfully. It is OK to do this because we
119+
// will only return successfully from this call if the
120+
// checkpoint has succeeded. That means if the kubelet is
121+
// ever restarted before this checkpoint succeeds, the pod
122+
// whose resources are being prepared would never have
123+
// started, so it's OK (actually correct) to not include it
124+
// in the cache.
125+
claimInfo.addPodReference(pod.UID)
126+
127+
if claimInfo.prepared {
128+
// Already prepared this claim, no need to prepare it again
123129
continue
124130
}
125131

126-
// Grab the allocation.resourceHandles. If there are no
127-
// allocation.resourceHandles, create a single resourceHandle with no
128-
// content. This will trigger processing of this claim by a single
129-
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
130-
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
131-
if len(resourceHandles) == 0 {
132-
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
133-
}
134-
135-
// Create a claimInfo object to store the relevant claim info.
136-
claimInfo := newClaimInfo(
137-
resourceClaim.Status.DriverName,
138-
resourceClaim.Spec.ResourceClassName,
139-
resourceClaim.UID,
140-
resourceClaim.Name,
141-
resourceClaim.Namespace,
142-
sets.New(string(pod.UID)),
143-
resourceHandles,
144-
)
145-
146132
// Loop through all plugins and prepare for calling NodePrepareResources.
147-
for _, resourceHandle := range resourceHandles {
133+
for _, resourceHandle := range claimInfo.ResourceHandles {
148134
// If no DriverName is provided in the resourceHandle, we
149135
// use the DriverName from the status
150136
pluginName := resourceHandle.DriverName
@@ -193,6 +179,8 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
193179
if err != nil {
194180
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
195181
}
182+
// mark claim as (successfully) prepared by manager, so next time we dont prepare it.
183+
claimInfo.prepared = true
196184

197185
// TODO: We (re)add the claimInfo object to the cache and
198186
// sync it to the checkpoint *after* the
@@ -291,8 +279,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
291279
}
292280

293281
claimInfo.RLock()
294-
klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimInfo.annotations)
295-
annotations = append(annotations, claimInfo.annotations...)
282+
claimAnnotations := claimInfo.annotationsAsList()
283+
klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations)
284+
annotations = append(annotations, claimAnnotations...)
296285
for _, devices := range claimInfo.CDIDevices {
297286
for _, device := range devices {
298287
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})

0 commit comments

Comments
 (0)