Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changes/unreleased/operator-Fixed-20260122-100940.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
project: operator
kind: Fixed
body: Allow Topic finalizer removal when broker is unreachable or credentials are missing during deletion, preventing stuck namespaces.
time: 2026-01-22T10:00:00.000000+01:00
25 changes: 24 additions & 1 deletion operator/internal/controller/redpanda/resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ package redpanda
import (
"context"
"fmt"
"net"
"time"

"github.com/cockroachdb/errors"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -126,7 +128,9 @@ func ignoreAllConnectionErrors(logger logr.Logger, err error) error {
// able to clean ourselves up anyway.
if internalclient.IsTerminalClientError(err) ||
internalclient.IsConfigurationError(err) ||
internalclient.IsInvalidClusterError(err) {
internalclient.IsInvalidClusterError(err) ||
isNotFoundInChain(err) ||
isNetworkDialError(err) {
// We use Info rather than Error here because we don't want
// to ignore the verbosity settings. This is really only for
// debugging purposes.
Expand All @@ -136,6 +140,25 @@ func ignoreAllConnectionErrors(logger logr.Logger, err error) error {
return err
}

// isNotFoundInChain walks the error chain to check if a K8s "not found" error
// is wrapped anywhere. This handles missing secrets/configmaps.
func isNotFoundInChain(err error) bool {
for err != nil {
if apierrors.IsNotFound(err) {
return true
}
err = errors.Unwrap(err)
}
return false
}

// isNetworkDialError checks if the error chain contains a network dial error.
// This handles cases where the broker is unreachable (connection refused, timeout, etc).
func isNetworkDialError(err error) bool {
var netErr *net.OpError
return errors.As(err, &netErr)
}

func handleResourceSyncErrors(err error) (metav1.Condition, error) {
// If we have a known terminal error, just set the sync condition and don't re-run reconciliation.
if internalclient.IsInvalidClusterError(err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ package redpanda
import (
"context"
"fmt"
"net"
"os"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/modules/redpanda"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -365,3 +367,69 @@ func TestResourceController(t *testing.T) { // nolint:funlen // These tests have
require.Equal(t, int32(size/2), reconciler.deletes.Load())
require.Equal(t, int32(size), reconciler.syncs.Load())
}

func TestIsNetworkDialError(t *testing.T) {
tests := []struct {
name string
err error
expected bool
}{
{
name: "nil error",
err: nil,
expected: false,
},
{
name: "plain error",
err: errors.New("some error"),
expected: false,
},
{
name: "net.OpError directly",
err: &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")},
expected: true,
},
{
name: "net.OpError wrapped once (franz-go style)",
err: fmt.Errorf("unable to dial: %w",
&net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")}),
expected: true,
},
{
name: "net.OpError wrapped twice (recordErrorEvent + franz-go)",
err: fmt.Errorf("deleting topic (test) library error: %w",
fmt.Errorf("unable to dial: %w",
&net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")})),
expected: true,
},
{
name: "realistic connection refused error",
err: fmt.Errorf("deleting topic (test-topic) library error: %w",
fmt.Errorf("unable to dial: %w",
&net.OpError{
Op: "dial",
Net: "tcp",
Addr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 9092},
Err: &net.OpError{Op: "connect", Err: errors.New("connection refused")},
})),
expected: true,
},
{
name: "DNS resolution error",
err: fmt.Errorf("unable to dial: %w",
&net.OpError{
Op: "dial",
Net: "tcp",
Err: &net.DNSError{Err: "no such host", Name: "nonexistent.local"},
}),
expected: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isNetworkDialError(tt.err)
assert.Equal(t, tt.expected, result, "error: %v", tt.err)
})
}
}
12 changes: 12 additions & 0 deletions operator/internal/controller/redpanda/topic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func (r *TopicReconciler) reconcile(ctx context.Context, topic *redpandav1alpha2

kafkaClient, err := r.createKafkaClient(ctx, topic, l)
if err != nil {
// If topic is being deleted, allow finalizer removal when we can't
// establish a connection. This prevents namespaces from getting stuck
// in Terminating state when secrets are deleted before the Topic.
if !topic.ObjectMeta.DeletionTimestamp.IsZero() && ignoreAllConnectionErrors(l, err) == nil {
return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil
}
return redpandav1alpha2.TopicFailed(topic), ctrl.Result{}, err
}
defer kafkaClient.Close()
Expand All @@ -157,6 +163,12 @@ func (r *TopicReconciler) reconcile(ctx context.Context, topic *redpandav1alpha2
l.V(log.DebugLevel).Info("delete topic", "topic-name", topic.GetTopicName())
err = r.deleteTopic(ctx, topic, kafkaClient)
if err != nil {
// Allow finalizer removal if we can't connect to the broker.
// This prevents topics from getting stuck in Terminating state
// when the broker is unreachable.
if ignoreAllConnectionErrors(l, err) == nil {
return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil
}
return redpandav1alpha2.TopicFailed(topic), ctrl.Result{}, fmt.Errorf("unable to delete topic: %w", err)
}
return redpandav1alpha2.TopicReady(topic), ctrl.Result{}, nil
Expand Down
87 changes: 87 additions & 0 deletions operator/internal/controller/redpanda/topic_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,4 +873,91 @@ func TestReconcile(t *testing.T) { // nolint:funlen // These tests have clear su
assert.False(t, result.Requeue)
assert.Equal(t, time.Duration(0), result.RequeueAfter)
})
t.Run("delete_topic_with_missing_credentials_succeeds", func(t *testing.T) {
// Deletion should succeed when credentials Secret is missing.
// This prevents namespaces from getting stuck in Terminating state
// when secrets are deleted before topics.
topicName := "delete-topic-missing-secret"

topic := redpandav1alpha2.Topic{
ObjectMeta: metav1.ObjectMeta{
Name: topicName,
Namespace: testNamespace,
Finalizers: []string{FinalizerKey},
},
Spec: redpandav1alpha2.TopicSpec{
Partitions: ptr.To(1),
ReplicationFactor: ptr.To(1),
KafkaAPISpec: &redpandav1alpha2.KafkaAPISpec{
Brokers: []string{seedBroker},
SASL: &redpandav1alpha2.KafkaSASL{
Username: "testuser",
Password: redpandav1alpha2.SecretKeyRef{
Name: "non-existent-secret",
Key: "password",
},
Mechanism: redpandav1alpha2.SASLMechanismScramSHA256,
},
},
},
}

err := c.Create(ctx, &topic)
require.NoError(t, err)

err = c.Delete(ctx, &topic)
require.NoError(t, err)

key := types.NamespacedName{Name: topicName, Namespace: testNamespace}
req := ctrl.Request{NamespacedName: key}

// Reconcile should succeed (ignoring the not-found error during deletion)
result, err := tr.Reconcile(ctx, req)
assert.NoError(t, err, "reconciler should not error when secret is missing during deletion")
assert.Equal(t, time.Duration(0), result.RequeueAfter)

// Topic should be deleted (finalizer removed)
err = c.Get(ctx, key, &topic)
assert.True(t, apierrors.IsNotFound(err), "topic should be deleted after finalizer removal")
})
t.Run("delete_topic_with_unreachable_broker_succeeds", func(t *testing.T) {
// Deletion should succeed when broker is unreachable (connection refused).
// This prevents topics from getting stuck in Terminating state when the
// Kafka cluster is gone or unreachable.
topicName := "delete-topic-unreachable-broker"

topic := redpandav1alpha2.Topic{
ObjectMeta: metav1.ObjectMeta{
Name: topicName,
Namespace: testNamespace,
Finalizers: []string{FinalizerKey},
},
Spec: redpandav1alpha2.TopicSpec{
Partitions: ptr.To(1),
ReplicationFactor: ptr.To(1),
KafkaAPISpec: &redpandav1alpha2.KafkaAPISpec{
// Use a localhost address that will definitely fail to connect
Brokers: []string{"localhost:19092"},
},
},
}

err := c.Create(ctx, &topic)
require.NoError(t, err)

err = c.Delete(ctx, &topic)
require.NoError(t, err)

key := types.NamespacedName{Name: topicName, Namespace: testNamespace}
req := ctrl.Request{NamespacedName: key}

// Reconcile should succeed (ignoring the dial error during deletion)
result, err := tr.Reconcile(ctx, req)
assert.NoError(t, err, "reconciler should not error when broker is unreachable during deletion")
assert.Equal(t, time.Duration(0), result.RequeueAfter)

// Topic should be deleted (finalizer removed)
err = c.Get(ctx, key, &topic)
assert.True(t, apierrors.IsNotFound(err), "topic should be deleted after finalizer removal")
})
}
5 changes: 4 additions & 1 deletion operator/pkg/secrets/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

// ErrSecretNotFound is returned when a cloud secret cannot be found.
var ErrSecretNotFound = errors.New("cloud secret not found")

type CloudExpander struct {
client secrets.SecretAPI
logger *slog.Logger
Expand Down Expand Up @@ -81,7 +84,7 @@ func NewCloudExpanderFromAPI(api secrets.SecretAPI) *CloudExpander {
func (t *CloudExpander) Expand(ctx context.Context, name string) (string, error) {
value, found := t.client.GetSecretValue(ctx, name)
if !found {
return "", errors.Newf("secret %s not found", name)
return "", errors.Wrapf(ErrSecretNotFound, "secret %s", name)
}
return value, nil
}
Expand Down