diff --git a/.changes/unreleased/operator-Fixed-20260122-100940.yaml b/.changes/unreleased/operator-Fixed-20260122-100940.yaml new file mode 100644 index 000000000..b9c907f8c --- /dev/null +++ b/.changes/unreleased/operator-Fixed-20260122-100940.yaml @@ -0,0 +1,4 @@ +project: operator +kind: Fixed +body: Allow Topic finalizer removal when broker is unreachable or credentials are missing during deletion, preventing stuck namespaces. +time: 2026-01-22T10:00:00.000000+01:00 diff --git a/operator/internal/controller/redpanda/resource_controller.go b/operator/internal/controller/redpanda/resource_controller.go index f2e05ae33..63ed068d7 100644 --- a/operator/internal/controller/redpanda/resource_controller.go +++ b/operator/internal/controller/redpanda/resource_controller.go @@ -12,11 +12,13 @@ package redpanda import ( "context" "fmt" + "net" "time" "github.com/cockroachdb/errors" "github.com/go-logr/logr" "github.com/redpanda-data/common-go/otelutil/log" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -27,6 +29,7 @@ import ( "github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle" internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client" "github.com/redpanda-data/redpanda-operator/pkg/multicluster" + "github.com/redpanda-data/redpanda-operator/pkg/secrets" ) type Resource[T any] interface { @@ -136,7 +139,9 @@ func ignoreAllConnectionErrors(logger logr.Logger, err error) error { // able to clean ourselves up anyway. if internalclient.IsTerminalClientError(err) || internalclient.IsConfigurationError(err) || - internalclient.IsInvalidClusterError(err) { + internalclient.IsInvalidClusterError(err) || + isNotFoundInChain(err) || + isNetworkDialError(err) { // We use Info rather than Error here because we don't want // to ignore the verbosity settings. This is really only for // debugging purposes. @@ -146,6 +151,26 @@ func ignoreAllConnectionErrors(logger logr.Logger, err error) error { return err } +// isNotFoundInChain walks the error chain to check if a "not found" error +// is wrapped anywhere. This handles both K8s NotFound (missing secrets/configmaps) +// and cloud secret not found errors. +func isNotFoundInChain(err error) bool { + for err != nil { + if apierrors.IsNotFound(err) || errors.Is(err, secrets.ErrSecretNotFound) { + return true + } + err = errors.Unwrap(err) + } + return false +} + +// isNetworkDialError checks if the error chain contains a network dial error. +// This handles cases where the broker is unreachable (connection refused, timeout, etc). +func isNetworkDialError(err error) bool { + var netErr *net.OpError + return errors.As(err, &netErr) +} + func handleResourceSyncErrors(err error) (metav1.Condition, error) { // If we have a known terminal error, just set the sync condition and don't re-run reconciliation. if internalclient.IsInvalidClusterError(err) { diff --git a/operator/internal/controller/redpanda/resource_controller_test.go b/operator/internal/controller/redpanda/resource_controller_test.go index 0d04c5d5b..af58d615a 100644 --- a/operator/internal/controller/redpanda/resource_controller_test.go +++ b/operator/internal/controller/redpanda/resource_controller_test.go @@ -12,12 +12,14 @@ package redpanda import ( "context" "fmt" + "net" "os" "sync/atomic" "testing" "time" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/modules/redpanda" corev1 "k8s.io/api/core/v1" @@ -394,3 +396,69 @@ func TestResourceController(t *testing.T) { // nolint:funlen // These tests have require.Equal(t, int32(size/2), reconciler.deletes.Load()) require.Equal(t, int32(size), reconciler.syncs.Load()) } + +func TestIsNetworkDialError(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "plain error", + err: errors.New("some error"), + expected: false, + }, + { + name: "net.OpError directly", + err: &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")}, + expected: true, + }, + { + name: "net.OpError wrapped once (franz-go style)", + err: fmt.Errorf("unable to dial: %w", + &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")}), + expected: true, + }, + { + name: "net.OpError wrapped twice (recordErrorEvent + franz-go)", + err: fmt.Errorf("deleting topic (test) library error: %w", + fmt.Errorf("unable to dial: %w", + &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")})), + expected: true, + }, + { + name: "realistic connection refused error", + err: fmt.Errorf("deleting topic (test-topic) library error: %w", + fmt.Errorf("unable to dial: %w", + &net.OpError{ + Op: "dial", + Net: "tcp", + Addr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 9092}, + Err: &net.OpError{Op: "connect", Err: errors.New("connection refused")}, + })), + expected: true, + }, + { + name: "DNS resolution error", + err: fmt.Errorf("unable to dial: %w", + &net.OpError{ + Op: "dial", + Net: "tcp", + Err: &net.DNSError{Err: "no such host", Name: "nonexistent.local"}, + }), + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isNetworkDialError(tt.err) + assert.Equal(t, tt.expected, result, "error: %v", tt.err) + }) + } +} diff --git a/operator/internal/controller/redpanda/topic_controller.go b/operator/internal/controller/redpanda/topic_controller.go index 41ca41c38..c063c25e1 100644 --- a/operator/internal/controller/redpanda/topic_controller.go +++ b/operator/internal/controller/redpanda/topic_controller.go @@ -185,6 +185,12 @@ func (r *TopicReconciler) reconcile(ctx context.Context, recorder record.EventRe kafkaClient, err := r.createKafkaClient(ctx, topic, l) if err != nil { + // If topic is being deleted, allow finalizer removal when we can't + // establish a connection. This prevents namespaces from getting stuck + // in Terminating state when secrets are deleted before the Topic. + if !topic.ObjectMeta.DeletionTimestamp.IsZero() && ignoreAllConnectionErrors(l, err) == nil { + return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil + } return redpandav1alpha2.TopicFailed(topic), ctrl.Result{}, err } defer kafkaClient.Close() @@ -203,6 +209,12 @@ func (r *TopicReconciler) reconcile(ctx context.Context, recorder record.EventRe l.V(log.DebugLevel).Info("delete topic", "topic-name", topic.GetTopicName()) err = r.deleteTopic(ctx, recorder, topic, kafkaClient) if err != nil { + // Allow finalizer removal if we can't connect to the broker. + // This prevents topics from getting stuck in Terminating state + // when the broker is unreachable. + if ignoreAllConnectionErrors(l, err) == nil { + return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil + } return redpandav1alpha2.TopicFailed(topic), ctrl.Result{}, fmt.Errorf("unable to delete topic: %w", err) } return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil diff --git a/operator/internal/controller/redpanda/topic_controller_test.go b/operator/internal/controller/redpanda/topic_controller_test.go index fd78d72e2..902b30370 100644 --- a/operator/internal/controller/redpanda/topic_controller_test.go +++ b/operator/internal/controller/redpanda/topic_controller_test.go @@ -21,6 +21,7 @@ import ( "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -823,4 +824,95 @@ func TestReconcile(t *testing.T) { // nolint:funlen // These tests have clear su assert.Equal(t, time.Duration(0), result.RequeueAfter) }) + t.Run("delete_topic_with_missing_credentials_succeeds", func(t *testing.T) { + // Deletion should succeed when credentials Secret is missing. + // This prevents namespaces from getting stuck in Terminating state + // when secrets are deleted before topics. + topicName := "delete-topic-missing-secret" + + topic := redpandav1alpha2.Topic{ + ObjectMeta: metav1.ObjectMeta{ + Name: topicName, + Namespace: testNamespace, + Finalizers: []string{FinalizerKey}, + }, + Spec: redpandav1alpha2.TopicSpec{ + Partitions: ptr.To(1), + ReplicationFactor: ptr.To(1), + KafkaAPISpec: &redpandav1alpha2.KafkaAPISpec{ + Brokers: []string{seedBroker}, + SASL: &redpandav1alpha2.KafkaSASL{ + Username: "testuser", + Password: &redpandav1alpha2.ValueSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "non-existent-secret", + }, + Key: "password", + }, + }, + Mechanism: redpandav1alpha2.SASLMechanismScramSHA256, + }, + }, + }, + } + + err := c.Create(ctx, &topic) + require.NoError(t, err) + + err = c.Delete(ctx, &topic) + require.NoError(t, err) + + key := types.NamespacedName{Name: topicName, Namespace: testNamespace} + req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: key}, ClusterName: mcmanager.LocalCluster} + + // Reconcile should succeed (ignoring the not-found error during deletion) + result, err := tr.Reconcile(ctx, req) + assert.NoError(t, err, "reconciler should not error when secret is missing during deletion") + assert.Equal(t, time.Duration(0), result.RequeueAfter) + + // Topic should be deleted (finalizer removed) + err = c.Get(ctx, key, &topic) + assert.True(t, apierrors.IsNotFound(err), "topic should be deleted after finalizer removal") + }) + t.Run("delete_topic_with_unreachable_broker_succeeds", func(t *testing.T) { + // Deletion should succeed when broker is unreachable (connection refused). + // This prevents topics from getting stuck in Terminating state when the + // Kafka cluster is gone or unreachable. + topicName := "delete-topic-unreachable-broker" + + topic := redpandav1alpha2.Topic{ + ObjectMeta: metav1.ObjectMeta{ + Name: topicName, + Namespace: testNamespace, + Finalizers: []string{FinalizerKey}, + }, + Spec: redpandav1alpha2.TopicSpec{ + Partitions: ptr.To(1), + ReplicationFactor: ptr.To(1), + KafkaAPISpec: &redpandav1alpha2.KafkaAPISpec{ + // Use a localhost address that will definitely fail to connect + Brokers: []string{"localhost:19092"}, + }, + }, + } + + err := c.Create(ctx, &topic) + require.NoError(t, err) + + err = c.Delete(ctx, &topic) + require.NoError(t, err) + + key := types.NamespacedName{Name: topicName, Namespace: testNamespace} + req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: key}, ClusterName: mcmanager.LocalCluster} + + // Reconcile should succeed (ignoring the dial error during deletion) + result, err := tr.Reconcile(ctx, req) + assert.NoError(t, err, "reconciler should not error when broker is unreachable during deletion") + assert.Equal(t, time.Duration(0), result.RequeueAfter) + + // Topic should be deleted (finalizer removed) + err = c.Get(ctx, key, &topic) + assert.True(t, apierrors.IsNotFound(err), "topic should be deleted after finalizer removal") + }) } diff --git a/pkg/secrets/secrets.go b/pkg/secrets/secrets.go index b0df0cdcf..910076cc5 100644 --- a/pkg/secrets/secrets.go +++ b/pkg/secrets/secrets.go @@ -21,6 +21,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) +// ErrSecretNotFound is returned when a cloud secret cannot be found. +var ErrSecretNotFound = errors.New("cloud secret not found") + type CloudExpander struct { client secrets.SecretAPI logger *slog.Logger @@ -97,7 +100,7 @@ func NewCloudExpanderFromAPI(api secrets.SecretAPI) *CloudExpander { func (t *CloudExpander) Expand(ctx context.Context, name string) (string, error) { value, found := t.client.GetSecretValue(ctx, name) if !found { - return "", errors.Newf("secret %s not found", name) + return "", errors.Wrapf(ErrSecretNotFound, "secret %s", name) } return value, nil }