Skip to content

Commit 6bbcfda

Browse files
authored
Merge pull request kubernetes-sigs#9077 from killianmuldoon/pr-scale-test-upgrade
🌱 Add scale testing for upgrades
2 parents 90e7cb8 + 71bc6fe commit 6bbcfda

File tree

7 files changed

+193
-24
lines changed

7 files changed

+193
-24
lines changed

test/e2e/scale.go

Lines changed: 153 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"sigs.k8s.io/controller-runtime/pkg/client"
4242

4343
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
44+
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
4445
"sigs.k8s.io/cluster-api/test/e2e/internal/log"
4546
"sigs.k8s.io/cluster-api/test/framework"
4647
"sigs.k8s.io/cluster-api/test/framework/clusterctl"
@@ -123,6 +124,9 @@ type scaleSpecInput struct {
123124
// if the test suit should fail as soon as c6 fails or if it should fail after all cluster creations are done.
124125
FailFast bool
125126

127+
// SkipUpgrade if set to true will skip upgrading the workload clusters.
128+
SkipUpgrade bool
129+
126130
// SkipCleanup if set to true will skip deleting the workload clusters.
127131
SkipCleanup bool
128132

@@ -133,7 +137,7 @@ type scaleSpecInput struct {
133137
SkipWaitForCreation bool
134138
}
135139

136-
// scaleSpec implements a scale test.
140+
// scaleSpec implements a scale test for clusters with MachineDeployments.
137141
func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
138142
var (
139143
specName = "scale"
@@ -253,7 +257,7 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
253257
Flavor: flavor,
254258
Namespace: scaleClusterNamespacePlaceholder,
255259
ClusterName: scaleClusterNamePlaceholder,
256-
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersion),
260+
KubernetesVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeFrom),
257261
ControlPlaneMachineCount: controlPlaneMachineCount,
258262
WorkerMachineCount: workerMachineCount,
259263
})
@@ -333,6 +337,47 @@ func scaleSpec(ctx context.Context, inputGetter func() scaleSpecInput) {
333337
Fail("")
334338
}
335339

340+
if !input.SkipUpgrade {
341+
By("Upgrade the workload clusters concurrently")
342+
// Get the upgrade function for upgrading the workload clusters.
343+
upgrader := getClusterUpgradeAndWaitFn(framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
344+
ClusterProxy: input.BootstrapClusterProxy,
345+
KubernetesUpgradeVersion: input.E2EConfig.GetVariable(KubernetesVersionUpgradeTo),
346+
EtcdImageTag: input.E2EConfig.GetVariable(EtcdVersionUpgradeTo),
347+
DNSImageTag: input.E2EConfig.GetVariable(CoreDNSVersionUpgradeTo),
348+
WaitForMachinesToBeUpgraded: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
349+
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
350+
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
351+
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
352+
})
353+
354+
clusterNamesToUpgrade := []string{}
355+
for _, result := range clusterCreateResults {
356+
clusterNamesToUpgrade = append(clusterNamesToUpgrade, result.clusterName)
357+
}
358+
359+
// Upgrade all the workload clusters.
360+
_, err = workConcurrentlyAndWait(ctx, workConcurrentlyAndWaitInput{
361+
ClusterNames: clusterNamesToUpgrade,
362+
Concurrency: concurrency,
363+
FailFast: input.FailFast,
364+
WorkerFunc: func(ctx context.Context, inputChan chan string, resultChan chan workResult, wg *sync.WaitGroup) {
365+
upgradeClusterAndWaitWorker(ctx, inputChan, resultChan, wg, namespace.Name, input.DeployClusterInSeparateNamespaces, baseClusterTemplateYAML, upgrader)
366+
},
367+
})
368+
if err != nil {
369+
// Call Fail to notify ginkgo that the suit has failed.
370+
// Ginkgo will print the first observed error failure in this case.
371+
// Example: If cluster c1, c2 and c3 failed then ginkgo will only print the first
372+
// observed failure among the these 3 clusters.
373+
// Since ginkgo only captures one failure, to help with this we are logging the error
374+
// that will contain the full stack trace of failure for each cluster to help with debugging.
375+
// TODO(ykakarap): Follow-up: Explore options for improved error reporting.
376+
log.Logf("Failed to upgrade clusters. Error: %s", err.Error())
377+
Fail("")
378+
}
379+
}
380+
336381
// TODO(ykakarap): Follow-up: Dump resources for the failed clusters (creation).
337382

338383
clusterNamesToDelete := []string{}
@@ -494,11 +539,9 @@ func getClusterCreateAndWaitFn(input clusterctl.ApplyCustomClusterTemplateAndWai
494539
CustomTemplateYAML: clusterTemplateYAML,
495540
ClusterName: clusterName,
496541
Namespace: namespace,
497-
CNIManifestPath: input.CNIManifestPath,
498542
WaitForClusterIntervals: input.WaitForClusterIntervals,
499543
WaitForControlPlaneIntervals: input.WaitForControlPlaneIntervals,
500544
WaitForMachineDeployments: input.WaitForMachineDeployments,
501-
WaitForMachinePools: input.WaitForMachinePools,
502545
Args: input.Args,
503546
PreWaitForCluster: input.PreWaitForCluster,
504547
PostMachinesProvisioned: input.PostMachinesProvisioned,
@@ -594,7 +637,7 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re
594637
return true
595638
case clusterName, open := <-inputChan:
596639
// Read the cluster name from the channel.
597-
// If the channel is closed it implies there is not more work to be done. Return.
640+
// If the channel is closed it implies there is no more work to be done. Return.
598641
if !open {
599642
return true
600643
}
@@ -648,6 +691,111 @@ func deleteClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, re
648691
}
649692
}
650693

694+
type clusterUpgrader func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte)
695+
696+
func getClusterUpgradeAndWaitFn(input framework.UpgradeClusterTopologyAndWaitForUpgradeInput) clusterUpgrader {
697+
return func(ctx context.Context, namespace, clusterName string, clusterTemplateYAML []byte) {
698+
resources := getClusterResourcesForUpgrade(ctx, input.ClusterProxy.GetClient(), namespace, clusterName)
699+
// Nb. We cannot directly modify and use `input` in this closure function because this function
700+
// will be called multiple times and this closure will keep modifying the same `input` multiple
701+
// times. It is safer to pass the values explicitly into `UpgradeClusterTopologyAndWaitForUpgradeInput`.
702+
framework.UpgradeClusterTopologyAndWaitForUpgrade(ctx, framework.UpgradeClusterTopologyAndWaitForUpgradeInput{
703+
ClusterProxy: input.ClusterProxy,
704+
Cluster: resources.cluster,
705+
ControlPlane: resources.controlPlane,
706+
MachineDeployments: resources.machineDeployments,
707+
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
708+
WaitForMachinesToBeUpgraded: input.WaitForMachinesToBeUpgraded,
709+
WaitForKubeProxyUpgrade: input.WaitForKubeProxyUpgrade,
710+
WaitForDNSUpgrade: input.WaitForDNSUpgrade,
711+
WaitForEtcdUpgrade: input.WaitForEtcdUpgrade,
712+
// TODO: (killianmuldoon) Checking the kube-proxy, etcd and DNS version doesn't work as we can't access the control plane endpoint for the workload cluster
713+
// from the host. Need to figure out a way to route the calls to the workload Cluster correctly.
714+
EtcdImageTag: "",
715+
DNSImageTag: "",
716+
SkipKubeProxyCheck: true,
717+
})
718+
}
719+
}
720+
721+
func upgradeClusterAndWaitWorker(ctx context.Context, inputChan <-chan string, resultChan chan<- workResult, wg *sync.WaitGroup, defaultNamespace string, deployClusterInSeparateNamespaces bool, clusterTemplateYAML []byte, upgrade clusterUpgrader) {
722+
defer wg.Done()
723+
724+
for {
725+
done := func() bool {
726+
select {
727+
case <-ctx.Done():
728+
// If the context is cancelled, return and shutdown the worker.
729+
return true
730+
case clusterName, open := <-inputChan:
731+
// Read the cluster name from the channel.
732+
// If the channel is closed it implies there is no more work to be done. Return.
733+
if !open {
734+
return true
735+
}
736+
log.Logf("Upgrading cluster %s", clusterName)
737+
738+
// This defer will catch ginkgo failures and record them.
739+
// The recorded panics are then handled by the parent goroutine.
740+
defer func() {
741+
e := recover()
742+
resultChan <- workResult{
743+
clusterName: clusterName,
744+
err: e,
745+
}
746+
}()
747+
748+
// Calculate namespace.
749+
namespaceName := defaultNamespace
750+
if deployClusterInSeparateNamespaces {
751+
namespaceName = clusterName
752+
}
753+
upgrade(ctx, namespaceName, clusterName, clusterTemplateYAML)
754+
return false
755+
}
756+
}()
757+
if done {
758+
break
759+
}
760+
}
761+
}
762+
763+
type clusterResources struct {
764+
cluster *clusterv1.Cluster
765+
machineDeployments []*clusterv1.MachineDeployment
766+
controlPlane *controlplanev1.KubeadmControlPlane
767+
}
768+
769+
func getClusterResourcesForUpgrade(ctx context.Context, c client.Client, namespace, clusterName string) clusterResources {
770+
cluster := &clusterv1.Cluster{}
771+
err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: clusterName}, cluster)
772+
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting Cluster %s: %s", klog.KRef(namespace, clusterName), err))
773+
774+
controlPlane := &controlplanev1.KubeadmControlPlane{}
775+
err = c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: cluster.Spec.ControlPlaneRef.Name}, controlPlane)
776+
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting ControlPlane for Cluster %s: %s,", klog.KObj(cluster), err))
777+
778+
mds := []*clusterv1.MachineDeployment{}
779+
machineDeployments := &clusterv1.MachineDeploymentList{}
780+
err = c.List(ctx, machineDeployments,
781+
client.MatchingLabels{
782+
clusterv1.ClusterNameLabel: cluster.Name,
783+
clusterv1.ClusterTopologyOwnedLabel: "",
784+
},
785+
client.InNamespace(cluster.Namespace),
786+
)
787+
Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error getting MachineDeployments for Cluster %s: %s", klog.KObj(cluster), err))
788+
for _, md := range machineDeployments.Items {
789+
mds = append(mds, md.DeepCopy())
790+
}
791+
792+
return clusterResources{
793+
cluster: cluster,
794+
machineDeployments: mds,
795+
controlPlane: controlPlane,
796+
}
797+
}
798+
651799
type workResult struct {
652800
clusterName string
653801
err any

test/e2e/scale_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
)
2626

2727
var _ = Describe("When testing the machinery for scale testing using in-memory provider", func() {
28+
// Note: This test does not support MachinePools.
2829
scaleSpec(ctx, func() scaleSpecInput {
2930
return scaleSpecInput{
3031
E2EConfig: e2eConfig,

test/framework/cluster_topology_helpers.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type UpgradeClusterTopologyAndWaitForUpgradeInput struct {
7575
WaitForEtcdUpgrade []interface{}
7676
PreWaitForControlPlaneToBeUpgraded func()
7777
PreWaitForWorkersToBeUpgraded func()
78+
SkipKubeProxyCheck bool
7879
}
7980

8081
// UpgradeClusterTopologyAndWaitForUpgrade upgrades a Cluster topology and waits for it to be upgraded.
@@ -123,13 +124,16 @@ func UpgradeClusterTopologyAndWaitForUpgrade(ctx context.Context, input UpgradeC
123124
KubernetesUpgradeVersion: input.KubernetesUpgradeVersion,
124125
}, input.WaitForMachinesToBeUpgraded...)
125126

126-
log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
127127
workloadCluster := input.ClusterProxy.GetWorkloadCluster(ctx, input.Cluster.Namespace, input.Cluster.Name)
128128
workloadClient := workloadCluster.GetClient()
129-
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
130-
Getter: workloadClient,
131-
KubernetesVersion: input.KubernetesUpgradeVersion,
132-
}, input.WaitForKubeProxyUpgrade...)
129+
130+
if !input.SkipKubeProxyCheck {
131+
log.Logf("Waiting for kube-proxy to have the upgraded Kubernetes version")
132+
WaitForKubeProxyUpgrade(ctx, WaitForKubeProxyUpgradeInput{
133+
Getter: workloadClient,
134+
KubernetesVersion: input.KubernetesUpgradeVersion,
135+
}, input.WaitForKubeProxyUpgrade...)
136+
}
133137

134138
// Wait for the CoreDNS upgrade if the DNSImageTag is set.
135139
if input.DNSImageTag != "" {

test/framework/controlplane_helpers.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -393,13 +393,6 @@ func UpgradeControlPlaneAndWaitForUpgrade(ctx context.Context, input UpgradeCont
393393
}
394394
}
395395

396-
// controlPlaneMachineOptions returns a set of ListOptions that allows to get all machine objects belonging to control plane.
397-
func controlPlaneMachineOptions() []client.ListOption {
398-
return []client.ListOption{
399-
client.HasLabels{clusterv1.MachineControlPlaneLabel},
400-
}
401-
}
402-
403396
type ScaleAndWaitControlPlaneInput struct {
404397
ClusterProxy ClusterProxy
405398
Cluster *clusterv1.Cluster

test/framework/machine_helpers.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,13 @@ func GetControlPlaneMachinesByCluster(ctx context.Context, input GetControlPlane
127127
Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetControlPlaneMachinesByCluster")
128128
Expect(input.Namespace).ToNot(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling GetControlPlaneMachinesByCluster")
129129

130-
options := append(byClusterOptions(input.ClusterName, input.Namespace), controlPlaneMachineOptions()...)
130+
options := []client.ListOption{
131+
client.InNamespace(input.Namespace),
132+
client.MatchingLabels{
133+
clusterv1.ClusterNameLabel: input.ClusterName,
134+
clusterv1.MachineControlPlaneLabel: "",
135+
},
136+
}
131137

132138
machineList := &clusterv1.MachineList{}
133139
Eventually(func() error {

test/infrastructure/inmemory/pkg/server/api/handler.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3636
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
3737
"k8s.io/apimachinery/pkg/fields"
38+
"k8s.io/apimachinery/pkg/labels"
3839
"k8s.io/apimachinery/pkg/runtime"
3940
"k8s.io/apimachinery/pkg/runtime/schema"
4041
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -317,16 +318,24 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
317318
listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace")))
318319
}
319320

320-
// TODO: The only field selector which works is for `spec.nodeName` on pods.
321-
selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
321+
// TODO: The only field Selector which works is for `spec.nodeName` on pods.
322+
fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
322323
if err != nil {
323324
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
324325
return
325326
}
326-
if selector != nil {
327-
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector})
327+
if fieldSelector != nil {
328+
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector})
328329
}
329330

331+
labelSelector, err := labels.Parse(req.QueryParameter("labelSelector"))
332+
if err != nil {
333+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
334+
return
335+
}
336+
if labelSelector != nil {
337+
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
338+
}
330339
if err := inmemoryClient.List(ctx, list, listOpts...); err != nil {
331340
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
332341
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)

test/infrastructure/inmemory/pkg/server/mux_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
rbacv1 "k8s.io/api/rbac/v1"
3737
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3838
"k8s.io/apimachinery/pkg/fields"
39+
"k8s.io/apimachinery/pkg/labels"
3940
"k8s.io/apimachinery/pkg/runtime"
4041
"k8s.io/klog/v2"
4142
ctrl "sigs.k8s.io/controller-runtime"
@@ -155,7 +156,7 @@ func TestAPI_corev1_CRUD(t *testing.T) {
155156
Spec: corev1.PodSpec{NodeName: n.Name},
156157
})).To(Succeed())
157158
g.Expect(c.Create(ctx, &corev1.Pod{
158-
ObjectMeta: metav1.ObjectMeta{Name: "notSelectedPod", Namespace: metav1.NamespaceDefault},
159+
ObjectMeta: metav1.ObjectMeta{Name: "labelSelectedPod", Namespace: metav1.NamespaceDefault, Labels: map[string]string{"name": "labelSelectedPod"}},
159160
})).To(Succeed())
160161

161162
pl := &corev1.PodList{}
@@ -166,8 +167,15 @@ func TestAPI_corev1_CRUD(t *testing.T) {
166167
g.Expect(pl.Items).To(HaveLen(1))
167168
g.Expect(pl.Items[0].Name).To(Equal("bar"))
168169

169-
// get
170+
// list with label selector on pod
171+
labelSelector := &client.ListOptions{
172+
LabelSelector: labels.SelectorFromSet(labels.Set{"name": "labelSelectedPod"}),
173+
}
174+
g.Expect(c.List(ctx, pl, labelSelector)).To(Succeed())
175+
g.Expect(pl.Items).To(HaveLen(1))
176+
g.Expect(pl.Items[0].Name).To(Equal("labelSelectedPod"))
170177

178+
// get
171179
n = &corev1.Node{}
172180
err = c.Get(ctx, client.ObjectKey{Name: "foo"}, n)
173181
g.Expect(err).ToNot(HaveOccurred())

0 commit comments

Comments
 (0)