diff --git a/operator/pkg/resources/statefulset_update.go b/operator/pkg/resources/statefulset_update.go index debf3e544..9c8adf47f 100644 --- a/operator/pkg/resources/statefulset_update.go +++ b/operator/pkg/resources/statefulset_update.go @@ -94,7 +94,7 @@ func (r *StatefulSetResource) runUpdate( // Check if we should run update on this specific STS. log.V(logger.DebugLevel).Info("Checking that we should update") - stsChanged, nodePoolRestarting, err := r.shouldUpdate(current, modified) + stsChanged, nodePoolRestarting, err := r.shouldUpdate(ctx, current, modified) if err != nil { return fmt.Errorf("unable to determine the update procedure: %w", err) } @@ -629,7 +629,7 @@ func (r *StatefulSetResource) updateStatefulSet( // If no differences are found, the function returns `false` as first value. The second value // depends on the Node Pool Status, indicating whether rolling restart is still in progress. func (r *StatefulSetResource) shouldUpdate( - current, modified *appsv1.StatefulSet, + ctx context.Context, current, modified *appsv1.StatefulSet, ) (bool, bool, error) { log := r.logger.WithName("shouldUpdate") @@ -648,6 +648,18 @@ func (r *StatefulSetResource) shouldUpdate( return false, true, nil } + // Check if any pods have ClusterUpdate condition set. + // This makes the controller idempotent: if a previous reconcile set the condition + // but the status flag wasn't persisted (due to controller restart, stale cache, etc.), + // we'll still detect that a rolling update is needed by checking actual pod state. + allPodsUpdated, err := r.areAllPodsUpdated(ctx) + if err != nil { + log.V(logger.DebugLevel).Error(err, "error checking if pods need update") + } else if !allPodsUpdated { + log.Info("Found pods with ClusterUpdate condition, continuing rolling update") + return false, true, nil + } + return false, false, err } log.Info("Detected diff", "patchResult", string(patchResult.Patch)) diff --git a/operator/pkg/resources/statefulset_update_test.go b/operator/pkg/resources/statefulset_update_test.go index 325b60711..f50800ff4 100644 --- a/operator/pkg/resources/statefulset_update_test.go +++ b/operator/pkg/resources/statefulset_update_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" redpanda "github.com/redpanda-data/redpanda-operator/charts/redpanda/v25/client" vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1" @@ -146,13 +147,21 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { } stsWithAnnotation := sts.DeepCopy() stsWithAnnotation.Spec.Template.Annotations = map[string]string{"test": "test2"} + + fakeClient := fake.NewClientBuilder().Build() + ssres := StatefulSetResource{ + Client: fakeClient, nodePool: vectorizedv1alpha1.NodePoolSpecWithDeleted{ NodePoolSpec: vectorizedv1alpha1.NodePoolSpec{ Name: "default", }, }, pandaCluster: &vectorizedv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, Status: vectorizedv1alpha1.ClusterStatus{ Restarting: false, NodePools: map[string]vectorizedv1alpha1.NodePoolStatus{ @@ -163,22 +172,27 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { }, }, } - stsChange, _, err := ssres.shouldUpdate(sts, stsWithAnnotation) + stsChange, _, err := ssres.shouldUpdate(context.Background(), sts, stsWithAnnotation) require.NoError(t, err) require.True(t, stsChange) // same statefulset with same annotation - stsChange, _, err = ssres.shouldUpdate(stsWithAnnotation, stsWithAnnotation) + stsChange, _, err = ssres.shouldUpdate(context.Background(), stsWithAnnotation, stsWithAnnotation) require.NoError(t, err) require.False(t, stsChange) ssres = StatefulSetResource{ + Client: fakeClient, nodePool: vectorizedv1alpha1.NodePoolSpecWithDeleted{ NodePoolSpec: vectorizedv1alpha1.NodePoolSpec{ Name: "default", }, }, pandaCluster: &vectorizedv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, Status: vectorizedv1alpha1.ClusterStatus{ Restarting: false, NodePools: map[string]vectorizedv1alpha1.NodePoolStatus{ @@ -190,7 +204,7 @@ func TestShouldUpdate_AnnotationChange(t *testing.T) { }, } // same statefulset with same annotation, but with node pool status restarting - _, nodePoolRestarting, err := ssres.shouldUpdate(stsWithAnnotation, stsWithAnnotation) + _, nodePoolRestarting, err := ssres.shouldUpdate(context.Background(), stsWithAnnotation, stsWithAnnotation) require.NoError(t, err) require.True(t, nodePoolRestarting) } @@ -385,6 +399,111 @@ vectorized_cluster_partition_under_replicated_replicas{namespace="kafka",partiti require.NoError(t, err) } +func TestShouldUpdate_PodsWithClusterUpdateCondition(t *testing.T) { + var replicas int32 = 3 + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "cluster-first", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/name": "redpanda", + "app.kubernetes.io/instance": "cluster", + "app.kubernetes.io/component": "redpanda", + "cluster.redpanda.com/nodepool": "first", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app.kubernetes.io/name": "redpanda", + "app.kubernetes.io/instance": "cluster", + "app.kubernetes.io/component": "redpanda", + "cluster.redpanda.com/nodepool": "first", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "redpanda", Image: "redpanda:v25.2.7"}}, + }, + }, + }, + } + + // Create pods with ClusterUpdate condition set (simulating stuck state) + pods := []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-first-0", + Namespace: "default", + Labels: map[string]string{ + "app.kubernetes.io/name": "redpanda", + "app.kubernetes.io/instance": "cluster", + "app.kubernetes.io/component": "redpanda", + "cluster.redpanda.com/nodepool": "first", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + {Type: ClusterUpdatePodCondition, Status: corev1.ConditionTrue, Reason: ClusterUpdateReasonVersion}, + }, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster-first-1", + Namespace: "default", + Labels: map[string]string{ + "app.kubernetes.io/name": "redpanda", + "app.kubernetes.io/instance": "cluster", + "app.kubernetes.io/component": "redpanda", + "cluster.redpanda.com/nodepool": "first", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + {Type: ClusterUpdatePodCondition, Status: corev1.ConditionTrue, Reason: ClusterUpdateReasonVersion}, + }, + }, + }, + } + + fakeClient := fake.NewClientBuilder().WithObjects(pods...).Build() + + ssres := StatefulSetResource{ + Client: fakeClient, + logger: ctrl.Log.WithName("test"), + nodePool: vectorizedv1alpha1.NodePoolSpecWithDeleted{ + NodePoolSpec: vectorizedv1alpha1.NodePoolSpec{ + Name: "first", + }, + }, + pandaCluster: &vectorizedv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster", + Namespace: "default", + }, + Status: vectorizedv1alpha1.ClusterStatus{ + Restarting: false, // Status says not restarting (stale cache) + NodePools: map[string]vectorizedv1alpha1.NodePoolStatus{ + "first": { + Restarting: false, // NodePool status also says not restarting + }, + }, + }, + }, + } + + // No StatefulSet diff, status says not restarting, but pods have ClusterUpdate condition + // Should detect this and return nodePoolRestarting=true for idempotency + stsChange, nodePoolRestarting, err := ssres.shouldUpdate(context.Background(), sts, sts) + require.NoError(t, err) + require.False(t, stsChange, "StatefulSet should not have changed") + require.True(t, nodePoolRestarting, "Should detect pods need restart despite status saying otherwise") +} + //nolint:funlen // the test data doesn't count func Test_sortPodList(t *testing.T) { const clusterName = "sortPodListCluster"