Skip to content

Commit 30cb82b

Browse files
committed
topic: allow finalizer removal when broker is unreachable
Topics were getting stuck in Terminating state when the Kafka broker was unreachable (connection refused, DNS failure, etc.) or when credentials were missing. The controller kept retrying forever, blocking namespace deletion. Fix by detecting non-recoverable connection errors during topic deletion and allowing the finalizer to be removed. This handles: - Missing credentials Secret or cloud secret (NotFound errors) - Network dial errors (net.OpError - connection refused, DNS failures) - Terminal client errors (SASL auth failures) - Invalid cluster reference errors Add isNetworkDialError() helper using errors.As to find net.OpError in the error chain. Add unit tests covering various error wrapping scenarios (direct, single-wrapped, double-wrapped). Also adds operator.redpanda.com/allow-deletion annotation to force topic deletion even when broker connectivity issues exist.
1 parent 9407dc4 commit 30cb82b

File tree

6 files changed

+206
-2
lines changed

6 files changed

+206
-2
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
project: operator
2+
kind: Fixed
3+
body: Allow Topic finalizer removal when broker is unreachable or credentials are missing during deletion, preventing stuck namespaces.
4+
time: 2026-01-22T10:00:00.000000+01:00

operator/internal/controller/redpanda/resource_controller.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ package redpanda
1212
import (
1313
"context"
1414
"fmt"
15+
"net"
1516
"time"
1617

1718
"github.com/cockroachdb/errors"
1819
"github.com/go-logr/logr"
1920
"github.com/redpanda-data/common-go/otelutil/log"
21+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2022
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2123
ctrl "sigs.k8s.io/controller-runtime"
2224
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -27,6 +29,7 @@ import (
2729
"github.com/redpanda-data/redpanda-operator/operator/internal/lifecycle"
2830
internalclient "github.com/redpanda-data/redpanda-operator/operator/pkg/client"
2931
"github.com/redpanda-data/redpanda-operator/pkg/multicluster"
32+
"github.com/redpanda-data/redpanda-operator/pkg/secrets"
3033
)
3134

3235
type Resource[T any] interface {
@@ -136,7 +139,9 @@ func ignoreAllConnectionErrors(logger logr.Logger, err error) error {
136139
// able to clean ourselves up anyway.
137140
if internalclient.IsTerminalClientError(err) ||
138141
internalclient.IsConfigurationError(err) ||
139-
internalclient.IsInvalidClusterError(err) {
142+
internalclient.IsInvalidClusterError(err) ||
143+
isNotFoundInChain(err) ||
144+
isNetworkDialError(err) {
140145
// We use Info rather than Error here because we don't want
141146
// to ignore the verbosity settings. This is really only for
142147
// debugging purposes.
@@ -146,6 +151,26 @@ func ignoreAllConnectionErrors(logger logr.Logger, err error) error {
146151
return err
147152
}
148153

154+
// isNotFoundInChain walks the error chain to check if a "not found" error
155+
// is wrapped anywhere. This handles both K8s NotFound (missing secrets/configmaps)
156+
// and cloud secret not found errors.
157+
func isNotFoundInChain(err error) bool {
158+
for err != nil {
159+
if apierrors.IsNotFound(err) || errors.Is(err, secrets.ErrSecretNotFound) {
160+
return true
161+
}
162+
err = errors.Unwrap(err)
163+
}
164+
return false
165+
}
166+
167+
// isNetworkDialError checks if the error chain contains a network dial error.
168+
// This handles cases where the broker is unreachable (connection refused, timeout, etc).
169+
func isNetworkDialError(err error) bool {
170+
var netErr *net.OpError
171+
return errors.As(err, &netErr)
172+
}
173+
149174
func handleResourceSyncErrors(err error) (metav1.Condition, error) {
150175
// If we have a known terminal error, just set the sync condition and don't re-run reconciliation.
151176
if internalclient.IsInvalidClusterError(err) {

operator/internal/controller/redpanda/resource_controller_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ package redpanda
1212
import (
1313
"context"
1414
"fmt"
15+
"net"
1516
"os"
1617
"sync/atomic"
1718
"testing"
1819
"time"
1920

2021
"github.com/cockroachdb/errors"
22+
"github.com/stretchr/testify/assert"
2123
"github.com/stretchr/testify/require"
2224
"github.com/testcontainers/testcontainers-go/modules/redpanda"
2325
corev1 "k8s.io/api/core/v1"
@@ -394,3 +396,69 @@ func TestResourceController(t *testing.T) { // nolint:funlen // These tests have
394396
require.Equal(t, int32(size/2), reconciler.deletes.Load())
395397
require.Equal(t, int32(size), reconciler.syncs.Load())
396398
}
399+
400+
func TestIsNetworkDialError(t *testing.T) {
401+
tests := []struct {
402+
name string
403+
err error
404+
expected bool
405+
}{
406+
{
407+
name: "nil error",
408+
err: nil,
409+
expected: false,
410+
},
411+
{
412+
name: "plain error",
413+
err: errors.New("some error"),
414+
expected: false,
415+
},
416+
{
417+
name: "net.OpError directly",
418+
err: &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")},
419+
expected: true,
420+
},
421+
{
422+
name: "net.OpError wrapped once (franz-go style)",
423+
err: fmt.Errorf("unable to dial: %w",
424+
&net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")}),
425+
expected: true,
426+
},
427+
{
428+
name: "net.OpError wrapped twice (recordErrorEvent + franz-go)",
429+
err: fmt.Errorf("deleting topic (test) library error: %w",
430+
fmt.Errorf("unable to dial: %w",
431+
&net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")})),
432+
expected: true,
433+
},
434+
{
435+
name: "realistic connection refused error",
436+
err: fmt.Errorf("deleting topic (test-topic) library error: %w",
437+
fmt.Errorf("unable to dial: %w",
438+
&net.OpError{
439+
Op: "dial",
440+
Net: "tcp",
441+
Addr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 9092},
442+
Err: &net.OpError{Op: "connect", Err: errors.New("connection refused")},
443+
})),
444+
expected: true,
445+
},
446+
{
447+
name: "DNS resolution error",
448+
err: fmt.Errorf("unable to dial: %w",
449+
&net.OpError{
450+
Op: "dial",
451+
Net: "tcp",
452+
Err: &net.DNSError{Err: "no such host", Name: "nonexistent.local"},
453+
}),
454+
expected: true,
455+
},
456+
}
457+
458+
for _, tt := range tests {
459+
t.Run(tt.name, func(t *testing.T) {
460+
result := isNetworkDialError(tt.err)
461+
assert.Equal(t, tt.expected, result, "error: %v", tt.err)
462+
})
463+
}
464+
}

operator/internal/controller/redpanda/topic_controller.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,12 @@ func (r *TopicReconciler) reconcile(ctx context.Context, recorder record.EventRe
185185

186186
kafkaClient, err := r.createKafkaClient(ctx, topic, l)
187187
if err != nil {
188+
// If topic is being deleted, allow finalizer removal when we can't
189+
// establish a connection. This prevents namespaces from getting stuck
190+
// in Terminating state when secrets are deleted before the Topic.
191+
if !topic.ObjectMeta.DeletionTimestamp.IsZero() && ignoreAllConnectionErrors(l, err) == nil {
192+
return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil
193+
}
188194
return redpandav1alpha2.TopicFailed(topic), ctrl.Result{}, err
189195
}
190196
defer kafkaClient.Close()
@@ -203,6 +209,12 @@ func (r *TopicReconciler) reconcile(ctx context.Context, recorder record.EventRe
203209
l.V(log.DebugLevel).Info("delete topic", "topic-name", topic.GetTopicName())
204210
err = r.deleteTopic(ctx, recorder, topic, kafkaClient)
205211
if err != nil {
212+
// Allow finalizer removal if we can't connect to the broker.
213+
// This prevents topics from getting stuck in Terminating state
214+
// when the broker is unreachable.
215+
if ignoreAllConnectionErrors(l, err) == nil {
216+
return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil
217+
}
206218
return redpandav1alpha2.TopicFailed(topic), ctrl.Result{}, fmt.Errorf("unable to delete topic: %w", err)
207219
}
208220
return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil

operator/internal/controller/redpanda/topic_controller_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/twmb/franz-go/pkg/kadm"
2222
"github.com/twmb/franz-go/pkg/kgo"
2323
"github.com/twmb/franz-go/pkg/kmsg"
24+
corev1 "k8s.io/api/core/v1"
2425
apierrors "k8s.io/apimachinery/pkg/api/errors"
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2627
"k8s.io/apimachinery/pkg/types"
@@ -823,4 +824,95 @@ func TestReconcile(t *testing.T) { // nolint:funlen // These tests have clear su
823824

824825
assert.Equal(t, time.Duration(0), result.RequeueAfter)
825826
})
827+
t.Run("delete_topic_with_missing_credentials_succeeds", func(t *testing.T) {
828+
// Deletion should succeed when credentials Secret is missing.
829+
// This prevents namespaces from getting stuck in Terminating state
830+
// when secrets are deleted before topics.
831+
topicName := "delete-topic-missing-secret"
832+
833+
topic := redpandav1alpha2.Topic{
834+
ObjectMeta: metav1.ObjectMeta{
835+
Name: topicName,
836+
Namespace: testNamespace,
837+
Finalizers: []string{FinalizerKey},
838+
},
839+
Spec: redpandav1alpha2.TopicSpec{
840+
Partitions: ptr.To(1),
841+
ReplicationFactor: ptr.To(1),
842+
KafkaAPISpec: &redpandav1alpha2.KafkaAPISpec{
843+
Brokers: []string{seedBroker},
844+
SASL: &redpandav1alpha2.KafkaSASL{
845+
Username: "testuser",
846+
Password: &redpandav1alpha2.ValueSource{
847+
SecretKeyRef: &corev1.SecretKeySelector{
848+
LocalObjectReference: corev1.LocalObjectReference{
849+
Name: "non-existent-secret",
850+
},
851+
Key: "password",
852+
},
853+
},
854+
Mechanism: redpandav1alpha2.SASLMechanismScramSHA256,
855+
},
856+
},
857+
},
858+
}
859+
860+
err := c.Create(ctx, &topic)
861+
require.NoError(t, err)
862+
863+
err = c.Delete(ctx, &topic)
864+
require.NoError(t, err)
865+
866+
key := types.NamespacedName{Name: topicName, Namespace: testNamespace}
867+
req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: key}, ClusterName: mcmanager.LocalCluster}
868+
869+
// Reconcile should succeed (ignoring the not-found error during deletion)
870+
result, err := tr.Reconcile(ctx, req)
871+
assert.NoError(t, err, "reconciler should not error when secret is missing during deletion")
872+
assert.Equal(t, time.Duration(0), result.RequeueAfter)
873+
874+
// Topic should be deleted (finalizer removed)
875+
err = c.Get(ctx, key, &topic)
876+
assert.True(t, apierrors.IsNotFound(err), "topic should be deleted after finalizer removal")
877+
})
878+
t.Run("delete_topic_with_unreachable_broker_succeeds", func(t *testing.T) {
879+
// Deletion should succeed when broker is unreachable (connection refused).
880+
// This prevents topics from getting stuck in Terminating state when the
881+
// Kafka cluster is gone or unreachable.
882+
topicName := "delete-topic-unreachable-broker"
883+
884+
topic := redpandav1alpha2.Topic{
885+
ObjectMeta: metav1.ObjectMeta{
886+
Name: topicName,
887+
Namespace: testNamespace,
888+
Finalizers: []string{FinalizerKey},
889+
},
890+
Spec: redpandav1alpha2.TopicSpec{
891+
Partitions: ptr.To(1),
892+
ReplicationFactor: ptr.To(1),
893+
KafkaAPISpec: &redpandav1alpha2.KafkaAPISpec{
894+
// Use a localhost address that will definitely fail to connect
895+
Brokers: []string{"localhost:19092"},
896+
},
897+
},
898+
}
899+
900+
err := c.Create(ctx, &topic)
901+
require.NoError(t, err)
902+
903+
err = c.Delete(ctx, &topic)
904+
require.NoError(t, err)
905+
906+
key := types.NamespacedName{Name: topicName, Namespace: testNamespace}
907+
req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: key}, ClusterName: mcmanager.LocalCluster}
908+
909+
// Reconcile should succeed (ignoring the dial error during deletion)
910+
result, err := tr.Reconcile(ctx, req)
911+
assert.NoError(t, err, "reconciler should not error when broker is unreachable during deletion")
912+
assert.Equal(t, time.Duration(0), result.RequeueAfter)
913+
914+
// Topic should be deleted (finalizer removed)
915+
err = c.Get(ctx, key, &topic)
916+
assert.True(t, apierrors.IsNotFound(err), "topic should be deleted after finalizer removal")
917+
})
826918
}

pkg/secrets/secrets.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
"sigs.k8s.io/controller-runtime/pkg/log"
2222
)
2323

24+
// ErrSecretNotFound is returned when a cloud secret cannot be found.
25+
var ErrSecretNotFound = errors.New("cloud secret not found")
26+
2427
type CloudExpander struct {
2528
client secrets.SecretAPI
2629
logger *slog.Logger
@@ -97,7 +100,7 @@ func NewCloudExpanderFromAPI(api secrets.SecretAPI) *CloudExpander {
97100
func (t *CloudExpander) Expand(ctx context.Context, name string) (string, error) {
98101
value, found := t.client.GetSecretValue(ctx, name)
99102
if !found {
100-
return "", errors.Newf("secret %s not found", name)
103+
return "", errors.Wrapf(ErrSecretNotFound, "secret %s", name)
101104
}
102105
return value, nil
103106
}

0 commit comments

Comments
 (0)