88
99import java .time .Duration ;
1010import java .util .List ;
11+ import java .util .Map ;
1112import java .util .Set ;
1213
1314import org .assertj .core .api .AbstractStringAssert ;
4041import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaServiceBuilder ;
4142import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaCluster ;
4243import io .kroxylicious .kubernetes .api .v1alpha1 .VirtualKafkaClusterBuilder ;
44+ import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilter ;
45+ import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilterBuilder ;
4346import io .kroxylicious .kubernetes .operator .config .RuntimeDecl ;
4447
4548import static io .kroxylicious .kubernetes .api .v1alpha1 .kafkaproxyingressspec .ClusterIP .Protocol .TCP ;
@@ -56,6 +59,7 @@ class ProxyReconcilerIT {
5659 private static final String PROXY_A = "proxy-a" ;
5760 private static final String PROXY_B = "proxy-b" ;
5861 private static final String CLUSTER_FOO_REF = "fooref" ;
62+ private static final String FILTER_NAME = "validation" ;
5963 private static final String CLUSTER_FOO = "foo" ;
6064 private static final String CLUSTER_FOO_CLUSTERIP_INGRESS = "foo-cluster-ip" ;
6165 private static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092" ;
@@ -88,6 +92,7 @@ void beforeEach() {
8892 .withAdditionalCustomResourceDefinition (VirtualKafkaCluster .class )
8993 .withAdditionalCustomResourceDefinition (KafkaService .class )
9094 .withAdditionalCustomResourceDefinition (KafkaProxyIngress .class )
95+ .withAdditionalCustomResourceDefinition (KafkaProtocolFilter .class )
9196 .waitForNamespaceDeletion (true )
9297 .withConfigurationService (x -> x .withCloseClientOnStop (false ))
9398 .build ();
@@ -119,12 +124,13 @@ public KafkaProxyIngress ingress(String name) {
119124
120125 CreatedResources doCreate () {
121126 KafkaProxy proxy = extension .create (kafkaProxy (PROXY_A ));
127+ KafkaProtocolFilter filter = extension .create (filter (FILTER_NAME ));
122128 KafkaService barClusterRef = extension .create (clusterRef (CLUSTER_BAR_REF , CLUSTER_BAR_BOOTSTRAP ));
123129 KafkaProxyIngress ingressBar = extension .create (clusterIpIngress (CLUSTER_BAR_CLUSTERIP_INGRESS , proxy ));
124130 Set <KafkaService > clusterRefs = Set .of (barClusterRef );
125- VirtualKafkaCluster clusterBar = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxy , barClusterRef , ingressBar ));
131+ VirtualKafkaCluster clusterBar = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxy , barClusterRef , ingressBar , filter ));
126132 Set <VirtualKafkaCluster > clusters = Set .of (clusterBar );
127- assertProxyConfigContents (proxy , Set .of (CLUSTER_BAR_BOOTSTRAP ), Set .of ());
133+ assertProxyConfigContents (proxy , Set .of (CLUSTER_BAR_BOOTSTRAP , filter . getSpec (). getType () ), Set .of ());
128134 assertDeploymentMountsConfigConfigMap (proxy );
129135 assertDeploymentBecomesReady (proxy );
130136 assertServiceTargetsProxyInstances (proxy , clusterBar , ingressBar );
@@ -274,9 +280,10 @@ void moveVirtualKafkaClusterToAnotherKafkaProxy() {
274280
275281 KafkaService fooClusterRef = extension .create (clusterRef (CLUSTER_FOO_REF , CLUSTER_FOO_BOOTSTRAP ));
276282 KafkaService barClusterRef = extension .create (clusterRef (CLUSTER_BAR_REF , CLUSTER_BAR_BOOTSTRAP ));
283+ KafkaProtocolFilter filter = extension .create (filter (FILTER_NAME ));
277284
278- VirtualKafkaCluster clusterFoo = extension .create (virtualKafkaCluster (CLUSTER_FOO , proxyA , fooClusterRef , ingressFoo ));
279- VirtualKafkaCluster barCluster = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxyB , barClusterRef , ingressBar ));
285+ VirtualKafkaCluster clusterFoo = extension .create (virtualKafkaCluster (CLUSTER_FOO , proxyA , fooClusterRef , ingressFoo , filter ));
286+ VirtualKafkaCluster barCluster = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxyB , barClusterRef , ingressBar , filter ));
280287
281288 assertProxyConfigContents (proxyA , Set .of (CLUSTER_FOO_BOOTSTRAP ), Set .of ());
282289 assertProxyConfigContents (proxyB , Set .of (CLUSTER_BAR_BOOTSTRAP ), Set .of ());
@@ -312,13 +319,13 @@ private AbstractStringAssert<?> assertThatProxyConfigFor(KafkaProxy proxy) {
312319 }
313320
314321 private static VirtualKafkaCluster virtualKafkaCluster (String clusterName , KafkaProxy proxy , KafkaService clusterRef ,
315- KafkaProxyIngress ingress ) {
322+ KafkaProxyIngress ingress , KafkaProtocolFilter filter ) {
316323 return new VirtualKafkaClusterBuilder ().withNewMetadata ().withName (clusterName ).endMetadata ()
317324 .withNewSpec ()
318325 .withTargetKafkaServiceRef (new KafkaServiceRefBuilder ().withName (name (clusterRef )).build ())
319326 .withNewProxyRef ().withName (name (proxy )).endProxyRef ()
320327 .addNewIngressRef ().withName (name (ingress )).endIngressRef ()
321- .withFilterRefs ()
328+ .withFilterRefs (). addNewFilterRef (). withName ( name ( filter )). endFilterRef ()
322329 .endSpec ().build ();
323330 }
324331
@@ -329,6 +336,12 @@ private static KafkaService clusterRef(String clusterRefName, String clusterBoot
329336 .endSpec ().build ();
330337 }
331338
339+ private static KafkaProtocolFilter filter (String name ) {
340+ return new KafkaProtocolFilterBuilder ().withNewMetadata ().withName (name ).endMetadata ()
341+ .withNewSpec ().withType ("RecordValidation" ).withConfigTemplate (Map .of ("rules" , List .of (Map .of ("allowNulls" , false ))))
342+ .endSpec ().build ();
343+ }
344+
332345 KafkaProxy kafkaProxy (String name ) {
333346 // @formatter:off
334347 return new KafkaProxyBuilder ()
0 commit comments