1010import java .util .List ;
1111import java .util .Map ;
1212import java .util .Set ;
13+ import java .util .concurrent .TimeUnit ;
1314import java .util .stream .Collectors ;
1415
1516import org .assertj .core .api .AbstractStringAssert ;
2223import org .slf4j .Logger ;
2324import org .slf4j .LoggerFactory ;
2425
26+ import io .fabric8 .kubernetes .api .model .Pod ;
2527import io .fabric8 .kubernetes .api .model .Secret ;
2628import io .fabric8 .kubernetes .api .model .Service ;
2729import io .fabric8 .kubernetes .api .model .Volume ;
@@ -58,6 +60,19 @@ class ProxyReconcilerIT {
5860 static void checkKubeAvailable () {
5961 client = OperatorTestUtils .kubeClientIfAvailable ();
6062 Assumptions .assumeThat (client ).describedAs ("Test requires a viable kube client" ).isNotNull ();
63+ preloadOperatorImage ();
64+ }
65+
66+ // the initial operator image pull can take a long time and interfere with the tests
67+ private static void preloadOperatorImage () {
68+ String operandImage = ProxyDeployment .getOperandImage ();
69+ Pod pod = client .run ().withName ("preload-operator-image" )
70+ .withNewRunConfig ()
71+ .withImage (operandImage )
72+ .withRestartPolicy ("Never" )
73+ .withCommand ("ls" ).done ();
74+ client .resource (pod ).waitUntilCondition (it -> it .getStatus ().getPhase ().equals ("Succeeded" ), 2 , TimeUnit .MINUTES );
75+ client .resource (pod ).delete ();
6176 }
6277
6378 @ RegisterExtension
@@ -95,11 +110,24 @@ CreatedResources doCreate() {
95110
96111 assertProxyConfigContents (proxy , Set .of (CLUSTER_FOO_BOOTSTRAP , CLUSTER_BAR_BOOTSTRAP ), Set .of ());
97112 assertDeploymentMountsConfigSecret (proxy );
113+ assertDeploymentBecomesReady (proxy );
98114 assertServiceTargetsProxyInstances (proxy , clusters );
99-
100115 return new CreatedResources (proxy , clusters );
101116 }
102117
118+ private void assertDeploymentBecomesReady (KafkaProxy proxy ) {
119+ // wait longer for initial operator image download
120+ await ().alias ("Deployment as expected" ).untilAsserted (() -> {
121+ var deployment = extension .get (Deployment .class , ProxyDeployment .deploymentName (proxy ));
122+ assertThat (deployment ).isNotNull ()
123+ .extracting (Deployment ::getStatus )
124+ .describedAs ("All deployment replicas should become ready" )
125+ .satisfies (status -> {
126+ assertThat (status .getReplicas ()).isEqualTo (status .getReadyReplicas ());
127+ });
128+ });
129+ }
130+
103131 private void assertServiceTargetsProxyInstances (KafkaProxy proxy , Set <VirtualKafkaCluster > clusters ) {
104132 await ().alias ("cluster Services as expected" ).untilAsserted (() -> {
105133 for (var cluster : clusters ) {
@@ -171,6 +199,7 @@ void testUpdateVirtualCluster() {
171199 VirtualKafkaCluster cluster = createdResources .cluster (CLUSTER_FOO ).edit ().editSpec ().editTargetCluster ().editBootstrapping ().withBootstrapAddress (NEW_BOOTSTRAP )
172200 .endBootstrapping ().endTargetCluster ().endSpec ().build ();
173201 extension .replace (cluster );
202+ assertDeploymentBecomesReady (proxy );
174203 await ().untilAsserted (() -> {
175204 var secret = extension .get (Secret .class , ProxyConfigSecret .secretName (proxy ));
176205 assertThat (secret )
@@ -204,6 +233,7 @@ void testAddVirtualCluster() {
204233 .extracting (map -> map .get (ProxyConfigSecret .CONFIG_YAML_KEY ), InstanceOfAssertFactories .STRING )
205234 .contains (CLUSTER_BAZ_BOOTSTRAP );
206235 });
236+ assertDeploymentBecomesReady (proxy );
207237
208238 await ().untilAsserted (() -> {
209239 assertClusterServiceExists (proxy , CLUSTER_FOO );
@@ -229,6 +259,7 @@ void testDeleteVirtualCluster() {
229259 .doesNotContain (CLUSTER_FOO_BOOTSTRAP )
230260 .contains (CLUSTER_BAR_BOOTSTRAP );
231261 });
262+ assertDeploymentBecomesReady (proxy );
232263
233264 await ().untilAsserted (() -> {
234265 var service = extension .get (Service .class , CLUSTER_FOO );
@@ -259,6 +290,8 @@ void moveVirtualKafkaClusterToAnotherKafkaProxy() {
259290 // when
260291 extension .replace (virtualKafkaCluster (CLUSTER_BAR , CLUSTER_BAR_BOOTSTRAP , proxyB ));
261292 // then
293+ assertDeploymentBecomesReady (proxyA );
294+ assertDeploymentBecomesReady (proxyB );
262295 Set <String > doesNotContain = Set .of (CLUSTER_BAR_BOOTSTRAP );
263296 assertProxyConfigContents (proxyA , Set .of (CLUSTER_FOO_BOOTSTRAP ), doesNotContain );
264297 assertProxyConfigContents (proxyB , Set .of (CLUSTER_BAZ_BOOTSTRAP , CLUSTER_BAR_BOOTSTRAP ), Set .of ());
0 commit comments