88
99import java .time .Duration ;
1010import java .util .List ;
11+ import java .util .Map ;
1112import java .util .Set ;
1213
14+ import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilter ;
15+
16+ import io .kroxylicious .kubernetes .filter .api .v1alpha1 .KafkaProtocolFilterBuilder ;
17+
1318import org .assertj .core .api .AbstractStringAssert ;
1419import org .assertj .core .api .InstanceOfAssertFactories ;
1520import org .awaitility .core .ConditionFactory ;
@@ -56,6 +61,7 @@ class ProxyReconcilerIT {
5661 private static final String PROXY_A = "proxy-a" ;
5762 private static final String PROXY_B = "proxy-b" ;
5863 private static final String CLUSTER_FOO_REF = "fooref" ;
64+ private static final String FILTER_NAME = "validation" ;
5965 private static final String CLUSTER_FOO = "foo" ;
6066 private static final String CLUSTER_FOO_CLUSTERIP_INGRESS = "foo-cluster-ip" ;
6167 private static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092" ;
@@ -88,6 +94,7 @@ void beforeEach() {
8894 .withAdditionalCustomResourceDefinition (VirtualKafkaCluster .class )
8995 .withAdditionalCustomResourceDefinition (KafkaService .class )
9096 .withAdditionalCustomResourceDefinition (KafkaProxyIngress .class )
97+ .withAdditionalCustomResourceDefinition (KafkaProtocolFilter .class )
9198 .waitForNamespaceDeletion (true )
9299 .withConfigurationService (x -> x .withCloseClientOnStop (false ))
93100 .build ();
@@ -119,12 +126,13 @@ public KafkaProxyIngress ingress(String name) {
119126
120127 CreatedResources doCreate () {
121128 KafkaProxy proxy = extension .create (kafkaProxy (PROXY_A ));
129+ KafkaProtocolFilter filter = extension .create (filter (FILTER_NAME ));
122130 KafkaService barClusterRef = extension .create (clusterRef (CLUSTER_BAR_REF , CLUSTER_BAR_BOOTSTRAP ));
123131 KafkaProxyIngress ingressBar = extension .create (clusterIpIngress (CLUSTER_BAR_CLUSTERIP_INGRESS , proxy ));
124132 Set <KafkaService > clusterRefs = Set .of (barClusterRef );
125- VirtualKafkaCluster clusterBar = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxy , barClusterRef , ingressBar ));
133+ VirtualKafkaCluster clusterBar = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxy , barClusterRef , ingressBar , filter ));
126134 Set <VirtualKafkaCluster > clusters = Set .of (clusterBar );
127- assertProxyConfigContents (proxy , Set .of (CLUSTER_BAR_BOOTSTRAP ), Set .of ());
135+ assertProxyConfigContents (proxy , Set .of (CLUSTER_BAR_BOOTSTRAP , filter . getSpec (). getType () ), Set .of ());
128136 assertDeploymentMountsConfigConfigMap (proxy );
129137 assertDeploymentBecomesReady (proxy );
130138 assertServiceTargetsProxyInstances (proxy , clusterBar , ingressBar );
@@ -274,9 +282,10 @@ void moveVirtualKafkaClusterToAnotherKafkaProxy() {
274282
275283 KafkaService fooClusterRef = extension .create (clusterRef (CLUSTER_FOO_REF , CLUSTER_FOO_BOOTSTRAP ));
276284 KafkaService barClusterRef = extension .create (clusterRef (CLUSTER_BAR_REF , CLUSTER_BAR_BOOTSTRAP ));
285+ KafkaProtocolFilter filter = extension .create (filter (FILTER_NAME ));
277286
278- VirtualKafkaCluster clusterFoo = extension .create (virtualKafkaCluster (CLUSTER_FOO , proxyA , fooClusterRef , ingressFoo ));
279- VirtualKafkaCluster barCluster = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxyB , barClusterRef , ingressBar ));
287+ VirtualKafkaCluster clusterFoo = extension .create (virtualKafkaCluster (CLUSTER_FOO , proxyA , fooClusterRef , ingressFoo , filter ));
288+ VirtualKafkaCluster barCluster = extension .create (virtualKafkaCluster (CLUSTER_BAR , proxyB , barClusterRef , ingressBar , filter ));
280289
281290 assertProxyConfigContents (proxyA , Set .of (CLUSTER_FOO_BOOTSTRAP ), Set .of ());
282291 assertProxyConfigContents (proxyB , Set .of (CLUSTER_BAR_BOOTSTRAP ), Set .of ());
@@ -312,13 +321,13 @@ private AbstractStringAssert<?> assertThatProxyConfigFor(KafkaProxy proxy) {
312321 }
313322
314323 private static VirtualKafkaCluster virtualKafkaCluster (String clusterName , KafkaProxy proxy , KafkaService clusterRef ,
315- KafkaProxyIngress ingress ) {
324+ KafkaProxyIngress ingress , KafkaProtocolFilter filter ) {
316325 return new VirtualKafkaClusterBuilder ().withNewMetadata ().withName (clusterName ).endMetadata ()
317326 .withNewSpec ()
318327 .withTargetKafkaServiceRef (new KafkaServiceRefBuilder ().withName (name (clusterRef )).build ())
319328 .withNewProxyRef ().withName (name (proxy )).endProxyRef ()
320329 .addNewIngressRef ().withName (name (ingress )).endIngressRef ()
321- .withFilterRefs ()
330+ .withFilterRefs (). addNewFilterRef (). withName ( name ( filter )). endFilterRef ()
322331 .endSpec ().build ();
323332 }
324333
@@ -329,6 +338,12 @@ private static KafkaService clusterRef(String clusterRefName, String clusterBoot
329338 .endSpec ().build ();
330339 }
331340
341+ private static KafkaProtocolFilter filter (String name ) {
342+ return new KafkaProtocolFilterBuilder ().withNewMetadata ().withName (name ).endMetadata ()
343+ .withNewSpec ().withType ("RecordValidation" ).withConfigTemplate (Map .of ("rules" , List .of (Map .of ("allowNulls" , false ))))
344+ .endSpec ().build ();
345+ }
346+
332347 KafkaProxy kafkaProxy (String name ) {
333348 // @formatter:off
334349 return new KafkaProxyBuilder ()
0 commit comments