Skip to content

Commit 9a7f2f4

Browse files
fix(v3): upgrade race condition (#3098)
* add retry for cluster nodes match * add retry for cluster node version check
1 parent 7965149 commit 9a7f2f4

File tree

2 files changed

+273
-6
lines changed

2 files changed

+273
-6
lines changed

pkg-new/upgrade/upgrade.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,8 @@ func upgradeK0s(ctx context.Context, cli client.Client, rc runtimeconfig.Runtime
154154
return upgradeK0s(ctx, cli, rc, in, logger)
155155
}
156156

157-
match, err = clusterNodesMatchVersion(ctx, cli, desiredVersion)
158-
if err != nil {
159-
return fmt.Errorf("check cluster nodes match version after plan completion: %w", err)
160-
}
161-
if !match {
162-
return fmt.Errorf("cluster nodes did not match version after upgrade")
157+
if err := waitForClusterNodesMatchVersion(ctx, cli, desiredVersion, logger); err != nil {
158+
return fmt.Errorf("wait for cluster nodes to match version: %w", err)
163159
}
164160

165161
// the plan has been completed, so we can move on - kubernetes is now upgraded
@@ -404,3 +400,46 @@ func waitForAutopilotPlan(ctx context.Context, cli client.Client, logger logrus.
404400

405401
return plan, nil
406402
}
403+
404+
func waitForClusterNodesMatchVersion(ctx context.Context, cli client.Client, desiredVersion string, logger logrus.FieldLogger) error {
405+
return waitForClusterNodesMatchVersionWithBackoff(ctx, cli, desiredVersion, logger, wait.Backoff{
406+
Duration: 5 * time.Second,
407+
Steps: 60, // 60 attempts × 5s = 300s = 5 minutes
408+
Factor: 1.0,
409+
Jitter: 0.1,
410+
})
411+
}
412+
413+
func waitForClusterNodesMatchVersionWithBackoff(ctx context.Context, cli client.Client, desiredVersion string, logger logrus.FieldLogger, backoff wait.Backoff) error {
414+
var lastErr error
415+
416+
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
417+
match, err := clusterNodesMatchVersion(ctx, cli, desiredVersion)
418+
if err != nil {
419+
lastErr = fmt.Errorf("check cluster nodes match version: %w", err)
420+
return false, nil
421+
}
422+
423+
if !match {
424+
logger.WithField("version", desiredVersion).Debug("Waiting for cluster nodes to report updated version")
425+
return false, nil
426+
}
427+
428+
return true, nil
429+
})
430+
431+
if err != nil {
432+
if errors.Is(err, context.Canceled) {
433+
if lastErr != nil {
434+
err = errors.Join(err, lastErr)
435+
}
436+
return err
437+
} else if lastErr != nil {
438+
return fmt.Errorf("timed out waiting for cluster nodes to match version %s: %w", desiredVersion, lastErr)
439+
} else {
440+
return fmt.Errorf("cluster nodes did not match version %s after upgrade", desiredVersion)
441+
}
442+
}
443+
444+
return nil
445+
}

pkg-new/upgrade/upgrade_test.go

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"sync/atomic"
88
"testing"
9+
"time"
910

1011
apv1b2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2"
1112
k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
@@ -14,8 +15,10 @@ import (
1415
"github.com/sirupsen/logrus"
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
18+
corev1 "k8s.io/api/core/v1"
1719
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1820
"k8s.io/apimachinery/pkg/runtime"
21+
"k8s.io/apimachinery/pkg/util/wait"
1922
"sigs.k8s.io/controller-runtime/pkg/client"
2023
"sigs.k8s.io/controller-runtime/pkg/client/fake"
2124
)
@@ -552,3 +555,228 @@ func (m *mockClientWithStateChange) Get(ctx context.Context, key client.ObjectKe
552555

553556
return nil
554557
}
558+
559+
func TestWaitForClusterNodesMatchVersion(t *testing.T) {
560+
logger := logrus.New()
561+
logger.SetLevel(logrus.ErrorLevel)
562+
563+
scheme := runtime.NewScheme()
564+
require.NoError(t, corev1.AddToScheme(scheme))
565+
566+
tests := []struct {
567+
name string
568+
nodes *corev1.NodeList
569+
targetVersion string
570+
mockClient func(*corev1.NodeList) client.Client
571+
backoff *wait.Backoff
572+
expectError bool
573+
errorContains string
574+
validate func(t *testing.T, cli client.Client)
575+
}{
576+
{
577+
name: "all nodes already match version",
578+
nodes: &corev1.NodeList{
579+
Items: []corev1.Node{
580+
{
581+
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
582+
Status: corev1.NodeStatus{
583+
NodeInfo: corev1.NodeSystemInfo{
584+
KubeletVersion: "v1.30.0+k0s",
585+
},
586+
},
587+
},
588+
{
589+
ObjectMeta: metav1.ObjectMeta{Name: "node2"},
590+
Status: corev1.NodeStatus{
591+
NodeInfo: corev1.NodeSystemInfo{
592+
KubeletVersion: "v1.30.0+k0s",
593+
},
594+
},
595+
},
596+
},
597+
},
598+
targetVersion: "v1.30.0+k0s",
599+
mockClient: func(nodes *corev1.NodeList) client.Client {
600+
return fake.NewClientBuilder().WithScheme(scheme).WithLists(nodes).Build()
601+
},
602+
expectError: false,
603+
},
604+
{
605+
name: "nodes update after retries",
606+
nodes: &corev1.NodeList{
607+
Items: []corev1.Node{
608+
{
609+
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
610+
Status: corev1.NodeStatus{
611+
NodeInfo: corev1.NodeSystemInfo{
612+
KubeletVersion: "v1.29.0+k0s",
613+
},
614+
},
615+
},
616+
},
617+
},
618+
targetVersion: "v1.30.0+k0s",
619+
mockClient: func(nodes *corev1.NodeList) client.Client {
620+
return &mockClientWithNodeVersionUpdate{
621+
Client: fake.NewClientBuilder().WithScheme(scheme).WithLists(nodes).Build(),
622+
callsUntil: 3,
623+
targetVersion: "v1.30.0+k0s",
624+
initialVersion: "v1.29.0+k0s",
625+
}
626+
},
627+
expectError: false,
628+
validate: func(t *testing.T, cli client.Client) {
629+
if mock, ok := cli.(*mockClientWithNodeVersionUpdate); ok {
630+
assert.Equal(t, 3, mock.callCount, "Should have retried until nodes reported correct version")
631+
}
632+
},
633+
},
634+
{
635+
name: "multi-node staggered updates",
636+
nodes: &corev1.NodeList{
637+
Items: []corev1.Node{
638+
{
639+
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
640+
Status: corev1.NodeStatus{
641+
NodeInfo: corev1.NodeSystemInfo{
642+
KubeletVersion: "v1.29.0+k0s",
643+
},
644+
},
645+
},
646+
{
647+
ObjectMeta: metav1.ObjectMeta{Name: "node2"},
648+
Status: corev1.NodeStatus{
649+
NodeInfo: corev1.NodeSystemInfo{
650+
KubeletVersion: "v1.29.0+k0s",
651+
},
652+
},
653+
},
654+
{
655+
ObjectMeta: metav1.ObjectMeta{Name: "node3"},
656+
Status: corev1.NodeStatus{
657+
NodeInfo: corev1.NodeSystemInfo{
658+
KubeletVersion: "v1.29.0+k0s",
659+
},
660+
},
661+
},
662+
},
663+
},
664+
targetVersion: "v1.30.0+k0s",
665+
mockClient: func(nodes *corev1.NodeList) client.Client {
666+
return &mockClientWithStaggeredNodeUpdates{
667+
Client: fake.NewClientBuilder().WithScheme(scheme).WithLists(nodes).Build(),
668+
targetVersion: "v1.30.0+k0s",
669+
}
670+
},
671+
expectError: false,
672+
validate: func(t *testing.T, cli client.Client) {
673+
if mock, ok := cli.(*mockClientWithStaggeredNodeUpdates); ok {
674+
assert.GreaterOrEqual(t, mock.callCount, 3, "Should have waited for all nodes to update")
675+
}
676+
},
677+
},
678+
{
679+
name: "timeout when nodes never match",
680+
nodes: &corev1.NodeList{
681+
Items: []corev1.Node{
682+
{
683+
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
684+
Status: corev1.NodeStatus{
685+
NodeInfo: corev1.NodeSystemInfo{
686+
KubeletVersion: "v1.29.0+k0s",
687+
},
688+
},
689+
},
690+
},
691+
},
692+
targetVersion: "v1.30.0+k0s",
693+
mockClient: func(nodes *corev1.NodeList) client.Client {
694+
return fake.NewClientBuilder().WithScheme(scheme).WithLists(nodes).Build()
695+
},
696+
backoff: &wait.Backoff{
697+
Duration: 100 * time.Millisecond,
698+
Steps: 3,
699+
Factor: 1.0,
700+
Jitter: 0.1,
701+
},
702+
expectError: true,
703+
errorContains: "cluster nodes did not match version v1.30.0+k0s after upgrade",
704+
},
705+
}
706+
707+
for _, tt := range tests {
708+
t.Run(tt.name, func(t *testing.T) {
709+
cli := tt.mockClient(tt.nodes)
710+
var err error
711+
if tt.backoff != nil {
712+
err = waitForClusterNodesMatchVersionWithBackoff(context.Background(), cli, tt.targetVersion, logger, *tt.backoff)
713+
} else {
714+
err = waitForClusterNodesMatchVersion(context.Background(), cli, tt.targetVersion, logger)
715+
}
716+
717+
if tt.expectError {
718+
require.Error(t, err)
719+
if tt.errorContains != "" {
720+
assert.Contains(t, err.Error(), tt.errorContains)
721+
}
722+
} else {
723+
require.NoError(t, err)
724+
if tt.validate != nil {
725+
tt.validate(t, cli)
726+
}
727+
}
728+
})
729+
}
730+
}
731+
732+
// Mock client that updates node versions after N calls
733+
type mockClientWithNodeVersionUpdate struct {
734+
client.Client
735+
callCount int
736+
callsUntil int
737+
targetVersion string
738+
initialVersion string
739+
}
740+
741+
func (m *mockClientWithNodeVersionUpdate) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
742+
m.callCount++
743+
err := m.Client.List(ctx, list, opts...)
744+
if err != nil {
745+
return err
746+
}
747+
748+
if m.callCount >= m.callsUntil {
749+
if nodeList, ok := list.(*corev1.NodeList); ok {
750+
for i := range nodeList.Items {
751+
nodeList.Items[i].Status.NodeInfo.KubeletVersion = m.targetVersion
752+
}
753+
}
754+
}
755+
756+
return nil
757+
}
758+
759+
// Mock client that updates nodes one at a time to simulate staggered upgrades
760+
type mockClientWithStaggeredNodeUpdates struct {
761+
client.Client
762+
callCount int
763+
targetVersion string
764+
}
765+
766+
func (m *mockClientWithStaggeredNodeUpdates) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
767+
m.callCount++
768+
err := m.Client.List(ctx, list, opts...)
769+
if err != nil {
770+
return err
771+
}
772+
773+
if nodeList, ok := list.(*corev1.NodeList); ok {
774+
for i := range nodeList.Items {
775+
if m.callCount > i {
776+
nodeList.Items[i].Status.NodeInfo.KubeletVersion = m.targetVersion
777+
}
778+
}
779+
}
780+
781+
return nil
782+
}

0 commit comments

Comments
 (0)