Skip to content

Commit 1888e45

Browse files
authored
Add client factory support for v1 CRD for Kafka and SchemaRegistry API Clients (#1074)
* Add client factory support for v1 CRD for Kafka and SchemaRegistry API clients * DRY up pod-based URL construction and auth for listeners
1 parent 6c63e57 commit 1888e45

File tree

9 files changed

+410
-9
lines changed

9 files changed

+410
-9
lines changed

operator/api/vectorized/v1alpha1/cluster_types.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,20 @@ func (r *Cluster) SchemaRegistryAPITLS() *SchemaRegistryAPI {
13301330
return nil
13311331
}
13321332

1333+
// SchemaRegistryInternalListener returns internal listener.
1334+
func (r *Cluster) SchemaRegistryInternalListener() *SchemaRegistryAPI {
1335+
if r == nil {
1336+
return nil
1337+
}
1338+
for i := range r.Spec.Configuration.SchemaRegistryAPI {
1339+
el := &r.Spec.Configuration.SchemaRegistryAPI[i]
1340+
if el.External == nil || !el.External.Enabled {
1341+
return el
1342+
}
1343+
}
1344+
return nil
1345+
}
1346+
13331347
// SchemaRegistryListeners returns all schema registry listeners
13341348
func (r *Cluster) SchemaRegistryListeners() []SchemaRegistryAPI {
13351349
if r == nil || r.Spec.Configuration.SchemaRegistry == nil {

operator/pkg/client/cluster.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,29 @@ func (c *Factory) schemaRegistryForCluster(cluster *redpandav1alpha2.Redpanda) (
9696
return client, nil
9797
}
9898

99+
func (c *Factory) schemaRegistryForV1Cluster(cluster *vectorizedv1alpha1.Cluster) (*sr.Client, error) {
100+
ctx := context.Background()
101+
102+
fqdn, certs, err := v1ClusterCerts(ctx, c.Client, cluster)
103+
if err != nil {
104+
return nil, err
105+
}
106+
107+
client, err := newNodePoolInternalSchemaRegistryAPI(ctx, c.Client, cluster, fqdn, certs, c.dialer, nil)
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
if c.userAuth != nil {
113+
client, err = sr.NewClient(append(client.Opts(), sr.BasicAuth(c.userAuth.Username, c.userAuth.Password))...)
114+
if err != nil {
115+
return nil, err
116+
}
117+
}
118+
119+
return client, nil
120+
}
121+
99122
// kafkaForCluster returns a simple kgo.Client able to communicate with the given cluster specified via a Redpanda cluster.
100123
func (c *Factory) kafkaForCluster(cluster *redpandav1alpha2.Redpanda, opts ...kgo.Opt) (*kgo.Client, error) {
101124
dot, err := cluster.GetDot(c.config)
@@ -129,3 +152,34 @@ func (c *Factory) kafkaForCluster(cluster *redpandav1alpha2.Redpanda, opts ...kg
129152

130153
return client, nil
131154
}
155+
156+
func (c *Factory) kafkaForV1Cluster(cluster *vectorizedv1alpha1.Cluster, opts ...kgo.Opt) (*kgo.Client, error) {
157+
ctx := context.Background()
158+
159+
fqdn, certs, err := v1ClusterCerts(ctx, c.Client, cluster)
160+
if err != nil {
161+
return nil, err
162+
}
163+
164+
client, err := newNodePoolInternalKafkaAPI(ctx, c.Client, cluster, fqdn, certs, c.dialer, opts)
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
authOpt, err := c.kafkaUserAuth()
170+
if err != nil {
171+
// close the client since it's no longer usable
172+
client.Close()
173+
174+
return nil, err
175+
}
176+
177+
if authOpt != nil {
178+
// close this client since we're not going to use it anymore
179+
client.Close()
180+
181+
return kgo.NewClient(append(client.Opts(), authOpt)...)
182+
}
183+
184+
return client, nil
185+
}

operator/pkg/client/factory.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ func (c *Factory) KafkaClient(ctx context.Context, obj any, opts ...kgo.Opt) (*k
170170
return c.kafkaForCluster(cluster, opts...)
171171
}
172172

173+
if cluster, ok := obj.(*vectorizedv1alpha1.Cluster); ok {
174+
return c.kafkaForV1Cluster(cluster)
175+
}
176+
173177
if profile, ok := obj.(*rpkconfig.RpkProfile); ok {
174178
return c.kafkaForRPKProfile(profile, opts...)
175179
}
@@ -236,6 +240,10 @@ func (c *Factory) SchemaRegistryClient(ctx context.Context, obj any) (*sr.Client
236240
return c.schemaRegistryForCluster(cluster)
237241
}
238242

243+
if cluster, ok := obj.(*vectorizedv1alpha1.Cluster); ok {
244+
return c.schemaRegistryForV1Cluster(cluster)
245+
}
246+
239247
if profile, ok := obj.(*rpkconfig.RpkProfile); ok {
240248
return c.schemaRegistryForRPKProfile(profile)
241249
}

operator/pkg/client/factory_test.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@ func TestIntegrationFactoryOperatorV1(t *testing.T) {
174174
Port: 9644,
175175
},
176176
},
177+
SchemaRegistryAPI: []vectorizedv1alpha1.SchemaRegistryAPI{
178+
{
179+
Port: 8081,
180+
},
181+
},
177182
DeveloperMode: true,
178183
},
179184
},
@@ -192,13 +197,27 @@ func TestIntegrationFactoryOperatorV1(t *testing.T) {
192197
return cluster.Status.GetConditionStatus(vectorizedv1alpha1.OperatorQuiescentConditionType) == corev1.ConditionTrue
193198
}, time.Minute*5, time.Second*5, "didn't work")
194199

200+
// check admin api
195201
adminClient, err := clientFactory.RedpandaAdminClient(testutil.Context(t), &cr)
196202
require.NoError(t, err)
197-
198203
brokers, err := adminClient.Brokers(context.Background())
199204
require.NoError(t, err)
200205
require.Len(t, brokers, 1)
201206
require.Equal(t, rpadmin.MembershipStatusActive, brokers[0].MembershipStatus)
207+
208+
// check kafka api
209+
kafkaClient, err := clientFactory.KafkaClient(testutil.Context(t), &cr)
210+
require.NoError(t, err)
211+
defer kafkaClient.Close()
212+
metadata, err := kadm.NewClient(kafkaClient).BrokerMetadata(context.Background())
213+
require.NoError(t, err)
214+
require.Len(t, metadata.Brokers.NodeIDs(), 1)
215+
216+
// check schema registry api
217+
srClient, err := clientFactory.SchemaRegistryClient(testutil.Context(t), &cr)
218+
require.NoError(t, err)
219+
_, err = srClient.SupportedTypes(context.Background())
220+
require.NoError(t, err)
202221
}
203222

204223
func TestIntegrationClientFactory(t *testing.T) {

0 commit comments

Comments
 (0)