77package io .kroxylicious .kubernetes .operator ;
88
99import java .time .Duration ;
10+ import java .util .List ;
11+ import java .util .Map ;
1012import java .util .Set ;
1113
1214import org .assertj .core .api .AbstractStringAssert ;
4042import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaCluster ;
4143import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaClusterBuilder ;
4244import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilter ;
45+ import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilterBuilder ;
4346
4447import static io .kroxylicious .kubernetes .api .v1alpha1 .kafkaproxyingressspec .ClusterIP .Protocol .TCP ;
4548import static io .kroxylicious .kubernetes .operator .ResourcesUtil .findOnlyResourceNamed ;
@@ -55,6 +58,7 @@ class ProxyReconcilerIT {
5558 private static final String PROXY_A = "proxy-a" ;
5659 private static final String PROXY_B = "proxy-b" ;
5760 private static final String CLUSTER_FOO_REF = "fooref" ;
61+ private static final String FILTER_NAME = "validation" ;
5862 private static final String CLUSTER_FOO = "foo" ;
5963 private static final String CLUSTER_FOO_CLUSTERIP_INGRESS = "foo-cluster-ip" ;
6064 private static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092" ;
@@ -117,12 +121,13 @@ public KafkaProxyIngress ingress(String name) {
117121
118122 CreatedResources doCreate () {
119123 KafkaProxy proxy = extension .create (kafkaProxy (PROXY_A ));
124+ KafkaProtocolFilter filter = extension .create (filter (FILTER_NAME ));
120125 KafkaService barClusterRef = extension .create (clusterRef (CLUSTER_BAR_REF , CLUSTER_BAR_BOOTSTRAP ));
121126 KafkaProxyIngress ingressBar = extension .create (clusterIpIngress (CLUSTER_BAR_CLUSTERIP_INGRESS , proxy ));
122127 Set <KafkaService > clusterRefs = Set .of (barClusterRef );
123- VirtualKafkaCluster clusterBar = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxy , barClusterRef , ingressBar ));
128+ VirtualKafkaCluster clusterBar = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxy , barClusterRef , ingressBar , filter ));
124129 Set <VirtualKafkaCluster > clusters = Set .of (clusterBar );
125- assertProxyConfigContents (proxy , Set .of (CLUSTER_BAR_BOOTSTRAP ), Set .of ());
130+ assertProxyConfigContents (proxy , Set .of (CLUSTER_BAR_BOOTSTRAP , filter . getSpec (). getType () ), Set .of ());
126131 assertDeploymentMountsConfigConfigMap (proxy );
127132 assertDeploymentBecomesReady (proxy );
128133 assertServiceTargetsProxyInstances (proxy , clusterBar , ingressBar );
@@ -272,9 +277,10 @@ void moveVirtualKafkaClusterToAnotherKafkaProxy() {
272277
273278 KafkaService fooClusterRef = extension .create (clusterRef (CLUSTER_FOO_REF , CLUSTER_FOO_BOOTSTRAP ));
274279 KafkaService barClusterRef = extension .create (clusterRef (CLUSTER_BAR_REF , CLUSTER_BAR_BOOTSTRAP ));
280+ KafkaProtocolFilter filter = extension .create (filter (FILTER_NAME ));
275281
276- VirtualKafkaCluster clusterFoo = extension .create (virtualKafkaCluster (CLUSTER_FOO , proxyA , fooClusterRef , ingressFoo ));
277- VirtualKafkaCluster barCluster = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxyB , barClusterRef , ingressBar ));
282+ VirtualKafkaCluster clusterFoo = extension .create (virtualKafkaCluster (CLUSTER_FOO , proxyA , fooClusterRef , ingressFoo , filter ));
283+ VirtualKafkaCluster barCluster = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxyB , barClusterRef , ingressBar , filter ));
278284
279285 assertProxyConfigContents (proxyA , Set .of (CLUSTER_FOO_BOOTSTRAP ), Set .of ());
280286 assertProxyConfigContents (proxyB , Set .of (CLUSTER_BAR_BOOTSTRAP ), Set .of ());
@@ -310,13 +316,13 @@ private AbstractStringAssert<?> assertThatProxyConfigFor(KafkaProxy proxy) {
310316 }
311317
312318 private static VirtualKafkaCluster virtualKafkaCluster (String clusterName , KafkaProxy proxy , KafkaService clusterRef ,
313- KafkaProxyIngress ingress ) {
319+ KafkaProxyIngress ingress , KafkaProtocolFilter filter ) {
314320 return new VirtualKafkaClusterBuilder ().withNewMetadata ().withName (clusterName ).endMetadata ()
315321 .withNewSpec ()
316322 .withTargetKafkaServiceRef (new KafkaServiceRefBuilder ().withName (name (clusterRef )).build ())
317323 .withNewProxyRef ().withName (name (proxy )).endProxyRef ()
318324 .addNewIngressRef ().withName (name (ingress )).endIngressRef ()
319- .withFilterRefs ()
325+ .withFilterRefs (). addNewFilterRef (). withName ( name ( filter )). endFilterRef ()
320326 .endSpec ().build ();
321327 }
322328
@@ -327,6 +333,12 @@ private static KafkaService clusterRef(String clusterRefName, String clusterBoot
327333 .endSpec ().build ();
328334 }
329335
336+ private static KafkaProtocolFilter filter (String name ) {
337+ return new KafkaProtocolFilterBuilder ().withNewMetadata ().withName (name ).endMetadata ()
338+ .withNewSpec ().withType ("RecordValidation" ).withConfigTemplate (Map .of ("rules" , List .of (Map .of ("allowNulls" , false ))))
339+ .endSpec ().build ();
340+ }
341+
330342 KafkaProxy kafkaProxy (String name ) {
331343 // @formatter:off
332344 return new KafkaProxyBuilder ()
0 commit comments