Skip to content

Commit 8266787

Browse files
authored
Merge pull request kubernetes#130599 from tallclair/acknowledged-resources
[FG:InPlacePodVerticalScaling] Track actuated resources to trigger resizes
2 parents b90ff89 + 6d0b627 commit 8266787

File tree

9 files changed

+486
-602
lines changed

9 files changed

+486
-602
lines changed

pkg/kubelet/allocation/allocation_manager.go

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package allocation
1818

1919
import (
20+
"path/filepath"
21+
2022
v1 "k8s.io/api/core/v1"
2123
apiequality "k8s.io/apimachinery/pkg/api/equality"
2224
"k8s.io/apimachinery/pkg/types"
@@ -29,7 +31,10 @@ import (
2931
)
3032

3133
// podStatusManagerStateFile is the file name where status manager stores its state
32-
const podStatusManagerStateFile = "pod_status_manager_state"
34+
const (
35+
allocatedPodsStateFile = "allocated_pods_state"
36+
actuatedPodsStateFile = "actuated_pods_state"
37+
)
3338

3439
// AllocationManager tracks pod resource allocations.
3540
type Manager interface {
@@ -41,57 +46,71 @@ type Manager interface {
4146
// Returns the updated (or original) pod, and whether there was an allocation stored.
4247
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
4348

44-
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
45-
SetPodAllocation(pod *v1.Pod) error
49+
// SetAllocatedResources checkpoints the resources allocated to a pod's containers.
50+
SetAllocatedResources(allocatedPod *v1.Pod) error
51+
52+
// SetActuatedResources records the actuated resources of the given container (or the entire
53+
// pod, if actuatedContainer is nil).
54+
SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error
4655

47-
// DeletePodAllocation removes any stored state for the given pod UID.
48-
DeletePodAllocation(uid types.UID)
56+
// GetActuatedResources returns the stored actuated resources for the container, and whether they exist.
57+
GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool)
58+
59+
// RemovePod removes any stored state for the given pod UID.
60+
RemovePod(uid types.UID)
4961

5062
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
5163
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
5264
}
5365

5466
type manager struct {
55-
state state.State
67+
allocated state.State
68+
actuated state.State
5669
}
5770

5871
func NewManager(checkpointDirectory string) Manager {
59-
m := &manager{}
60-
61-
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
62-
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile)
63-
if err != nil {
64-
// This is a crictical, non-recoverable failure.
65-
klog.ErrorS(err, "Failed to initialize allocation checkpoint manager")
66-
panic(err)
67-
}
68-
m.state = stateImpl
69-
} else {
70-
m.state = state.NewNoopStateCheckpoint()
72+
return &manager{
73+
allocated: newStateImpl(checkpointDirectory, allocatedPodsStateFile),
74+
actuated: newStateImpl(checkpointDirectory, actuatedPodsStateFile),
75+
}
76+
}
77+
78+
func newStateImpl(checkpointDirectory, checkpointName string) state.State {
79+
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
80+
return state.NewNoopStateCheckpoint()
7181
}
7282

73-
return m
83+
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, checkpointName)
84+
if err != nil {
85+
// This is a critical, non-recoverable failure.
86+
klog.ErrorS(err, "Failed to initialize allocation checkpoint manager",
87+
"checkpointPath", filepath.Join(checkpointDirectory, checkpointName))
88+
panic(err)
89+
}
90+
91+
return stateImpl
7492
}
7593

7694
// NewInMemoryManager returns an allocation manager that doesn't persist state.
7795
// For testing purposes only!
7896
func NewInMemoryManager() Manager {
7997
return &manager{
80-
state: state.NewStateMemory(nil),
98+
allocated: state.NewStateMemory(nil),
99+
actuated: state.NewStateMemory(nil),
81100
}
82101
}
83102

84103
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
85104
// If checkpoint manager has not been initialized, it returns nil, false
86105
func (m *manager) GetContainerResourceAllocation(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
87-
return m.state.GetContainerResourceAllocation(podUID, containerName)
106+
return m.allocated.GetContainerResourceAllocation(podUID, containerName)
88107
}
89108

90109
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
91110
// This function does a deep copy only if updates are needed.
92111
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
93112
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
94-
allocs := m.state.GetPodResourceAllocation()
113+
allocs := m.allocated.GetPodResourceAllocation()
95114
return updatePodFromAllocation(pod, allocs)
96115
}
97116

@@ -132,8 +151,12 @@ func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*
132151
return pod, updated
133152
}
134153

135-
// SetPodAllocation checkpoints the resources allocated to a pod's containers
136-
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
154+
// SetAllocatedResources checkpoints the resources allocated to a pod's containers
155+
func (m *manager) SetAllocatedResources(pod *v1.Pod) error {
156+
return m.allocated.SetPodResourceAllocation(pod.UID, allocationFromPod(pod))
157+
}
158+
159+
func allocationFromPod(pod *v1.Pod) map[string]v1.ResourceRequirements {
137160
podAlloc := make(map[string]v1.ResourceRequirements)
138161
for _, container := range pod.Spec.Containers {
139162
alloc := *container.Resources.DeepCopy()
@@ -149,16 +172,35 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
149172
}
150173
}
151174

152-
return m.state.SetPodResourceAllocation(pod.UID, podAlloc)
175+
return podAlloc
153176
}
154177

155-
func (m *manager) DeletePodAllocation(uid types.UID) {
156-
if err := m.state.Delete(uid, ""); err != nil {
178+
func (m *manager) RemovePod(uid types.UID) {
179+
if err := m.allocated.Delete(uid, ""); err != nil {
180+
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
181+
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
182+
}
183+
184+
if err := m.actuated.Delete(uid, ""); err != nil {
157185
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
158186
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
159187
}
160188
}
161189

162190
func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
163-
m.state.RemoveOrphanedPods(remainingPods)
191+
m.allocated.RemoveOrphanedPods(remainingPods)
192+
m.actuated.RemoveOrphanedPods(remainingPods)
193+
}
194+
195+
func (m *manager) SetActuatedResources(allocatedPod *v1.Pod, actuatedContainer *v1.Container) error {
196+
if actuatedContainer == nil {
197+
alloc := allocationFromPod(allocatedPod)
198+
return m.actuated.SetPodResourceAllocation(allocatedPod.UID, alloc)
199+
}
200+
201+
return m.actuated.SetContainerResourceAllocation(allocatedPod.UID, actuatedContainer.Name, actuatedContainer.Resources)
202+
}
203+
204+
func (m *manager) GetActuatedResources(podUID types.UID, containerName string) (v1.ResourceRequirements, bool) {
205+
return m.actuated.GetContainerResourceAllocation(podUID, containerName)
164206
}

pkg/kubelet/kubelet.go

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
740740
kubeDeps.ContainerManager,
741741
klet.containerLogManager,
742742
klet.runtimeClassManager,
743+
klet.allocationManager,
743744
seccompDefault,
744745
kubeCfg.MemorySwap.SwapBehavior,
745746
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
@@ -2666,7 +2667,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
26662667
continue
26672668
}
26682669
// For new pod, checkpoint the resource values at which the Pod has been admitted
2669-
if err := kl.allocationManager.SetPodAllocation(allocatedPod); err != nil {
2670+
if err := kl.allocationManager.SetAllocatedResources(allocatedPod); err != nil {
26702671
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
26712672
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
26722673
}
@@ -2722,7 +2723,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
27222723
start := kl.clock.Now()
27232724
for _, pod := range pods {
27242725
kl.podManager.RemovePod(pod)
2725-
kl.allocationManager.DeletePodAllocation(pod.UID)
2726+
kl.allocationManager.RemovePod(pod.UID)
27262727

27272728
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
27282729
if wasMirror {
@@ -2886,8 +2887,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
28862887

28872888
if !updated {
28882889
// Desired resources == allocated resources. Check whether a resize is in progress.
2889-
resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus)
2890-
if resizeInProgress {
2890+
if kl.isPodResizeInProgress(allocatedPod, podStatus) {
28912891
// If a resize is in progress, make sure the cache has the correct state in case the Kubelet restarted.
28922892
kl.statusManager.SetPodResizeStatus(pod.UID, v1.PodResizeStatusInProgress)
28932893
} else {
@@ -2909,7 +2909,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
29092909
fit, resizeStatus, resizeMsg := kl.canResizePod(pod)
29102910
if fit {
29112911
// Update pod resource allocation checkpoint
2912-
if err := kl.allocationManager.SetPodAllocation(pod); err != nil {
2912+
if err := kl.allocationManager.SetAllocatedResources(pod); err != nil {
29132913
return nil, err
29142914
}
29152915
for i, container := range pod.Spec.Containers {
@@ -2928,11 +2928,11 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
29282928
}
29292929
allocatedPod = pod
29302930

2931-
// Special case when the updated allocation matches the actual resources. This can occur
2931+
// Special case when the updated allocation matches the actuated resources. This can occur
29322932
// when reverting a resize that hasn't been actuated, or when making an equivalent change
29332933
// (such as CPU requests below MinShares). This is an optimization to clear the resize
29342934
// status immediately, rather than waiting for the next SyncPod iteration.
2935-
if allocatedResourcesMatchStatus(allocatedPod, podStatus) {
2935+
if !kl.isPodResizeInProgress(allocatedPod, podStatus) {
29362936
// In this case, consider the resize complete.
29372937
kl.statusManager.SetPodResizeStatus(pod.UID, "")
29382938
return allocatedPod, nil
@@ -2952,6 +2952,46 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
29522952
return allocatedPod, nil
29532953
}
29542954

2955+
// isPodResizingInProgress checks whether the actuated resizable resources differ from the allocated resources
2956+
// for any running containers. Specifically, the following differences are ignored:
2957+
// - Non-resizable containers: non-restartable init containers, ephemeral containers
2958+
// - Non-resizable resources: only CPU & memory are resizable
2959+
// - Non-actuated resources: memory requests are not actuated
2960+
// - Non-running containers: they will be sized correctly when (re)started
2961+
func (kl *Kubelet) isPodResizeInProgress(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
2962+
return !podutil.VisitContainers(&allocatedPod.Spec, podutil.InitContainers|podutil.Containers,
2963+
func(allocatedContainer *v1.Container, containerType podutil.ContainerType) (shouldContinue bool) {
2964+
if !isResizableContainer(allocatedContainer, containerType) {
2965+
return true
2966+
}
2967+
2968+
containerStatus := podStatus.FindContainerStatusByName(allocatedContainer.Name)
2969+
if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
2970+
// If the container isn't running, it doesn't need to be resized.
2971+
return true
2972+
}
2973+
2974+
actuatedResources, _ := kl.allocationManager.GetActuatedResources(allocatedPod.UID, allocatedContainer.Name)
2975+
allocatedResources := allocatedContainer.Resources
2976+
2977+
// Memory requests are excluded since they don't need to be actuated.
2978+
return allocatedResources.Requests[v1.ResourceCPU].Equal(actuatedResources.Requests[v1.ResourceCPU]) &&
2979+
allocatedResources.Limits[v1.ResourceCPU].Equal(actuatedResources.Limits[v1.ResourceCPU]) &&
2980+
allocatedResources.Limits[v1.ResourceMemory].Equal(actuatedResources.Limits[v1.ResourceMemory])
2981+
})
2982+
}
2983+
2984+
func isResizableContainer(container *v1.Container, containerType podutil.ContainerType) bool {
2985+
switch containerType {
2986+
case podutil.InitContainers:
2987+
return podutil.IsRestartableInitContainer(container)
2988+
case podutil.Containers:
2989+
return true
2990+
default:
2991+
return false
2992+
}
2993+
}
2994+
29552995
// LatestLoopEntryTime returns the last time in the sync loop monitor.
29562996
func (kl *Kubelet) LatestLoopEntryTime() time.Time {
29572997
val := kl.syncLoopMonitor.Load()

pkg/kubelet/kubelet_pods.go

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ import (
6060
"k8s.io/kubernetes/pkg/kubelet/metrics"
6161
"k8s.io/kubernetes/pkg/kubelet/status"
6262
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
63-
"k8s.io/kubernetes/pkg/kubelet/util/format"
6463
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
6564
utilpod "k8s.io/kubernetes/pkg/util/pod"
6665
volumeutil "k8s.io/kubernetes/pkg/volume/util"
@@ -1746,86 +1745,6 @@ func (kl *Kubelet) determinePodResizeStatus(allocatedPod *v1.Pod, podStatus *kub
17461745
return resizeStatus
17471746
}
17481747

1749-
// allocatedResourcesMatchStatus tests whether the resizeable resources in the pod spec match the
1750-
// resources reported in the status.
1751-
func allocatedResourcesMatchStatus(allocatedPod *v1.Pod, podStatus *kubecontainer.PodStatus) bool {
1752-
for _, c := range allocatedPod.Spec.Containers {
1753-
if !allocatedContainerResourcesMatchStatus(allocatedPod, &c, podStatus) {
1754-
return false
1755-
}
1756-
}
1757-
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
1758-
for _, c := range allocatedPod.Spec.InitContainers {
1759-
if podutil.IsRestartableInitContainer(&c) && !allocatedContainerResourcesMatchStatus(allocatedPod, &c, podStatus) {
1760-
return false
1761-
}
1762-
}
1763-
}
1764-
return true
1765-
}
1766-
1767-
// allocatedContainerResourcesMatchStatus returns true if the container resources matches with the container statuses resources.
1768-
func allocatedContainerResourcesMatchStatus(allocatedPod *v1.Pod, c *v1.Container, podStatus *kubecontainer.PodStatus) bool {
1769-
if cs := podStatus.FindContainerStatusByName(c.Name); cs != nil {
1770-
if cs.State != kubecontainer.ContainerStateRunning {
1771-
// If the container isn't running, it isn't resizing.
1772-
return true
1773-
}
1774-
1775-
cpuReq, hasCPUReq := c.Resources.Requests[v1.ResourceCPU]
1776-
cpuLim, hasCPULim := c.Resources.Limits[v1.ResourceCPU]
1777-
memLim, hasMemLim := c.Resources.Limits[v1.ResourceMemory]
1778-
1779-
if cs.Resources == nil {
1780-
if hasCPUReq || hasCPULim || hasMemLim {
1781-
// Container status is missing Resources information, but the container does
1782-
// have resizable resources configured.
1783-
klog.ErrorS(nil, "Missing runtime resources information for resizing container",
1784-
"pod", format.Pod(allocatedPod), "container", c.Name)
1785-
return false // We don't want to clear resize status with insufficient information.
1786-
} else {
1787-
// No resizable resources configured; this might be ok.
1788-
return true
1789-
}
1790-
}
1791-
1792-
// Only compare resizeable resources, and only compare resources that are explicitly configured.
1793-
if hasCPUReq {
1794-
if cs.Resources.CPURequest == nil {
1795-
if !cpuReq.IsZero() {
1796-
return false
1797-
}
1798-
} else if !cpuReq.Equal(*cs.Resources.CPURequest) &&
1799-
(cpuReq.MilliValue() > cm.MinShares || cs.Resources.CPURequest.MilliValue() > cm.MinShares) {
1800-
// If both allocated & status CPU requests are at or below MinShares then they are considered equal.
1801-
return false
1802-
}
1803-
}
1804-
if hasCPULim {
1805-
if cs.Resources.CPULimit == nil {
1806-
if !cpuLim.IsZero() {
1807-
return false
1808-
}
1809-
} else if !cpuLim.Equal(*cs.Resources.CPULimit) &&
1810-
(cpuLim.MilliValue() > cm.MinMilliCPULimit || cs.Resources.CPULimit.MilliValue() > cm.MinMilliCPULimit) {
1811-
// If both allocated & status CPU limits are at or below the minimum limit, then they are considered equal.
1812-
return false
1813-
}
1814-
}
1815-
if hasMemLim {
1816-
if cs.Resources.MemoryLimit == nil {
1817-
if !memLim.IsZero() {
1818-
return false
1819-
}
1820-
} else if !memLim.Equal(*cs.Resources.MemoryLimit) {
1821-
return false
1822-
}
1823-
}
1824-
}
1825-
1826-
return true
1827-
}
1828-
18291748
// generateAPIPodStatus creates the final API pod status for a pod, given the
18301749
// internal pod status. This method should only be called from within sync*Pod methods.
18311750
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podIsTerminal bool) v1.PodStatus {

0 commit comments

Comments
 (0)