Skip to content

Commit dba6d44

Browse files
authored
Enable using localhost for SR/PP internal clients in v1 (#1233)
Add opt-in configuration to use localhost instead of headless service FQDN for Schema Registry and Pandaproxy internal client broker addresses via `pandaproxy_client.use_localhost` and `schema_registry_client.use_localhost` in additionalConfiguration. Default behavior unchanged (continues using headless service FQDN for backward compatibility).
1 parent f1112cb commit dba6d44

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

operator/pkg/resources/configmap_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,51 @@ func TestConfigMapResource_replicas(t *testing.T) { //nolint:funlen // test tabl
518518
}
519519
}
520520

521+
func TestConfigmap_InternalClientsUseLocalhost(t *testing.T) {
522+
panda := pandaCluster().DeepCopy()
523+
panda.Spec.Configuration.SchemaRegistry = &vectorizedv1alpha1.SchemaRegistryAPI{
524+
Port: 8081,
525+
}
526+
panda.Spec.Configuration.PandaproxyAPI = []vectorizedv1alpha1.PandaproxyAPI{
527+
{Port: 8082},
528+
}
529+
panda.Spec.AdditionalConfiguration = map[string]string{
530+
"pandaproxy_client.use_localhost": "true",
531+
"schema_registry_client.use_localhost": "true",
532+
}
533+
534+
c := fake.NewClientBuilder().Build()
535+
secret := corev1.Secret{
536+
ObjectMeta: metav1.ObjectMeta{
537+
Name: "archival",
538+
Namespace: "default",
539+
},
540+
Data: map[string][]byte{
541+
"archival": []byte("XXX"),
542+
},
543+
}
544+
require.NoError(t, c.Create(t.Context(), &secret))
545+
546+
cfg, err := resources.CreateConfiguration(
547+
t.Context(), c, nil, panda,
548+
"cluster.default.svc.cluster.local",
549+
types.NamespacedName{Namespace: "namespace", Name: "pandaproxy"},
550+
types.NamespacedName{Namespace: "namespace", Name: "schemaregistry"},
551+
types.NamespacedName{Namespace: "namespace", Name: "rpk"},
552+
TestBrokerTLSConfigProvider{},
553+
)
554+
require.NoError(t, err)
555+
556+
redpandaYaml, err := cfg.ReifyNodeConfiguration(t.Context())
557+
require.NoError(t, err)
558+
559+
// Should use localhost when override is set
560+
require.Equal(t, []config.SocketAddress{{Address: "localhost", Port: 123}},
561+
redpandaYaml.PandaproxyClient.Brokers)
562+
require.Equal(t, []config.SocketAddress{{Address: "localhost", Port: 123}},
563+
redpandaYaml.SchemaRegistryClient.Brokers)
564+
}
565+
521566
func TestConfigmap_BrokerTLSClients(t *testing.T) {
522567
panda := pandaCluster().DeepCopy()
523568
panda.Spec.Configuration.KafkaAPI[0].TLS = vectorizedv1alpha1.KafkaAPITLS{

operator/pkg/resources/configuration.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"fmt"
1515
"maps"
1616
"slices"
17+
"strconv"
1718

1819
cmmetav1 "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
1920
"github.com/cockroachdb/errors"
@@ -429,9 +430,18 @@ func preparePandaproxyClient(
429430
// Alternatively, localhost could be used to the same end but using DNS
430431
// might save us in some cases where the local broker is having some
431432
// trouble and it makes the config more generally reusable.
433+
// Can be overridden to localhost via additionalConfiguration:
434+
// pandaproxy_client.use_localhost: "true"
435+
brokerAddress := serviceFQDN
436+
if useLocalhost, exists := pandaCluster.Spec.AdditionalConfiguration["pandaproxy_client.use_localhost"]; exists {
437+
if parsedBool, err := strconv.ParseBool(useLocalhost); err == nil && parsedBool {
438+
brokerAddress = "localhost"
439+
}
440+
}
441+
432442
cfg.Node.PandaproxyClient = &config.KafkaClient{
433443
Brokers: []config.SocketAddress{
434-
{Address: serviceFQDN, Port: pandaCluster.InternalListener().Port},
444+
{Address: brokerAddress, Port: pandaCluster.InternalListener().Port},
435445
},
436446
}
437447

@@ -497,9 +507,18 @@ func prepareSchemaRegistryClient(
497507
// Alternatively, localhost could be used to the same end but using DNS
498508
// might save us in some cases where the local broker is having some
499509
// trouble and it makes the config more generally reusable.
510+
// Can be overridden to localhost via additionalConfiguration:
511+
// schema_registry_client.use_localhost: "true"
512+
brokerAddress := serviceFQDN
513+
if useLocalhost, exists := pandaCluster.Spec.AdditionalConfiguration["schema_registry_client.use_localhost"]; exists {
514+
if parsedBool, err := strconv.ParseBool(useLocalhost); err == nil && parsedBool {
515+
brokerAddress = "localhost"
516+
}
517+
}
518+
500519
cfg.Node.SchemaRegistryClient = &config.KafkaClient{
501520
Brokers: []config.SocketAddress{
502-
{Address: serviceFQDN, Port: pandaCluster.InternalListener().Port},
521+
{Address: brokerAddress, Port: pandaCluster.InternalListener().Port},
503522
},
504523
}
505524

0 commit comments

Comments
 (0)