diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go index 034d849011..0f39e9a86f 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/config/application.go @@ -56,7 +56,6 @@ type KubernetesApplicationSpec struct { // The label will be configured to variant manifests used to distinguish them. VariantLabel KubernetesVariantLabel `json:"variantLabel"` - // TODO: Define fields for KubernetesApplicationSpec. } func (s *KubernetesApplicationSpec) UnmarshalJSON(data []byte) error { @@ -201,6 +200,32 @@ type K8sCanaryRolloutStageOptions struct { // K8sCanaryCleanStageOptions contains all configurable values for a K8S_CANARY_CLEAN stage. type K8sCanaryCleanStageOptions struct{} +// K8sPrimaryRolloutStageOptions contains all configurable values for a K8S_PRIMARY_ROLLOUT stage. +type K8sPrimaryRolloutStageOptions struct { + // Suffix that should be used when naming the PRIMARY variant's resources. + // Default is "primary". + Suffix string `json:"suffix" default:"primary"` + // Whether the PRIMARY service should be created. + CreateService bool `json:"createService"` + // Whether the PRIMARY variant label should be added to manifests if they were missing. + AddVariantLabelToSelector bool `json:"addVariantLabelToSelector"` + // Whether the resources that are no longer defined in Git should be removed or not. + Prune bool `json:"prune"` +} + +func (o *K8sPrimaryRolloutStageOptions) UnmarshalJSON(data []byte) error { + type alias K8sPrimaryRolloutStageOptions + var a alias + if err := json.Unmarshal(data, &a); err != nil { + return err + } + *o = K8sPrimaryRolloutStageOptions(a) + if err := defaults.Set(o); err != nil { + return err + } + return nil +} + // K8sResourcePatch represents a patch operation for a Kubernetes resource. type K8sResourcePatch struct { // The target of the patch operation. diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go index 63894ae1b7..88c72e44d1 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/pipeline.go @@ -30,6 +30,8 @@ const ( StageK8sMultiCanaryRollout = "K8S_CANARY_ROLLOUT" // StageK8sMultiCanaryClean represents the state where all canary resources should be removed. StageK8sMultiCanaryClean = "K8S_CANARY_CLEAN" + // StageK8sMultiPrimaryRollout represents the state where the new version is promoted as PRIMARY to all targets. + StageK8sMultiPrimaryRollout = "K8S_PRIMARY_ROLLOUT" ) var allStages = []string{ @@ -37,6 +39,7 @@ var allStages = []string{ StageK8sMultiRollback, StageK8sMultiCanaryRollout, StageK8sMultiCanaryClean, + StageK8sMultiPrimaryRollout, } const ( @@ -48,6 +51,8 @@ const ( StageDescriptionK8sMultiCanaryRollout = "Rollout the new version as CANARY to all targets" // StageDescriptionK8sMultiCanaryClean represents the description of the K8sCanaryClean stage. StageDescriptionK8sMultiCanaryClean = "Remove all canary resources" + // StageDescriptionK8sMultiPrimaryRollout represents the description of the K8sPrimaryRollout stage. + StageDescriptionK8sMultiPrimaryRollout = "Rollout the new version as PRIMARY to all targets" ) func buildQuickSyncPipeline(autoRollback bool) []sdk.QuickSyncStage { diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go index f1e56b3f64..5ac987dd14 100644 --- a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/plugin.go @@ -73,6 +73,8 @@ func (p *Plugin) ExecuteStage(ctx context.Context, _ *sdk.ConfigNone, dts []*sdk return &sdk.ExecuteStageResponse{ Status: p.executeK8sMultiCanaryCleanStage(ctx, input, dts), }, nil + case StageK8sMultiPrimaryRollout: + return &sdk.ExecuteStageResponse{Status: p.executeK8sMultiPrimaryRolloutStage(ctx, input, dts)}, nil default: return nil, errors.New("unimplemented or unsupported stage") } diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go new file mode 100644 index 0000000000..d94e72a49c --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary.go @@ -0,0 +1,263 @@ +// Copyright 2025 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "cmp" + "context" + "encoding/json" + "fmt" + "time" + + "golang.org/x/sync/errgroup" + + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + + kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/toolregistry" +) + +func (p *Plugin) executeK8sMultiPrimaryRolloutStage(ctx context.Context, input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) sdk.StageStatus { + lp := input.Client.LogPersister() + + cfg, err := input.Request.TargetDeploymentSource.AppConfig() + if err != nil { + lp.Errorf("Failed while decoding application config (%v)", err.Error()) + return sdk.StageStatusFailure + } + + var stageCfg kubeconfig.K8sPrimaryRolloutStageOptions + if len(input.Request.StageConfig) > 0 { + if err := json.Unmarshal(input.Request.StageConfig, &stageCfg); err != nil { + lp.Errorf("Failed while unmarshalling stage config (%v)", err) + return sdk.StageStatusFailure + } + } + + type targetConfig struct { + deployTarget *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig] + multiTarget *kubeconfig.KubernetesMultiTarget + } + + deployTargetMap := make(map[string]*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]) + targetConfigs := make([]targetConfig, 0, len(dts)) + + for _, target := range dts { + deployTargetMap[target.Name] = target + } + + // If no multi-targets are specified, roll out primary to all deploy targets. + if len(cfg.Spec.Input.MultiTargets) == 0 { + for _, dt := range dts { + targetConfigs = append(targetConfigs, targetConfig{ + deployTarget: dt, + multiTarget: nil, + }) + } + } else { + for _, multiTarget := range cfg.Spec.Input.MultiTargets { + dt, ok := deployTargetMap[multiTarget.Target.Name] + if !ok { + lp.Infof("Ignore multi target '%s': not matched any deployTarget", multiTarget.Target.Name) + continue + } + targetConfigs = append(targetConfigs, targetConfig{ + deployTarget: dt, + multiTarget: &multiTarget, + }) + } + } + + eg, ctx := errgroup.WithContext(ctx) + for _, tc := range targetConfigs { + eg.Go(func() error { + lp.Infof("Start primary rollout for target %s", tc.deployTarget.Name) + status := p.primaryRollout(ctx, input, tc.deployTarget, tc.multiTarget, stageCfg) + if status == sdk.StageStatusFailure { + return fmt.Errorf("failed to primary rollout for target %s", tc.deployTarget.Name) + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + lp.Errorf("Failed while rolling out primary (%v)", err) + return sdk.StageStatusFailure + } + + return sdk.StageStatusSuccess +} + +func (p *Plugin) primaryRollout( + ctx context.Context, + input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec], + dt *sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], + multiTarget *kubeconfig.KubernetesMultiTarget, + stageCfg kubeconfig.K8sPrimaryRolloutStageOptions, +) sdk.StageStatus { + lp := input.Client.LogPersister() + + cfg, err := input.Request.TargetDeploymentSource.AppConfig() + if err != nil { + lp.Errorf("Failed while loading application config (%v)", err) + return sdk.StageStatusFailure + } + + var ( + appCfg = cfg.Spec + variantLabel = appCfg.VariantLabel.Key + primaryVariant = appCfg.VariantLabel.PrimaryValue + ) + + toolRegistry := toolregistry.NewRegistry(input.Client.ToolRegistry()) + loader := provider.NewLoader(toolRegistry) + + lp.Infof("Loading manifests at commit %s for handling", input.Request.TargetDeploymentSource.CommitHash) + manifests, err := p.loadManifests(ctx, &input.Request.Deployment, cfg.Spec, &input.Request.TargetDeploymentSource, loader, input.Logger, multiTarget) + if err != nil { + lp.Errorf("Failed while loading manifests (%v)", err) + return sdk.StageStatusFailure + } + lp.Successf("Successfully loaded %d manifests", len(manifests)) + + if len(manifests) == 0 { + lp.Error("This application has no Kubernetes manifests to handle") + return sdk.StageStatusFailure + } + + // Generate the manifests for applying. + lp.Info("Start generating manifests for PRIMARY variant") + primaryManifests, err := generatePrimaryManifests(appCfg, manifests, stageCfg, variantLabel, primaryVariant) + if err != nil { + lp.Errorf("Unable to generate manifests for PRIMARY variant (%v)", err) + return sdk.StageStatusFailure + } + lp.Successf("Successfully generated %d manifests for PRIMARY variant", len(primaryManifests)) + + addVariantLabelsAndAnnotations(primaryManifests, variantLabel, primaryVariant) + + if err := annotateConfigHash(primaryManifests); err != nil { + lp.Errorf("Unable to set %q annotation into the workload manifest (%v)", provider.AnnotationConfigHash, err) + return sdk.StageStatusFailure + } + + deployTargetConfig := dt.Config + + // Resolve kubectl version: multiTarget > spec > deployTarget + kubectlVersion := cmp.Or(appCfg.Input.KubectlVersion, deployTargetConfig.KubectlVersion) + if multiTarget != nil { + kubectlVersion = cmp.Or(multiTarget.KubectlVersion, kubectlVersion) + } + + kubectlPath, err := toolRegistry.Kubectl(ctx, kubectlVersion) + if err != nil { + lp.Errorf("Failed while getting kubectl tool (%v)", err) + return sdk.StageStatusFailure + } + + kubectl := provider.NewKubectl(kubectlPath) + applier := provider.NewApplier(kubectl, appCfg.Input, deployTargetConfig, input.Logger) + + lp.Info("Start rolling out PRIMARY variant...") + if err := applyManifests(ctx, applier, primaryManifests, appCfg.Input.Namespace, lp); err != nil { + lp.Errorf("Failed while applying manifests (%v)", err) + return sdk.StageStatusFailure + } + + if !stageCfg.Prune { + lp.Info("Resource GC was skipped because prune was not configured") + return sdk.StageStatusSuccess + } + + // Wait for all applied manifests to be stable. + // In theory, we don't need to wait for them to be stable before going to the next step + // but waiting for a while reduces the number of Kubernetes changes in a short time. + lp.Info("Waiting for the applied manifests to be stable") + select { + case <-time.After(15 * time.Second): + break + case <-ctx.Done(): + break + } + + // Find the running resources that are not defined in Git. + lp.Info("Start finding all running PRIMARY resources but no longer defined in Git") + namespacedLiveResources, clusterScopedLiveResources, err := provider.GetLiveResources(ctx, kubectl, deployTargetConfig.KubeConfigPath, input.Request.Deployment.ApplicationID, fmt.Sprintf("%s=%s", variantLabel, primaryVariant)) + if err != nil { + lp.Errorf("Failed while getting live resources (%v)", err) + return sdk.StageStatusFailure + } + + if len(namespacedLiveResources)+len(clusterScopedLiveResources) == 0 { + lp.Info("There is no data about live resource so no resource will be removed") + return sdk.StageStatusSuccess + } + + lp.Successf("Successfully loaded %d live resources", len(namespacedLiveResources)+len(clusterScopedLiveResources)) + + removeKeys := provider.FindRemoveResources(primaryManifests, namespacedLiveResources, clusterScopedLiveResources) + if len(removeKeys) == 0 { + lp.Info("There are no live resources should be removed") + return sdk.StageStatusSuccess + } + + lp.Infof("Start pruning %d resources", len(removeKeys)) + deletedCount := deleteResources(ctx, lp, applier, removeKeys) + lp.Successf("Successfully deleted %d resources", deletedCount) + + return sdk.StageStatusSuccess +} + +// generatePrimaryManifests generates manifests for the PRIMARY variant. +// It deep-copies the input manifests, adds the variant label to workload selectors +// if requested, and generates a variant Service manifest if requested. +func generatePrimaryManifests(appCfg *kubeconfig.KubernetesApplicationSpec, manifests []provider.Manifest, stageCfg kubeconfig.K8sPrimaryRolloutStageOptions, variantLabel, variant string) ([]provider.Manifest, error) { + suffix := variant + if stageCfg.Suffix != "" { + suffix = stageCfg.Suffix + } + + primaryManifests := provider.DeepCopyManifests(manifests) + + // Add the variant label to workload selectors if requested. + if stageCfg.AddVariantLabelToSelector { + workloads := findWorkloadManifests(primaryManifests, nil) + for _, m := range workloads { + if err := ensureVariantSelectorInWorkload(m, variantLabel, variant); err != nil { + return nil, fmt.Errorf("unable to check/set %q in selector of workload %s (%w)", variantLabel+": "+variant, m.Key().ReadableString(), err) + } + } + } + + // Generate Service manifests for the PRIMARY variant if requested. + if stageCfg.CreateService { + serviceName := appCfg.Service.Name + services := findManifests(provider.KindService, serviceName, primaryManifests) + if len(services) == 0 { + return nil, fmt.Errorf("unable to find any service for PRIMARY variant") + } + // Deep-copy the services to avoid mutating the shared primaryManifests slice entries. + services = provider.DeepCopyManifests(services) + + generatedServices, err := generateVariantServiceManifests(services, variantLabel, variant, suffix) + if err != nil { + return nil, fmt.Errorf("failed to generate service manifests: %w", err) + } + primaryManifests = append(primaryManifests, generatedServices...) + } + + return primaryManifests, nil +} diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary_test.go b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary_test.go new file mode 100644 index 0000000000..d20e134e9c --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/primary_test.go @@ -0,0 +1,440 @@ +// Copyright 2025 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + "github.com/pipe-cd/piped-plugin-sdk-go/logpersister/logpersistertest" + "github.com/pipe-cd/piped-plugin-sdk-go/toolregistry/toolregistrytest" + + kubeconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config" +) + +func TestPlugin_executeK8sMultiPrimaryRolloutStage_SingleCluster(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "primary_rollout", "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiPrimaryRollout, + StageConfig: []byte(`{}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "primary_rollout"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + plugin := &Plugin{} + status := plugin.executeK8sMultiPrimaryRolloutStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + deployment, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + require.NoError(t, err) + + assert.Equal(t, "simple", deployment.GetName()) + assert.Equal(t, "primary", deployment.GetLabels()["pipecd.dev/variant"]) + assert.Equal(t, "primary", deployment.GetAnnotations()["pipecd.dev/variant"]) + assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) + assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) +} + +func TestPlugin_executeK8sMultiPrimaryRolloutStage_MultiCluster(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "primary_rollout", "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiPrimaryRollout, + StageConfig: []byte(`{}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "primary_rollout"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + cluster1 := setupCluster(t, "cluster1") + cluster2 := setupCluster(t, "cluster2") + + dts := []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "cluster1", Config: *cluster1.dtc}, + {Name: "cluster2", Config: *cluster2.dtc}, + } + + plugin := &Plugin{} + status := plugin.executeK8sMultiPrimaryRolloutStage(ctx, input, dts) + + require.Equal(t, sdk.StageStatusSuccess, status) + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + + // Both clusters should have the primary deployment. + for _, cl := range []*cluster{cluster1, cluster2} { + deployment, err := cl.cli.Resource(deploymentRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + require.NoError(t, err) + + assert.Equal(t, "simple", deployment.GetName()) + assert.Equal(t, "primary", deployment.GetLabels()["pipecd.dev/variant"]) + assert.Equal(t, "piped-id", deployment.GetLabels()["pipecd.dev/piped"]) + assert.Equal(t, "app-id", deployment.GetLabels()["pipecd.dev/application"]) + } +} + +func TestPlugin_executeK8sMultiPrimaryRolloutStage_WithCreateService(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + configDir := filepath.Join("testdata", "primary_rollout_with_create_service") + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(configDir, "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiPrimaryRollout, + StageConfig: []byte(`{"createService": true}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: configDir, + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + plugin := &Plugin{} + status := plugin.executeK8sMultiPrimaryRolloutStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Primary deployment should exist. + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + deployment, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, "simple", deployment.GetName()) + assert.Equal(t, "primary", deployment.GetLabels()["pipecd.dev/variant"]) + + // Primary variant service should be created with variant selector added. + serviceRes := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} + service, err := dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple-primary", metav1.GetOptions{}) + require.NoError(t, err) + assert.Equal(t, "simple-primary", service.GetName()) + + selector, found, err := unstructured.NestedStringMap(service.Object, "spec", "selector") + require.NoError(t, err) + require.True(t, found) + assert.Equal(t, map[string]string{"app": "simple", "pipecd.dev/variant": "primary"}, selector) +} + +func TestPlugin_executeK8sMultiPrimaryRolloutStage_WithAddVariantLabelToSelector(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "primary_rollout", "app.pipecd.yaml"), "kubernetes_multicluster") + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiPrimaryRollout, + StageConfig: []byte(`{"addVariantLabelToSelector": true}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "primary_rollout"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + plugin := &Plugin{} + status := plugin.executeK8sMultiPrimaryRolloutStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + deployment, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + require.NoError(t, err) + + // Variant label should be present in spec.selector.matchLabels. + matchLabels, found, err := unstructured.NestedStringMap(deployment.Object, "spec", "selector", "matchLabels") + require.NoError(t, err) + require.True(t, found) + assert.Equal(t, "primary", matchLabels["pipecd.dev/variant"]) +} + +func TestPlugin_executeK8sMultiPrimaryRolloutStage_WithPrune(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + testRegistry := toolregistrytest.NewTestToolRegistry(t) + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + serviceRes := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} + + runningOk := t.Run("prepare running state", func(t *testing.T) { + running := filepath.Join("testdata", "primary_rollout_prune", "running") + runningCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(running, "app.pipecd.yaml"), "kubernetes_multicluster") + + runningInput := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiPrimaryRollout, + StageConfig: []byte(`{"prune": true}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + plugin := &Plugin{} + status := plugin.executeK8sMultiPrimaryRolloutStage(ctx, runningInput, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Both deployment and service should exist after running state deployment. + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + assert.NoError(t, err) + _, err = dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + assert.NoError(t, err) + }) + require.True(t, runningOk, "prepare running state subtest failed, aborting") + + t.Run("prune with target state", func(t *testing.T) { + target := filepath.Join("testdata", "primary_rollout_prune", "target") + targetCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(target, "app.pipecd.yaml"), "kubernetes_multicluster") + + running := filepath.Join("testdata", "primary_rollout_prune", "running") + runningCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join(running, "app.pipecd.yaml"), "kubernetes_multicluster") + + targetInput := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiPrimaryRollout, + StageConfig: []byte(`{"prune": true}`), + RunningDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: running, + CommitHash: "0123456789", + ApplicationConfig: runningCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: target, + CommitHash: "0012345678", + ApplicationConfig: targetCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + plugin := &Plugin{} + status := plugin.executeK8sMultiPrimaryRolloutStage(ctx, targetInput, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + assert.Equal(t, sdk.StageStatusSuccess, status) + + // Deployment should still exist. + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + assert.NoError(t, err) + + // Service should have been pruned because it's not in the target manifests. + _, err = dynamicClient.Resource(serviceRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, apierrors.IsNotFound(err), "expected service to be pruned, but got %v", err) + }) +} + +func TestPlugin_executeK8sMultiPrimaryRolloutStage_WithPrune_ManualPreCreate(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + appCfg := sdk.LoadApplicationConfigForTest[kubeconfig.KubernetesApplicationSpec](t, filepath.Join("testdata", "primary_rollout", "app.pipecd.yaml"), "kubernetes_multicluster") + testRegistry := toolregistrytest.NewTestToolRegistry(t) + + dtConfig, dynamicClient := setupTestDeployTargetConfigAndDynamicClient(t) + + deploymentRes := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + + // Pre-create a stale primary deployment that is NOT in the target manifests. + staleDeployment := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]any{ + "name": "simple-stale", + "namespace": "default", + "labels": map[string]any{ + "app": "simple", + "pipecd.dev/managed-by": "piped", + "pipecd.dev/piped": "piped-id", + "pipecd.dev/application": "app-id", + "pipecd.dev/variant": "primary", + }, + "annotations": map[string]any{ + "pipecd.dev/managed-by": "piped", + "pipecd.dev/application": "app-id", + "pipecd.dev/variant": "primary", + }, + }, + "spec": map[string]any{ + "replicas": int64(1), + "selector": map[string]any{ + "matchLabels": map[string]any{ + "app": "simple", + "pipecd.dev/variant": "primary", + }, + }, + "template": map[string]any{ + "metadata": map[string]any{ + "labels": map[string]any{ + "app": "simple", + "pipecd.dev/variant": "primary", + }, + }, + "spec": map[string]any{ + "containers": []any{ + map[string]any{ + "name": "helloworld", + "image": "ghcr.io/pipe-cd/helloworld:v0.31.0", + }, + }, + }, + }, + }, + }, + } + + _, err := dynamicClient.Resource(deploymentRes).Namespace("default").Create(ctx, staleDeployment, metav1.CreateOptions{}) + require.NoError(t, err) + + input := &sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]{ + Request: sdk.ExecuteStageRequest[kubeconfig.KubernetesApplicationSpec]{ + StageName: StageK8sMultiPrimaryRollout, + StageConfig: []byte(`{"prune": true}`), + TargetDeploymentSource: sdk.DeploymentSource[kubeconfig.KubernetesApplicationSpec]{ + ApplicationDirectory: filepath.Join("testdata", "primary_rollout"), + CommitHash: "0123456789", + ApplicationConfig: appCfg, + ApplicationConfigFilename: "app.pipecd.yaml", + }, + Deployment: sdk.Deployment{ + PipedID: "piped-id", + ApplicationID: "app-id", + }, + }, + Client: sdk.NewClient(nil, "kubernetes_multicluster", "app-id", "stage-id", logpersistertest.NewTestLogPersister(t), testRegistry), + Logger: zaptest.NewLogger(t), + } + + plugin := &Plugin{} + status := plugin.executeK8sMultiPrimaryRolloutStage(ctx, input, []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig]{ + {Name: "default", Config: *dtConfig}, + }) + + assert.Equal(t, sdk.StageStatusSuccess, status) + + // The target deployment should exist. + _, err = dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple", metav1.GetOptions{}) + assert.NoError(t, err) + + // The stale deployment should have been pruned. + _, err = dynamicClient.Resource(deploymentRes).Namespace("default").Get(ctx, "simple-stale", metav1.GetOptions{}) + require.Error(t, err) + assert.True(t, apierrors.IsNotFound(err), "expected stale deployment to be pruned, but got: %v", err) +} diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/app.pipecd.yaml new file mode 100644 index 0000000000..496c3a72ba --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/app.pipecd.yaml @@ -0,0 +1,14 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + plugins: + kubernetes_multicluster: + input: + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.32.2 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/deployment.yaml new file mode 100644 index 0000000000..429fc2cd59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/deployment.yaml @@ -0,0 +1,25 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/service.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/service.yaml new file mode 100644 index 0000000000..52ca9d1f59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/app.pipecd.yaml new file mode 100644 index 0000000000..496c3a72ba --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/app.pipecd.yaml @@ -0,0 +1,14 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + plugins: + kubernetes_multicluster: + input: + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.32.2 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/deployment.yaml new file mode 100644 index 0000000000..429fc2cd59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/deployment.yaml @@ -0,0 +1,25 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/service.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/service.yaml new file mode 100644 index 0000000000..52ca9d1f59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/running/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/target/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/target/app.pipecd.yaml new file mode 100644 index 0000000000..55b01e7b45 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/target/app.pipecd.yaml @@ -0,0 +1,13 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + plugins: + kubernetes_multicluster: + input: + manifests: + - deployment.yaml + kubectlVersion: 1.32.2 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/target/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/target/deployment.yaml new file mode 100644 index 0000000000..429fc2cd59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_prune/target/deployment.yaml @@ -0,0 +1,25 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/app.pipecd.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/app.pipecd.yaml new file mode 100644 index 0000000000..d2c4ce708c --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/app.pipecd.yaml @@ -0,0 +1,16 @@ +apiVersion: pipecd.dev/v1beta1 +kind: KubernetesApp +spec: + name: simple + labels: + env: example + team: product + plugins: + kubernetes_multicluster: + service: + name: simple + input: + manifests: + - deployment.yaml + - service.yaml + kubectlVersion: 1.32.2 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/deployment.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/deployment.yaml new file mode 100644 index 0000000000..429fc2cd59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/deployment.yaml @@ -0,0 +1,25 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: simple + labels: + app: simple +spec: + replicas: 2 + selector: + matchLabels: + app: simple + pipecd.dev/variant: primary + template: + metadata: + labels: + app: simple + pipecd.dev/variant: primary + spec: + containers: + - name: helloworld + image: ghcr.io/pipe-cd/helloworld:v0.32.0 + args: + - server + ports: + - containerPort: 9085 diff --git a/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/service.yaml b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/service.yaml new file mode 100644 index 0000000000..52ca9d1f59 --- /dev/null +++ b/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment/testdata/primary_rollout_with_create_service/service.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: Service +metadata: + name: simple +spec: + selector: + app: simple + ports: + - protocol: TCP + port: 9085 + targetPort: 9085