Skip to content

Commit 08b9420

Browse files
committed
DRA: call plugins for claims even if exist in cache
Today, DRA manager does not call plugin NodePrepareResource for claims that it previously successfully handled, that is, if claims are present in cache (checkpoint) even if node rebooted. After node reboots, it is required to call DRA plugin for resource claims so that plugins may prepare them again in case the resources dont persist reboot. To achieve that, once kubelet is started, we call DRA plugins for claims once if a pod sandbox is required to be created during PodSync. Signed-off-by: adrianc <[email protected]>
1 parent 755644a commit 08b9420

File tree

2 files changed

+64
-39
lines changed

2 files changed

+64
-39
lines changed

pkg/kubelet/cm/dra/claiminfo.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ import (
3333
type ClaimInfo struct {
3434
sync.RWMutex
3535
state.ClaimInfoState
36-
// annotations is a list of container annotations associated with
36+
// annotations is a mapping of container annotations per DRA plugin associated with
3737
// a prepared resource
38-
annotations []kubecontainer.Annotation
38+
annotations map[string][]kubecontainer.Annotation
39+
prepared bool
3940
}
4041

4142
func (info *ClaimInfo) addPodReference(podUID types.UID) {
@@ -69,11 +70,23 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err
6970
}
7071

7172
info.CDIDevices[pluginName] = cdiDevices
72-
info.annotations = append(info.annotations, annotations...)
73+
info.annotations[pluginName] = annotations
7374

7475
return nil
7576
}
7677

78+
// annotationsAsList returns container annotations as a single list.
79+
func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation {
80+
info.RLock()
81+
defer info.RUnlock()
82+
83+
var lst []kubecontainer.Annotation
84+
for _, v := range info.annotations {
85+
lst = append(lst, v...)
86+
}
87+
return lst
88+
}
89+
7790
// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name.
7891
type claimInfoCache struct {
7992
sync.RWMutex
@@ -93,10 +106,33 @@ func newClaimInfo(driverName, className string, claimUID types.UID, claimName, n
93106
}
94107
claimInfo := ClaimInfo{
95108
ClaimInfoState: claimInfoState,
109+
annotations: make(map[string][]kubecontainer.Annotation),
96110
}
97111
return &claimInfo
98112
}
99113

114+
// newClaimInfoFromResourceClaim creates a new ClaimInfo object
115+
func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo {
116+
// Grab the allocation.resourceHandles. If there are no
117+
// allocation.resourceHandles, create a single resourceHandle with no
118+
// content. This will trigger processing of this claim by a single
119+
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
120+
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
121+
if len(resourceHandles) == 0 {
122+
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
123+
}
124+
125+
return newClaimInfo(
126+
resourceClaim.Status.DriverName,
127+
resourceClaim.Spec.ResourceClassName,
128+
resourceClaim.UID,
129+
resourceClaim.Name,
130+
resourceClaim.Namespace,
131+
make(sets.Set[string]),
132+
resourceHandles,
133+
)
134+
}
135+
100136
// newClaimInfoCache is a function that returns an instance of the claimInfoCache.
101137
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
102138
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)

pkg/kubelet/cm/dra/manager.go

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@ import (
2121
"fmt"
2222

2323
v1 "k8s.io/api/core/v1"
24-
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
2524
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2625
"k8s.io/apimachinery/pkg/types"
27-
"k8s.io/apimachinery/pkg/util/sets"
2826
clientset "k8s.io/client-go/kubernetes"
2927
"k8s.io/dynamic-resource-allocation/resourceclaim"
3028
"k8s.io/klog/v2"
@@ -109,42 +107,30 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
109107
continue
110108
}
111109

112-
// Is the resource already prepared? Then add the pod UID to it.
113-
if claimInfo := m.cache.get(*claimName, pod.Namespace); claimInfo != nil {
114-
// We delay checkpointing of this change until this call
115-
// returns successfully. It is OK to do this because we
116-
// will only return successfully from this call if the
117-
// checkpoint has succeeded. That means if the kubelet is
118-
// ever restarted before this checkpoint succeeds, the pod
119-
// whose resources are being prepared would never have
120-
// started, so it's OK (actually correct) to not include it
121-
// in the cache.
122-
claimInfo.addPodReference(pod.UID)
110+
claimInfo := m.cache.get(*claimName, pod.Namespace)
111+
if claimInfo == nil {
112+
// claim does not exist in cache, create new claimInfo object
113+
// to be processed later.
114+
claimInfo = newClaimInfoFromResourceClaim(resourceClaim)
115+
}
116+
117+
// We delay checkpointing of this change until this call
118+
// returns successfully. It is OK to do this because we
119+
// will only return successfully from this call if the
120+
// checkpoint has succeeded. That means if the kubelet is
121+
// ever restarted before this checkpoint succeeds, the pod
122+
// whose resources are being prepared would never have
123+
// started, so it's OK (actually correct) to not include it
124+
// in the cache.
125+
claimInfo.addPodReference(pod.UID)
126+
127+
if claimInfo.prepared {
128+
// Already prepared this claim, no need to prepare it again
123129
continue
124130
}
125131

126-
// Grab the allocation.resourceHandles. If there are no
127-
// allocation.resourceHandles, create a single resourceHandle with no
128-
// content. This will trigger processing of this claim by a single
129-
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
130-
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
131-
if len(resourceHandles) == 0 {
132-
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
133-
}
134-
135-
// Create a claimInfo object to store the relevant claim info.
136-
claimInfo := newClaimInfo(
137-
resourceClaim.Status.DriverName,
138-
resourceClaim.Spec.ResourceClassName,
139-
resourceClaim.UID,
140-
resourceClaim.Name,
141-
resourceClaim.Namespace,
142-
sets.New(string(pod.UID)),
143-
resourceHandles,
144-
)
145-
146132
// Loop through all plugins and prepare for calling NodePrepareResources.
147-
for _, resourceHandle := range resourceHandles {
133+
for _, resourceHandle := range claimInfo.ResourceHandles {
148134
// If no DriverName is provided in the resourceHandle, we
149135
// use the DriverName from the status
150136
pluginName := resourceHandle.DriverName
@@ -193,6 +179,8 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
193179
if err != nil {
194180
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
195181
}
182+
// mark claim as (successfully) prepared by manager, so next time we dont prepare it.
183+
claimInfo.prepared = true
196184

197185
// TODO: We (re)add the claimInfo object to the cache and
198186
// sync it to the checkpoint *after* the
@@ -291,8 +279,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
291279
}
292280

293281
claimInfo.RLock()
294-
klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimInfo.annotations)
295-
annotations = append(annotations, claimInfo.annotations...)
282+
claimAnnotations := claimInfo.annotationsAsList()
283+
klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations)
284+
annotations = append(annotations, claimAnnotations...)
296285
for _, devices := range claimInfo.CDIDevices {
297286
for _, device := range devices {
298287
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})

0 commit comments

Comments
 (0)