Skip to content

Commit fe46713

Browse files
committed
Call allocationManager directly
1 parent 84ec78e commit fe46713

14 files changed

+52
-91
lines changed

pkg/kubelet/allocation/allocation_manager.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
v1 "k8s.io/api/core/v1"
2121
apiequality "k8s.io/apimachinery/pkg/api/equality"
2222
"k8s.io/apimachinery/pkg/types"
23+
"k8s.io/apimachinery/pkg/util/sets"
2324
utilfeature "k8s.io/apiserver/pkg/util/feature"
2425
"k8s.io/klog/v2"
2526
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@@ -44,10 +45,10 @@ type Manager interface {
4445
SetPodAllocation(pod *v1.Pod) error
4546

4647
// DeletePodAllocation removes any stored state for the given pod UID.
47-
DeletePodAllocation(uid types.UID) error
48+
DeletePodAllocation(uid types.UID)
4849

49-
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs.
50-
RemoveOrphanedPods(remainingPods map[types.UID]bool)
50+
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
51+
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
5152
}
5253

5354
type manager struct {
@@ -151,10 +152,13 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
151152
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
152153
}
153154

154-
func (m *manager) DeletePodAllocation(uid types.UID) error {
155-
return m.state.Delete(string(uid), "")
155+
func (m *manager) DeletePodAllocation(uid types.UID) {
156+
if err := m.state.Delete(string(uid), ""); err != nil {
157+
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
158+
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
159+
}
156160
}
157161

158-
func (m *manager) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
162+
func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
159163
m.state.RemoveOrphanedPods(remainingPods)
160164
}

pkg/kubelet/allocation/state/state.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package state
1919
import (
2020
v1 "k8s.io/api/core/v1"
2121
"k8s.io/apimachinery/pkg/types"
22+
"k8s.io/apimachinery/pkg/util/sets"
2223
)
2324

2425
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
@@ -49,8 +50,8 @@ type writer interface {
4950
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
5051
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
5152
Delete(podUID string, containerName string) error
52-
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs.
53-
RemoveOrphanedPods(remainingPods map[types.UID]bool)
53+
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
54+
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
5455
}
5556

5657
// State interface provides methods for tracking and setting pod resource allocation

pkg/kubelet/allocation/state/state_checkpoint.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
v1 "k8s.io/api/core/v1"
2525
"k8s.io/apimachinery/pkg/types"
26+
"k8s.io/apimachinery/pkg/util/sets"
2627
"k8s.io/klog/v2"
2728
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
2829
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
@@ -140,7 +141,7 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) error {
140141
return sc.storeState()
141142
}
142143

143-
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
144+
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
144145
sc.cache.RemoveOrphanedPods(remainingPods)
145146
// Don't bother updating the stored state. If Kubelet is restarted before the cache is written,
146147
// the orphaned pods will be removed the next time this method is called.
@@ -173,4 +174,4 @@ func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
173174
return nil
174175
}
175176

176-
func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ map[types.UID]bool) {}
177+
func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ sets.Set[types.UID]) {}

pkg/kubelet/allocation/state/state_mem.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
v1 "k8s.io/api/core/v1"
2323
"k8s.io/apimachinery/pkg/types"
24+
"k8s.io/apimachinery/pkg/util/sets"
2425
"k8s.io/klog/v2"
2526
)
2627

@@ -98,7 +99,7 @@ func (s *stateMemory) Delete(podUID string, containerName string) error {
9899
return nil
99100
}
100101

101-
func (s *stateMemory) RemoveOrphanedPods(remainingPods map[types.UID]bool) {
102+
func (s *stateMemory) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
102103
s.Lock()
103104
defer s.Unlock()
104105

pkg/kubelet/kubelet.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ import (
7777
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
7878
"k8s.io/kubernetes/pkg/api/v1/resource"
7979
"k8s.io/kubernetes/pkg/features"
80+
"k8s.io/kubernetes/pkg/kubelet/allocation"
8081
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
8182
"k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
8283
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
@@ -662,7 +663,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
662663
klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
663664
klet.podManager = kubepod.NewBasicPodManager()
664665

665-
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir())
666+
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
667+
klet.allocationManager = allocation.NewManager(klet.getRootDir())
666668

667669
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
668670

@@ -1147,6 +1149,9 @@ type Kubelet struct {
11471149
// consult the pod worker.
11481150
statusManager status.Manager
11491151

1152+
// allocationManager manages allocated resources for pods.
1153+
allocationManager allocation.Manager
1154+
11501155
// resyncInterval is the interval between periodic full reconciliations of
11511156
// pods on this node.
11521157
resyncInterval time.Duration
@@ -2644,7 +2649,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
26442649
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
26452650
// To handle kubelet restarts, test pod admissibility using AllocatedResources values
26462651
// (for cpu & memory) from checkpoint store. If found, that is the source of truth.
2647-
allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod)
2652+
allocatedPod, _ := kl.allocationManager.UpdatePodFromAllocation(pod)
26482653

26492654
// Check if we can admit the pod; if not, reject it.
26502655
if ok, reason, message := kl.canAdmitPod(allocatedPods, allocatedPod); !ok {
@@ -2657,7 +2662,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
26572662
continue
26582663
}
26592664
// For new pod, checkpoint the resource values at which the Pod has been admitted
2660-
if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil {
2665+
if err := kl.allocationManager.SetPodAllocation(allocatedPod); err != nil {
26612666
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
26622667
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
26632668
}
@@ -2713,6 +2718,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
27132718
start := kl.clock.Now()
27142719
for _, pod := range pods {
27152720
kl.podManager.RemovePod(pod)
2721+
kl.allocationManager.DeletePodAllocation(pod.UID)
27162722

27172723
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
27182724
if wasMirror {
@@ -2876,7 +2882,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus, string)
28762882
// calculations after this function is called. It also updates the cached ResizeStatus according to
28772883
// the allocation decision and pod status.
28782884
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) {
2879-
allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod)
2885+
allocatedPod, updated := kl.allocationManager.UpdatePodFromAllocation(pod)
28802886
if !updated {
28812887
// Desired resources == allocated resources. Check whether a resize is in progress.
28822888
resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus)
@@ -2897,7 +2903,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
28972903
fit, resizeStatus, resizeMsg := kl.canResizePod(pod)
28982904
if fit {
28992905
// Update pod resource allocation checkpoint
2900-
if err := kl.statusManager.SetPodAllocation(pod); err != nil {
2906+
if err := kl.allocationManager.SetPodAllocation(pod); err != nil {
29012907
return nil, err
29022908
}
29032909
for i, container := range pod.Spec.Containers {

pkg/kubelet/kubelet_pods.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (kl *Kubelet) getAllocatedPods() []*v1.Pod {
217217

218218
allocatedPods := make([]*v1.Pod, len(activePods))
219219
for i, pod := range activePods {
220-
allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod)
220+
allocatedPods[i], _ = kl.allocationManager.UpdatePodFromAllocation(pod)
221221
}
222222
return allocatedPods
223223
}
@@ -1169,9 +1169,9 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
11691169
// desired pods. Pods that must be restarted due to UID reuse, or leftover
11701170
// pods from previous runs, are not known to the pod worker.
11711171

1172-
allPodsByUID := make(map[types.UID]*v1.Pod)
1172+
allPodsByUID := make(sets.Set[types.UID])
11731173
for _, pod := range allPods {
1174-
allPodsByUID[pod.UID] = pod
1174+
allPodsByUID.Insert(pod.UID)
11751175
}
11761176

11771177
// Identify the set of pods that have workers, which should be all pods
@@ -1218,6 +1218,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
12181218
// Remove orphaned pod statuses not in the total list of known config pods
12191219
klog.V(3).InfoS("Clean up orphaned pod statuses")
12201220
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
1221+
kl.allocationManager.RemoveOrphanedPods(allPodsByUID)
12211222

12221223
// Remove orphaned pod user namespace allocations (if any).
12231224
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
@@ -2147,7 +2148,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
21472148

21482149
// Always set the status to the latest allocated resources, even if it differs from the
21492150
// allocation used by the current sync loop.
2150-
alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName)
2151+
alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName)
21512152
if !found {
21522153
// This case is expected for non-resizable containers (ephemeral & non-restartable init containers).
21532154
// Don't set status.Resources in this case.
@@ -2367,7 +2368,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
23672368
status.Resources = convertContainerStatusResources(cName, status, cStatus, oldStatuses)
23682369

23692370
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingAllocatedStatus) {
2370-
if alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName); found {
2371+
if alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName); found {
23712372
status.AllocatedResources = alloc.Requests
23722373
}
23732374
}

pkg/kubelet/kubelet_pods_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5087,7 +5087,8 @@ func TestConvertToAPIContainerStatusesForResources(t *testing.T) {
50875087
} else {
50885088
tPod.Spec.Containers[0].Resources = tc.Resources
50895089
}
5090-
kubelet.statusManager.SetPodAllocation(tPod)
5090+
err := kubelet.allocationManager.SetPodAllocation(tPod)
5091+
require.NoError(t, err)
50915092
resources := tc.ActualResources
50925093
if resources == nil {
50935094
resources = &kubecontainer.ContainerResources{

pkg/kubelet/kubelet_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import (
6262
fakeremote "k8s.io/cri-client/pkg/fake"
6363
"k8s.io/klog/v2"
6464
"k8s.io/kubernetes/pkg/features"
65+
"k8s.io/kubernetes/pkg/kubelet/allocation"
6566
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
6667
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
6768
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
@@ -272,7 +273,8 @@ func newTestKubeletWithImageList(
272273
kubelet.mirrorPodClient = fakeMirrorClient
273274
kubelet.podManager = kubepod.NewBasicPodManager()
274275
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
275-
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir())
276+
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker)
277+
kubelet.allocationManager = allocation.NewInMemoryManager()
276278
kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
277279

278280
kubelet.containerRuntime = fakeRuntime
@@ -2566,14 +2568,14 @@ func TestPodResourceAllocationReset(t *testing.T) {
25662568
t.Run(tc.name, func(t *testing.T) {
25672569
if tc.existingPodAllocation != nil {
25682570
// when kubelet restarts, AllocatedResources has already existed before adding pod
2569-
err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation)
2571+
err := kubelet.allocationManager.SetPodAllocation(tc.existingPodAllocation)
25702572
if err != nil {
25712573
t.Fatalf("failed to set pod allocation: %v", err)
25722574
}
25732575
}
25742576
kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
25752577

2576-
allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
2578+
allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
25772579
if !found {
25782580
t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
25792581
}
@@ -2903,9 +2905,9 @@ func TestHandlePodResourcesResize(t *testing.T) {
29032905
}
29042906

29052907
if !tt.newResourcesAllocated {
2906-
require.NoError(t, kubelet.statusManager.SetPodAllocation(originalPod))
2908+
require.NoError(t, kubelet.allocationManager.SetPodAllocation(originalPod))
29072909
} else {
2908-
require.NoError(t, kubelet.statusManager.SetPodAllocation(newPod))
2910+
require.NoError(t, kubelet.allocationManager.SetPodAllocation(newPod))
29092911
}
29102912

29112913
podStatus := &kubecontainer.PodStatus{
@@ -2951,7 +2953,7 @@ func TestHandlePodResourcesResize(t *testing.T) {
29512953
assert.Equal(t, tt.expectedAllocatedReqs, updatedPodCtr.Resources.Requests, "updated pod spec requests")
29522954
assert.Equal(t, tt.expectedAllocatedLims, updatedPodCtr.Resources.Limits, "updated pod spec limits")
29532955

2954-
alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name)
2956+
alloc, found := kubelet.allocationManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name)
29552957
require.True(t, found, "container allocation")
29562958
assert.Equal(t, tt.expectedAllocatedReqs, alloc.Requests, "stored container request allocation")
29572959
assert.Equal(t, tt.expectedAllocatedLims, alloc.Limits, "stored container limit allocation")

pkg/kubelet/prober/common_test.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package prober
1818

1919
import (
20-
"os"
2120
"reflect"
2221
"sync"
2322

@@ -114,14 +113,8 @@ func newTestManager() *manager {
114113
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
115114
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
116115
podManager.AddPod(getTestPod())
117-
testRootDir := ""
118-
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
119-
return nil
120-
} else {
121-
testRootDir = tempDir
122-
}
123116
m := NewManager(
124-
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
117+
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
125118
results.NewManager(),
126119
results.NewManager(),
127120
results.NewManager(),

pkg/kubelet/prober/scale_test.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"fmt"
2222
"net"
2323
"net/http"
24-
"os"
2524
"sync"
2625
"sync/atomic"
2726
"testing"
@@ -81,16 +80,10 @@ func TestTCPPortExhaustion(t *testing.T) {
8180
}
8281
for _, tt := range tests {
8382
t.Run(tt.name, func(t *testing.T) {
84-
testRootDir := ""
85-
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
86-
t.Fatalf("can't make a temp rootdir: %v", err)
87-
} else {
88-
testRootDir = tempDir
89-
}
9083
podManager := kubepod.NewBasicPodManager()
9184
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
9285
m := NewManager(
93-
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
86+
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
9487
results.NewManager(),
9588
results.NewManager(),
9689
results.NewManager(),

0 commit comments

Comments
 (0)