Skip to content

Commit b4a21a3

Browse files
committed
Wait for volume teardown during graceful node shutdown
Signed-off-by: torredil <[email protected]>
1 parent d770dd6 commit b4a21a3

File tree

8 files changed

+229
-13
lines changed

8 files changed

+229
-13
lines changed

pkg/kubelet/kubelet.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
931931
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
932932
Logger: logger,
933933
ProbeManager: klet.probeManager,
934+
VolumeManager: klet.volumeManager,
934935
Recorder: kubeDeps.Recorder,
935936
NodeRef: nodeRef,
936937
GetPodsFunc: klet.GetActivePods,

pkg/kubelet/kubelet_node_status_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
11241124
kubelet.setCachedMachineInfo(&cadvisorapi.MachineInfo{})
11251125

11261126
// override test volumeManager
1127-
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes)
1127+
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes, 0, nil)
11281128
kubelet.volumeManager = fakeVolumeManager
11291129

11301130
// Only test VolumesInUse setter

pkg/kubelet/nodeshutdown/nodeshutdown_manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/kubernetes/pkg/kubelet/eviction"
2727
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
2828
"k8s.io/kubernetes/pkg/kubelet/prober"
29+
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
2930
"k8s.io/utils/clock"
3031
)
3132

@@ -40,6 +41,7 @@ type Manager interface {
4041
type Config struct {
4142
Logger klog.Logger
4243
ProbeManager prober.Manager
44+
VolumeManager volumemanager.VolumeManager
4345
Recorder record.EventRecorder
4446
NodeRef *v1.ObjectReference
4547
GetPodsFunc eviction.ActivePodsFunc

pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ limitations under the License.
2121
package nodeshutdown
2222

2323
import (
24+
"context"
2425
"fmt"
2526
"path/filepath"
2627
"sort"
@@ -41,6 +42,7 @@ import (
4142
"k8s.io/kubernetes/pkg/kubelet/metrics"
4243
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
4344
"k8s.io/kubernetes/pkg/kubelet/prober"
45+
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
4446
"k8s.io/utils/clock"
4547
)
4648

@@ -73,6 +75,8 @@ type managerImpl struct {
7375
nodeRef *v1.ObjectReference
7476
probeManager prober.Manager
7577

78+
volumeManager volumemanager.VolumeManager
79+
7680
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
7781

7882
getPods eviction.ActivePodsFunc
@@ -123,6 +127,7 @@ func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
123127
logger: conf.Logger,
124128
probeManager: conf.ProbeManager,
125129
recorder: conf.Recorder,
130+
volumeManager: conf.VolumeManager,
126131
nodeRef: conf.NodeRef,
127132
getPods: conf.GetPodsFunc,
128133
killPodFunc: conf.KillPodFunc,
@@ -395,19 +400,44 @@ func (m *managerImpl) processShutdownEvent() error {
395400
}(pod, group)
396401
}
397402

403+
// This duration determines how long the shutdown manager will wait for the pods in this group
404+
// to terminate before proceeding to the next group.
405+
var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second
398406
var (
399-
doneCh = make(chan struct{})
400-
timer = m.clock.NewTimer(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second)
407+
doneCh = make(chan struct{})
408+
timer = m.clock.NewTimer(groupTerminationWaitDuration)
409+
ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration)
401410
)
402411
go func() {
403412
defer close(doneCh)
413+
defer ctxCancel()
404414
wg.Wait()
415+
// The signal to kill a Pod was sent successfully to all the pods,
416+
// let's wait until all the volumes are unmounted from all the pods before
417+
// continuing to the next group. This is done so that the CSI Driver (assuming
418+
// that it's part of the highest group) has a chance to perform unmounts.
419+
if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil {
420+
var podIdentifiers []string
421+
for _, pod := range group.Pods {
422+
podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
423+
}
424+
425+
// Waiting for volume teardown is done on a best basis effort,
426+
// report an error and continue.
427+
//
428+
// Depending on the user provided kubelet configuration value
429+
// either the `timer` will tick and we'll continue to shutdown the next group, or,
430+
// WaitForAllPodsUnmount will timeout, therefore this goroutine
431+
// will close doneCh and we'll continue to shutdown the next group.
432+
m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers)
433+
}
405434
}()
406435

407436
select {
408437
case <-doneCh:
409438
timer.Stop()
410439
case <-timer.C():
440+
ctxCancel()
411441
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
412442
}
413443
}

pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/google/go-cmp/cmp"
3131
"github.com/google/go-cmp/cmp/cmpopts"
3232
"github.com/stretchr/testify/assert"
33+
"github.com/stretchr/testify/require"
3334
v1 "k8s.io/api/core/v1"
3435
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3536
"k8s.io/apimachinery/pkg/types"
@@ -45,6 +46,7 @@ import (
4546
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
4647
"k8s.io/kubernetes/pkg/kubelet/prober"
4748
probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
49+
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
4850
"k8s.io/utils/clock"
4951
testingclock "k8s.io/utils/clock/testing"
5052
)
@@ -348,10 +350,12 @@ func TestManager(t *testing.T) {
348350

349351
proberManager := probetest.FakeManager{}
350352
fakeRecorder := &record.FakeRecorder{}
353+
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
351354
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
352355
manager, _ := NewManager(&Config{
353356
Logger: logger,
354357
ProbeManager: proberManager,
358+
VolumeManager: fakeVolumeManager,
355359
Recorder: fakeRecorder,
356360
NodeRef: nodeRef,
357361
GetPodsFunc: activePodsFunc,
@@ -452,11 +456,13 @@ func TestFeatureEnabled(t *testing.T) {
452456

453457
proberManager := probetest.FakeManager{}
454458
fakeRecorder := &record.FakeRecorder{}
459+
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
455460
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
456461

457462
manager, _ := NewManager(&Config{
458463
Logger: logger,
459464
ProbeManager: proberManager,
465+
VolumeManager: fakeVolumeManager,
460466
Recorder: fakeRecorder,
461467
NodeRef: nodeRef,
462468
GetPodsFunc: activePodsFunc,
@@ -509,10 +515,12 @@ func TestRestart(t *testing.T) {
509515

510516
proberManager := probetest.FakeManager{}
511517
fakeRecorder := &record.FakeRecorder{}
518+
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
512519
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
513520
manager, _ := NewManager(&Config{
514521
Logger: logger,
515522
ProbeManager: proberManager,
523+
VolumeManager: fakeVolumeManager,
516524
Recorder: fakeRecorder,
517525
NodeRef: nodeRef,
518526
GetPodsFunc: activePodsFunc,
@@ -738,17 +746,19 @@ func Test_groupByPriority(t *testing.T) {
738746

739747
func Test_managerImpl_processShutdownEvent(t *testing.T) {
740748
var (
741-
probeManager = probetest.FakeManager{}
742-
fakeRecorder = &record.FakeRecorder{}
743-
syncNodeStatus = func() {}
744-
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
745-
fakeclock = testingclock.NewFakeClock(time.Now())
749+
probeManager = probetest.FakeManager{}
750+
fakeRecorder = &record.FakeRecorder{}
751+
fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
752+
syncNodeStatus = func() {}
753+
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
754+
fakeclock = testingclock.NewFakeClock(time.Now())
746755
)
747756

748757
type fields struct {
749758
recorder record.EventRecorder
750759
nodeRef *v1.ObjectReference
751760
probeManager prober.Manager
761+
volumeManager volumemanager.VolumeManager
752762
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
753763
getPods eviction.ActivePodsFunc
754764
killPodFunc eviction.KillPodFunc
@@ -767,9 +777,10 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
767777
{
768778
name: "kill pod func take too long",
769779
fields: fields{
770-
recorder: fakeRecorder,
771-
nodeRef: nodeRef,
772-
probeManager: probeManager,
780+
recorder: fakeRecorder,
781+
nodeRef: nodeRef,
782+
probeManager: probeManager,
783+
volumeManager: fakeVolumeManager,
773784
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
774785
{
775786
Priority: 1,
@@ -808,6 +819,7 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
808819
)
809820
m := &managerImpl{
810821
logger: logger,
822+
volumeManager: tt.fields.volumeManager,
811823
recorder: tt.fields.recorder,
812824
nodeRef: tt.fields.nodeRef,
813825
probeManager: tt.fields.probeManager,
@@ -839,3 +851,65 @@ func Test_managerImpl_processShutdownEvent(t *testing.T) {
839851
})
840852
}
841853
}
854+
855+
func Test_processShutdownEvent_VolumeUnmountTimeout(t *testing.T) {
856+
var (
857+
probeManager = probetest.FakeManager{}
858+
fakeRecorder = &record.FakeRecorder{}
859+
syncNodeStatus = func() {}
860+
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
861+
fakeclock = testingclock.NewFakeClock(time.Now())
862+
shutdownGracePeriodSeconds = 2
863+
)
864+
865+
fakeVolumeManager := volumemanager.NewFakeVolumeManager(
866+
[]v1.UniqueVolumeName{},
867+
3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior
868+
// for volume unmount operations that take longer than the allowed grace period.
869+
fmt.Errorf("unmount timeout"),
870+
)
871+
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
872+
m := &managerImpl{
873+
logger: logger,
874+
volumeManager: fakeVolumeManager,
875+
recorder: fakeRecorder,
876+
nodeRef: nodeRef,
877+
probeManager: probeManager,
878+
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
879+
{
880+
Priority: 1,
881+
ShutdownGracePeriodSeconds: int64(shutdownGracePeriodSeconds),
882+
},
883+
},
884+
getPods: func() []*v1.Pod {
885+
return []*v1.Pod{
886+
makePod("test-pod", 1, nil),
887+
}
888+
},
889+
killPodFunc: func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, fn func(*v1.PodStatus)) error {
890+
return nil
891+
},
892+
syncNodeStatus: syncNodeStatus,
893+
dbusCon: &fakeDbus{},
894+
clock: fakeclock,
895+
}
896+
897+
start := fakeclock.Now()
898+
err := m.processShutdownEvent()
899+
end := fakeclock.Now()
900+
901+
require.NoError(t, err, "managerImpl.processShutdownEvent() should not return an error")
902+
903+
// Check if processShutdownEvent completed within the expected time
904+
actualDuration := int(end.Sub(start).Seconds())
905+
assert.LessOrEqual(t, actualDuration, shutdownGracePeriodSeconds, "processShutdownEvent took too long")
906+
907+
underlier, ok := logger.GetSink().(ktesting.Underlier)
908+
if !ok {
909+
t.Fatalf("Should have had a ktesting LogSink, got %T", logger.GetSink())
910+
}
911+
912+
log := underlier.GetBuffer().String()
913+
expectedLogMessage := "Failed while waiting for all the volumes belonging to Pods in this group to unmount"
914+
assert.Contains(t, log, expectedLogMessage, "Expected log message not found")
915+
}

pkg/kubelet/volumemanager/volume_manager.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sort"
2424
"strconv"
2525
"strings"
26+
"sync"
2627
"time"
2728

2829
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -31,6 +32,7 @@ import (
3132

3233
v1 "k8s.io/api/core/v1"
3334
k8stypes "k8s.io/apimachinery/pkg/types"
35+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3436
"k8s.io/apimachinery/pkg/util/runtime"
3537
"k8s.io/apimachinery/pkg/util/sets"
3638
"k8s.io/apimachinery/pkg/util/wait"
@@ -106,6 +108,12 @@ type VolumeManager interface {
106108
// the duration defined in podAttachAndMountTimeout.
107109
WaitForUnmount(ctx context.Context, pod *v1.Pod) error
108110

111+
// WaitForAllPodsUnmount is a version of WaitForUnmount that blocks and
112+
// waits until all the volumes belonging to all the pods are unmounted.
113+
// An error is returned if there's at least one Pod with volumes not unmounted
114+
// within the duration defined in podAttachAndMountTimeout.
115+
WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error
116+
109117
// GetMountedVolumesForPod returns a VolumeMap containing the volumes
110118
// referenced by the specified pod that are successfully attached and
111119
// mounted. The key in the map is the OuterVolumeSpecName (i.e.
@@ -479,6 +487,24 @@ func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error
479487
return nil
480488
}
481489

490+
func (vm *volumeManager) WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error {
491+
var (
492+
errors []error
493+
wg sync.WaitGroup
494+
)
495+
wg.Add(len(pods))
496+
for _, pod := range pods {
497+
go func(pod *v1.Pod) {
498+
defer wg.Done()
499+
if err := vm.WaitForUnmount(ctx, pod); err != nil {
500+
errors = append(errors, err)
501+
}
502+
}(pod)
503+
}
504+
wg.Wait()
505+
return utilerrors.NewAggregate(errors)
506+
}
507+
482508
func (vm *volumeManager) getVolumesNotInDSW(uniquePodName types.UniquePodName, expectedVolumes []string) []string {
483509
volumesNotInDSW := sets.New[string](expectedVolumes...)
484510

pkg/kubelet/volumemanager/volume_manager_fake.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package volumemanager
1818

1919
import (
2020
"context"
21+
"time"
2122

2223
v1 "k8s.io/api/core/v1"
2324
"k8s.io/kubernetes/pkg/kubelet/config"
@@ -29,17 +30,23 @@ import (
2930
type FakeVolumeManager struct {
3031
volumes map[v1.UniqueVolumeName]bool
3132
reportedInUse map[v1.UniqueVolumeName]bool
33+
unmountDelay time.Duration
34+
unmountError error
3235
}
3336

37+
var _ VolumeManager = &FakeVolumeManager{}
38+
3439
// NewFakeVolumeManager creates a new VolumeManager test instance
35-
func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName) *FakeVolumeManager {
40+
func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName, unmountDelay time.Duration, unmountError error) *FakeVolumeManager {
3641
volumes := map[v1.UniqueVolumeName]bool{}
3742
for _, v := range initialVolumes {
3843
volumes[v] = true
3944
}
4045
return &FakeVolumeManager{
4146
volumes: volumes,
4247
reportedInUse: map[v1.UniqueVolumeName]bool{},
48+
unmountDelay: unmountDelay,
49+
unmountError: unmountError,
4350
}
4451
}
4552

@@ -57,6 +64,15 @@ func (f *FakeVolumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) err
5764
return nil
5865
}
5966

67+
func (f *FakeVolumeManager) WaitForAllPodsUnmount(ctx context.Context, pods []*v1.Pod) error {
68+
select {
69+
case <-ctx.Done():
70+
return ctx.Err()
71+
case <-time.After(f.unmountDelay):
72+
return f.unmountError
73+
}
74+
}
75+
6076
// GetMountedVolumesForPod is not implemented
6177
func (f *FakeVolumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
6278
return nil

0 commit comments

Comments
 (0)