66
77package io .kroxylicious .kubernetes .operator ;
88
9+ import java .time .Duration ;
910import java .util .Base64 ;
1011import java .util .List ;
1112import java .util .Map ;
1617import org .assertj .core .api .AbstractStringAssert ;
1718import org .assertj .core .api .Assumptions ;
1819import org .assertj .core .api .InstanceOfAssertFactories ;
20+ import org .awaitility .core .ConditionFactory ;
1921import org .junit .jupiter .api .AfterEach ;
2022import org .junit .jupiter .api .BeforeAll ;
2123import org .junit .jupiter .api .Test ;
2931import io .fabric8 .kubernetes .api .model .Volume ;
3032import io .fabric8 .kubernetes .api .model .apps .Deployment ;
3133import io .fabric8 .kubernetes .client .KubernetesClient ;
34+ import io .fabric8 .kubernetes .client .readiness .Readiness ;
3235import io .javaoperatorsdk .operator .junit .LocallyRunOperatorExtension ;
3336
3437import io .kroxylicious .kubernetes .api .v1alpha1 .KafkaClusterRef ;
@@ -49,20 +52,21 @@ class ProxyReconcilerIT {
4952
5053 private static final Logger LOGGER = LoggerFactory .getLogger (ProxyReconcilerIT .class );
5154
52- public static final String PROXY_A = "proxy-a" ;
53- public static final String PROXY_B = "proxy-b" ;
54- public static final String CLUSTER_FOO_REF = "fooref" ;
55- public static final String CLUSTER_FOO = "foo" ;
56- public static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092" ;
57- public static final String CLUSTER_BAR_REF = "barref" ;
58- public static final String CLUSTER_BAR = "bar" ;
59- public static final String CLUSTER_BAR_BOOTSTRAP = "my-cluster-kafka-bootstrap.bar.svc.cluster.local:9092" ;
60- public static final String NEW_BOOTSTRAP = "new-bootstrap:9092" ;
61- public static final String CLUSTER_BAZ = "baz" ;
62- public static final String CLUSTER_BAZ_REF = "bazref" ;
63- public static final String CLUSTER_BAZ_BOOTSTRAP = "my-cluster-kafka-bootstrap.baz.svc.cluster.local:9092" ;
64-
65- static KubernetesClient client ;
55+ private static final String PROXY_A = "proxy-a" ;
56+ private static final String PROXY_B = "proxy-b" ;
57+ private static final String CLUSTER_FOO_REF = "fooref" ;
58+ private static final String CLUSTER_FOO = "foo" ;
59+ private static final String CLUSTER_FOO_BOOTSTRAP = "my-cluster-kafka-bootstrap.foo.svc.cluster.local:9092" ;
60+ private static final String CLUSTER_BAR_REF = "barref" ;
61+ private static final String CLUSTER_BAR = "bar" ;
62+ private static final String CLUSTER_BAR_BOOTSTRAP = "my-cluster-kafka-bootstrap.bar.svc.cluster.local:9092" ;
63+ private static final String NEW_BOOTSTRAP = "new-bootstrap:9092" ;
64+ private static final String CLUSTER_BAZ = "baz" ;
65+ private static final String CLUSTER_BAZ_REF = "bazref" ;
66+ private static final String CLUSTER_BAZ_BOOTSTRAP = "my-cluster-kafka-bootstrap.baz.svc.cluster.local:9092" ;
67+
68+ private static KubernetesClient client ;
69+ private final ConditionFactory AWAIT = await ().timeout (Duration .ofSeconds (20 ));
6670
6771 @ BeforeAll
6872 static void checkKubeAvailable () {
@@ -134,19 +138,16 @@ CreatedResources doCreate() {
134138
135139 private void assertDeploymentBecomesReady (KafkaProxy proxy ) {
136140 // wait longer for initial operator image download
137- await () .alias ("Deployment as expected" ).untilAsserted (() -> {
141+ AWAIT .alias ("Deployment as expected" ).untilAsserted (() -> {
138142 var deployment = extension .get (Deployment .class , ProxyDeployment .deploymentName (proxy ));
139- assertThat (deployment ).isNotNull ()
140- .extracting (Deployment ::getStatus )
143+ assertThat (deployment )
141144 .describedAs ("All deployment replicas should become ready" )
142- .satisfies (status -> {
143- assertThat (status .getReplicas ()).isEqualTo (status .getReadyReplicas ());
144- });
145+ .returns (true , Readiness ::isDeploymentReady );
145146 });
146147 }
147148
148149 private void assertServiceTargetsProxyInstances (KafkaProxy proxy , Set <VirtualKafkaCluster > clusters ) {
149- await () .alias ("cluster Services as expected" ).untilAsserted (() -> {
150+ AWAIT .alias ("cluster Services as expected" ).untilAsserted (() -> {
150151 for (var cluster : clusters ) {
151152 var service = extension .get (Service .class , ClusterService .serviceName (cluster ));
152153 assertThat (service ).isNotNull ()
@@ -159,7 +160,7 @@ private void assertServiceTargetsProxyInstances(KafkaProxy proxy, Set<VirtualKaf
159160 }
160161
161162 private void assertDeploymentMountsConfigSecret (KafkaProxy proxy ) {
162- await () .alias ("Deployment as expected" ).untilAsserted (() -> {
163+ AWAIT .alias ("Deployment as expected" ).untilAsserted (() -> {
163164 var deployment = extension .get (Deployment .class , ProxyDeployment .deploymentName (proxy ));
164165 assertThat (deployment ).isNotNull ()
165166 .extracting (dep -> dep .getSpec ().getTemplate ().getSpec ().getVolumes (), InstanceOfAssertFactories .list (Volume .class ))
@@ -170,7 +171,7 @@ private void assertDeploymentMountsConfigSecret(KafkaProxy proxy) {
170171 }
171172
172173 private void assertProxyConfigContents (KafkaProxy cr , Set <String > contains , Set <String > notContains ) {
173- await () .alias ("Secret as expected" ).untilAsserted (() -> {
174+ AWAIT .alias ("Secret as expected" ).untilAsserted (() -> {
174175 AbstractStringAssert <?> proxyConfig = assertThatProxyConfigFor (cr );
175176 if (!contains .isEmpty ()) {
176177 proxyConfig .contains (contains );
@@ -187,15 +188,15 @@ void testDelete() {
187188 KafkaProxy proxy = createdResources .proxy ;
188189 extension .delete (proxy );
189190
190- await () .alias ("Secret was deleted" ).untilAsserted (() -> {
191+ AWAIT .alias ("Secret was deleted" ).untilAsserted (() -> {
191192 var secret = extension .get (Secret .class , ProxyConfigSecret .secretName (proxy ));
192193 assertThat (secret ).isNull ();
193194 });
194- await () .alias ("Deployment was deleted" ).untilAsserted (() -> {
195+ AWAIT .alias ("Deployment was deleted" ).untilAsserted (() -> {
195196 var deployment = extension .get (Deployment .class , ProxyDeployment .deploymentName (proxy ));
196197 assertThat (deployment ).isNull ();
197198 });
198- await () .alias ("Services were deleted" ).untilAsserted (() -> {
199+ AWAIT .alias ("Services were deleted" ).untilAsserted (() -> {
199200 for (var cluster : createdResources .clusters ) {
200201 var service = extension .get (Service .class , ClusterService .serviceName (cluster ));
201202 assertThat (service ).isNull ();
@@ -212,7 +213,7 @@ void testUpdateVirtualClusterTargetBootstrap() {
212213 extension .replace (clusterRef );
213214
214215 assertDeploymentBecomesReady (proxy );
215- await () .untilAsserted (() -> {
216+ AWAIT .untilAsserted (() -> {
216217 var secret = extension .get (Secret .class , ProxyConfigSecret .secretName (proxy ));
217218 assertThat (secret )
218219 .isNotNull ()
@@ -223,7 +224,7 @@ void testUpdateVirtualClusterTargetBootstrap() {
223224 .contains (NEW_BOOTSTRAP );
224225 });
225226
226- await () .untilAsserted (() -> {
227+ AWAIT .untilAsserted (() -> {
227228 assertClusterServiceExists (proxy , CLUSTER_FOO );
228229 assertClusterServiceExists (proxy , CLUSTER_BAR );
229230 });
@@ -248,13 +249,13 @@ void testUpdateVirtualClusterClusterRef() {
248249
249250 // then
250251 assertDeploymentBecomesReady (proxy );
251- await () .untilAsserted (() -> {
252+ AWAIT .untilAsserted (() -> {
252253 assertThatProxyConfigFor (proxy )
253254 .doesNotContain (CLUSTER_FOO_BOOTSTRAP )
254255 .contains (NEW_BOOTSTRAP );
255256 });
256257
257- await () .untilAsserted (() -> {
258+ AWAIT .untilAsserted (() -> {
258259 assertClusterServiceExists (proxy , CLUSTER_FOO );
259260 assertClusterServiceExists (proxy , CLUSTER_BAR );
260261 });
@@ -268,7 +269,7 @@ void testAddVirtualCluster() {
268269 KafkaProxy proxy = createdResources .proxy ;
269270 KafkaClusterRef bazClusterRef = extension .create (clusterRef (CLUSTER_BAZ_REF , CLUSTER_BAZ_BOOTSTRAP ));
270271 extension .create (virtualKafkaCluster (CLUSTER_BAZ , proxy , bazClusterRef ));
271- await () .untilAsserted (() -> {
272+ AWAIT .untilAsserted (() -> {
272273 var secret = extension .get (Secret .class , ProxyConfigSecret .secretName (proxy ));
273274 assertThat (secret )
274275 .isNotNull ()
@@ -279,7 +280,7 @@ void testAddVirtualCluster() {
279280 });
280281 assertDeploymentBecomesReady (proxy );
281282
282- await () .untilAsserted (() -> {
283+ AWAIT .untilAsserted (() -> {
283284 assertClusterServiceExists (proxy , CLUSTER_FOO );
284285 assertClusterServiceExists (proxy , CLUSTER_BAR );
285286 assertClusterServiceExists (proxy , CLUSTER_BAZ );
@@ -293,7 +294,7 @@ void testDeleteVirtualCluster() {
293294 final var createdResources = doCreate ();
294295 KafkaProxy proxy = createdResources .proxy ;
295296 extension .delete (createdResources .cluster (CLUSTER_FOO ));
296- await () .untilAsserted (() -> {
297+ AWAIT .untilAsserted (() -> {
297298 var secret = extension .get (Secret .class , ProxyConfigSecret .secretName (proxy ));
298299 assertThat (secret )
299300 .isNotNull ()
@@ -305,14 +306,14 @@ void testDeleteVirtualCluster() {
305306 });
306307 assertDeploymentBecomesReady (proxy );
307308
308- await () .untilAsserted (() -> {
309+ AWAIT .untilAsserted (() -> {
309310 var service = extension .get (Service .class , CLUSTER_FOO );
310311 assertThat (service )
311312 .describedAs ("Expect Service for cluster 'foo' to have been deleted" )
312313 .isNull ();
313314 });
314315
315- await () .untilAsserted (() -> {
316+ AWAIT .untilAsserted (() -> {
316317 assertClusterServiceExists (proxy , CLUSTER_BAR );
317318 });
318319 LOGGER .atInfo ().log ("Test finished" );
@@ -398,7 +399,7 @@ KafkaProxy kafkaProxy(String name) {
398399 }
399400
400401 private void assertClusterServiceExists (KafkaProxy proxy , String clusterName ) {
401- await () .alias ("Service as expected" ).untilAsserted (() -> {
402+ AWAIT .alias ("Service as expected" ).untilAsserted (() -> {
402403 var service = extension .get (Service .class , clusterName );
403404 assertThat (service )
404405 .describedAs ("Expect Service for cluster '" + clusterName + "' to still exist" )
0 commit comments